Source code for _utils.utils

# Copyright 2021-2023 The DADApy Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import multiprocessing
import warnings

import numpy as np
import scipy.special as sp
from scipy.spatial import cKDTree
from scipy.special import binom
from scipy.stats import beta as beta_d
from sklearn.metrics import pairwise_distances
from sklearn.neighbors import NearestNeighbors

cores = multiprocessing.cpu_count()


[docs] def compute_all_distances(X, n_jobs=cores, metric="euclidean"): """Compute the distances among all available points of the dataset X Args: X (np.ndarray): array of dimension N x D n_jobs (int): number of cores to use for the computation metric (str): metric used to compute the distances Returns: (np.ndarray(float)): N x N array containing the distances between the N points """ dists = pairwise_distances(X, Y=None, n_jobs=n_jobs, metric=metric) return dists
[docs] def compute_NN_PBC(X, maxk, box_size=None, p=2, cutoff=np.inf): """Compute the neighbours of each point taking into account periodic boundaries conditions and eventual cutoff Args: X (np.ndarray): array of dimension N x D maxk (int): number of neighbours to save box_size (float, np.ndarray(float)): sizes of PBC walls. Single value is interpreted as cubic box. p (int): Minkowski p-norm used cutoff (float): set an upper bound to the distances. Over such threshold a np.inf will occur Returns: dist (np.ndarray(float)): N x maxk array containing the distances from each point to the first maxk nn ind (np.ndarray(int)): N x maxk array containing the indices of the neighbours of each point """ tree = cKDTree(X, boxsize=box_size) dist, ind = tree.query(X, k=maxk, p=p, distance_upper_bound=cutoff) return dist, ind
[docs] def from_all_distances_to_nndistances(pdist_matrix, maxk): """Save the first maxk neighbours starting from the matrix of the distances Args: pdist_matrix (np.ndarray(float)): N x N matrix of distances maxk (int): number of neighbours to save Returns: distances (np.ndarray(float)): N x maxk matrix, distances of the neighbours of each point dist_indices (np.ndarray(int)): N x maxk matrix, indices of the neighbours of each point """ dist_indices = np.asarray(np.argsort(pdist_matrix, axis=1)[:, 0 : maxk + 1]) distances = np.asarray(np.take_along_axis(pdist_matrix, dist_indices, axis=1)) return distances, dist_indices
[docs] def compute_cross_nn_distances( X_new, X, maxk, metric="euclidean", period=None, n_jobs=None ): """Compute distances, up to neighbour maxk, between points of X_new and points of X. The element distances[i,j] represents the distance between point i in dataset X and its j-th neighbour in dataset X_new, whose index is dist_indices[i,j] Args: X_new (np.array(float)): dataset from which distances are computed X (np.array(float)): starting dataset of points, from which distances are computed maxk (int): number of neighbours to save metric (str): metric used to compute the distances period (float, np.ndarray(float)): sizes of PBC walls. Single value is interpreted as cubic box. Returns: distances (np.ndarray(int)): N x maxk matrix, indices of the neighbours of each point dist_indices (np.ndarray(float)): N x maxk matrix, distances of the neighbours of each point """ if period is None: nbrs = NearestNeighbors(n_neighbors=maxk, metric=metric, n_jobs=n_jobs).fit(X) distances, dist_indices = nbrs.kneighbors(X_new) # in case of hamming distance, make them integer if metric == "hamming": distances *= X.shape[1] else: if metric == "euclidean" or metric == "minkowski": p = 2 elif metric == "manhattan": p = 1 else: raise KeyError( "periodic distance computation is supported only for euclidean and manhattan metrics" ) tree = cKDTree(X, boxsize=period) distances, dist_indices = tree.query(X_new, k=maxk, p=p, workers=n_jobs) return distances, dist_indices
[docs] def compute_nn_distances(X, maxk, metric="euclidean", period=None, n_jobs=None): """For each point, compute the distances from its first maxk nearest neighbours Args: X (np.ndarray): points array of dimension N x D maxk (int): number of neighbours to save metric (str): metric used to compute the distances period (float, np.ndarray(float)): sizes of PBC walls. Single value is interpreted as cubic box. Returns: distances (np.ndarray(int)): N x maxk matrix, indices of the neighbours of each point dist_indices (np.ndarray(float)): N x maxk matrix, distances of the neighbours of each point """ distances, dist_indices = compute_cross_nn_distances( X, X, maxk + 1, metric=metric, period=period, n_jobs=n_jobs ) zero_dists = np.sum(distances[:, 1:] <= 1.01 * np.finfo(np.float32).eps) if zero_dists > 0: warnings.warn( "There are points with neighbours at 0 distance, meaning the dataset probably has identical points.\n" "This can cause problems in various routines.\nWe suggest to either perform smearing of distances using\n" "remove_zero_dists()\n" "or remove identical points using\n" "remove_identical_points())." ) return distances, dist_indices
def cast_to64(myarray): if myarray.dtype == "float32": myarray = myarray.astype("float64") return myarray # -------------------------------------------------------------------------------------- # Helper functions def _neg_loglik(dtype, d, mus, n1, n2): mus, n1, n2 = _filter_mus(dtype, mus, n1, n2) N = len(mus) term1 = (N - 1) * np.log(d) term2 = np.sum((n2 - n1 - 1) * np.log(mus**d - 1)) term3 = -np.sum(np.log(sp.beta(n2 - n1, n1))) term4 = -np.sum((((n2 - 1) * d) + 1) * np.log(mus)) return -(term1 + term2 + term3 + term4) def _neg_dloglik_did(d, mus, n1, n2, N, eps): """Compute the negative derivative of the log likelihood with respect to the id.""" one_m_mus_d = 1.0 - mus ** (-d) "regularize small numbers" one_m_mus_d[one_m_mus_d < 2 * eps] = 2 * eps summation = np.sum(((1 - n2 + n1) / one_m_mus_d + n2 - 1.0) * np.log(mus)) return summation - (N - 1) / d def _filter_mus(dtype, mus, n1, n2): indx = np.nonzero(mus == 1) mus[indx] += 10 * np.finfo(dtype).eps q3, q2 = np.percentile(mus, [95, 50]) mu_max = ( 20 * (q3 - q2) + q2 ) # very generous threshold: to accomodate fat tailed distributions select = ( mus < mu_max ) # remove high mu values related very likely to overlapping datapoints mus = mus[select] if isinstance(n1, np.ndarray): n1 = n1[select] if isinstance(n2, np.ndarray): n2 = n2[select] return mus, n1, n2 def _argmax_loglik(dtype, d0, d1, mus, n1, n2, eps=1.0e-7): mus, n1, n2 = _filter_mus(dtype, mus, n1, n2) N = len(mus) l1 = _neg_dloglik_did(d1, mus, n1, n2, N, eps) while abs(d0 - d1) > eps: d2 = (d0 + d1) / 2.0 l2 = _neg_dloglik_did(d2, mus, n1, n2, N, eps) if l2 * l1 > 0: d1 = d2 else: d0 = d2 d = (d0 + d1) / 2.0 return d def _fisher_info_scaling(id_ml, mus, n1, n2, eps): N = len(mus) one_m_mus_d = 1.0 - mus ** (-1.0 * id_ml) "regularize small numbers" one_m_mus_d[one_m_mus_d < eps] = eps log_mu = np.log(mus) j0 = N / id_ml**2 factor1 = np.divide(log_mu, one_m_mus_d) factor2 = mus ** (-id_ml) tmp = np.multiply(factor1**2, factor2) j1 = np.sum((n2 - n1 - 1) * tmp) return j0 + j1 def log_binom_stirling(k, n): return ( (k + 0.5) * np.log(k) - (n + 0.5) * np.log(n) - (k - n + 0.5) * np.log(k - n) - 0.5 * np.log(2 * np.pi) ) def binomial_loglik(d, k, n, r): if isinstance(k, np.ndarray): pk = np.histogram(k, bins=np.arange(-0.5, k.max() + 1.5))[0] pk = pk / pk.sum() else: pk = np.ones(k + 1) k = [k] log_binom = np.log(binom(k, n)) if np.any(log_binom == np.inf): mask = np.where(log_binom == np.inf)[0] log_binom[mask] = log_binom_stirling(k[mask], n[mask]) return -np.sum( n * d * np.log(r) + (k - n) * np.log(1.0 - r**d) + log_binom + np.log([pk[ki] for ki in k]) ) def _compute_binomial_cramerrao(d, k, r, n): """Calculate the Cramer Rao lower bound for the variance associated with the binomial estimator Args: d (float): intrinsic dimension k (float, np.ndarray(float)): number of neighbours within the external shell r (float): ratio among internal and external radii n (int): number of points of the dataset Returns: CramerRao lower bound (float) """ if isinstance(k, np.ndarray): k = k.mean() return (r ** (-d) - 1) / (k * n * np.log(r) ** 2) # -------------------------------------------------------------------------------------- # Others # -------------------------------------------------------------------------------------- def _align_arrays(set1, err1, set2, err2=None): """Computes the constant offset between two sets of error-affected measures and returns the first array aligned to the second, shifted by such offset. The offset is computed by inverse-variance weighting least square linear regression of a constant law on the differences between the two sets. Args: set1 (np.array(float)): array containing the first set of values, to be aligned to set2. err1 (np.array(float)): array containing the statistical errors on the values set1. set2 (np.array(float)): array containing the reference set of values, to which set1 will be aligned. err2 (np.array(float), optional): array containing the statistical errors on the values set2. If not given, set2 is assumed to contain errorless measures Returns: new_set2 (np.array(float)): set1 - offset offset (float): constant offset between the two sets """ if err2 is None: assert set1.shape == set2.shape == err1.shape w = 1.0 / np.square(err1) else: assert set1.shape == set2.shape == err1.shape == err2.shape w = 1.0 / (np.square(err1) + np.square(err2)) diffs = set1 - set2 offset = np.average(diffs, weights=w) return offset, set1 - offset # -------------------------------------------------------------------------------------- def _compute_pull_variables(set1, err1, set2, err2=None): """Computes the pull distribution between two sets of error-affected measures. For each value i the pull variable is defined as chi[i] = (set1[i]-set2[i])/sqrt(err1[i]^2+err2[i]^2).\ If err2 is not given, set2 is assumed to contain errorless measures. Args: set1 (np.array(float)): array containing the first set of values, to be aligned to set2. err1 (np.array(float)): array containing the statistical errors on the values set1. set2 (np.array(float)): array containing the reference set of values. err2 (np.array(float), optional): array containing the statistical errors on the values set2. If not given, set2 is assumed to contain errorless measures Returns: pull (np.array(float)): array of the pull variables """ if err2 is None: assert set1.shape == set2.shape == err1.shape return (set1 - set2) / err1 else: assert set1.shape == set2.shape == err1.shape == err2.shape den = np.sqrt(np.square(err1) + np.square(err2)) return (set1 - set2) / den def _beta_prior(k, n, r, a0=1, b0=1, posterior_profile=False): """Compute the posterior distribution of d given the input aggregates Since the likelihood is given by a binomial distribution, its conjugate prior is a beta distribution. However, the binomial is defined on the ratio of volumes and so do the beta distribution. As a consequence one has to change variable to have the distribution over d Args: k (nd.array(int) or int): number of points within the external shells n (nd.array(int)): number of points within the internal shells r (float): ratio between shells' radii a0 (float): beta distribution parameter, default =1 for flat prior b0 (float): prior initializer, default =1 for flat prior plot (bool,default=False): plot the posterior and give a numerical estimate other than the analytical one Returns: mean_bayes (float): mean value of the posterior std_bayes (float): std of the posterior d_range (np.ndarray(float), optional): domain of the posterior (if plot==True) P (np.ndarray(float), optional): values of the posterior on the domain d_range (if plot==True) """ D_MAX = 100.0 D_MIN = 0.0001 a = a0 + n.sum() if isinstance(k, (np.int64, int)): b = b0 + k * n.shape[0] - n.sum() else: b = b0 + k.sum() - n.sum() posterior = beta_d(a, b) if posterior_profile: import matplotlib.pyplot as plt def p_d(d): return abs(posterior.pdf(r**d) * (r**d) * np.log(r)) dx = 0.1 d_left = D_MIN d_right = D_MAX + dx + d_left d_range = np.arange(d_left, d_right, dx) P = np.array([p_d(di) for di in d_range]) * dx mask = P != 0 elements = mask.sum() # if less than 3 points !=0 are found, reduce the interval while elements < 3: dx /= 10 d_range = np.arange(d_left, d_right, dx) P = np.array([p_d(di) for di in d_range]) * dx mask = P != 0 elements = mask.sum() # with more than 3 points !=0 we can restrict the domain and have a smooth distribution # I choose 1000 points but such quantity can be varied according to necessity ind = np.where(mask)[0] d_left = d_range[ind[0]] - 0.5 * dx if d_range[ind[0]] - dx > 0 else D_MIN d_right = d_range[ind[-1]] + 0.5 * dx d_range = np.linspace(d_left, d_right, 1000) dx = d_range[1] - d_range[0] P = np.array([p_d(di) for di in d_range]) * dx plt.plot(d_range, P) plt.xlabel("d") plt.ylabel("P(d)") E_d_emp = (d_range * P).sum() S_d_emp = np.sqrt((d_range * d_range * P).sum() - E_d_emp * E_d_emp) print("empirical average:\t", E_d_emp, "\nempirical std:\t\t", S_d_emp) E_d = (sp.digamma(a) - sp.digamma(a + b)) / np.log(r) S_d = np.sqrt((sp.polygamma(1, a) - sp.polygamma(1, a + b)) / np.log(r) ** 2) if posterior_profile: return E_d, S_d, d_range, P else: return E_d, S_d, None, None