Source code for density_estimation

# Copyright 2021-2023 The DADApy Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""
The *density_estimation* module contains the *DensityEstimation* class.

The different algorithms of density estimation are implemented as methods of this class.
"""

import multiprocessing
import time
import warnings

import numpy as np

from dadapy._cython import cython_density as cd
from dadapy._utils.density_estimation import (
    return_not_normalised_density_kstarNN,
    return_not_normalised_density_PAk,
    return_not_normalised_density_PAk_optimized,
)
from dadapy._utils.utils import compute_cross_nn_distances
from dadapy.kstar import KStar

cores = multiprocessing.cpu_count()


[docs] class DensityEstimation(KStar): """Computes the log-density and its error at each point and other properties. Inherits from class KStar. Can compute the log-density and its error at each point choosing among various kNN-based methods. Attributes: log_den (np.array(float), optional): array containing the N log-densities log_den_err (np.array(float), optional): array containing the N errors on the log_den """ def __init__( self, coordinates=None, distances=None, maxk=None, verbose=False, n_jobs=cores ): """Initialise the DensityEstimation class.""" super().__init__( coordinates=coordinates, distances=distances, maxk=maxk, verbose=verbose, n_jobs=n_jobs, ) self.log_den = None self.log_den_err = None # ----------------------------------------------------------------------------------------------
[docs] def set_kstar(self, k=0): """Set all elements of kstar to a fixed value k. Overload the set_kstar method from the superior class. First, call the set_kstar from the superior class. Then also reset all other DensityEstimation attributes depending on kstar to None. Args: k: number of neighbours used to compute the density. It can be an iteger or an array of integers """ super().set_kstar(k) self.log_den = None self.log_den_err = None
# ----------------------------------------------------------------------------------------------
[docs] def compute_density_kNN(self, k=10, bias=False): """Compute the density of each point using a simple kNN estimator. Args: k (int): number of neighbours used to compute the density Returns: log_den (np.ndarray(float)): estimated log density log_den_err (np.ndarray(float)): estimated error on log density """ if self.intrinsic_dim is None: _ = self.compute_id_2NN() if self.verb: print(f"k-NN density estimation started (k={k})") self.set_kstar(k) log_den, log_den_err, dc = return_not_normalised_density_kstarNN( self.distances, self.intrinsic_dim, self.kstar, interpolation=False, bias=bias, ) # Normalise density log_den -= np.log(self.N) self.log_den = log_den self.log_den_err = log_den_err self.dc = dc if self.verb: print("k-NN density estimation finished") return self.log_den, self.log_den_err
# ----------------------------------------------------------------------------------------------
[docs] def compute_density_kstarNN(self, Dthr=23.92812698, bias=False): """Compute the density of each point using a simple kNN estimator with an optimal choice of k. Args: Dthr (float): Likelihood ratio parameter used to compute optimal k, the value of Dthr=23.92 corresponds to a p-value of 1e-6. Returns: log_den (np.ndarray(float)): estimated log density log_den_err (np.ndarray(float)): estimated error on log density """ self.compute_kstar(Dthr) if self.verb: print("kstar-NN density estimation started") log_den, log_den_err, dc = return_not_normalised_density_kstarNN( self.distances, self.intrinsic_dim, self.kstar, interpolation=False, bias=bias, ) # Normalise density log_den -= np.log(self.N) self.log_den = log_den self.log_den_err = log_den_err self.dc = dc if self.verb: print("k-NN density estimation finished") return self.log_den, self.log_den_err
# ----------------------------------------------------------------------------------------------
[docs] def compute_density_kpeaks(self, Dthr=23.92812698): """Compute the density of each point as proportional to the optimal k value found for that point. This method is mostly useful for the kpeaks clustering algorithm. Args: Dthr: Likelihood ratio parameter used to compute optimal k, the value of Dthr=23.92 corresponds to a p-value of 1e-6. Returns: log_den (np.ndarray(float)): estimated log density log_den_err (np.ndarray(float)): estimated error on log density """ self.compute_kstar(Dthr) if self.verb: print("Density estimation for k-peaks clustering started") dc = np.zeros(self.N, dtype=float) log_den = np.zeros(self.N, dtype=float) log_den_err = np.zeros(self.N, dtype=float) log_den_min = 9.9e300 for i in range(self.N): k = self.kstar[i] dc[i] = self.distances[i, k] log_den[i] = k log_den_err[i] = 0 for j in range(1, k): jj = self.dist_indices[i, j] log_den_err[i] = log_den_err[i] + (self.kstar[jj] - k) ** 2 log_den_err[i] = np.sqrt(log_den_err[i] / k) if log_den[i] < log_den_min: log_den_min = log_den[i] # Normalise density self.log_den = log_den self.log_den_err = log_den_err self.dc = dc if self.verb: print("k-peaks density estimation finished") return self.log_den, self.log_den_err
# ----------------------------------------------------------------------------------------------
[docs] def compute_density_PAk(self, Dthr=23.92812698, optimized=True): """Compute the density of each point using the PAk estimator. Args: Dthr (float): Likelihood ratio parameter used to compute optimal k, the value of Dthr=23.92 corresponds to a p-value of 1e-6. Returns: log_den (np.ndarray(float)): estimated log density log_den_err (np.ndarray(float)): estimated error on log density """ # compute optimal k if self.kstar is None: self.compute_kstar(Dthr=Dthr) elif len(np.unique(self.kstar)) == 1: warnings.warn( "Found pointwise optimal k already computed and CONSTANT over the datapoints. \ Make sure to have used a point-adaptive k selection function such as \ 'self.compute_kstar()'' ", stacklevel=2, ) if self.verb: print("PAk density estimation started") sec = time.time() if optimized: log_den, log_den_err, dc = return_not_normalised_density_PAk_optimized( self.distances, self.intrinsic_dim, self.kstar, interpolation=False, ) else: log_den, log_den_err, dc = return_not_normalised_density_PAk( self.distances, self.intrinsic_dim, self.kstar, interpolation=False, ) sec2 = time.time() if self.verb: print( "{0:0.2f} seconds optimizing the likelihood for all the points".format( sec2 - sec ) ) # Normalize density log_den -= np.log(self.N) self.log_den = log_den self.log_den_err = log_den_err self.dc = dc if self.verb: print("PAk density estimation finished") return self.log_den, self.log_den_err
# ----------------------------------------------------------------------------------------------
[docs] def return_entropy(self): """Compute a very rough estimate of the sample Shannon entropy of the data distribution. The computation simply returns the average negative log probability estimates. Returns: H (float): the estimated entropy of the distribution """ assert self.log_den is not None H = -np.mean(self.log_den) return H
# ----------------------------------------------------------------------------------------------
[docs] def return_interpolated_density_kNN(self, X_new, k): """Return the kNN density of the primary dataset, evaluated on a new set of points "X_new". Args: X_new (np.ndarray(float)): The points onto which the density should be computed k (int): the number of neighbours considered for the kNN estimator Returns: log_den (np.ndarray(float)): log density of dataset evaluated on X_new log_den_err (np.ndarray(float)): error on log density estimates """ assert self.X is not None if self.intrinsic_dim is None: _ = self.compute_id_2NN() cross_distances, _ = compute_cross_nn_distances( X_new, self.X, self.maxk, self.metric, self.period ) kstar = np.ones(X_new.shape[0], dtype=int) * k log_den, log_den_err, _ = return_not_normalised_density_kstarNN( cross_distances, self.intrinsic_dim, kstar, interpolation=True ) # Normalise density log_den -= np.log(self.N) return log_den, log_den_err
# ----------------------------------------------------------------------------------------------
[docs] def return_interpolated_density_kstarNN(self, X_new, Dthr=23.92812698): """Return the kstarNN density of the primary dataset, evaluated on a new set of points "X_new". Args: X_new (np.ndarray(float)): The points onto which the density should be computed Dthr: Likelihood ratio parameter used to compute optimal k Returns: log_den (np.ndarray(float)): log density of dataset evaluated on X_new log_den_err (np.ndarray(float)): error on log density estimates """ assert self.X is not None if self.intrinsic_dim is None: _ = self.compute_id_2NN() cross_distances, cross_dist_indices = compute_cross_nn_distances( X_new, self.X, self.maxk, self.metric, self.period ) kstar = cd._compute_kstar_interp( self.intrinsic_dim, X_new.shape[0], self.maxk, Dthr, cross_dist_indices, cross_distances, self.distances, ) log_den, log_den_err, _ = return_not_normalised_density_kstarNN( cross_distances, self.intrinsic_dim, kstar, interpolation=True ) # Normalise density log_den -= np.log(self.N) return log_den, log_den_err
# ----------------------------------------------------------------------------------------------
[docs] def return_interpolated_density_PAk(self, X_new, Dthr=23.92812698): """Return the PAk density of the primary dataset, evaluated on a new set of points "X_new". Args: X_new (np.ndarray(float)): The points onto which the density should be computed Dthr: Likelihood ratio parameter used to compute optimal k Returns: log_den (np.ndarray(float)): log density of dataset evaluated on X_new log_den_err (np.ndarray(float)): error on log density estimates """ assert self.X is not None if self.intrinsic_dim is None: _ = self.compute_id_2NN() cross_distances, cross_dist_indices = compute_cross_nn_distances( X_new, self.X, self.maxk, self.metric, self.period ) kstar = cd._compute_kstar_interp( self.intrinsic_dim, X_new.shape[0], self.maxk, Dthr, cross_dist_indices, cross_distances, self.distances, ) log_den, log_den_err, _ = return_not_normalised_density_PAk( cross_distances, self.intrinsic_dim, kstar, self.maxk, interpolation=True ) # Normalise density log_den -= np.log(self.N) return log_den, log_den_err