from modina.statistics_utils import pre_rescaling, post_rescaling, _df_to_numpy, _separate_types
import os
from typing import Optional, Tuple
import networkx as nx
import numpy as np
import pandas as pd
import igraph as ig
import napypi as napy
import logging
# Differential network computation
[docs]
def compute_diff_network(scores1: pd.DataFrame, scores2: pd.DataFrame, context1: pd.DataFrame, context2: pd.DataFrame,
edge_metric: Optional[str] = None, node_metric: Optional[str] = None,
max_path_length: int = 2, correction: str = 'bh',
path: Optional[str] = None, format: str = 'csv',
meta_file: Optional[pd.DataFrame] = None, test_type: str = 'nonparametric', nan_value: Optional[int] = None) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame]]:
"""
Computation of a differential network defined by a node metric and an edge metric.
:param scores1: Statistical association scores of Context 1, rescaled and potentially filtered.
:param scores2: Statistical association scores of Context 2, rescaled and potentially filtered.
:param context1: Observed data of Context 1, potentially filtered.
:param context2: Observed data of Context 2, potentially filtered.
:param edge_metric: Edge metric used to construct the differential network.
:param node_metric: Node metric used to construct the differential network.
:param max_path_length: Maximum length of paths to consider in the computation of integrated interaction scores. Defaults to 2.
:param correction: Correction method for multiple testing. Defaults to 'bh'.
:param path: Optional path to save the differential scores as CSV files. Defaults to None.
:param format: File format to save the differential network. Options are 'csv' and 'graphml'. Defaults to 'csv'.
:param meta_file: Meta file containing the node types. Only needed if node_metric is 'STC'. Defaults to None.
:param test_type: Test type to use for continuous nodes in STC metric. Defaults to 'nonparametric'.
:param nan_value: Numerical value used for NaN values in the context data. If None, an error will be raised if such values are present. Defaults to None.
:return: A tuple (edges_diff, nodes_diff) containing the computed differential edges and nodes.
"""
if edge_metric is None and node_metric is None:
raise ValueError('Please provide at least one of edge_metric or node_metric to compute the differential network.')
edges_diff = None
nodes_diff = None
# Rescaling
if not 'pre-E' in scores1.columns or not 'pre-E' in scores2.columns:
scores1, scores2 = pre_rescaling(scores1=scores1, scores2=scores2, metric='pre-E')
if not 'pre-P' in scores1.columns or not 'pre-P' in scores2.columns:
scores1, scores2 = pre_rescaling(scores1=scores1, scores2=scores2, metric='pre-P')
# Edges
if edge_metric is not None:
edges_diff = compute_diff_edges(scores1=scores1, scores2=scores2, edge_metric=edge_metric, max_path_length=max_path_length)
# Nodes
if node_metric is not None:
nodes_diff = compute_diff_nodes(context1=context1, context2=context2, scores1=scores1, scores2=scores2,
node_metric=node_metric, correction=correction, meta_file=meta_file, test_type=test_type, nan_value=nan_value)
if path is not None:
if format == 'csv':
if edges_diff is not None:
file_path_edges = os.path.join(path, f'diff_edges.csv')
edges_diff.to_csv(file_path_edges)
if nodes_diff is not None:
file_path_nodes = os.path.join(path, f'diff_nodes.csv')
nodes_diff.to_csv(file_path_nodes)
elif format == 'graphml':
if edges_diff is None:
raise ValueError("To save the differential network in 'graphml' format, please provide an 'edge_metric'.")
file_path = os.path.join(path, f'diff_net.graphml')
diff_net = nx.from_pandas_edgelist(edges_diff, 'label1', 'label2', edge_metric)
# Add node weights
if nodes_diff is not None:
nx.set_node_attributes(diff_net, nodes_diff[node_metric].to_dict(), node_metric)
nx.write_graphml(diff_net, file_path)
else:
raise ValueError(f"Invalid format {format}. Choose from 'csv' or 'graphml'.")
if edge_metric is not None and edges_diff is not None:
if 'post' in edge_metric:
edges_diff = edges_diff[['label1', 'label2', 'test_type', edge_metric]]
else:
edge_metric_signed = edge_metric + '_signed'
edges_diff = edges_diff[['label1', 'label2', 'test_type', edge_metric, edge_metric_signed]]
if node_metric is not None and nodes_diff is not None:
nodes_diff = nodes_diff[[node_metric]]
return edges_diff, nodes_diff
# Adjusted DrDimont implementation to compute integrated interaction scores
[docs]
def interaction_score(data, max_path_length=3, metric='pre-E'):
if max_path_length >= 5:
raise ValueError('The maximum path length considered in interaction scores has to be smaller than 5.')
data_extended = data.copy()
# Create network
# Nodes
graph = ig.Graph()
nodes = pd.unique(data[['label1', 'label2']].values.ravel())
graph.add_vertices(nodes)
# Edges
edges = list(zip(data['label1'], data['label2']))
graph.add_edges(edges)
# Edge weights
graph.es[metric] = data[metric].tolist()
# Loop through all edges
for idx, row in data.iterrows():
# Extract edge
edge = (row['label1'], row['label2'])
# Initialize variables
sums_of_weight_products = [0] * max_path_length
num_paths = [0] * max_path_length
# Find all paths between the two extracted nodes and loop through them
simple_paths = graph.get_all_simple_paths(edge[0], edge[1], maxlen=max_path_length)
for path in simple_paths:
# Get path length
path_length = len(path) - 1
# Add the product of the weights along the path to the corresponding sum
sums_of_weight_products[path_length - 1] += np.prod([graph.es.find(_source=path[i], _target=path[i+1])[metric] for i in range(path_length)])
# Increase the count of paths of this length
num_paths[path_length - 1] += 1
sums_of_weight_products = np.array(sums_of_weight_products, dtype=float)
num_paths = np.array(num_paths, dtype=float)
# Normalize the sums by the number of paths of the corresponding length
normalized_sums = np.divide(sums_of_weight_products, num_paths, out=np.zeros_like(num_paths), where=num_paths != 0)
# Sum up the normalized sums and save as interaction score
edge_score = np.sum(normalized_sums)
data_extended.loc[idx, 'int-IS'] = edge_score
return data_extended
# Calculate differential (weighted) degree centralities
[docs]
def degree_centrality(nodes_diff, scores1, scores2, metric='pre-P', weighted=False):
if weighted:
if metric == 'pre-P':
scores1['pre-P_inverted'] = 1 - scores1[metric]
scores2['pre-P_inverted'] = 1 - scores2[metric]
metric = 'pre-P_inverted'
method = 'WDC-P'
elif metric == 'pre-E':
method = 'WDC-E'
else:
raise ValueError(f"Invalid metric '{metric}' for weighted degree centrality.")
else:
if metric == 'pre-P':
method = 'DC-P'
elif metric == 'pre-E':
method = 'DC-E'
else:
raise ValueError(f"Invalid metric '{metric}' for degree centrality.")
nodes_diff = nodes_diff.copy()
nodes = nodes_diff.index
degree_centrality = pd.DataFrame()
degree_centrality['labels'] = nodes
degree_centrality.set_index('labels', inplace=True)
degree_centrality['context_a'] = np.nan
degree_centrality['context_b'] = np.nan
# TODO: optimize implementation according to DimontRank implementation using defaultdict
if weighted:
# Sum up all weights of adjacent edges
for node in nodes:
sum1 = 0
sum2 = 0
for i in scores1.index:
if (scores1.loc[i, 'label1'] == node) or (scores1.loc[i, 'label2'] == node):
sum1 += scores1.loc[i, metric]
for i in scores2.index:
if (scores2.loc[i, 'label1'] == node) or (scores2.loc[i, 'label2'] == node):
sum2 += scores2.loc[i, metric]
degree_centrality.loc[node, 'context_a'] = sum1
degree_centrality.loc[node, 'context_b'] = sum2
else:
for node in nodes:
count1 = 0
count2 = 0
for i in scores1.index:
if (scores1.loc[i, 'label1'] == node) or (scores1.loc[i, 'label2'] == node):
if metric == 'pre-P':
if scores1.loc[i, metric] != 1:
count1 += 1
elif metric == 'pre-E':
if scores1.loc[i, metric] != 0:
count1 += 1
else:
raise ValueError(f"Invalid metric '{metric}' for degree centrality.")
for i in scores2.index:
if (scores2.loc[i, 'label1'] == node) or (scores2.loc[i, 'label2'] == node):
if metric == 'pre-P':
if scores2.loc[i, metric] != 1:
count2 += 1
elif metric == 'pre-E':
if scores2.loc[i, metric] != 0:
count2 += 1
else:
raise ValueError(f"Invalid metric '{metric}' for degree centrality.")
degree_centrality.loc[node, 'context_a'] = count1
degree_centrality.loc[node, 'context_b'] = count2
# Normalize by max
max1 = max(degree_centrality['context_a'])
max2 = max(degree_centrality['context_b'])
if max1 == 0.:
max1 = 1.
if max2 == 0.:
max2 = 1.
# Absolute difference
nodes_diff[method] = abs((degree_centrality['context_a'] / max1) - (degree_centrality['context_b'] / max2))
return nodes_diff
# Compute differential PageRank centrality
# Compute absolute mean difference and statistical significance for each node between two contexts
[docs]
def stat_test_centrality(context1, context2, meta_file, test_type='nonparametric', correction='bh', nan_value: Optional[int] = None):
if not context1.columns.equals(context2.columns):
raise ValueError('Context a and b need to have the same structure.')
# Search for non-numeric and NaN values
if context1.apply(lambda col: pd.to_numeric(col, errors="coerce").isna()).values.any() > 0 or context2.apply(lambda col: pd.to_numeric(col, errors="coerce").isna()).values.any() > 0:
if nan_value is not None:
logging.warning(f'The context data contains non-numeric or NaN values. These will be replaced by the specified nan_value {nan_value}.')
context1 = context1.apply(pd.to_numeric, errors="coerce")
context1 = context1.fillna(nan_value)
context2 = context2.apply(pd.to_numeric, errors="coerce")
context2 = context2.fillna(nan_value)
else:
raise ValueError('The context data contains non-numeric or NaN values. Please clean the data and/or specify a nan_value to replace these values.')
else:
if nan_value is None:
# Find a value that does not exist in the data use as nan_value for napy
existing = set(context1.stack().values) | set(context2.stack().values)
while True:
nan_value = np.random.randint(-10**5, -10**3)
if nan_value not in existing:
break
logging.warning(f'No nan_value was specified for the context data. For statistical tests, the randomly generated value {nan_value} will be used.'
'If you want to specify a different value, please provide it as an argument.')
# Initialize nodes_diff DataFrame
nodes = context1.columns
nodes_diff = pd.DataFrame(index=nodes)
nodes_diff['test_p'] = 1.0
nodes_diff['STC'] = 0.0
# Initialize p-values
p_nom = {}
p_ord = {}
p_bi = {}
p_cont = {}
# Separate data types
ord1, nom1, cont1, bi1 = _separate_types(context1, meta_file)
ord2, nom2, cont2, bi2 = _separate_types(context2, meta_file)
assert nom1.columns.equals(nom2.columns) and ord1.columns.equals(ord2.columns) and cont1.columns.equals(cont2.columns) and bi1.columns.equals(bi2.columns), 'Context a and b need to have the same structure.'
# Determine the return type for p-values based on the correction method
if correction == 'bh':
return_p = 'p_benjamini_hb'
elif correction == 'by':
return_p = 'p_benjamini_yek'
else:
raise ValueError(f"Invalid correction method '{correction}'. Choose from: 'bh' or 'yek'.")
# nominal
for node in nom1.columns:
combined = pd.DataFrame({node: pd.concat([nom1[node], nom2[node]]), 'context': [0] * len(nom1) + [1] * len(nom2)})
combined, _ = _df_to_numpy(combined)
result = napy.chi_squared(combined, axis=1, threads=1, use_numba=False, return_types=[return_p], nan_value=nan_value)[return_p]
p_nom[node] = float(result[0, 1])
# ordinal
for node in ord1.columns:
context_info = pd.DataFrame({'context': [0] * len(ord1) + [1] * len(ord2)})
context_info, _ = _df_to_numpy(context_info)
combined = pd.DataFrame({node: pd.concat([ord1[node], ord2[node]])})
combined, _ = _df_to_numpy(combined)
result = napy.mwu(bin_data = context_info, cont_data = combined, axis = 1, threads = 1, return_types = [return_p], use_numba=False, nan_value=nan_value)[return_p]
p_ord[node] = float(result[0, 0])
# binary
for node in bi1.columns:
combined = pd.DataFrame({node: pd.concat([bi1[node], bi2[node]]), 'context': [0] * len(bi1) + [1] * len(bi2)})
combined, _ = _df_to_numpy(combined)
result = napy.chi_squared(combined, axis=1, threads=1, use_numba=False, return_types=[return_p], nan_value=nan_value)[return_p]
p_bi[node] = float(result[0, 1])
# continuous
for node in cont1.columns:
context_info = pd.DataFrame({'context': [0] * len(cont1) + [1] * len(cont2)})
context_info, _ = _df_to_numpy(context_info)
combined = pd.DataFrame({node: pd.concat([cont1[node], cont2[node]])})
combined, _ = _df_to_numpy(combined)
if test_type == 'parametric':
result = napy.ttest(bin_data = context_info, cont_data = combined, axis = 1, threads = 1, use_numba=False, return_types=[return_p], nan_value=nan_value)[return_p]
elif test_type == 'nonparametric':
result = napy.mwu(bin_data = context_info, cont_data = combined, axis = 1, threads = 1, return_types = [return_p], use_numba=False, nan_value=nan_value)[return_p]
else:
raise ValueError(f"Invalid test type '{test_type}' for continuous nodes. Choose from 'parametric' or 'nonparametric'.")
p_cont[node] = float(result[0, 0])
# STC = 1 - p-value
if p_ord:
pvals = pd.Series(p_ord)
nodes_diff.loc[pvals.index, 'test_p'] = pvals
nodes_diff.loc[pvals.index, 'STC'] = 1 - pvals
if p_nom:
pvals = pd.Series(p_nom)
nodes_diff.loc[pvals.index, 'test_p'] = pvals
nodes_diff.loc[pvals.index, 'STC'] = 1 - pvals
if p_bi:
pvals = pd.Series(p_bi)
nodes_diff.loc[pvals.index, 'test_p'] = pvals
nodes_diff.loc[pvals.index, 'STC'] = 1 - pvals
if p_cont:
pvals = pd.Series(p_cont)
nodes_diff.loc[pvals.index, 'test_p'] = pvals
nodes_diff.loc[pvals.index, 'STC'] = 1 - pvals
# In case a test failed and returned NaN, set p-value to 1.0
nodes_diff['test_p'] = nodes_diff['test_p'].fillna(1.0)
nodes_diff['STC'] = nodes_diff['STC'].fillna(0.0)
return nodes_diff
# Differential edge computation
[docs]
def compute_diff_edges(scores1: pd.DataFrame, scores2: pd.DataFrame, edge_metric: str , max_path_length: int = 2,
path: Optional[str] = None) -> pd.DataFrame:
"""
Compute differential edge scores based on the specified edge metric.
:param scores1: Statistical association scores of Context 1, rescaled and potentially filtered.
:param scores2: Statistical association scores of Context 2, rescaled and potentially filtered.
:param edge_metric: Edge metric to compute the differential edge scores.
:param max_path_length: Maximum length of paths to consider in the computation of integrated interaction scores. Defaults to 2.
:param path: Optional path to save the differential edge scores as a CSV file. Defaults to None.
:return: A DataFrame containing the computed differential edge scores.
"""
edges_diff = None
# Rescaling
if not 'pre-E' in scores1.columns or not 'pre-E' in scores2.columns:
scores1, scores2 = pre_rescaling(scores1=scores1, scores2=scores2, metric='pre-E')
if not 'pre-P' in scores1.columns or not 'pre-P' in scores2.columns:
scores1, scores2 = pre_rescaling(scores1=scores1, scores2=scores2, metric='pre-P')
# Pre-rescaled effect size (pre-E) or rescaled multiple-testing adjusted p-value (pre-P)
if edge_metric == 'pre-P' or edge_metric == 'pre-E':
pass
# Post-rescaled p-value (post-P)
elif edge_metric == 'post-P':
# Compute differences in edge metrics first
edges_diff = _subtract_edges(scores1, scores2, metrics=['raw-P'], included_cols=['test_type'])
# Min-Max rescaling
edges_diff = post_rescaling(diff_scores=edges_diff, metric=edge_metric)
# Post-rescaled effect size (post-E)
elif edge_metric == 'post-E':
# Compute differences in edge metrics first
edges_diff = _subtract_edges(scores1, scores2, metrics=['raw-E'], included_cols=['test_type'])
# Min-Max rescaling
edges_diff = post_rescaling(diff_scores=edges_diff, metric=edge_metric)
# Sum of pre-P and pre-E (pre-PE)
elif edge_metric == 'pre-PE':
# Compute 'pre-P' and 'pre-E'
edges_diff = _subtract_edges(scores1, scores2, metrics=['pre-P', 'pre-E'], included_cols=['test_type'])
# Sum the two scores
edges_diff[edge_metric] = edges_diff['pre-P'] + edges_diff['pre-E']
# Compute the signed version
edges_diff['pre-PE_signed'] = (scores1['pre-P'] - scores2['pre-P']) + (scores1['pre-E'] - scores2['pre-E'])
# Sum of post-P and post-E (post-PE)
elif edge_metric == 'post-PE':
# Compute differences in raw association scores
edges_diff = _subtract_edges(scores1, scores2, metrics=['raw-P', 'raw-E'], included_cols=['test_type'])
# Rescale difference
edges_diff = post_rescaling(diff_scores=edges_diff, metric='post-E')
edges_diff = post_rescaling(diff_scores=edges_diff, metric='post-P')
# Sum the two scores
edges_diff[edge_metric] = edges_diff['post-P'] + edges_diff['post-E']
# Integrated Interaction Score (int-IS)
elif edge_metric == 'int-IS':
# Compute interaction score using DrDimont method
scores1 = interaction_score(scores1, metric='pre-E', max_path_length=max_path_length)
scores2 = interaction_score(scores2, metric='pre-E', max_path_length=max_path_length)
# Log-transformed p-value and pre-rescaled effect size combined score (pre-LS)
elif edge_metric == 'pre-LS':
# Replace zero values by small epsilon (1/10 of the minimum non-zero value)
p_vals_combined = np.concatenate([scores1[scores1['pre-P'] > 0]['pre-P'].to_numpy(),
scores2[scores2['pre-P'] > 0]['pre-P'].to_numpy()])
min_non_zero = p_vals_combined.min()
epsilon = min_non_zero / 10.0
p_vals1 = scores1['pre-P'].to_numpy()
p_vals2 = scores2['pre-P'].to_numpy()
p_vals1 = np.where(p_vals1 == 0, epsilon, p_vals1)
p_vals2 = np.where(p_vals2 == 0, epsilon, p_vals2)
# - log10(pre-P) * pre-E
values1 = - np.log10(p_vals1) * scores1['pre-E']
values2 = - np.log10(p_vals2) * scores2['pre-E']
# Replace -0.0 with +0.0
values1 = np.where(values1 == -0.0, 0.0, values1)
values2 = np.where(values2 == -0.0, 0.0, values2)
scores1[edge_metric] = values1
scores2[edge_metric] = values2
# Post rescaled absolute difference in (log-transformed raw p-value multiplied by raw effect size) (post-LS)
elif edge_metric == 'post-LS':
# Replace zero values by small epsilon (1/10 of the minimum non-zero value)
p_vals_combined = np.concatenate([scores1[scores1['raw-P'] > 0]['raw-P'].to_numpy(),
scores2[scores2['raw-P'] > 0]['raw-P'].to_numpy()])
min_non_zero = p_vals_combined.min()
epsilon = min_non_zero / 10.0
p_vals1 = scores1['raw-P'].to_numpy()
p_vals2 = scores2['raw-P'].to_numpy()
p_vals1 = np.where(p_vals1 == 0, epsilon, p_vals1)
p_vals2 = np.where(p_vals2 == 0, epsilon, p_vals2)
# - log10(raw-P) * raw-E
values1 = - np.log10(p_vals1) * scores1['raw-E']
values2 = - np.log10(p_vals2) * scores2['raw-E']
# Replace -0.0 with +0.0
values1 = np.where(values1 == -0.0, 0.0, values1)
values2 = np.where(values2 == -0.0, 0.0, values2)
scores1['raw-LS'] = values1
scores2['raw-LS'] = values2
# Compute differences in edge metrics first
edges_diff = _subtract_edges(scores1, scores2,
metrics=['raw-LS'], included_cols=['test_type'])
# Min-Max rescaling
edges_diff = post_rescaling(diff_scores=edges_diff, metric=edge_metric)
else:
raise ValueError(f"Invalid edge metric '{edge_metric}'. Choose from: 'pre-P', 'post-P', 'pre-E', 'post-E', 'pre-PE', 'post-PE', 'pre-LS' or 'post-LS', int-IS'.")
if edges_diff is None:
# Compute difference in edge scores
edges_diff = _subtract_edges(scores1, scores2, metrics=[edge_metric], included_cols=('test_type',))
if path is not None:
edges_diff.to_csv(path)
return edges_diff
# Differential node computation
[docs]
def compute_diff_nodes(scores1: pd.DataFrame, scores2: pd.DataFrame, context1: pd.DataFrame, context2: pd.DataFrame,
node_metric: str, correction: str = 'bh', meta_file: Optional[pd.DataFrame] = None, test_type: str = 'nonparametric',
nan_value: Optional[int] = None,
path: Optional[str] = None) -> pd.DataFrame:
"""
Compute differential node scores based on the specified node metric.
:param scores1: Statistical association scores of Context 1, rescaled and potentially filtered.
:param scores2: Statistical association scores of Context 2, rescaled and potentially filtered.
:param context1: Observed data of Context 1, potentially filtered.
:param context2: Observed data of Context 2, potentially filtered.
:param node_metric: Node metric to compute the differential node scores.
:param correction: Correction method for multiple testing. Only needed if node_metric is 'STC'. Defaults to 'bh'.
:param meta_file: Meta file containing the node types. Only needed if node_metric is 'STC'. Defaults to None.
:param test_type: Test type to compare continuous variables across contexts for the 'STC' node metric. Defaults to 'nonparametric'.
:param nan_value: Numerical value used for NaN values in the context data. If None, an error will be raised if such values are present. Defaults to None.
:param path: Optional path to save the differential node scores as a CSV file. Defaults to None.
:return: A DataFrame containing the computed differential node scores.
"""
assert context1.columns.equals(context2.columns), 'Context a and b need to have the same structure.'
nodes = context1.columns
nodes_diff = pd.DataFrame(index=nodes)
# Rescaling
if not 'pre-E' in scores1.columns or not 'pre-E' in scores2.columns:
scores1, scores2 = pre_rescaling(scores1=scores1, scores2=scores2, metric='pre-E')
if not 'pre-P' in scores1.columns or not 'pre-P' in scores2.columns:
scores1, scores2 = pre_rescaling(scores1=scores1, scores2=scores2, metric='pre-P')
# Statistical test centrality (STC)
if node_metric == 'STC':
if meta_file is None:
raise ValueError("To compute the 'STC' node metric, please provide a 'meta_file' containing the node types.")
nodes_diff = stat_test_centrality(context1=context1, context2=context2, correction=correction, meta_file=meta_file, test_type=test_type, nan_value=nan_value)
# Degree centrality based on pre-P (DC-P)
elif node_metric == 'DC-P':
nodes_diff = degree_centrality(nodes_diff=nodes_diff, weighted=False,
scores1=scores1, scores2=scores2,
metric='pre-P')
# Degree centrality based on pre-E (DC-E)
elif node_metric == 'DC-E':
nodes_diff = degree_centrality(nodes_diff=nodes_diff, weighted=False,
scores1=scores1, scores2=scores2,
metric='pre-E')
# Weighted degree centrality based on pre-P (WDC-P)
elif node_metric == 'WDC-P':
nodes_diff = degree_centrality(nodes_diff=nodes_diff, weighted=True,
scores1=scores1, scores2=scores2,
metric='pre-P')
# Weighted degree centrality based on pre-E (WDC-E)
elif node_metric == 'WDC-E':
nodes_diff = degree_centrality(nodes_diff=nodes_diff, weighted=True,
scores1=scores1, scores2=scores2,
metric='pre-E')
# PageRank centrality based on pre-E (PRC-E)
elif node_metric == 'PRC-E':
nodes_diff = pagerank_centrality(nodes_diff=nodes_diff, metric='pre-E',
scores1=scores1, scores2=scores2)
# PageRank centrality based on pre-P (PRC-P)
elif node_metric == 'PRC-P':
nodes_diff = pagerank_centrality(nodes_diff=nodes_diff, metric='pre-P',
scores1=scores1, scores2=scores2)
else:
raise ValueError(f"Invalid node metric '{node_metric}'. Choose from: 'STC', 'DC-P', 'DC-E', 'WDC-P', 'WDC-E', 'PRC-P' or 'PRC-E'.")
if path is not None:
nodes_diff.to_csv(path)
return nodes_diff
# Compute absolute differences in edge scores between two contexts
def _subtract_edges(scores1, scores2, metrics, included_cols=None):
if not scores1['label1'].equals(scores2['label1']) or not scores1['label2'].equals(scores2['label2']):
raise ValueError('Context a and b need to have the same structure.')
edges_diff = scores1[['label1', 'label2']].copy()
if included_cols is not None:
for column in included_cols:
edges_diff[column] = scores1[column].copy()
for met in metrics:
signed_metric = met + '_signed'
edges_diff[signed_metric] = scores1[met] - scores2[met]
edges_diff[met] = abs(scores1[met] - scores2[met])
return edges_diff