from collections import defaultdict
import networkx as nx
import numpy as np
import pandas as pd
import logging
import os
from typing import Tuple, Optional
# Compute ranking
[docs]
def compute_ranking(nodes_diff: Optional[pd.DataFrame], edges_diff: Optional[pd.DataFrame], ranking_alg: str,
path: Optional[str] = None, meta_file: Optional[pd.DataFrame] = None) -> Tuple[list, dict]:
"""
Compute a ranking based on the specified ranking algorithm.
:param nodes_diff: Differential node scores.
:param edges_diff: Differential edge scores.
:param ranking_alg: Ranking algorithm to compute. Options are 'PageRank+', 'PageRank', 'absDimontRank', 'DimontRank', 'direct_node' and 'direct_edge'.
:param meta_file: Metadata file containing a 'label' and 'type' column to specify the data type of each variable.
:param path: Optional path to save the ranking as a CSV file.
:return: A tuple containing the list of ranked nodes and a dictionary with ranked nodes per data type.
"""
if nodes_diff is not None:
node_metric = nodes_diff.columns[0]
else:
node_metric = None
if edges_diff is not None:
edge_metric = edges_diff.columns[3]
else:
edge_metric = None
# Personalized PageRank
if ranking_alg == 'PageRank+':
if nodes_diff is None or edges_diff is None:
raise ValueError("To compute 'PageRank+', please provide both 'nodes_diff' and 'edges_diff'.")
ranking_scores = pagerank(nodes_diff=nodes_diff, edges_diff=edges_diff,
node_metric=node_metric, edge_metric=edge_metric,
personalization=True)
ranks = pd.Series(ranking_scores).sort_values(ascending=False).index.tolist()
# PageRank
elif ranking_alg == 'PageRank':
if edges_diff is None:
raise ValueError("To compute 'PageRank', please provide 'edges_diff'.")
ranking_scores = pagerank(edges_diff=edges_diff, edge_metric=edge_metric,
personalization=False)
ranks = pd.Series(ranking_scores).sort_values(ascending=False).index.tolist()
# DimontRank with absolute difference
elif ranking_alg == 'absDimontRank':
if edges_diff is None:
raise ValueError("To compute 'absDimontRank', please provide 'edges_diff'.")
ranking_scores = dimontrank(edges_diff=edges_diff, edge_metric=edge_metric, mode='abs')
ranks = pd.Series(ranking_scores).sort_values(ascending=False).index.tolist()
# DimontRank with signed difference
elif ranking_alg == 'DimontRank':
if edges_diff is None:
raise ValueError("To compute 'DimontRank', please provide 'edges_diff'.")
ranking_scores = dimontrank(edges_diff=edges_diff, edge_metric=edge_metric, mode='signed')
ranks = pd.Series(ranking_scores).sort_values(ascending=False).index.tolist()
# Direct ranking based on node metric
elif ranking_alg == 'direct_node':
if nodes_diff is None:
raise ValueError("To compute 'direct_node', please provide 'nodes_diff'.")
ranking_scores = nodes_diff[node_metric]
ranks = pd.Series(ranking_scores).sort_values(ascending=False).index.tolist()
elif ranking_alg == 'direct_edge':
if edges_diff is None or edge_metric is None:
raise ValueError("To compute 'direct_edge', please provide 'edges_diff'.")
ranking_scores = edges_diff[['label1', 'label2', edge_metric]].copy()
ranking_scores = ranking_scores.sort_values(by=edge_metric, ascending=False).reset_index(drop=True)
#ranks = ranking_scores[['label1', 'label2']].values.tolist()
ranks = (
ranking_scores[['label1', 'label2']]
.apply(lambda row: '_'.join(sorted(row)), axis=1)
.tolist()
)
else:
raise ValueError(f"Invalid ranking algorithm {ranking_alg}. "
"Choose from: 'PageRank+', 'PageRank', 'absDimontRank', 'DimontRank', 'direct_node' or 'direct_edge'.")
rank_cont = []
rank_nom = []
rank_ord = []
rank_bi = []
if ranking_alg != 'direct_edge':
if meta_file is None:
logging.warning('No meta_file was provided. Rankings per data type cannot be computed.')
else:
meta_file = meta_file.set_index('label')
for node in ranks:
node_type = meta_file.at[node, 'type']
if node_type == 'continuous':
rank_cont.append(node)
elif node_type == 'ordinal':
rank_ord.append(node)
elif node_type == 'nominal':
rank_nom.append(node)
elif node_type == 'binary':
rank_bi.append(node)
else:
raise ValueError(f"Invalid node type '{node_type}' for node '{node}' in meta_file.")
else:
logging.info('Ranking per type is not available for direct_edge ranking algorithm.')
if path is not None:
ranking_df = pd.DataFrame({"node": ranks, "rank": range(1, len(ranks) + 1)})
ranking_df = ranking_df.sort_values("node")
ranking_df.to_csv(path, index=False)
if ranking_alg != 'direct_edge' and meta_file is not None:
rank_cont_df = pd.DataFrame({"node": rank_cont, "rank": range(1, len(rank_cont) + 1)})
rank_ord_df = pd.DataFrame({"node": rank_ord, "rank": range(1, len(rank_ord) + 1)})
rank_nom_df = pd.DataFrame({"node": rank_nom, "rank": range(1, len(rank_nom) + 1)})
rank_bi_df = pd.DataFrame({"node": rank_bi, "rank": range(1, len(rank_bi) + 1)})
rank_cont_path = os.path.splitext(path)[0] + '_continuous.csv'
rank_ord_path = os.path.splitext(path)[0] + '_ordinal.csv'
rank_nom_path = os.path.splitext(path)[0] + '_nominal.csv'
rank_bi_path = os.path.splitext(path)[0] + '_binary.csv'
rank_cont_df.to_csv(rank_cont_path, index=False)
rank_ord_df.to_csv(rank_ord_path, index=False)
rank_nom_df.to_csv(rank_nom_path, index=False)
rank_bi_df.to_csv(rank_bi_path, index=False)
return ranks, {'cont': rank_cont, 'ord': rank_ord, 'nom': rank_nom, 'bi': rank_bi}
# PageRank algorithm
# DimontRank algorithm
[docs]
def dimontrank(edges_diff, edge_metric, mode='abs'):
if mode == 'signed':
# Check for valid edge metric
if edge_metric not in ['pre-P', 'pre-E', 'pre-PE', 'pre-LS', 'int-IS']:
raise ValueError(f"DimontRank can only be applied with edge metrics 'pre-P', 'pre-E', 'pre-PE', 'pre-LS', or 'int-IS'. But '{edge_metric}' was provided.")
edge_metric = edge_metric + '_signed'
sums = defaultdict(float)
counts = defaultdict(int)
for n1, n2, w in zip(edges_diff['label1'], edges_diff['label2'], edges_diff[edge_metric]):
sums[n1] += w
sums[n2] += w
counts[n1] += 1
counts[n2] += 1
return {n: (np.abs(sums[n] / counts[n]) if counts[n] != 0 else 0) for n in sums}