Source code for density_advanced

# Copyright 2021-2024 The DADApy Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""
The *density_advanced* module contains the *DensityEstimation* class.

Different algorithms to estimate the logdensity, the logdensity gradientest and the logdensity differences are
implemented as methods of this class. In particular, differently from the methods implemented in the DensityEstimation,
the methods in the DensityAdvanced class are based on the sparse neighbourhood graph structure which is implemented
in the NeighGraph class.
"""

import multiprocessing
import time
import warnings

import numpy as np
from scipy import linalg as slin
from scipy import sparse

from dadapy._cython import cython_grads as cgr
from dadapy._utils.density_estimation import return_not_normalised_density_kstarNN
from dadapy.density_estimation import DensityEstimation
from dadapy.neigh_graph import NeighGraph

cores = multiprocessing.cpu_count()


[docs] class DensityAdvanced(DensityEstimation, NeighGraph): """Computes the log-density gradient and its covariance at each point and other log-density-related properties. Can return an estimate of the gradient of the log-density at each point and an estimate of the error on each component. Can return an estimate of log-density differences and their error each point based on the gradient estimates. Can compute the log-density and its error at each point using BMTI, i.e. integrating the log-density differences on the neighbourhood graph Attributes: grads (np.ndarray(float), optional): size N. Contains the gradient components estimated at each point i grads_var (np.ndarray(float), optional): size N x dims. For each line i contains the estimated variance of the gradient components at point i grads_covmat (np.ndarray(float), optional): size N x dims x dims. For each line i contains the estimated covariance matrix of the gradient components at point i pearson_array (np.ndarray(float), optional): size nspar. At position p corresponding to the directed edge (i,j) of the neighbourhood graph, it contains an estimate of the Pearson correlation coefficient between the directed deltaFij computed with the gradients in i and in j, namely between dot(g_i,(x_j-x_i)) and dot(g_j,(x_j-x_i)). Fij_array (list(np.array(float)), optional): size nspar. Stores for each couple in nind_list the estimates of deltaF_ij computed from point i as semisum of the gradients in i and minus the gradient in j Fij_var_array (np.array(float), optional): size nspar. Stores for each couple in nind_list the estimates of the squared errors on the values in Fij_array inv_deltaFs_cov (np.array(float), optional): size nspar. Stores for each couple in nind_list the estimates of the inverse cross-covariance of the deltaFs, that is: cov [ deltaFij , deltaFlm ] . """ def __init__( self, coordinates=None, distances=None, maxk=None, verbose=False, n_jobs=cores ): """Initialise the DensityEstimation class.""" super().__init__( coordinates=coordinates, distances=distances, maxk=maxk, verbose=verbose, n_jobs=n_jobs, ) self.grads = None self.grads_var = None self.grads_covmat = None self.pearson_array = None self.Fij_array = None self.Fij_var_array = None self.inv_deltaFs_cov = None # ----------------------------------------------------------------------------------------------
[docs] def set_kstar(self, k=0): """Set all elements of kstar to a fixed value k. Overload the set_kstar method from the superior classes. First, call the set_kstar from the superior classes. Then also reset all other AdvanceDensity attributes depending on kstar to None. Args: k: number of neighbours used to compute the density. It can be an iteger or an array of integers """ super().set_kstar(k) self.grads = None self.grads_var = None self.grads_covmat = None self.pearson_array = None self.Fij_array = None self.Fij_var_array = None self.inv_deltaFs_cov = None
# ----------------------------------------------------------------------------------------------
[docs] def compute_grads(self, comp_covmat=False): """Compute the gradient of the log density each point using kstar nearest neighbors and store Estimate the gradient using an improved version of the mean-shift gradient algorithm [Fukunaga1975] as presented in [Carli2024]. Store the computed gradients in grads. Also compute the variance of the gradient and store it in grads_var. Optionally, the whole covariance matrix can be estimated for gradient Args: comp_covmat (bool): if True, the whole covariance matrix is computed for each gradient and stored in grads_covmat """ # compute optimal k if self.kstar is None: self.compute_kstar() # check or compute vector_diffs if self.neigh_vector_diffs is None: self.compute_neigh_vector_diffs() if self.verb: print("Estimation of the density gradient started") sec = time.time() if comp_covmat is False: self.grads, self.grads_var = cgr.return_grads_and_var_from_nnvecdiffs( self.neigh_vector_diffs, self.nind_list, self.nind_iptr, self.kstar, self.intrinsic_dim, ) self.grads_var = np.einsum( "ij, i -> ij", self.grads_var, self.kstar / (self.kstar - 1) ) # Bessel's correction for the unbiased sample variance estimator else: self.grads, self.grads_covmat = cgr.return_grads_and_covmat_from_nnvecdiffs( self.neigh_vector_diffs, self.nind_list, self.nind_iptr, self.kstar, self.intrinsic_dim, ) # Bessel's correction for the unbiased sample variance estimator self.grads_covmat = np.einsum( "ijk, i -> ijk", self.grads_covmat, self.kstar / (self.kstar - 1) ) smallnumber = 1.0e-10 self.grads_covmat += smallnumber * np.tile( np.eye(self.dims), (self.N, 1, 1) ) # get diagonal elements of the covariance matrix self.grads_var = np.zeros((self.N, self.dims)) for i in range(self.N): self.grads_var[i, :] = np.diag(self.grads_covmat[i, :, :]) sec2 = time.time() if self.verb: print("{0:0.2f} seconds computing gradients".format(sec2 - sec))
# ----------------------------------------------------------------------------------------------
[docs] def compute_pearson(self, similarity_method="jaccard"): """ Compute, for any couple (i,j) of points connected on the directed neighbourhood graph, an estimate of the Pearson correlation coefficient between the directed deltaFij computed with the gradients in i and in j, namely between dot(g_i,(x_j-x_i)) and dot(g_j,(x_j-x_i)). These are needed in order to compute the errors on the deltaFs. They are estimated as the neighbourhood similarity index (see documentation for compute_neigh_similarity_index) times the sign of the product of the two directed deltaFijs. The Pearson coefficients take values between -1 and 1 and are stored in the pearson_array attribute. Args: similarity_method (str): similarity_method to compute the neighbourhood similarity index (see documentation for compute_neigh_similarity_index). """ # check or compute neigh_similarity_index if self.neigh_similarity_index is None: self.compute_neigh_similarity_index(method=similarity_method) # check or compute grads if self.grads is None: self.compute_grads() sec = time.time() # estimate pearson sign Fij_i_oneway = np.einsum( "ij, ij -> i", self.grads[self.nind_list[:, 0]], self.neigh_vector_diffs ) Fij_j_oneway = np.einsum( "ij, ij -> i", self.grads[self.nind_list[:, 1]], self.neigh_vector_diffs ) psign_est = np.sign(Fij_i_oneway * Fij_j_oneway) self.pearson_array = self.neigh_similarity_index * psign_est sec2 = time.time() if self.verb: print( "{0:0.2f} seconds to carry out Pearson coefficients estimation.".format( sec2 - sec ) )
[docs] def compute_deltaFs(self, similarity_method="jaccard", comp_p_mat=False): """Compute deviations deltaFij to standard kNN log-densities at point j as seen from point i using a linear expansion with as slope the semisum of the average gradient of the log-density over the neighbourhood of points i and j. If not defined, compute the Pearson coefficients p (see docs for pearson_array) by running compute_pearson. Then use these p in the estimate of the variances on the deltaFij as 1/4*(E_i^2+E_j^2+2*E_i*E_j*chi), where E_i is the error on the estimate of grad_i*DeltaX_ij (see [Carli2024]). The log-density differences are stored Fij_array, their variances in Fij_array_var. Args: similarity_method: see docs for neigh_graph.compute_neigh_similarity_index function comp_p_mat: see docs for compute_pearson function """ if self.grads_covmat is None: self.compute_grads(comp_covmat=True) if self.verb: print( "Estimation of the gradient semisum (linear) corrections deltaFij to the log-density started" ) sec = time.time() Fij_array = np.zeros(self.nspar) self.Fij_var_array = np.zeros(self.nspar) g0 = self.grads[self.nind_list[:, 0]] g1 = self.grads[self.nind_list[:, 1]] g_var0 = self.grads_covmat[self.nind_list[:, 0]] g_var1 = self.grads_covmat[self.nind_list[:, 1]] # check or compute common_neighs if self.pearson_array is None: self.compute_pearson(similarity_method=similarity_method) Fij_array = 0.5 * np.einsum("ij, ij -> i", g0 + g1, self.neigh_vector_diffs) vari = np.einsum( "ij, ij -> i", self.neigh_vector_diffs, np.einsum("ijk, ik -> ij", g_var0, self.neigh_vector_diffs), ) varj = np.einsum( "ij, ij -> i", self.neigh_vector_diffs, np.einsum("ijk, ik -> ij", g_var1, self.neigh_vector_diffs), ) self.Fij_var_array = 0.25 * ( vari + varj + 2 * self.pearson_array * np.sqrt(vari * varj) ) sec2 = time.time() if self.verb: print("{0:0.2f} seconds computing gradient corrections".format(sec2 - sec)) self.Fij_array = Fij_array self.Fij_var_array = self.Fij_var_array
# ----------------------------------------------------------------------------------------------
[docs] def compute_diag_inv_deltaFs_cross_covariance_LSDI( self, similarity_method="jaccard" ): """Compute the diagonal of the appoximate inverse of the deltaFs cross-covariance cov[deltaFij,deltaFlm] using the LSDI approximation (see compute_density_BMTI docs) Args: similarity_method: see docs for neigh_graph.compute_neigh_similarity_index function """ # check for deltaFs if self.neigh_similarity_index_mat is None: self.compute_neigh_similarity_index_mat(method=similarity_method) # check or compute deltaFs_grads_semisum if self.Fij_var_array is None: self.compute_deltaFs() # estimate directional deltaFs if self.verb: print("Estimation of the directional deltaFs started") sec = time.time() Fij_i_oneway = np.einsum( "ij, ij -> i", self.grads[self.nind_list[:, 0]], self.neigh_vector_diffs ) Fij_j_oneway = np.einsum( "ij, ij -> i", self.grads[self.nind_list[:, 1]], self.neigh_vector_diffs ) sec2 = time.time() if self.verb: print( "{0:0.2f} seconds estimating the directional deltaFs".format(sec2 - sec) ) # get grads covariance matrices g_var0 = self.grads_covmat[self.nind_list[:, 0]] g_var1 = self.grads_covmat[self.nind_list[:, 1]] # estimate standard deviations on directional deltaFs epsi = np.sqrt( np.einsum( "ij, ij -> i", self.neigh_vector_diffs, np.einsum("ijk, ik -> ij", g_var0, self.neigh_vector_diffs), ) ) epsj = np.sqrt( np.einsum( "ij, ij -> i", self.neigh_vector_diffs, np.einsum("ijk, ik -> ij", g_var1, self.neigh_vector_diffs), ) ) # compute epsilon^i_ij * sgn(deltaF^i_ij) seps0 = epsi * np.sign(Fij_i_oneway) seps1 = epsj * np.sign(Fij_j_oneway) if self.verb: print( "Estimation of the diagonal of the inverse of the deltaFs cross-covariance started" ) sec = time.time() # compute a diagonal approximation of the inverse of the cross-covariance matrix self.inv_deltaFs_cov = cgr.return_diag_inv_deltaFs_cross_covariance_LSDI( self.nind_list, self.neigh_similarity_index_mat, self.Fij_var_array, seps0, seps1, ) sec2 = time.time() if self.verb: print( "{0:0.2f} seconds computing the diagonal of the inverse of the deltaFs cross-covariance".format( sec2 - sec ) )
# ----------------------------------------------------------------------------------------------
[docs] def compute_density_BMTI( self, delta_F_inv_cov="uncorr", comp_log_den_err=False, solver="sp_direct", sp_direct_perm_spec="NATURAL", alpha=1, log_den=None, log_den_err=None, ): """Compute the log-density for each point using BMTI. If alpha<1, the algorithm also includes a regularisatin. The regulariser log-density and its errors can be passed as arguments: log_den and log_den_err. If any of these two is not specified, use kstarNN estimator as a regulariser. Args: delta_F_inv_cov (str): specify the method used to invert the cross-covariance matrix C of the log-density deviations cov[deltaF_ij,deltaF_kl]. Currently implemented methods: "uncorr" (default): all the deltaFs are assumed uncorrelated, i.e. C is assumed to be diagonal with diagonal = Fij_var_array "identity": C is assumed as the identity matrix, so that all terms in the BMTI likelihood are taken unweighted (variance of deltaF_ij = 1 for all (i,j) couples) "LSDI": (Least Squares with respect to a Diagonal Inverse). Invert the cross-covariance C by finding the approximate diagonal inverse which multiplied by C gives the least-squares closest matrix to the identity in the Frobenius norm comp_log_den_err (bool): if True, compute the error on the BMTI estimates. Can be highly time consuming solver (str): specify the solver to use when solving the BMSTI linear system. Three sparse (memory efficient) and a dense solvers are implemented: 'sp_direct' (default): scipy.sparse.linalg.spsolve. Performs a LU decomposition of the matrix and then solves the linear system directly. More robust but less memory efficient than other implemented sparse solvers. Slower than iterative solvers for very sparse and large matrices. 'sp_cg': scipy.sparse.linalg.cg. This is the iterative conjugate gradient method. It might be preferred to 'direct' for large and sparse matrices. If a log-density estimate is alredy stored in self.log_den, it will be used as a guess for the solution for a great spedup. If this option is chosen, we suggest you call compute_density_kstarNN() right before computing BMTI. 'sp_cg_precond': same as 'cg', scipy.sparse.linalg.cg, but with a preconditioner estimated unsuperivisedly with a partial LU decomposition (scipy.sparse.linalg.spilu) of the matrix. In settings where 'direct' performs better than 'cg', 'cg_precond' is likely to perform better than 'spolve' and 'cg'. If 'cg' already performs better than 'direct', 'cg_precond' is likely to perform worse than 'cg' alone. 'dense': numpy.linalg.solve. Direct solver for dense matrices. O(N^3) complexity, O(N^2) memory complexity. The solver automatically uses multiprocessing if available. This option is suited for small datasets or when memory and cores are not an issue. sp_direct_perm_spec (str): specify the permutation strategy to use when solving the linear system with the 'sp_direct' solver. See the scipy.sparse.linalg.spsolve documentation for more information. alpha (float): can take values from 0.0 to 1.0. Indicates the portion of BMTI in the sum of the likelihoods alpha*L_BMTI + (1-alpha)*L_kstarNN. Setting alpha=1.0 corresponds to not reguarising BMTI. log_den (np.ndarray(float)): size N. The array of the log-densities of the regulariser. log_den_err (np.ndarray(float)): size N. The array of the log-density errors of the regulariser. """ # compute changes in free energy if self.Fij_array is None: self.compute_deltaFs() # note: this should be called after the computation of the deltaFs # since otherwhise self.log_den and self.log_den_err are redefined to None via set kstar if log_den is not None and log_den_err is not None: self.log_den = log_den self.log_den_err = log_den_err else: log_den, log_den_err, _ = return_not_normalised_density_kstarNN( self.distances, self.intrinsic_dim, self.kstar, interpolation=False, bias=False, ) # Normalise density log_den -= np.log(self.N) self.log_den = log_den self.log_den_err = log_den_err # add a warnings.warning if self.N > 10000 and solver is 'dense' if self.N > 10000 and solver == "dense": warnings.warn( "The number of points is large and you are not using a memory efficient option. \ If you run into memory issues, consider using other options." ) if self.verb: print("BMTI density estimation started") sec = time.time() # define the likelihood covarince matrix A, deltaFcum = self._get_BMTI_reg_linear_system(delta_F_inv_cov, alpha) sec2 = time.time() if self.verb: print("{0:0.2f} seconds to fill get linear system ready".format(sec2 - sec)) # solve linear system log_den = self._solve_BMTI_reg_linar_system( A, deltaFcum, solver, sp_direct_perm_spec ) self.log_den = log_den if self.verb: print("{0:0.2f} seconds to solve linear system".format(time.time() - sec2)) sec2 = time.time() # compute error if comp_log_den_err is True: A = A.todense() B = slin.pinvh(A) self.log_den_err = np.sqrt(np.diag(B)) if self.verb: print("{0:0.2f} seconds inverting A matrix".format(time.time() - sec2)) sec2 = time.time() sec2 = time.time() if self.verb: print("{0:0.2f} seconds for BMTI density estimation".format(sec2 - sec))
# ---------------------------------------------------------------------------------------------- def _get_BMTI_reg_linear_system(self, delta_F_inv_cov, alpha): sec = time.time() if delta_F_inv_cov == "uncorr": # define redundancy factor for each A matrix entry as the geometric mean of the 2 corresponding kstar k1 = self.kstar[self.nind_list[:, 0]] k2 = self.kstar[self.nind_list[:, 1]] redundancy = np.sqrt(k1 * k2) tmpvec = ( np.ones(self.nspar, dtype=np.float_) / self.Fij_var_array / redundancy ) elif delta_F_inv_cov == "LSDI": # self.compute_deltaFs_inv_cross_covariance() self.compute_diag_inv_deltaFs_cross_covariance_LSDI() tmpvec = self.inv_deltaFs_cov elif delta_F_inv_cov == "identity": tmpvec = np.ones(self.nspar, dtype=np.float_) else: raise ValueError( "The delta_F_inv_cov parameter is not valid, choose 'uncorr', 'LSDI' or 'none'" ) if self.verb: print( "{0:0.2f} seconds finding the diagonal of the deltaFs cross-covariance matrix".format( time.time() - sec ) ) sec = time.time() # compute adjacency matrix A = sparse.csr_matrix( (-tmpvec, (self.nind_list[:, 0], self.nind_list[:, 1])), shape=(self.N, self.N), dtype=np.float_, ) # compute coefficients vector supp_deltaF = sparse.csr_matrix( (self.Fij_array * tmpvec, (self.nind_list[:, 0], self.nind_list[:, 1])), shape=(self.N, self.N), dtype=np.float_, ) # make A symmetric A = alpha * sparse.lil_matrix(A + A.transpose()) # insert kstarNN with factor 1-alpha in the Gaussian approximation # ALREADY MULTIPLIED A BY ALPHA diag = ( np.array(-A.sum(axis=1)).reshape((self.N,)) + (1.0 - alpha) / self.log_den_err**2 ) A.setdiag(diag) deltaFcum = ( alpha * ( np.array(supp_deltaF.sum(axis=0)).reshape((self.N,)) - np.array(supp_deltaF.sum(axis=1)).reshape((self.N,)) ) + (1.0 - alpha) * self.log_den / self.log_den_err**2 ) if self.verb: print("{0:0.2f} seconds to fill sparse matrix".format(time.time() - sec)) return A, deltaFcum def _solve_BMTI_reg_linar_system(self, A, deltaFcum, solver, sp_direct_perm_spec): if solver == "dense": # dense solver O(N^3) complexity if self.verb: print("Solving dense linear system") log_den = np.linalg.solve(A.todense(), deltaFcum) elif solver == "sp_cg": # conjugate gradient without preconditioner if self.verb: print( "Solving by conjugate gradient sparse solver without preconditioner" ) log_den = sparse.linalg.cg( A.tocsr(), deltaFcum, x0=self.log_den, atol=0.0, maxiter=None )[0] elif solver == "sp_cg_precond": # conjugate gradient with preconditioner if self.verb: print( "Solving by conjugate gradient sparse solver with estimated (spilu) preconditioner" ) # Create preconditioner sec = time.time() A_csc = sparse.csc_matrix(A) # Ensure CSC format for spilu M = sparse.linalg.spilu(A_csc) preconditioner = sparse.linalg.LinearOperator(A_csc.shape, matvec=M.solve) if self.verb: print("{0:0.2f} seconds preconditioning".format(time.time() - sec)) log_den = sparse.linalg.cg( A.tocsr(), deltaFcum, M=preconditioner, x0=self.log_den, atol=0.0, maxiter=None, )[0] else: # default solver: sp_direct if solver != "sp_direct": warnings.warn( f"The solver '{solver}' selected is not among the options. Using 'sp_direct' instead." ) if self.verb: print( f"Solving with 'sp_direct' sparse solver with perm_spec='{sp_direct_perm_spec}'" ) print("cast to csr") log_den = sparse.linalg.spsolve( A.tocsr(), deltaFcum, permc_spec=sp_direct_perm_spec ) return log_den