Source code for data

# Copyright 2021-2023 The DADApy Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""
The *data* module contains the *Data* class.

Such a class inherits from all other classes defined in the package and as such it provides a convenient container of
all the algorithms implemented in Dadapy.
"""

import multiprocessing
import os

import numpy as np

from dadapy._utils import utils as ut
from dadapy.clustering import Clustering
from dadapy.density_advanced import DensityAdvanced
from dadapy.feature_weighting import FeatureWeighting
from dadapy.metric_comparisons import MetricComparisons

rng = np.random.default_rng()

cores = multiprocessing.cpu_count()
np.set_printoptions(precision=2)
os.getcwd()


[docs] class Data(Clustering, DensityAdvanced, MetricComparisons, FeatureWeighting): """Data class.""" def __init__( self, coordinates=None, distances=None, maxk=None, verbose=False, n_jobs=cores, working_memory=1024, ): """Initialise a Data object, container of all DADApy methods. It is initialised with a set of coordinates or a set of distances, and all methods can be called on the generated class instance. Args: coordinates (np.ndarray(float)): the data points loaded, of shape (N , dimension of embedding space) distances (np.ndarray(float)): A matrix of dimension N x mask containing distances between points maxk (int): maximum number of neighbours to be considered for the calculation of distances verbose (bool): whether you want the code to speak or shut up n_jobs (int): number of cores to be used working_memory (int): working memory (TODO: currently unused) """ super().__init__( coordinates=coordinates, distances=distances, maxk=maxk, verbose=verbose, n_jobs=n_jobs, )
[docs] def return_ids_kstar_gride( self, initial_id=None, n_iter=5, Dthr=23.92812698, d0=0.001, d1=1000, eps=1e-7 ): """Return the id estimates of the Gride algorithm coupled with the kstar estimation of the scale. Args: initial_id: initial estimate of the id default uses 2NN n_iter: number of iteration Dthr: threshold value for the kstar test d0: minimum id value d1: maximum id value eps: threshold for the convergence of the Gride algorithm Returns: ids, ids_err, kstars, log_likelihoods """ # start with an initial estimate of the ID if initial_id is None: self.compute_id_2NN() else: self.set_id(initial_id) if self.distances is None: self.compute_distances() # compute kstar self.compute_kstar(Dthr) ids = [self.intrinsic_dim] ids_err = [self.intrinsic_dim_err] kstars = [self.kstar] log_likelihoods = [0] for i in range(n_iter): print("iteration ", i) print("id ", self.intrinsic_dim) # compute n2 and n1 via kstar. If not even, make it even by adding one n2s = self.kstar not_even = n2s % 2 != 0 n2s[not_even] = n2s[not_even] + 1 assert sum(n2s % 2 != 0) == 0 n1s = (n2s / 2).astype(int) # compute the mus mus = np.array( [ self.distances[i, n2] / self.distances[i, n1] for i, (n1, n2) in enumerate(zip(n1s, n2s)) ] ) # compute the id using Gride id, id_err = self._compute_id_gride_single_scale(d0, d1, mus, n1s, n2s, eps) self.set_id(id) log_lik = -ut._neg_loglik(self.dtype, id, mus, n1s, n2s) self.compute_kstar(Dthr) ids.append(id) ids_err.append(id_err) kstars.append(self.kstar) log_likelihoods.append(log_lik) ids = np.array(ids) ids_err = np.array(ids_err) kstars = np.array(kstars) log_likelihoods = np.array(log_likelihoods) id_scale = 0.0 for i, (n1, n2) in enumerate(zip(n1s, n2s)): id_scale += self.distances[i, n1] id_scale += self.distances[i, n2] id_scale /= 2 * self.N self.intrinsic_dim = id self.intrinsic_dim_err = id_err self.intrinsic_dim_scale = id_scale return ids, ids_err, kstars, log_likelihoods
[docs] def return_ids_kstar_binomial( self, initial_id=None, n_iter=5, Dthr=23.92812698, r=None, plot_mv=False, k_bootstrap=1, ): """Return the id estimates of the binomial algorithm coupled with the kstar estimation of the scale. Args: initial_id (float): initial estimate of the id default uses 2NN n_iter (int): number of iteration Dthr (float): threshold value for the kstar test r (float, default=None): parameter of binomial estimator, 0 < r < 1. If None, the optimal, adaptive one is used plot_mv (bool, default=False): if True, plots the observed and the theoretical distributions Returns: ids (np.ndarray(float)): intrinsic dimension across iterations ids_err (np.ndarray(float)): intrinsic dimension error across iterations kstars (np.ndarray(int): arrays of kstars across iterations p-values (np.ndarray(float)): p-values from model validation across iterations """ # start with an initial estimate of the ID and the associated k* if initial_id is None: self.compute_id_2NN(algorithm="base") else: self.set_id(initial_id) if self.distances is None: self.compute_distances() self.compute_kstar(Dthr) ids = [self.intrinsic_dim] ids_err = [self.intrinsic_dim_err] kstars = [self.kstar] pvalues = [0] for i in range(n_iter): print("iteration ", i) print("id ", self.intrinsic_dim) # set new ratio r_eff = min(0.975, 0.2032 ** (1.0 / self.intrinsic_dim)) if r is None else r # compute id using the k* ide, id_err, scale, pv = self.compute_id_binomial_k( self.kstar, r_eff, bayes=False, plot_mv=plot_mv, k_bootstrap=k_bootstrap ) # compute likelihood """ n = self._fix_k(self.kstar, r_eff) log_lik = ut.binomial_loglik(ide, self.kstar - 1, n - 1, r_eff) """ # update the k* self.compute_kstar(Dthr) # store the obtained values ids.append(ide) ids_err.append(id_err) kstars.append(self.kstar) pvalues.append(pv) ids = np.array(ids) ids_err = np.array(ids_err) kstars = np.array(kstars) pvalues = np.array(pvalues) return ids, ids_err, kstars, pvalues