Source code for modina.edge_filtering

from modina.statistics_utils import pre_rescaling

import logging
import math
import os
from typing import Tuple, Optional
import pandas as pd
import numpy as np


# Edge filtering
[docs] def filter(scores1: pd.DataFrame, scores2: pd.DataFrame, context1: pd.DataFrame, context2: pd.DataFrame, filter_method: Optional[str] = None, filter_param: float = 0.0, filter_metric: Optional[str] = None, filter_rule: Optional[str]=None, path: Optional[str] = None) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Filter association scores and context data based on the specified filtering configurations. :param scores1: Statistical association scores of Context 1. :param scores2: Statistical association scores of Context 2. :param context1: The first context for the differential network analysis. :param context2: The second context for the differential network analysis. :param filter_method: Method used for filtering. Defaults to None. :param filter_param: Parameter for the specified filtering method. Defaults to 0.0. :param filter_metric: Edge metric used for filtering. Defaults to None. :param filter_rule: Rule to integrate the networks during filtering. Defaults to None. :param path: Optional path to save the filtered scores and context data as CSV files. Defaults to None. :return: A tuple containing the filtered scores and context data. """ if not scores1['label1'].equals(scores2['label1']) and not scores1['label2'].equals(scores2['label2']): raise ValueError('scores1 and scores2 need to have the same structure and order of edges.') # Rescaling #TODO: change pre-E and pre-P to E and P (since the metric names should only correspond to differential scores) if not 'pre-E' in scores1.columns or not 'pre-E' in scores2.columns: scores1, scores2 = pre_rescaling(scores1=scores1, scores2=scores2, metric='pre-E') if not 'pre-P' in scores1.columns or not 'pre-P' in scores2.columns: scores1, scores2 = pre_rescaling(scores1=scores1, scores2=scores2, metric='pre-P') # Check input parameters if filter_method is None: raise ValueError("Please provide a 'filter_method'.") if filter_metric is None: raise ValueError("Please provide a 'filter_metric'.") if filter_rule is None: raise ValueError("Please provide a 'filter_rule'.") if filter_param is None: raise ValueError("Please provide a 'filter_param'.") # Set sorting order based on filter metric ascending = True if filter_metric == 'pre-P' else False # Compute number of edges according to the specified method threshold1 = None threshold2 = None n_nodes = context1.shape[1] n_edges_before = scores1.shape[0] if filter_method == 'quantile': if filter_param <= 0.0 or filter_param > 1.0: raise ValueError("For 'quantile' filtering, 'filter_param' must be between 0 and 1.") n_filtered_edges = math.ceil(filter_param * n_edges_before) elif filter_method == 'degree': degree = filter_param if degree < 1 or degree >= n_nodes: raise ValueError(f"For 'degree' filtering, 'filter_param' must be between 1 and {n_nodes - 1}.") n_filtered_edges = math.ceil(degree * n_nodes / 2) elif filter_method == 'density': density = filter_param if density <= 0.0 or density > 1.0: raise ValueError("For 'density' filtering, 'filter_param' must be between 0 and 1.") possible_edges = n_nodes * (n_nodes - 1) / 2 n_filtered_edges = math.ceil(density * possible_edges) else: raise ValueError(f"Invalid filtering method '{filter_method}'. Choose from: 'quantile', 'degree' or 'density'") # Set threshold threshold1 = scores1[filter_metric].sort_values(ascending=ascending).iloc[n_filtered_edges - 1] threshold2 = scores2[filter_metric].sort_values(ascending=ascending).iloc[n_filtered_edges - 1] # Apply the filtering threshold to scores and raw data if provided if filter_rule == 'union': if ascending is True: mask = (scores1[filter_metric] <= threshold1) | ( scores2[filter_metric] <= threshold2) else: mask = (scores1[filter_metric] >= threshold1) | ( scores2[filter_metric] >= threshold2) # Apply mask scores1_filtered = scores1[mask].copy() scores2_filtered = scores2[mask].copy() elif filter_rule == 'zero': if ascending is True: mask1 = scores1[filter_metric] <= threshold1 mask2 = scores2[filter_metric] <= threshold2 else: mask1 = scores1[filter_metric] >= threshold1 mask2 = scores2[filter_metric] >= threshold2 # Apply mask filtered1 = scores1[mask1].copy() filtered2 = scores2[mask2].copy() filtered1 = filtered1.set_index(['label1', 'label2']) filtered2 = filtered2.set_index(['label1', 'label2']) # Unify indices and set missing values indices = filtered1.index.union(filtered2.index) scores1_filtered = filtered1.reindex(indices) scores2_filtered = filtered2.reindex(indices) fill_values = { 'raw-P': 1.0, 'pre-P': 1.0, 'raw-E': 0.0, 'pre-E': 0.0, } for metric, value in fill_values.items(): if metric in scores1_filtered.columns: scores1_filtered[metric] = scores1_filtered[metric].fillna(value) if metric in scores2_filtered.columns: scores2_filtered[metric] = scores2_filtered[metric].fillna(value) if 'test_type' in scores1_filtered.columns and 'test_type' in scores2_filtered.columns: merged_test_type = scores1_filtered['test_type'].combine_first(scores2_filtered['test_type']) scores1_filtered['test_type'] = merged_test_type scores2_filtered['test_type'] = merged_test_type scores1_filtered = scores1_filtered.reset_index() scores2_filtered = scores2_filtered.reset_index() else: raise ValueError(f"Invalid filtering rule '{filter_rule}'.") # Filter context data to only include nodes present in the filtered scores filtered_nodes = np.concatenate((scores1_filtered['label1'].values, scores1_filtered['label2'].values)) filtered_nodes = pd.unique(filtered_nodes) context1_filtered = context1[filtered_nodes].copy() context2_filtered = context2[filtered_nodes].copy() n_edges_after = scores1_filtered.shape[0] logging.info(f'Reduced the number of edges from {n_edges_before} to {n_edges_after}.') if path is not None: scores1_filtered.to_csv(os.path.join(path, 'scores1_filtered.csv'), index=False) scores2_filtered.to_csv(os.path.join(path, 'scores2_filtered.csv'), index=False) context1_filtered.to_csv(os.path.join(path, 'context1_filtered.csv'), index=False) context2_filtered.to_csv(os.path.join(path, 'context2_filtered.csv'), index=False) return scores1_filtered, scores2_filtered, context1_filtered, context2_filtered