Source code for modina.context_net_inference

# Adapted from https://github.com/DyHealthNet/DHN-backend.git

from modina.statistics_utils import _df_to_numpy, _separate_types

import os
import numpy as np
import pandas as pd
import napypi as napy
import logging
import itertools
from typing import Optional, Tuple

EXCLUDED_EFFECTS = {'chi2', 'phi', 't', 'F', 'U', 'H'}


[docs] def calculate_association_scores(ord_data, nom_data, cont_data, bi_data, test_type='nonparametric', num_workers=1, nan_value=-89.0, correction='bh') -> pd.DataFrame: cont_data = cont_data.copy() if not cont_data.select_dtypes(include=[np.number]).shape[1] == cont_data.shape[1]: raise ValueError('Continuous data contains non-numeric columns.') # Remap categories to start at 0 and be consecutive integers nom_data = _order_categories(nom_data) bi_data = _order_categories(bi_data) cont_nom_results = napy_nom_cont(cont_data, nom_data, test=test_type, num_workers=num_workers, nan_value=nan_value) logging.info("Finished continuous-nominal score calculation.") cont_ord_results = napy_ord_cont(cont_data, ord_data, num_workers=num_workers, nan_value=nan_value) logging.info("Finished continuous-ordinal score calculation.") bi_cont_results = napy_bi_cont(cont_data, bi_data, test=test_type, num_workers=num_workers, nan_value=nan_value) logging.info("Finished continuous-binary score calculation.") bi_nom_results = napy_bi_nom(nom_data, bi_data, num_workers=num_workers, nan_value=nan_value) logging.info("Finished binary-nominal score calculation.") cont_cont_results = napy_cont_cont(cont_data, test=test_type, num_workers=num_workers, nan_value=nan_value) logging.info("Finished continuous-continuous score calculation") bi_ord_results = napy_bi_ord(ord_data, bi_data, num_workers=num_workers, nan_value=nan_value) logging.info("Finished binary-ordinal score calculation.") ord_nom_results = napy_ord_nom(ord_data, nom_data, num_workers=num_workers, nan_value=nan_value) logging.info("Finished ordinal-nominal score calculation.") scores = _combine_tests(cont_nom_results, cont_ord_results, bi_cont_results, bi_nom_results, cont_cont_results, bi_ord_results, ord_nom_results) # Take the adjusted p-value and the corresponding effect size column_names = scores.iloc[:, 2:].columns if correction == 'bh': correction = 'benjamini_hb' elif correction == 'by': correction = 'benjamini_yek' else: raise ValueError(f"Invalid correction method '{correction}'. Choose from: 'bh' or 'yek'.") p_adj = '_p_' + correction p_columns = [column for column in column_names if p_adj in column] e_columns = [column for column in column_names if '_e_' in column] scores_final = scores[['label1', 'label2']].copy() for i in scores.index: for p_type in p_columns: if pd.notna(scores.loc[i, p_type]): test = p_type.split('_')[0] e_type = [column for column in e_columns if test in column][0] scores_final.loc[i, 'raw-P'] = scores.loc[i, p_type] scores_final.loc[i, 'raw-E'] = scores.loc[i, e_type] scores_final.loc[i, 'test_type'] = test break scores_final = scores_final.drop_duplicates(subset=['label1', 'label2', 'test_type'], keep='first') return scores_final
[docs] def compute_context_scores(context_data: pd.DataFrame, meta_file: pd.DataFrame, test_type: str = 'nonparametric', correction: str = 'bh', num_workers: int = 1, path: Optional[str] = None, nan_value: Optional[float] = None, name: str = 'context1') -> pd.DataFrame: """ Compute association scores for a given context. :param context_data: Raw context data (rows: samples, columns: variables). :param meta_file: Metadata file containing a 'label' and 'type' column to specify the data type of each variable. :param test_type: Type of tests to use for network inference. Defaults to 'nonparametric'. :param correction: Correction method for multiple testing. Defaults to 'bh'. :param num_workers: Number of workers for parallel processing. Defaults to 1. :param path: Optional path to save the computed scores as a CSV file. Defaults to None. :param nan_value: Numerical value used for NaN values in the context data. If None, an error will be raised if such values are present. Defaults to None. :param name: Name of the context. Used for saving files. Defaults to 'context'. :return: A pd.DataFrame containing the computed association scores. """ # Check nan values and input format context, nan_value = _check_input_data(context=context_data, meta_file=meta_file, nan_value=nan_value) # Separate the data into categorical and continuous data ord, nom, cont, bi = _separate_types(context, meta_file) # Handle categorical variables with only one category by adding dummy rows with p-value 1.0 and effect size 0.0 ord, nom, bi, cont, dummy = _create_dummy_associations(ord=ord, nom=nom, bi=bi, cont=cont, meta_file=meta_file, test_type=test_type, nan_value=nan_value) # Calculate scores scores = calculate_association_scores(ord_data=ord, nom_data=nom, cont_data=cont, bi_data=bi, test_type=test_type, num_workers=num_workers, nan_value=nan_value, correction=correction) scores = pd.concat([scores, dummy], ignore_index=True) # Sort l1 = scores[['label1', 'label2']].min(axis=1) l2 = scores[['label1', 'label2']].max(axis=1) scores['label1'] = l1 scores['label2'] = l2 scores = scores.sort_values(by=['label1', 'label2', 'test_type']).reset_index(drop=True) # Replace NaN values in p-values with 1.0 and in effect sizes with 0.0; assign correct test type if test_type == 'parametric': map = { ('continuous', 'continuous'): 'pearson', ('continuous', 'nominal'): 'anova', ('continuous', 'ordinal'): 'spearman', ('binary', 'continuous'): 'ttest', ('binary', 'binary'): 'chi2', ('binary', 'nominal'): 'chi2', ('binary', 'ordinal'): 'mwu', ('ordinal', 'ordinal'): 'spearman', ('nominal', 'ordinal'): 'kruskal', ('nominal', 'nominal'): 'chi2' } elif test_type == 'nonparametric': map = { ('continuous', 'continuous'): 'spearman', ('continuous', 'nominal'): 'kruskal', ('continuous', 'ordinal'): 'spearman', ('binary', 'continuous'): 'mwu', ('binary', 'binary'): 'chi2', ('binary', 'nominal'): 'chi2', ('binary', 'ordinal'): 'mwu', ('ordinal', 'ordinal'): 'spearman', ('nominal', 'ordinal'): 'kruskal', ('nominal', 'nominal'): 'chi2' } else: raise ValueError(f"Invalid test type '{test_type}'. Specify 'parametric' or 'nonparametric' for association testing.") meta = meta_file.set_index('label')['type'].to_dict() for col in scores.columns: if col == 'raw-E': scores[col] = scores[col].fillna(0.0) elif col == 'raw-P': scores[col] = scores[col].fillna(1.0) elif col == 'test_type': scores[col] = scores[col].fillna(scores.apply(lambda row: map.get(tuple(sorted((meta[row['label1']], meta[row['label2']]))), 'unknown'), axis=1)) # Save scores if path is not None: file = os.path.join(path, f"{name}_scores.csv") scores.to_csv(file, index=False) return scores
[docs] def napy_bi_nom(nom_phenotypes: pd.DataFrame, bi_phenotypes: pd.DataFrame, num_workers=8, nan_value=-89.0): # Combine nominal and binary phenotypes for chi-squared test discrete_phenotypes = pd.concat([nom_phenotypes, bi_phenotypes], axis=1) discrete_phenotypes, cols = _df_to_numpy(discrete_phenotypes) if discrete_phenotypes.shape[1] < 2: return [None] output = napy.chi_squared(discrete_phenotypes, axis=1, threads=num_workers, nan_value=nan_value, use_numba=False) results = _napy_formatting(output, [cols], 'chi2') assert results is not None, "Results should not be None here." return [results]
[docs] def napy_nom_cont(cont_phenotypes: pd.DataFrame, nom_phenotypes: pd.DataFrame, test: str='nonparametric', num_workers=8, nan_value=-89.0): if nom_phenotypes.shape[1] < 1 or cont_phenotypes.shape[1] < 1: return [None] cont_phenotypes, cont_cols = _df_to_numpy(cont_phenotypes) nom_phenotypes, nom_cols = _df_to_numpy(nom_phenotypes) result = None done_test = None if test == 'parametric': result = napy.anova(cat_data=nom_phenotypes, cont_data=cont_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "anova" elif test == 'nonparametric': result = napy.kruskal_wallis(cat_data=nom_phenotypes, cont_data=cont_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "kruskal" else: raise ValueError(f"Invalid test type '{test}'. Specify 'parametric' or 'nonparametric' for nominal-continuous association testing.") return [_napy_formatting(result, [nom_cols, cont_cols], done_test)]
[docs] def napy_ord_nom(ord_phenotypes: pd.DataFrame, nom_phenotypes: pd.DataFrame, num_workers=8, nan_value=-89.0): if nom_phenotypes.shape[1] < 1 or ord_phenotypes.shape[1] < 1: return [None] ord_phenotypes, ord_cols = _df_to_numpy(ord_phenotypes) nom_phenotypes, nom_cols = _df_to_numpy(nom_phenotypes) result = napy.kruskal_wallis(cat_data=nom_phenotypes, cont_data=ord_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "kruskal" return [_napy_formatting(result, [nom_cols, ord_cols], done_test)]
[docs] def napy_bi_cont(cont_phenotypes: pd.DataFrame, bi_phenotypes: pd.DataFrame, test: str='nonparametric', num_workers=8, nan_value=-89.0): if bi_phenotypes.shape[1] < 1 or cont_phenotypes.shape[1] < 1: return [None] cont_phenotypes, cont_cols = _df_to_numpy(cont_phenotypes) bi_phenotypes_two, bi_cols = _df_to_numpy(bi_phenotypes) result = None done_test = None if test == 'parametric': result = napy.ttest(bin_data=bi_phenotypes_two, cont_data=cont_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "ttest" elif test == 'nonparametric': result = napy.mwu(bin_data=bi_phenotypes_two, cont_data=cont_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "mwu" else: raise ValueError(f"Invalid test type '{test}'. Specify 'parametric' or 'nonparametric' for binary-continuous association testing.") results = [_napy_formatting(result, [bi_cols, cont_cols], done_test)] return results
[docs] def napy_bi_ord(ord_phenotypes: pd.DataFrame, bi_phenotypes: pd.DataFrame, num_workers=8, nan_value=-89.0): if bi_phenotypes.shape[1] < 1 or ord_phenotypes.shape[1] < 1: return [None] ord_phenotypes, ord_cols = _df_to_numpy(ord_phenotypes) bi_phenotypes_two, bi_cols = _df_to_numpy(bi_phenotypes) result = napy.mwu(bin_data=bi_phenotypes_two, cont_data=ord_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "mwu" results = [_napy_formatting(result, [bi_cols, ord_cols], done_test)] return results
[docs] def napy_cont_cont(cont_phenotypes: pd.DataFrame, test: str='nonparametric', num_workers=8, nan_value=-89.0): if cont_phenotypes.shape[1] < 2: return [None] cont_phenotypes, cont_cols = _df_to_numpy(cont_phenotypes) result = None done_test = None if test == 'parametric': result = napy.pearsonr(cont_phenotypes, nan_value=nan_value, threads=num_workers, axis=1) done_test = "pearson" elif test == 'nonparametric': result = napy.spearmanr(cont_phenotypes, threads=num_workers, nan_value=nan_value, axis=1) done_test = "spearman" else: raise ValueError(f"Invalid test type '{test}'. Specify 'parametric' or 'nonparametric' for continuous-continuous association testing.") return [_napy_formatting(result, [cont_cols], done_test)]
[docs] def napy_ord_cont(cont_phenotypes: pd.DataFrame, ord_phenotypes: pd.DataFrame, num_workers=8, nan_value=-89.0): if cont_phenotypes.shape[1] < 1 or ord_phenotypes.shape[1] < 1: return [None] combined_phenotypes = pd.concat([cont_phenotypes, ord_phenotypes], axis=1) combined_phenotypes, combined_cols = _df_to_numpy(combined_phenotypes) cont_phenotypes, cont_cols = _df_to_numpy(cont_phenotypes) ord_phenotypes, ord_cols = _df_to_numpy(ord_phenotypes) result = napy.spearmanr(combined_phenotypes, threads=num_workers, nan_value=nan_value, axis=1) done_test = "spearman" return [_napy_formatting(result, [combined_cols], done_test, ord_cols=ord_cols.tolist(), cont_cols=cont_cols.tolist())]
# Check input format of context data def _check_input_data(context: pd.DataFrame, meta_file: pd.DataFrame, nan_value: Optional[float] = None) -> Tuple[pd.DataFrame, float]: """ Check if the input data is in the expected format. Check for missing values and categorical variables that only have one category. :param context: Context data. :param meta_file: Metadata file containing one row per variable in the context data. :param nan_value: Numerical value used for NaN values in the context data. If None, an error will be raised if such values are present. :return: The checked context data. """ # Check if context is a DataFrame if not isinstance(context, pd.DataFrame): raise ValueError('The context data should be provided as a pandas DataFrame.') # Check if meta_file is a DataFrame if not isinstance(meta_file, pd.DataFrame): raise ValueError('The meta_file should be provided as a pandas DataFrame.') # Check if meta_file contains required columns required_columns = {'label', 'type'} if not required_columns.issubset(set(meta_file.columns)): raise ValueError(f'The meta_file should contain the following columns: {required_columns}.') # Check if all variables in context are present in meta_file context_vars = set(context.columns) meta_vars = set(meta_file['label'].values) if not context_vars.issubset(meta_vars): missing_vars = context_vars - meta_vars raise ValueError(f'The following variables are missing in the meta_file: {missing_vars}.') # Search for non-numeric and NaN values if context.apply(lambda col: pd.to_numeric(col, errors="coerce").isna()).values.any() > 0: if nan_value is not None: logging.warning(f'The context data still contains non-numeric or NaN values. These will be replaced by the specified nan_value {nan_value}.') context = context.apply(pd.to_numeric, errors="coerce") context = context.fillna(nan_value) else: raise ValueError('The context data contains non-numeric or NaN values. Please clean the data and/or specify a nan_value to replace these values.') else: if nan_value is None: # Find a value that does not exist in the data to use as nan_value for napy existing = set(context.stack().values) nan_value = -999.0 while True: if nan_value not in existing: break else: nan_value -= 1.0 logging.info(f'The context data does not contain any missing values. ' f'For statistical tests, {nan_value} will be used as ' f'the NaN replacement, as this value does not occur in the data. ' f'If you want to specify a different value, please provide it via the \'nan_value\' argument.') # Check if binary varialbes only have two unique values (excluding NaN) bi_phenotypes = context[meta_file[meta_file['type'] == 'binary']['label']] invalid_cols = [col for col in bi_phenotypes.columns if bi_phenotypes[col][bi_phenotypes[col] != nan_value].nunique() > 2] if invalid_cols: raise ValueError(f"These variables are not binary, but were specified as such: {invalid_cols}") return context, nan_value def _combine_tests(*result_groups) -> pd.DataFrame: all_results = [] for results in result_groups: merged = None for test in results: if test is None: continue # Convert all columns except 'label1' and 'label2' to float32 cols_to_convert = test.columns.difference(['label1', 'label2']) test[cols_to_convert] = test[cols_to_convert].astype('float32') if merged is None: merged = test continue merged = pd.merge(merged, test, on=['label1', 'label2'], how='outer') all_results.append(merged) out = pd.concat(all_results, ignore_index=True) out = out.sort_values(by=['label1', 'label2'], kind='mergesort').reset_index(drop=True) return out def _create_dummy_associations(ord, nom, bi, cont, meta_file, test_type, nan_value): all_data = pd.concat([ord, nom, bi, cont], axis=1) const_vars = [] # Find all variables that have only one observed category/value (excluding NaN) for df in [ord, nom, bi, cont]: for col in df.columns: if df[col][df[col] != nan_value].nunique() <= 1: const_vars.append(col) if const_vars: logging.warning(f'The following variables have only one observed value/category or entirely missing values in one of the contexts: {const_vars}. For these variables, all related association scores in that context will be set to p-value 1.0 and effect size 0.0.') ord = ord.drop(columns=const_vars, errors='ignore') nom = nom.drop(columns=const_vars, errors='ignore') bi = bi.drop(columns=const_vars, errors='ignore') cont = cont.drop(columns=const_vars, errors='ignore') other_vars = all_data.columns.difference(const_vars) meta = meta_file.set_index('label')['type'].to_dict() if test_type == 'parametric': map = { ('continuous', 'continuous'): 'pearson', ('continuous', 'nominal'): 'anova', ('continuous', 'ordinal'): 'spearman', ('binary', 'continuous'): 'ttest', ('binary', 'binary'): 'chi2', ('binary', 'nominal'): 'chi2', ('binary', 'ordinal'): 'mwu', ('ordinal', 'ordinal'): 'spearman', ('nominal', 'ordinal'): 'kruskal', ('nominal', 'nominal'): 'chi2' } elif test_type == 'nonparametric': map = { ('continuous', 'continuous'): 'spearman', ('continuous', 'nominal'): 'kruskal', ('continuous', 'ordinal'): 'spearman', ('binary', 'continuous'): 'mwu', ('binary', 'binary'): 'chi2', ('binary', 'nominal'): 'chi2', ('binary', 'ordinal'): 'mwu', ('ordinal', 'ordinal'): 'spearman', ('nominal', 'ordinal'): 'kruskal', ('nominal', 'nominal'): 'chi2' } else: raise ValueError(f"Invalid test type '{test_type}'. Specify 'parametric' or 'nonparametric' for association testing.") rows = [] for var1 in const_vars: for var2 in other_vars: pair = tuple(sorted((meta[var1], meta[var2]))) test = map.get(pair) row = {'label1': var1, 'label2': var2, 'raw-P': 1.0, 'raw-E': 0.0, 'test_type': test} rows.append(row) for var1, var2 in itertools.combinations(const_vars, 2): pair = tuple(sorted((meta[var1], meta[var2]))) test = map.get(pair) row = {'label1': var1, 'label2': var2, 'raw-P': 1.0, 'raw-E': 0.0, 'test_type': test} rows.append(row) return ord, nom, bi, cont, pd.DataFrame(rows) return ord, nom, bi, cont, pd.DataFrame() def _napy_formatting(assoc_out: dict[np.array], labels: list, test: str, ord_cols: Optional[list] = None, cont_cols: Optional[list] = None, file_name: Optional[str] = None) -> Optional[pd.DataFrame]: if not assoc_out: return None if len(labels) == 1: rows_idx, cols_idx = np.tril_indices(assoc_out['p_unadjusted'].shape[0], k=-1) # Pre-format labels and values label1 = np.array(labels[0])[rows_idx] label2 = np.array(labels[0])[cols_idx] else: rows_idx, cols_idx = np.indices(assoc_out['p_unadjusted'].shape) label1 = np.array(labels[0])[rows_idx.ravel()] label2 = np.array(labels[1])[cols_idx.ravel()] p_values_raw = {key: assoc_out[key][rows_idx, cols_idx].ravel() for key in assoc_out if key.startswith('p_')} effects_raw = {key: assoc_out[key][rows_idx, cols_idx].ravel() for key in assoc_out if not key.startswith('p_') and key not in EXCLUDED_EFFECTS} p_columns = [f"{test}_{key}" for key in p_values_raw.keys()] e_columns = [f"{test}_e_{key}" for key in effects_raw.keys()] df = pd.DataFrame({ 'label1': label1, 'label2': label2, **{p_columns[i]: p_values_raw[key] for i, key in enumerate(p_values_raw)}, **{e_columns[i]: effects_raw[key] for i, key in enumerate(effects_raw)}, }) if ord_cols is not None and cont_cols is not None: mask = ( (df["label1"].isin(ord_cols) & df["label2"].isin(cont_cols)) | (df["label1"].isin(cont_cols) & df["label2"].isin(ord_cols)) | (df["label1"].isin(ord_cols) & df["label2"].isin(ord_cols)) ) df = df[mask] if file_name: df.to_csv(file_name, sep=',', index=True, header=False, lineterminator='\n') return df def _order_categories(data: pd.DataFrame): """ Order categories in a dataframe such that they start at 0 and are consecutive integers. :param data: the dataframe to order :return: the ordered dataframe """ data = data.copy() order_table = {col: {o: n for n, o in enumerate(sorted(data[col].dropna().unique()))} for col in data.columns} for col, mapping in order_table.items(): data[col] = data[col].map(mapping).astype(pd.Int64Dtype()) return data