# Copyright 2021-2025 The DADApy Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
The *causal_graph* module contains the *CausalGraph* class, which inherits from the *DiffImbalance* class.
The code can be runned on gpu using the command
jax.config.update('jax_platform_name', 'gpu') # set 'cpu' or 'gpu'
"""
import string
import warnings
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
from tqdm.auto import tqdm
from dadapy import DiffImbalance
[docs]
class CausalGraph(DiffImbalance):
"""Constructs a community causal graph where variables are grouped into single nodes.
Attributes:
time_series (np.array(float)): array of shape (N_times,D), where N_times is the length of
trajectory and D is the number of dynamical variables. The sampling time is supposed to
be constant along the trajectory and for all the variables.
coords_present (np.array(float)): array of shape (N_samples,D) containing the samples of the
D-dimensional trajectory at time t=0; read only when time_series is None.
coords_future (np.array(float)): array of shape (N_samples,D,n_lags) containing the samples of
the D-dimensional trajectory at different future time lags; read only when time_series is None.
If you want to test a single time lag, reshape the dataset with coords_future[:,:,np.newaxis].
periods (np.ndarray(float)): array of shape (D,) containing the periods of the dynamical variables.
The default is None, which means that the variables are treated as nonperiodic. If not all
variables are periodic, the entries of the nonperiodic ones should be set to 0.
standardize (bool): whether to standardize each of the D variables, dividing by its standard
deviation along the trajectory or the provided samples. Default is True.
seed (int): seed of JAX random generator.
"""
def __init__(
self,
time_series=None,
coords_present=None,
coords_future=None,
periods=None,
standardize=True,
seed=0,
):
self.time_series = time_series
self.coords_present = coords_present
self.coords_future = coords_future
self.standardize = standardize
self.num_variables, self.periods = self._check_and_initialize_args(periods)
self.seed = seed
self.imbs_training = None
self.weights_training = None
self.weights_final = None
self.imbs_final = None
self.errors_final = None
self.adj_matrix = None
self.community_dictionary = None
def _check_and_initialize_args(self, periods):
"""Checks input arguments to constructor of CausalGraph object."""
num_variables = None
periods = periods
if (
self.time_series is None
and self.coords_present is not None
and self.coords_future is not None
):
assert len(self.coords_future.shape) == 3, (
f"Coords_future has shape {self.coords_future.shape}, while the expected shape "
+ f"is (N_samples, D_features, n_lags).\nIf you want to test a single time lag, "
+ f"provide as input coords_future[:,:,np.newaxis]."
)
n_lags = self.coords_future.shape[2]
assert self.coords_present.shape == self.coords_future.shape[:2], (
"Arguments coords_present and coords_future should have shapes (N_samples, D_features) "
+ f"and (N_samples, D_features, n_lags),\n but the number of samples and/or the "
+ f"number of features do not match."
)
num_variables = self.coords_present.shape[1]
if periods is not None:
periods = np.ones(self.coords_present.shape[1]) * np.array(periods)
if (
self.standardize is False
and (
np.std(self.coords_present, ddof=1, axis=0)
!= np.ones(num_variables)
).any()
):
warnings.warn(
f"The {num_variables} variables in 'coords_present' are not standardized."
)
if self.standardize is True:
self.coords_present /= np.std(
self.coords_present, ddof=1, axis=0, keepdims=True
)
elif self.time_series is not None:
if self.coords_present is not None or self.coords_future is not None:
warnings.warn(
f"You passed the whole time series as input; the arguments coords_present and "
+ f"coords_future will be ignored"
)
num_variables = self.time_series.shape[1]
if periods is not None:
periods = np.ones(time_series.shape[1]) * np.array(periods)
if (
self.standardize is False
and (
np.std(self.time_series, ddof=1, axis=0) != np.ones(num_variables)
).any()
):
warnings.warn(
f"Warning: the {num_variables} variables of the input time series are not standardized."
)
if self.standardize is True:
self.time_series /= np.std(
self.time_series, ddof=1, axis=0, keepdims=True
)
return num_variables, periods
[docs]
def return_nn_indices(
self,
variables,
num_samples,
time_lags,
embedding_dim=1,
embedding_time=1,
discard_close_ind=None,
):
"""Returns the indices of the nearest neighbor of each point.
Args:
variables (list, jnp.array(int)): array of the coordinates used to build the distance space (with weights 1).
num_samples (int): number of samples harvested from the full time series.
time_lags (list(int), np.array(int)): tested time lags between 'present' and 'future'.
embedding_dim (int): dimension of the time-delay embedding vector built on each variable. Default is 1,
which means the time-delay embeddings are not employed.
embedding_time (int): lag between consecutive samples in the time-delay embedding vectors of each
variable. Default is 1.
discard_close_ind (int): defines the "close points" for which distances and ranks are not computed: for each
point i, distances between i and points within [i-discard_close_ind, i+discard_close_ind] are discarded.
Returns:
nn_indices (np.array(float)): array of the nearest neighbors indices: nn_indices[i] is the index of the column
with value 1 in the rank matrix.
"""
assert (
self.time_series is not None
), "Error: to call this method, provide the time series while initializing the CausalGraph class."
assert num_samples <= self.time_series.shape[0] - max(time_lags), (
f"Error: cannot extract {num_samples} samples from {self.time_series.shape[0]} initial samples, "
+ f"if the maximum time lag is {max(time_lags)}.\nChoose a value of num_samples such that "
+ f"num_samples < {self.time_series.shape[0]} - {max(time_lags)}"
)
indices_present = np.linspace(
(embedding_dim - 1)
* embedding_time, # select times defining the ensemble of trajectories
self.time_series.shape[0] - max(time_lags) - 1,
num_samples,
dtype=int,
)
indices_present = [
indices_present - embedding_time * i for i in range(embedding_dim)
]
coords_present = self.time_series[
indices_present
] # has shape (embedding_dim, num_samples, n_variables)
coords_present = np.transpose(
coords_present, axes=[1, 2, 0]
) # convert to shape (num_samples, n_variables, embedding_dim)
dii = DiffImbalance(
data_A=coords_present[:, variables].reshape(
(num_samples, len(variables) * embedding_dim)
),
data_B=coords_present[:, variables].reshape(
(num_samples, len(variables) * embedding_dim)
), # dummy argument
)
nn_indices = dii._return_nn_indices(discard_close_ind=discard_close_ind)
return np.array(nn_indices)
[docs]
def optimize_present_to_future(
self,
num_samples,
time_lags,
embedding_dim_present=1,
embedding_dim_future=1,
embedding_time=1,
target_variables="all",
save_weights=False,
num_epochs=200,
batches_per_epoch=1,
l1_strength=0.0,
point_adapt_lambda=False,
k_init=1,
k_final=1,
lambda_factor=0.1,
params_init=None,
optimizer_name="sgd",
learning_rate=1e-2,
learning_rate_decay=None,
num_points_rows=None,
compute_imb_final=False,
compute_error=False,
ratio_rows_columns=1,
discard_close_ind=None,
):
"""Iteratively optimizes the DII from the full space in the present to a target space in the future.
Arguments 'num_samples', 'time_lags', 'embedding_dim_present', 'embedding_dim_future' and 'embedding_time'
are read only when data are provided to the CausalGraph object through the argument 'time_series'.
Arguments 'compute_error', 'ratio_rows_columns' and 'discard_close_ind' are only read when 'compute_imb_final'
is set to True.
Args:
num_samples (int): number of samples harvested from the full time series, interpreted as
independent initial conditions of the same dynamical process.
time_lags (list(int), np.ndarray(int)): tested time lags between 'present' and 'future'.
embedding_dim_present (int): dimension of the time-delay embedding vectors built in the present
space (t=0, t=-1, ...). Default is 1, which means the time-delay embeddings are not employed.
embedding_dim_future (int): dimension of the time-delay embedding vectors built in the space of
the target variable (t=tau, t=tau-1, ...). Default is 1.
embedding_time (int): lag between consecutive samples in the time-delay embedding vectors of each
variable. Default is 1.
target_variables (str or list(int), np.array(int)): list or np.array of the target variables
defining the distance space in the future. Default is "all", for which the optimization is
iterated over all variables as target.
save_weights (bool): whether to save or not the weights during training, rather than only the final
weights. If True, weights are saved in the attribute 'weights_training' of the CausalGraph object,
which is an array of shape (n_target_variables, n_time_lags, num_epochs+1, num_variables).
Default is False.
num_epochs (int): number of training epochs. Default is 200.
batches_per_epoch (int): number of minibatches; must be a divisor of n_points. Each weight update is
carried out by computing the DII gradient over n_points / batches_per_epoch points. Default is 1,
which means that the gradient is computed over all the available points (batch GD).
seed (int): seed of JAX random generator, default is 0. Different seeds determine different mini-batch
partitions.
l1_strength (float): strength of the L1 regularization (LASSO) term. Default is 0.
point_adapt_lambda (bool): whether to use a global smoothing parameter lambda for the c_ij coefficients
in the DII (if False), or a different parameter for each point (if True). Default is True.
k_init (int): initial rank of neighbors used to set lambda. Ranks are defined starting from 1. If
batches_per_epoch > 1, neighbors are recomputed within each mini-batch. Default is 1.
k_final (int): final rank of neighbors used to set lambda. If batches_per_epoch > 1, neighbors are
recomputed within each mini-batch. Default is 1.
lambda_factor (float): factor defining the scale of lambda. Default is 0.1.
params_init (np.array(float), jnp.array(float)): array of shape (n_features_A,) containing the initial
values of the scaling weights to be optimized. If None, params_init is set to [0.1, 0.1, ..., 0.1].
optimizer_name (str): name of the optimizer, calling the Optax library. Possible choices are 'sgd'
(default), 'adam' and 'adamw'. See https://optax.readthedocs.io/en/latest/api/optimizers.html for
additional details.
learning_rate (float): value of the learning rate. Default is 1e-2.
learning_rate_decay (str): schedule to damp the learning rate to zero starting from the value provided
with the attribute learning_rate. The available schedules are: cosine decay ("cos"), exponential
decay ("exp"; the initial learning rate is halved every 10 steps), or constant learning rate (None).
Default is None (constant learning rate).
num_points_rows (int): number of points sampled from the rows of rank and distance matrices. In case of large
datasets, choosing num_points_rows < n_points can significantly speed up the training. The default is
None, for which num_points_rows == n_points.
compute_imb_final (bool): whether to compute the final DII over the full data set, using the options
specified by 'compute_error', 'ratio_rows_columns' and 'discard_close_ind'. Default is False, for
which those arguments are ignored.
compute_error (bool): whether to compute the final DII and its error by sampling different points along
rows and columns of the distance matrix. If False, the final DII is computed using the same points
along rows and columns, which does not allow for an error estimation. Default is True.
ratio_rows_columns (float): only read when compute_error is True; defines the ratio between the number
of points along rows (nrows) and along columns (ncolumns) of distance and rank matrices, in two groups
randomly sampled. In general, nrows and ncolumns are determined by solving the equations
nrows / ncolumns = ratio_rows_columns,
nrows + ncolumns = n_total_points.
Default is 1, which means that both groups have n_points / 2 elements.
discard_close_ind (int): given any point i, defines the "close" points (following the time ordering
along axis=0 of 'time_series' or 'coords_present') that are known to be significantly correlated with i.
If compute_error is True, "time-correlated" points are excluded by subsampling the data along axis=0
with stride discard_close_ind + 1. If compute_error is False, distances between each point i and points
within the time window [i-discard_close_ind, i+discard_close_ind] are discarded. Default is 0, for which
no distances between points close in the time are discarded.
Returns:
weights_final (np.array(float)): array of shape (n_target_variables, n_time_lags, D) containing the
D final scaling weights for each optimization, where D is the number of variables in the time series.
If embedding_dim_present > 1, the shape is (n_target_variables, n_time_lags, D, embedding_dim_present).
Also accessible as attribute of the CausalGraph object.
imbs_training (np.array(float)): array of shape (n_target_variables, n_time_lags, num_epochs+1)
containing the DII during the trainings. Also accessible as attribute of the CausalGraph object.
imbs_final (np.array(float)): array of shape (n_target_variables, n_time_lags) containing the DII at
the end of each training computed over the full data set. If 'compute_imb_final' is False, imbs_final
is set to None. Also accessible as attribute of the CausalGraph object.
errors_final (np.array(float)): array of shape (n_target_variables, n_time_lags) containing the errors
of the DII at the end of each training, computed over the full data set. If 'compute_imb_final' is False,
or if 'compute_imb_final' is True and 'compute_error' is False, errors_final is set to None. Also
accessible as attribute of the CausalGraph object.
"""
coords_present = None
if self.time_series is not None:
assert num_samples <= self.time_series.shape[0] - max(time_lags), (
f"Error: cannot extract {num_samples} samples from {self.time_series.shape[0]} initial "
+ f"samples, if the maximum time lag is {np.max(time_lags)}.\nChoose a smaller value of "
+ f"num_samples."
)
t0s = np.linspace(
(max(embedding_dim_present, embedding_dim_future) - 1)
* embedding_time, # select times defining the ensemble of trajectories
self.time_series.shape[0] - max(time_lags) - 1,
num_samples,
dtype=int,
)
indices_present = np.array(
[t0s - embedding_time * i for i in range(embedding_dim_present)]
)
coords_present = self.time_series[
indices_present
] # has shape (embedding_dim_present, num_samples, n_variables)
coords_present = np.transpose(
coords_present, axes=[1, 2, 0]
) # convert to shape (num_samples, n_variables, embedding_dim_present)
coords_present = coords_present.reshape(
(num_samples, self.num_variables * embedding_dim_present)
)
elif self.coords_present is not None:
if num_samples is not None:
warninings.warn(
f"Argument 'num_samples' will be ignored, as you already provided the independent "
+ f"initial conditions through arguments 'coords_present' and 'coords_future'.\n "
+ f"To suppress this warning, set 'num_samples' to None."
)
if time_lags is not None:
warninings.warn(
f"Argument 'time_lags' will be ignored, as the samples at different time lags t=tau "
+ f"are already read from the last dimension of 'coords_future'.\n "
+ f"To suppress this warning, set 'time_lags' to None."
)
num_samples = self.coords_present.shape[0]
time_lags = np.arange(1, self.coords_future.shape[2] + 1)
coords_present = self.coords_present
else:
print(
"To call this method, provide either a time series or directly the present and future samples "
+ f"while initializing the CausalGraph class."
)
if target_variables == "all":
target_variables = np.arange(self.num_variables)
# initialize output variables
imbs_training = np.zeros(
(len(target_variables), len(time_lags), num_epochs + 1)
)
if embedding_dim_present == 1:
weights_final = np.zeros(
(len(target_variables), len(time_lags), self.num_variables)
)
if save_weights is True:
weights_training = np.zeros(
(
len(target_variables),
len(time_lags),
num_epochs + 1,
self.num_variables,
)
)
elif embedding_dim_present > 1:
weights_final = np.zeros(
(
len(target_variables),
len(time_lags),
self.num_variables,
embedding_dim_present,
)
)
if save_weights is True:
weights_training = np.zeros(
(
len(target_variables),
len(time_lags),
num_epochs + 1,
self.num_variables,
embedding_dim_present,
)
)
imbs_final = None
errors_final = None
if compute_imb_final:
imbs_final = np.zeros((len(target_variables), len(time_lags)))
if compute_error:
errors_final = np.zeros((len(target_variables), len(time_lags)))
# loop over target variables and time lags
for i_var, target_var in enumerate(target_variables):
for j_tau, tau in enumerate(time_lags):
indices_future = (
np.array(
[t0s - embedding_time * i for i in range(embedding_dim_future)]
)
+ tau
)
if self.time_series is not None:
coords_future = self.time_series[
indices_future, target_var
] # has shape (embedding_dim_future, num_samples)
coords_future = np.transpose(
coords_future, axes=[1, 0]
) # convert to shape (num_samples, embedding_dim_future)
else:
coords_future = self.coords_future[:, :, j_tau]
dii = DiffImbalance(
data_A=coords_present,
data_B=coords_future,
periods_A=self.periods,
periods_B=None
if self.periods is None
else self.periods[target_var],
seed=self.seed,
num_epochs=num_epochs,
batches_per_epoch=batches_per_epoch,
l1_strength=l1_strength,
point_adapt_lambda=point_adapt_lambda,
k_init=k_init,
k_final=k_final,
lambda_factor=lambda_factor,
params_init=params_init,
optimizer_name=optimizer_name,
learning_rate=learning_rate,
learning_rate_decay=learning_rate_decay,
num_points_rows=num_points_rows,
)
weights_temp, imbs_training[i_var, j_tau] = dii.train(
bar_label=f"target_var={target_var}, tau={tau}"
)
# compute final DII and its error
if compute_imb_final:
imb, err = dii.return_final_dii(
compute_error=compute_error,
ratio_rows_columns=ratio_rows_columns,
seed=self.seed,
discard_close_ind=discard_close_ind,
)
imbs_final[i_var, j_tau] = imb
if compute_error:
errors_final[i_var, j_tau] = dii.error_final
# save weights
if embedding_dim_present == 1:
weights_final[i_var, j_tau] = weights_temp[-1]
if save_weights is True:
weights_training[i_var, j_tau] = weights_temp.reshape(
(num_epochs + 1, self.num_variables)
)
elif embedding_dim_present > 1:
weights_final[i_var, j_tau] = weights_temp[-1].reshape(
(self.num_variables, embedding_dim_present)
)
if save_weights is True:
weights_training[i_var, j_tau] = weights_temp.reshape(
(num_epochs + 1, self.num_variables, embedding_dim_present)
)
self.weights_final = weights_final
self.imbs_training = imbs_training
if save_weights:
self.weights_training = weights_training
self.imbs_final = imbs_final
self.errors_final = errors_final
return weights_final, imbs_training, imbs_final, errors_final
[docs]
def compute_adj_matrix(self, weights, threshold=1e-1):
"""Computes the adjacency matrix from the optimal weights returned by optimize_present_to_future.
As a preliminary step before applying the threshold, the maximum weight over the tested time lags is
taken for each pair X_i(0) -> X_j(tau) (i,j=1,...,D). If the weights are referred to time-delay
embeddings, the maximum is also taken along the embedding dimension.
Args:
weights (np.ndarray(float)): array of shape (D, n_time_lags, D) containing the optimal scaling
weights produced by optimize_present_to_future with the option target_variables="all". If the
optimization was carried out with embedding_dim_present > 1, the array should have an additional
dimension, i.e. shape (D, n_time_lags, D, embedding_dim_present).
threshold (float): value of the threshold used to construct the adjacency matrix. If a weight is
smaller than the threshold the corresponding entry in the adjacency matrix is set to 0, otherwise
it is set to 1.
Returns:
adj_matrix (np.ndarray(float)): array of shape (D,D) defining the adjacency matrix of a directed
graph, where each arrow defines a direct or indirect link. Also accessible as attribute of the
CausalGraph object.
"""
assert weights is not None, (
f"To call this method, provide the weights obtained with the method optimize_present_to_future, "
+ f"with the option target_variables='all'"
)
assert len(weights.shape) == 3 or len(weights.shape) == 4, (
"The array of weight must have shape (D,n_time_lags,D), or (D,n_time_lags,D,embedding_dim_present). "
+ f"If you are testing a single time lag, reshape this input as weights[:,np.newaxis,:]."
)
assert weights.shape[0] == weights.shape[2], (
"The array of weight must have shape (D,n_time_lags,D), or (D,n_time_lags,D,embedding_dim_present), "
+ f"where D is the number of variables."
)
if len(weights.shape) == 3:
weights_max = np.max(weights, axis=1) # maximum over all tested time lags
elif len(weights.shape) == 4:
weights_max = np.max(
weights, axis=(1, 3)
) # maximum over all tested time lags and embedding components
D = weights.shape[0]
adj_matrix = np.zeros((D, D))
adj_matrix[weights_max.T > threshold] = 1 # apply threshold
self.adj_matrix = adj_matrix
return adj_matrix
def _ancestors(self, adj_matrix):
"""Finds ancestors of each node in the directed graph described by the input adjacency matrix.
Args:
adj_matrix (np.ndarray(float)): binary matrix of shape (D,D) defining the links of a directed
graph.
Returns:
auto_sets (list): list of lists, such that auto_sets[i] is a list containing the indices of all
ancestors of node i in the graph
"""
G = nx.DiGraph(adj_matrix)
auto_sets = []
for var in np.arange(adj_matrix.shape[0]):
auto_sets.append(sorted(nx.ancestors(G, var) | {var}))
return auto_sets
[docs]
def find_communities(self, adj_matrix):
"""Finds dynamical communities, i.e. groups of variables defining single nodes in the community causal graph.
Args:
adj_matrix (np.ndarray(float)): binary matrix of shape (D,D) defining the links of a directed
graph with D nodes.
Returns:
community_dictionary (dict): dictionary with pairs (comm_id, level) as keys and lists containing
the indices of the variables in each community as values. 'comm_id' is an integer number
identifying the dynamical community, while 'level' is an integer identifying the step of the
algorithm at which the community is identified, namely its level of autonomy. The keys are sorted
from the smallest to the largest level. Both 'comm_id' and 'level' start from 0.
"""
auto_sets = self._ancestors(adj_matrix)
# re-order minimal autonomous sets from smallest to largest size
sizes_auto_sets = [len(auto_set) for auto_set in auto_sets]
auto_sets = [auto_sets[i_sorted] for i_sorted in np.argsort(sizes_auto_sets)]
comm_index = 0
level_index = 0
variables_assigned = []
community_dictionary = {}
auto_sets_left = auto_sets.copy()
while len(auto_sets_left) != 0:
nvariables_left = len(auto_sets_left)
sets_sizes = [
len(auto_sets_left[i_comm]) for i_comm in range(nvariables_left)
]
smallest_set_index = np.argmin(sets_sizes)
community_dictionary[comm_index, level_index] = auto_sets_left[
smallest_set_index
]
variables_assigned.extend(auto_sets_left[smallest_set_index])
auto_sets_left.pop(
smallest_set_index
) # delete set from list of autonomous sets
comm_index += 1
parallel_communities = (
0 # number of communities found to be autonomous at same level
)
auto_sets_left_temp = auto_sets_left.copy()
for try_set_index, try_set in enumerate(auto_sets_left):
intersection = set(variables_assigned).intersection(try_set)
if intersection == set():
community_dictionary[comm_index, level_index] = auto_sets_left[
try_set_index
]
variables_assigned.extend(auto_sets_left[try_set_index])
auto_sets_left_temp.pop(
try_set_index - parallel_communities
) # delete set from list of minimal autonomous subsets
comm_index += 1
parallel_communities += 1
auto_sets_left = auto_sets_left_temp.copy()
for left_set_index, left_set in enumerate(auto_sets_left):
auto_sets_left[left_set_index] = list(
set(left_set).difference(variables_assigned)
)
# delete empty lists
auto_sets_left = [
set_left for set_left in auto_sets_left if len(set_left) > 0
]
level_index += 1
self.community_dictionary = community_dictionary
return community_dictionary