Source code for base

# Copyright 2021-2023 The DADApy Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""
The *base* module contains the *Base* class.

This class contains essential methods and attributes needed for all other classes.
"""
import multiprocessing
import time
import warnings

import numpy as np

from dadapy._utils.utils import compute_nn_distances, from_all_distances_to_nndistances

cores = multiprocessing.cpu_count()


[docs] class Base: """Base class.""" def __init__( self, coordinates=None, distances=None, maxk=None, period=None, verbose=False, n_jobs=cores, rng_seed=42, ): """Containing coordinates and/or distances and some basic methods. Args: coordinates (np.ndarray(float)): the data points loaded, of shape (N , dimension of embedding space) distances (np.ndarray(float), tuple(np.ndarray(float), np.ndarray(float)) ): Distance matrix (N x N), or tuple of nearest neighbor distances (N x maxk) and their indices (N x maxk). maxk (int): maximum number of neighbours to be considered for the calculation of distances period (np.array(float), optional): array containing the periodicity of each coordinate. Default is None verbose (bool): whether you want the code to speak or shut up njobs (int): number of cores to be used """ self.X = coordinates self.maxk = maxk # remove from here self.verb = verbose self.n_jobs = n_jobs self.dims = None self.N = None self.metric = "euclidean" # remove from here self.period = period # remove from here self.rng = np.random.default_rng(rng_seed) if self.X is not None: assert isinstance( self.X, np.ndarray ), "Coordinates must be in numpy ndarray format" if ( self.X.dtype == np.float32 or self.X.dtype == np.float16 or self.X.dtype == np.float64 ): self.X = self.X.astype(np.float64, casting="safe") else: warnings.warn( f"data type is {self.X.dtype}: most methods work only with float-type inputs", stacklevel=2, ) self.N = self.X.shape[0] self.dims = coordinates.shape[1] self.distances = None self.dist_indices = None if self.maxk is None: # remove from here self.maxk = min(100, self.N - 1) if distances is not None: self.distances, self.dist_indices, self.N, self.maxk = self._init_distances( distances, self.maxk ) self.dtype = np.float64 self.eps = np.finfo(self.dtype).eps # this function is useful for overlap with another dataset and makes __init__ little bit shorter def _init_distances(self, distances, maxk=None): if isinstance(distances, tuple): assert isinstance( distances[0], np.ndarray ), "distances must be in numpy ndarray format" assert isinstance( distances[1], np.ndarray ), "distance indices must be in numpy ndarray format" assert ( distances[0].shape[0] == distances[1].shape[0] ), "distances and indices must have the same shape" if maxk is None: maxk = min(100, distances[0].shape[1] - 1) elif maxk > (distances[0].shape[1] - 1): maxk = distances[0].shape[1] - 1 warnings.warn( f"maxk requested bigger than number of features: setting maxk to {maxk}", stacklevel=2, ) N = distances[0].shape[0] dist = distances[0][:, : maxk + 1] dist_indices = distances[1][:, : maxk + 1] else: assert isinstance( distances, np.ndarray ), "distances must be in numpy ndarray format" assert ( distances.shape[0] == distances.shape[1] ), "distance matrix shape must be N x N" N = distances.shape[0] if maxk is None: maxk = min(100, distances.shape[1] - 1) elif maxk > (distances.shape[1] - 1): maxk = distances.shape[1] - 1 warnings.warn( f"maxk requested bigger than number of features: setting maxk to {maxk}", stacklevel=2, ) dist, dist_indices = from_all_distances_to_nndistances(distances, maxk) if dist.dtype == np.float32: dist = dist.astype(np.float64, casting="safe") return dist, dist_indices, N, maxk # ----------------------------------------------------------------------------------------------
[docs] def compute_distances( self, maxk=None, metric="euclidean", period=None, n_jobs=None ): """Compute distaces between points up to the maxk nearest neighbour. Args: maxk: maximum number of neighbours for which distance is computed and stored metric: type of metric period (float or np.array): periodicity (only used for periodic distance computation). Default is None. """ if self.verb: print("Computation of distances started") sec = time.time() self.metric = metric if period is not None: if isinstance(period, np.ndarray) and period.shape == (self.dims,): self.period = period elif isinstance(period, (int, float)): self.period = np.full((self.dims), fill_value=period, dtype=float) else: raise ValueError( f"'period' must be either a float scalar or a numpy array of floats of shape ({self.dims},)" ) if maxk is not None: self.maxk = maxk else: assert ( self.maxk is not None ), "set parameter maxk in the function or for the class" if self.verb and period is not None: print( "Computing periodic distances.", "The coordinates are assumed to be in the range [0, period]", ) if n_jobs is not None: self.n_jobs = n_jobs if self.verb: print(f"Computation of the distances up to {self.maxk} NNs started") self.distances, self.dist_indices = compute_nn_distances( self.X, self.maxk, self.metric, self.period, self.n_jobs ) sec2 = time.time() if self.verb: print("{0:0.2f} seconds for computing distances".format(sec2 - sec))
# ------------------------------------------------------------------------------- # better to use this formulation which can be applied to _mus_scaling_reduce_func
[docs] def remove_zero_dists(self, distances): """Find zero neighbour distances and substitute the numerical zeros with a very small number. This method is mostly useful to regularize the computation of certain id estimators. """ # find all distances which are 0 indices = np.nonzero(distances[:, 1:] < np.finfo(self.dtype).eps) # set distance to epsilon distances[:, 1:][indices] = np.finfo(self.dtype).eps return distances
[docs] def remove_identical_points(self): """Find points that are numerically identical and remove them. For very large datasets this method might be slow; you might want to use a command like: awk '!seen[$0]++' . See https://unix.stackexchange.com/questions/11939/how-to-get-only-the-unique-results-without-having-to-sort-data for more information """ if self.N > 100000: print( "WARNING: this method might be very slow for large datasets.\n" "We suggest to use something like \"awk '!seen[$0]++'\"" ) # removal of overlapping data points x_unique = np.unique(self.X, axis=0) n_unique = x_unique.shape[0] if n_unique < self.N: print( f"{self.N - n_unique} overlapping datapoints found: keeping {n_unique} unique elements", "WARNING: the order of points has been changed!", ) self.X = x_unique self.N = n_unique self.maxk = min(self.maxk, self.N - 1) if self.distances is not None: print("distances between points will be recomputed") self.compute_distances() else: print("No identical identical points were found")