Source code for id_discrete

# Copyright 2021 The DADApy Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""
The *id_discrete* module contains the *IdDiscrete* class.

The different algorithms of intrinsic dimension estimation for discrete spaces
  are implemented as methods of this class.
"""

import multiprocessing

import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import ks_2samp as KS

import dadapy._utils.discrete_functions as df
from dadapy import plot as ddp
from dadapy.base import Base

cores = multiprocessing.cpu_count()
rng = np.random.default_rng()


[docs] class IdDiscrete(Base): """Estimates the intrinsic dimension of a dataset with discrete features using a binomial likelihood. Inherits from class Base. Attributes: lk (int, np.ndarray(int)): radius of the external shells ln (int, np.ndarray(int)): radius of the internal shells k (np.ndarray(int)): total number of points within the external shell n (np.ndarray(int)): total number of points within the internal shell ratio (float): ratio between internal and external radii intrinsic_dim (float): intrinsic dimension obtained with the binomial estimator intrinsic_dim_err (float): error associated with the id estimation. Computed through Cramer-Rao or Bayesian inference intrinsic_dim_scale (float): scale at which the id has been computed posterior_domain (np.ndarray(float)): eventual support of the posterior distribution of the id posterior (np.ndarray(float)): posterior distribution when evaluated with Bayesian inference """ def __init__( self, coordinates=None, distances=None, is_network=False, maxk=None, condensed=None, weights=None, verbose=False, n_jobs=cores, ): """Instantiate the IdDiscrete object. Args: coordinates (np.ndarray(float)): the data points loaded, of shape (N , dimension of embedding space) distances (np.ndarray(float)): A matrix of dimension N x mask containing distances between points is_network (bool, default=False): if True a network is assumed to be loaded, meaning that in each computation the central points must not be removed from the enumeration maxk (int): maximum number of neighbours to be considered for the calculation of distances condensed (bool, default=False): if True, distances are saved in a cumulative fashion: the position in the self.distances array is the distance and the number represents the amount of points up to that distance (self is included) weights (np.ndarray(int), default=None): keeps into account explicitly possible repetitions of datapoints verbose (bool): whether you want the code to speak or shut up n_jobs (int): number of cores to be used """ super().__init__( coordinates=coordinates, distances=distances, maxk=maxk, verbose=verbose, n_jobs=n_jobs, ) self.central_point = 0 if is_network else 1 if weights is None: self._is_w = False self._weights = None else: self.set_w(weights) self._is_w = True self.lk = None self.ln = None self.k = None self.n = None self.ratio = None self._k = None self._mask = None if distances is not None: self._condensed = False else: self._condensed = condensed self.intrinsic_dim = None self.intrinsic_dim_err = None self.intrinsic_dim_scale = None self.posterior_domain = None self.posterior = None # ----------------------------------------------------------------------------------------------
[docs] def compute_distances( self, maxk=None, metric="manhattan", period=None, condensed=None, d_max=100 ): """Compute distances between datapoints. Distances can be saved in an extended or compacted shape. If condensed is True, self.distances will store how many points are present up to that distance. Conversely, the usual scheme is adopted (see same method in Base) Args: maxk: maximum number of neighbours for which distance is computed and stored metric: type of metric period (float or np.ndarray): periodicity (only used for periodic distance computation). Default is None. condensed (bool): save how many points one finds at each distance instead of all distances d_max (int, default=100): decide the farthest distance to be saved (only if condensed is True) """ if condensed is not None: self._condensed = condensed else: assert ( self._condensed is not None ), 'initialize "condensed" parameter in init or compute distances' if self._condensed: self.metric = metric self.period = period if period is not None: if isinstance(period, np.ndarray) and period.shape == (self.dims,): self.period = np.array(period, dtype=float) elif isinstance(period, int): self.period = np.full(self.dims, fill_value=period, dtype=float) else: raise ValueError( f"'period' must be either a float scalar or a numpy array of floats of shape ({self.dims},)" ) if ~isinstance(self.X[0, 0], int): print( "N.B. the data will be passed to the routine to compute distances as integers" ) self.distances, self.dist_indices = df.return_condensed_distances( np.array(self.X, dtype=int), self.metric, d_max, self.period, self.n_jobs, ) else: super().compute_distances(maxk, metric, period)
# ----------------------------------------------------------------------------------------------
[docs] def fix_lk(self, lk=None, ln=None): """Compute the k points within the given Rk and n points within given Rn. For each point, computes the number self.k of points within a sphere of radius Rk and the number self.n within an inner sphere of radius Rk. It also provides a mask to take into account those points for which the statistics might be wrong, i.e. k == self.maxk, meaning that all available points were selected or when we are not sure to have all the point of the selected shell. If self.maxk is equal to the number of points of the dataset no mask will be applied, as all the point were surely taken into account. The mask is not necessary (i.e. is set to True) if condensed is True, as all points will be considered. Eventually add the weights of the points if they have an explicit multiplicity Args: lk (int): external shell radius ln (int): internal shell radius """ # checks-in and initializations assert ( self.distances is not None ), "first compute distances with the proper metric (manhattan of hamming presumably)" if lk is not None and ln is not None: self.set_lk_ln(lk, ln) else: assert ( self.lk is not None and self.ln is not None ), "set lk and ln or insert proper values for the lk and ln parameters" self.set_ratio(float(self.ln) / float(self.lk)) self._k = ( 0 # just a flag to remember that the id was computed at constant radius ) if self._condensed: self.n = np.copy(self.distances[:, self.ln]) self.k = np.copy(self.distances[:, self.lk]) self._mask = np.ones(self.N, dtype=bool) else: if self._is_w is False: self.k = (self.distances <= self.lk).sum(axis=1) self.n = (self.distances <= self.ln).sum(axis=1) else: assert ( self._weights is not None ), "first insert the weights if you want to use them!" self.k = np.array( [ sum(self._weights[self.dist_indices[i][el]]) for i, el in enumerate(self.distances <= self.lk) ], dtype=np.int, ) self.n = np.array( [ sum(self._weights[self.dist_indices[i][el]]) for i, el in enumerate(self.distances <= self.ln) ], dtype=np.int, ) # checks-out # compute mask if self.maxk == self.N - 1: self._mask = np.ones(self.N, dtype=bool) else: # if not all possible NN were taken into account (i.e. maxk < N) and k is equal to self.maxk # or distances[:,-1]<lk, it is likely that there are other points within lk that are not being # considered and thus potentially altering the statistics -> neglect them through self._mask # in the calculation of likelihood self._mask = self.distances[:, -1] > self.lk # or self.k == self.maxk if np.any(~self._mask): print( "NB: for " + str(sum(~(self._mask))) + " points, the counting of k could be wrong, " + "as more points might be present within the selected Rk. In order not to affect " + "the statistics a mask is provided to remove them from the calculation of the " + "likelihood or posterior.\nConsiself.kder recomputing NN with higher maxk or lowering Rk." ) if self.verb: print("n and k computed")
# ----------------------------------------------------------------------------------------------
[docs] def compute_id_binomial_lk( # noqa: C901 self, lk=None, ln=None, method="bayes", subset=None, plot=True, set_attr=True, ): """Calculate Id using the binomial estimator by fixing the eternal radius for all the points. In the estimation of d one has to remove the central point from the counting of n and k as it generates the process, but it is not effectively part of it Args: lk (int): radius of the external shell ln (int): radius of the internal shell subset (int): choose a random subset of the dataset to make the Id estimate method (str, default 'bayes'): choose method between 'bayes' and 'mle'. The bayesian estimate gives the distribution of the id, while mle only the max of the likelihood plot (bool): if bayes method is used, one can decide whether to plot the posterior set_attr (bool): changes class attributes after computation Returns: intrinsic_dim (float): the id esteem intrinsic_dim_err (float): the error estimate on the id intrinsic_dim_scale (float): the scale at which the id was computed (lk) """ # checks-in and initialisations assert ( self.distances is not None ), "first compute distances with the proper metric (manhattan of hamming presumably)" if lk is not None and ln is not None: if isinstance(self.lk, np.ndarray): # id previously computed by fixing k self.set_lk_ln(lk, ln) self.fix_lk() elif ( lk != self.lk or ln != self.ln ): # ln or lk changing from a previous estimation self.set_lk_ln(lk, ln) self.fix_lk() else: assert ( self.lk is not None and self.ln is not None ), "set lk and ln through set_lk_ln or insert proper values for the lk and ln parameters" mask = self._my_mask(subset) n_eff = self.n[mask] - self.central_point k_eff = self.k[mask] - self.central_point if self._is_w: w_eff = self._weights[mask] # check statistics before performing id estimation if ~self._is_w: e_n = n_eff.mean() if e_n == 0.0: print( "no points in the inner shell, returning 0. Consider increasing ln and possibly lk" ) self.intrinsic_dim = 0 self.intrinsic_dim_err = 0 return 0, 0, self.lk # choice of the method if method == "mle": if self._is_w: # necessary only if using the root finding method N = w_eff.sum() k_eff = sum(k_eff * w_eff) / float(N) n_eff = sum(n_eff * w_eff) / float(N) else: w_eff = 1 N = k_eff.shape[0] k_eff = k_eff.mean() n_eff = n_eff.mean() # explicit computation of likelihood, not necessary when ln and lk are fixed, but apparently\ # more stable than root searching intrinsic_dim = df.find_d_likelihood(self.ln, self.lk, n_eff, k_eff, w_eff) """ intrinsic_dim = df.find_d_root(self.ln, self.lk, n_eff, k_eff) """ intrinsic_dim_err = ( df.binomial_cramer_rao( d=intrinsic_dim, ln=self.ln, lk=self.lk, N=N, k=k_eff ) ** 0.5 ) elif method == "bayes": if self._is_w: k_tot = w_eff * k_eff n_tot = w_eff * n_eff else: k_tot = k_eff n_tot = n_eff if self.verb: print("starting bayesian estimation") ( intrinsic_dim, intrinsic_dim_err, self.posterior_domain, self.posterior, ) = df.beta_prior_d(k_tot, n_tot, self.lk, self.ln, plot=plot) else: print("select a proper method for id computation") return 0 if set_attr: self.intrinsic_dim = intrinsic_dim self.intrinsic_dim_err = intrinsic_dim_err self.intrinsic_dim_scale = self.lk return intrinsic_dim, intrinsic_dim_err, self.lk
# ----------------------------------------------------------------------------------------------
[docs] def return_id_scaling(self, Lks, r=0.5, method="mle", subset=None, plot=True): """Compute the ID by varying the radii. Args: Lks (list or np.ndarray(int)): external radii lk r (float, default = 0.5): ratio among ln and lk method (string, default='mle'): method to compute the id subset (np.ndarray(int) or int, default=None): indices of points to be used for the estimate plot (bool, default=True): whether to plot the id scaling Returns: ids (np.ndarray): intrinsic dimension at different scales ids_err (np.ndarray): error estimate on intrinsic dimension at different scales Quick Start: =========== .. code-block:: python import numpy as np import matplotlib.pyplot as plt rng = np.random.default_rng(12345) from dadapy.id_discrete import IdDiscrete #uniformly sampled points on 5d lattice N = 2500 box = 50 d = 5 data = rng.integers(0,box,size=(N, d)) I3D = IdDiscrete(data, condensed=True, maxk=data.shape[0]) I3D.compute_distances(metric='manhattan',d_max=box*d,period=box) scales = np.arange(15,30) ids_scaling, ids_scaling_err = I3D.return_id_scaling(scales,r=0.75) ids_scaling: array([4.91, 4.86, 4.86, 4.94, 4.98, 5. , 5. , 4.99, 5.01, 5. , 5.01, 5.01, 5.02, 5.01, 5.01]) ids_scaling_err: array([0.09, 0.08, 0.07, 0.06, 0.05, 0.05, 0.04, 0.04, 0.03, 0.03, 0.03, 0.02, 0.02, 0.02, 0.02]) References: Iuri Macocco, Aldo Glielmo, Jacopo Grilli, and Alessandro Laio, Intrinsic Dimension Estimation for Discrete Metrics, Phys. Rev. Lett. 130, 067401 – Published 8 February 2023 """ ids = np.zeros_like(Lks, dtype=float) ids_e = np.zeros_like(Lks, dtype=float) for i, lk in enumerate(Lks): ln = np.ceil(lk * r).astype(int) if ln == lk: ln -= 1 id_i, id_er, scale = self.compute_id_binomial_lk( lk, ln, method=method, subset=subset, set_attr=False ) ids[i] = id_i ids_e[i] = id_er if plot: plt.figure() plt.plot(Lks, ids) plt.errorbar(Lks, ids, ids_e, fmt="None") plt.xlabel("scale") plt.ylabel("ID estimate") return ids, ids_e
# ----------------------------------------------------------------------------------------------
[docs] def fix_k(self, k_eff=None, ratio=0.5): # noqa: C901 """Compute Rk, Rn, n for each point given a selected value of k. This routine computes external radii lk, internal radii ln and internal points n. It also ensures that lk is scaled onto the most external shell for which we are sure to have all points. A mask will exclude "badly behaved" points if condensed is False. NB As a consequence the k will be point dependent Args: k_eff (int, default=self.k_max): selected (max) number of NN ratio (float, default = 0.5): approximate ratio among ln and lk """ # TODO: what if we have multiplicity???? use it straightly on the plain points # or take it into account when counting the number of NN? # checks-in and initialisations assert ( self.distances is not None ), "first compute distances with the proper metric (manhattan of hamming presumably)" self.set_ratio(ratio) self._k = k_eff # mark that the id is computed at fixed k if self._condensed: self.lk = np.ones(self.N, dtype=int) for i, ddi in enumerate(self.distances): lk = 0 appo = 0 while ( k_eff > ddi[lk + 1] ): # increase lk until you reach the proper amount of neighbours if ddi[lk + 1] != ddi[lk]: # save at which range you last found a neighbour at a different distance appo = np.copy(lk) + 1 lk += 1 # go to next radius if it has exactly the number of k_eff if ddi[lk + 1] == k_eff: lk += 1 self.lk[i] = np.copy(lk) continue # go back to lower radius with same amount of neighbours if the next one is too big if ddi[lk] == ddi[appo]: self.lk[i] = np.copy(appo) else: self.lk[i] = np.copy(lk) self.ln = np.rint(self.lk * self.ratio).astype(int) self.k = np.take_along_axis(self.distances, self.lk[:, None], 1)[:, 0] self.n = np.take_along_axis(self.distances, self.ln[:, None], 1)[:, 0] self._mask = self.lk > 0 else: if k_eff is None: k_eff = self.maxk - 1 else: assert k_eff < self.maxk, ( "A k_eff > maxk was selected, recompute the distances with the proper amount on NN to see points " "up to that k_eff " ) # routine self.lk, self.k, self.ln, self.n = ( np.zeros(self.N), np.zeros(self.N, dtype=np.int64), np.zeros(self.N), np.zeros(self.N, dtype=np.int64), ) self._mask = np.ones(self.N, dtype=bool) # cut distances at the k-th NN and cycle over each point for i, dist_i in enumerate(self.distances[:, : k_eff + 1]): # case 1: all possible neighbours have been considered, no extra possible unseen points if k_eff == self.N: lk_temp = dist_i[-1] k_temp = k_eff # case 2: only some NN are considered -> work on surely "complete" shells else: # Group distances according to shell. Returns radii of shell and cumulative number # of points at that radius. # The index at which a distance is first met == number of points at smaller distance # EXAMPLE: """ a = np.array([0,1,1,2,3,3,4,4,4,6,6]) and suppose it would go on as 6,6,6,7,8... un, ind = np.unique(a, return_index=True) un: array([0, 1, 2, 3, 4, 6]) ind: array([0, 1, 3, 4, 6, 9]) """ unique, index = np.unique(dist_i, return_index=True) if unique.shape[0] < 3: # lk=0 may happen when we have smtg like dist_i = [0,0,0,0,0] or dist_i = [0,3,3,3,3,3]. # As we don't have the full information for at least two shells, we skip the point self._mask[i] = False continue # set lk to the distance of the second-to-last shell, for which we are sure to have full info lk_temp = unique[-2] # EX: 4 # set k to cumulative up to last complete shell k_temp = index[-1] # EX: 9 # fix internal radius ln_temp = np.rint(lk_temp * self.ratio) # if the inner shell is accidentally the same of the outer, go to the innerer one if ln_temp == lk_temp: ln_temp -= 1 n_temp = sum(dist_i <= ln_temp) self.k[i] = k_temp.astype(np.int64) self.n[i] = n_temp.astype(np.int64) self.lk[i] = lk_temp self.ln[i] = ln_temp # checks out if any(~self._mask): print( "BE CAREFUL: " + str(sum(~self._mask)) + " points would have lk set to 0 " + "and thus will not be considered when computing the id. Consider increasing k." ) if self.verb: print("n and k computed")
# --------------------------------------------------------------------------------------
[docs] def fix_k_shell(self, k_shell, ratio=0.5): # noqa: C901 """Compute the lk, ln, n given k_shell. This routine computes the external radius lk, the associated points k, internal radius ln and associated points n. The computation is performed starting from a given number of shells. It ensures that Rk is scaled onto the most external shell for which we are sure to have all points. NB in this case we will have an effective, point dependent k Args: k_shell (int): selected (max) number of considered shells ratio (float): ratio among Rn and Rk """ # TODO: one might want to use it even with less shells available # initial checks assert ( self.distances is not None ), "first compute distances with the proper metric (manhattan of hamming presumably)" self.set_ratio(ratio) if self._condensed: assert k_shell < self.distances.shape[1] self.lk = np.zeros(self.N, dtype=int) for i, ddi in enumerate(self.distances): counter = 0 # number of non-empty shell visited lk = 1 # radius iterator while counter < k_shell: if ddi[lk - 1] != ddi[lk]: counter += 1 lk += 1 self.lk[i] = np.copy(lk) - 1 self.ln = np.rint(self.lk * self.ratio).astype(int) # check whether ln==lk, in that case reduce ln by 1 mask_ll = np.where(self.lk == self.ln)[0] self.ln[mask_ll] -= 1 self.k = np.take_along_axis(self.distances, self.lk[:, None], 1)[:, 0] self.n = np.take_along_axis(self.distances, self.ln[:, None], 1)[:, 0] self._mask = np.ones(self.N, dtype=bool) else: assert k_shell < self.maxk, "asking for a much too large number of shells" self.lk, self.k, self.ln, self.n = ( np.zeros(self.N), np.zeros(self.N, dtype=np.int64), np.zeros(self.N), np.zeros(self.N, dtype=np.int64), ) self._mask = np.ones(self.N, dtype=bool) for i, dist_i in enumerate(self.distances): counter = 0 cycle = 0 while counter < k_shell + 1: if dist_i[cycle] != dist_i[cycle + 1]: counter += 1 # update shell counter when neighbours at different distance are found cycle += 1 if cycle == self.maxk: # if you reach maxk do not use the point self._mask[i] = False continue self.k[i] = np.copy(cycle) lk_temp = np.copy( dist_i[cycle - 1] ) # set lk to the distance of the selected shell # and ln according to the ratio ln_temp = np.rint(lk_temp * self.ratio) # if the inner shell is accidentally the same of the outer, go to the innerer one if ln_temp == lk_temp: ln_temp -= 1 # compute k and n if self._is_w: which_k = dist_i <= lk_temp self.k[i] = sum( self._weights[self.dist_indices[i][which_k]] ).astype(np.int64) which_n = dist_i <= ln_temp self.n[i] = sum( self._weights[self.dist_indices[i][which_n]] ).astype(np.int64) else: self.n[i] = sum(dist_i <= ln_temp).astype(np.int64) self.lk[i] = lk_temp self.ln[i] = ln_temp # checks out if any(~self._mask): print( "BE CAREFUL: " + str(sum(~self._mask)) + " points would have Rk set to 0 " + "and thus will be kept out of the statistics. Consider increasing k." ) self._k = k_shell if self.verb: print("n and k computed")
# --------------------------------------------------------------------------------------
[docs] def compute_id_binomial_k_discrete( self, k, ratio=0.5, shell=False, subset=None, approx_err=True, set_attr=True ): """Calculate Id using the binomial estimator by fixing the number of neighbours or shells. As in the case in which one fix lk, also in this version of the estimation one removes the central point from n and k. Two different ways of computing k,n,lk,ln are available, whether one intends k as the k-th neighbour or the k-th shell. In both cases, one wants to be sure that the shell chosen is Args: k (int): order of neighbour that set the external shell ratio (float): ratio between internal and external shell shell (bool): k stands for number of neighbours or number of occupied shells subset (int): choose a random subset of the dataset to make the Id estimate approx_err (bool, default=True): if True, computes the error on the estimate using the CR.\ Otherwise it profiles the likelihood and compute its std (takes longer) set_attr (bool, default=True): assign id, id error and scale to the class Returns: intrinsic_dim (float): the id estimation intrinsic_dim_err (float): the error estimate on the id intrinsic_dim_scale (float): the scale at which the id was computed: <lk> """ # checks-in and initialisations assert ( self.distances is not None ), "first compute distances with the proper metric (manhattan of hamming presumably)" if k != self._k or ratio != self.ratio: if shell: self.fix_k_shell(k, ratio) else: self.fix_k(k, ratio) mask = self._my_mask(subset) n_eff = self.n[mask] - self.central_point k_eff = self.k[mask] - self.central_point ln_eff = self.ln[mask] lk_eff = self.lk[mask] if self._is_w: ww = self._weights[mask] else: ww = 1 e_n = n_eff.mean() if e_n == 0.0: print( "no points in the inner shell, returning 0. Consider increasing k and/or the ratio," "especially if your embedding space is very high dimensional." ) self.intrinsic_dim = 0 self.intrinsic_dim_err = 0 return 0, 0, lk_eff.mean() intrinsic_dim = df.find_d_likelihood(ln_eff, lk_eff, n_eff, k_eff, ww) if approx_err: intrinsic_dim_err = ( df.binomial_cramer_rao( d=intrinsic_dim, ln=int(ln_eff.mean()), lk=int(lk_eff.mean()), N=mask.sum(), k=k_eff.mean(), ) ** 0.5 ) else: _, b, _, _ = df.profile_likelihood(ln_eff, lk_eff, n_eff, k_eff, ww) intrinsic_dim_err = b if set_attr: self.intrinsic_dim = intrinsic_dim self.intrinsic_dim_err = intrinsic_dim_err self.intrinsic_dim_scale = lk_eff.mean() return intrinsic_dim, intrinsic_dim_err, lk_eff.mean()
# ----------------------------------------------------------------------------------------------
[docs] def return_id_scaling_k(self, Ks, r=0.5, subset=None, plot=True): """Compute the ID by varying the number of neighbours considered. Args: Ks (list or np.ndarray(int)): number of neighbours r (float, default = 0.5): ratio between ln and lk subset (int): choose a random subset of the dataset to make the Id estimate plot (bool, default=True): whether to plot the id as a function of Ks Returns: ids (np.ndarray(float)): intrinsic dimension at different scales ids_err (np.ndarray(float)): error estimate on intrinsic dimension at different scales scale (np.ndarray(float)): scale at which the id was computed (mean values of lk) """ ids = np.zeros_like(Ks, dtype=float) ids_e = np.zeros_like(Ks, dtype=float) scale = np.zeros_like(Ks, dtype=float) for i, k in enumerate(Ks): ids[i], ids_e[i], scale[i] = self.compute_id_binomial_k_discrete( k, r, False, subset=subset, set_attr=False ) if plot: plt.plot(Ks, ids) plt.errorbar(Ks, ids, ids_e, fmt="None") plt.xlabel("scale") plt.ylabel("ID estimate") return ids, ids_e, scale
# ----------------------------------------------------------------------------------------------
[docs] def return_id_fit(self, scales, subset=None, plot=True): r"""Find the ID as least square fit between the actual and the theoretical number of points. Compute the id as least square fit between the average number of neighbours N(r) and the theoretical number of points within a given shell \rho*V(r). Given a radius r, the fit takes into account all points with r_i <= r. The fit is performed both between N and \rho*V and their logarithms. Args: scales (list(int) or np.ndarray(int)): radii at which the number of neighbours is computed subset (np.ndarray(int)): indices of points to be used for the estimate plot (bool): plot the fit taking into account all given radii in R and ID(r) Returns: ids (np.ndarray(float)): the intrinsic dimension estimates as a function of R with "normal" fit ids (np.ndarray(float)): the intrinsic dimension estimates as a function of R with "log" fit scales (np.ndarray(int)): the scales at which the ID is computed (the first point is excluded) """ from scipy.optimize import curve_fit def fit_func(r, d, rho): return df.compute_discrete_volume(r, d) * rho def fit_func_log(r, d, rho): return np.log(df.compute_discrete_volume(r, d)) + np.log(rho) ids, idsl = [], [] N = np.zeros_like(scales) mask = self._my_mask(subset) for i, ri in enumerate(scales): if self._condensed: N[i] = np.mean(self.distances[mask, ri]) else: N[i] = np.mean(self.distances[mask] <= ri) if i == 0: continue popt, pcov = curve_fit(fit_func, scales[: i + 1], N[: i + 1]) ids.append(popt) poptl, pcovl = curve_fit(fit_func_log, scales[: i + 1], np.log(N[: i + 1])) idsl.append(poptl) ids = np.array(ids) idsl = np.array(idsl) if plot: plt.figure() plt.plot(N, fit_func(scales, *popt), label="fit") plt.plot(N, np.exp(fit_func_log(scales, *poptl)), label="log fit") plt.scatter(N, N, label="x=y") plt.legend() plt.figure() plt.plot(scales[1:], ids[:, 0], label="fit") plt.plot(scales[1:], idsl[:, 0], label="log fit") plt.legend() return ids[:, 0], idsl[:, 0], scales[1:]
# ----------------------------------------------------------------------------------------------
[docs] def return_id_fit_continuum(self, R, fit_intercept=True, subset=None, plot=True): r"""Find the ID as linear fit between Log(n) vs Log(r), assuming N~r**d. Compute the id fitting a line between Log(n) and Log(r). Given a radius r_i, the fit takes into account all points with r <= r_i. Since no prescription has been given for this method on how to deal with points lying at discrete distances, we suppose to apply a small smearing, so that half of the points will end up a bit further than their initial distance and half will be slightly closer. As a consequence, when looking at distance r, we will be considering all points up to r-1 and half of those at distance r Args: R (list(int) or np.ndarray(int)): radii at which the number of neighbours is computed fit_intercept (bool, default=True): decide whether to fit the intercept or fix it to N(r=1) subset (np.ndarray(int)): indices of points to be used for the estimate plot (bool): plot the fit taking into account all given radii in R and ID(r) Returns: ids (np.ndarray(float)): the intrinsic dimension estimates as a function of R R (np.ndarray(int)): the scales at which the ID is computed (the first point is excluded) """ self._mask = np.ones(self.N, dtype=bool) mask = self._my_mask(subset) el = mask.sum() from scipy.optimize import curve_fit if fit_intercept: def fit_func(x, a, b): return a * x + b else: if self._condensed: b = np.log(np.mean(self.distances[mask, 1])) - el else: b = np.log(np.mean(self.distances[mask] <= 1)) - el def fit_func(x, a): return a * x + b N = [] ids_i = [] if np.any(R < 1e-10): print("we cannot use the R=0 as we have to take the log, start with R=1") for i, r in enumerate(R): if self._condensed: shell = ( np.sum(self.distances[mask, r] - self.distances[mask, r - 1]) * 0.5 ) n = np.sum(self.distances[mask, r - 1]) + shell - el else: shell = ( np.sum((r - 1 < self.distances[mask]) * (self.distances[mask] <= r)) * 0.5 ) n = np.sum(self.distances[mask] < r) + shell - el N.append(n) if i == 0: continue popt, pcov = curve_fit(fit_func, np.log(R[: i + 1]), np.log(N[: i + 1])) ids_i.append(popt) ids = np.array(ids_i) if plot: plt.figure(figsize=(5, 3)) plt.scatter(np.log(R), np.log(N), label="points") plt.plot(np.log(R), fit_func(np.log(R), *ids[-1]), label="linear fit") plt.legend() plt.figure(figsize=(5, 3)) plt.plot(R[1:], ids[:, 0], label="curve_fit") return ids[:, 0], R[1:]
# -------------------------------MODEL VALIDATION----------------------------------------------- # ----------------------------------------------------------------------------------------------
[docs] def compute_local_density(self, plot=True): """Compute and compare the rescaled local density of points. Compute and compare the local density of points inside inner and outer shells after fixing lk and ln as n/<n> and (k-n)/<k-n>. The closest the points to the y=x line, the more the local uniform density hypothesis is respected, the better will be the id estimate Args: plot (bool, default=True): decide whether to plot the local density Returns: n (np.ndarray(float)): n/<n> m (np.ndarray(float)): (k-n)/<k-n> """ assert self.n is not None and self.k is not None, "find k and n first!" assert self.intrinsic_dim is not None, "compute the ID first!" n = np.copy(self.n).astype(float) m = np.copy(self.k - self.n).astype(float) # id computed at fixed lk if self._k == 0: assert isinstance(self.lk, int) n /= n.mean() m /= m.mean() # id computed at fixed k else: Vk = df.compute_discrete_volume(self.lk, self.intrinsic_dim) Vn = df.compute_discrete_volume(self.ln, self.intrinsic_dim) n /= Vn m /= Vk - Vn dist = abs(n - m) / np.sqrt(n**2 + m**2) print("average distance from line: ", dist.mean()) if plot: plt.scatter(n, m, s=1.0) plt.plot( np.linspace(min(min(n), min(m)), max(max(n), max(m)), 100), np.linspace(min(min(n), min(m)), max(max(n), max(m)), 100), ) plt.xlabel(r"$n/\langle n \rangle$") plt.ylabel(r"$(k-n)/\langle k-n \rangle$") return n, m
# ----------------------------------------------------------------------------------------------
[docs] def model_validation_full( # noqa: C901 self, alpha=0.05, subset=None, artificial_samples=100000, pdf=False, cdf=True, filename=None, ): """Use Kolmogorov-Smirnoff test to assess the goodness of the id estimate. In order to validate estimate of the intrinsic dimension and the model and the goodness of the binomial estimator we perform a KS test. In particular, once the ID has been computed, we generate a new set n_i starting from the k_i, lk_, ln_i and d using the binomial distribution (ln and lk can be scalars if the id estimate has been performed with fixed radii). We then compare the CDF obtained from both the n_empirical and the new n_i using the KS test, looking at the maximum distance between the two CDF Args: alpha (float, default=0.05): tolerance for the KS test subset (int or np.ndarray(int)): subset of points used to perform the model validation artificial_samples (int, default=100000): number of points sampled from the theoretical distribution pdf (bool, default=False): plot histogram of n_emp and n_i cdf (bool, default=False): plot cdf of n_emp and n_i filename (str, default=None): directory where to save plots Returns: s (float): KS statistics, max distance between empirical and theoretical cdfs pv (float): p-value associated to the KS statistics """ if self.intrinsic_dim is None: print("compute the id before validating the model!") return 0 mask = self._my_mask(subset) n_eff = self.n[mask] - self.central_point k_eff = self.k[mask] - self.central_point if isinstance(self.ln, np.ndarray): # id estimated at fixed K ln_eff = self.ln[mask] lk_eff = self.lk[mask] title = "K=" + str(self._k) p = df.compute_discrete_volume( ln_eff, self.intrinsic_dim ) / df.compute_discrete_volume(lk_eff, self.intrinsic_dim) else: # id estimated at fixed lk title = "R=" + str(self.lk) p = df.compute_discrete_volume( self.ln, self.intrinsic_dim ) / df.compute_discrete_volume(self.lk, self.intrinsic_dim) if self.n.shape[0] < artificial_samples: replicas = int(artificial_samples / self.n.shape[0]) if isinstance(self.ln, np.ndarray): n_model = np.array( [rng.binomial(ki, pi, size=replicas) for ki, pi in zip(k_eff, p)] ).reshape(-1) else: n_model = np.array( [rng.binomial(ki, p, size=replicas) for ki in k_eff] ).reshape(-1) else: n_model = rng.binomial(k_eff, p) if self.central_point == 0: n_model = n_model[n_model > 0] s, pv = KS(n_eff, n_model) if self.verb: if pv > alpha: print( "We cannot reject the null hypothesis: the empirical and theoretical\ distributions has to be considered equivalent" ) else: print( "We have to reject the null hypothesis: the two distributions are not\ equivalent and thus the model as it is cannot be used to infer the id" ) if pdf: fileout = filename + "_mv_pdf.pdf" if (filename is not None) else None ddp.plot_pdf(n_eff, n_model, title, fileout) if cdf: fileout = filename + "_mv_cdf.pdf" if (filename is not None) else None ddp.plot_cdf(n_eff, n_model, title, fileout) return s, pv
# ----------------------------------------------------------------------------------------------
[docs] def set_id(self, d): """Set id manually. Args: d (float): intrinsic dimension to assign to the class """ assert d > 0, "cannot support negative dimensions (yet)" self.intrinsic_dim = d
# ----------------------------------------------------------------------------------------------
[docs] def set_lk_ln(self, lk, ln): """Assign lk and ln to class. Args: lk (int): radius of external shells ln (int): radius of internal shells """ assert ( isinstance(ln, (np.int8, np.int16, np.int32, np.int64, int)) and ln >= 0 ), "select a proper integer ln>=0" assert ( isinstance(lk, (np.int8, np.int16, np.int32, np.int64, int)) and lk > 0 ), "select a proper integer lk>0" assert lk > ln, "select lk and ln, s.t. lk > ln" self.ln = ln self.lk = lk
# ----------------------------------------------------------------------------------------------
[docs] def set_ratio(self, r): """Set ratio between internal and external shells. Args: r (float): ratio, 0<r<1 is required """ assert isinstance(r, float) and 0 <= r < 1, "select a proper ratio 0<r<1" self.ratio = r
# ----------------------------------------------------------------------------------------------
[docs] def set_w(self, w): """Set weights of points. Args: w (np.ndarray(int)): multiplicity of points. Needs to have the same length of points """ assert len(w) == self.N and all( [wi > 0 and isinstance(wi, (np.int, int))] for wi in w ), "load proper integer weights" self._weights = np.array(w, dtype=np.int)
# ---------------------------------------------------------------------------------------------- def _my_mask(self, subset=None): """Compute the mask to select points to be used when computing the id. This function create a mask used to compute the id on a subset of datapoints. It takes into account mask given by the fix_k or fix_lk methods and integrate it with the subset given as an argument. This can be an integer (and points will be chosen randomly) or a list/np.ndarray of indices. If subset is left empty, just passes the mask from fix_k or fix_lk Args: subset (int or np.ndarray(int) or list(int), default=None) Returns: my_mask (np.ndarray(bool)): mask indicating which points will be used in id computation """ assert self._mask is not None if subset is None: return np.copy(self._mask) if isinstance(subset, (np.ndarray, list)): assert isinstance( subset[0], (int, np.integer) ), "elements of list/array must be integers, in order to be used as indexes" assert ( max(subset) < self.N ), "the array must contain elements with indexes lower than the total number of elements" my_mask = np.zeros(self._mask.shape[0], dtype=bool) my_mask[subset] = True my_mask *= self._mask # remove points with bad statistics elif isinstance(subset, (int, np.integer)): if subset > self._mask.sum(): my_mask = np.copy(self._mask) else: my_mask = np.zeros(self._mask.shape[0], dtype=bool) idx = np.sort( self.rng.choice( np.where(self._mask)[0], subset, replace=False, shuffle=False, ) ) my_mask[idx] = True assert my_mask.sum() == subset else: print("use a proper format for the subset, returning no subset") return np.copy(self._mask) return my_mask
# ---------------------------------------------------------------------------------------------- if __name__ == "__main__": X = rng.integers(0, 20, size=(1000, 2)) ide = IdDiscrete(coordinates=X) ide.compute_distances(maxk=30, metric="manhattan", period=20) ide.compute_id_binomial_lk(4, 2) print(ide.intrinsic_dim)