Source code for modina.context_net_inference

# Adapted from https://github.com/DyHealthNet/DHN-backend.git

from modina.statistics_utils import _df_to_numpy, _separate_types

import numpy as np
import pandas as pd
import napypi as napy
import logging
from typing import Optional, Tuple

EXCLUDED_EFFECTS = {'chi2', 'phi', 't', 'F', 'U', 'H'}


[docs] def calculate_association_scores(ord_data, nom_data, cont_data, bi_data, test_type='nonparametric', num_workers=1, nan_value=-89) -> pd.DataFrame: cont_data = cont_data.copy() if not cont_data.select_dtypes(include=[np.number]).shape[1] == cont_data.shape[1]: raise ValueError('Continuous data contains non-numeric columns.') # Remap categories to start at 0 and be consecutive integers nom_data = _order_categories(nom_data) bi_data = _order_categories(bi_data) cont_nom_results = napy_nom_cont(cont_data, nom_data, test=test_type, num_workers=num_workers, nan_value=nan_value) logging.info("Finished continuous-nominal score calculation.") cont_ord_results = napy_ord_cont(cont_data, ord_data, num_workers=num_workers, nan_value=nan_value) logging.info("Finished continuous-ordinal score calculation.") bi_cont_results = napy_bi_cont(cont_data, bi_data, test=test_type, num_workers=num_workers, nan_value=nan_value) logging.info("Finished continuous-binary score calculation.") bi_nom_results = napy_bi_nom(nom_data, bi_data, num_workers=num_workers, nan_value=nan_value) logging.info("Finished binary-nominal score calculation.") cont_cont_results = napy_cont_cont(cont_data, test=test_type, num_workers=num_workers, nan_value=nan_value) logging.info("Finished continuous-continuous score calculation") bi_ord_results = napy_bi_ord(ord_data, bi_data, num_workers=num_workers, nan_value=nan_value) logging.info("Finished binary-ordinal score calculation.") ord_nom_results = napy_ord_nom(ord_data, nom_data, num_workers=num_workers, nan_value=nan_value) logging.info("Finished ordinal-nominal score calculation.") scores = _combine_tests(cont_nom_results, cont_ord_results, bi_cont_results, bi_nom_results, cont_cont_results, bi_ord_results, ord_nom_results) return scores
[docs] def compute_context_scores(context_data: pd.DataFrame, meta_file: pd.DataFrame, test_type: str = 'nonparametric', correction: str = 'bh', num_workers: int = 1, path: Optional[str] = None, nan_value: Optional[int] = None) -> pd.DataFrame: """ Compute association scores for a given context. :param context_data: The raw context data (rows: samples, columns: variables). :param meta_file: Metadata file containing a 'label' and 'type' column to specify the data type of each variable. :param test_type: Type of tests to use for network inference. Defaults to 'nonparametric'. :param correction: Correction method for multiple testing. Defaults to 'bh'. :param num_workers: Number of workers for parallel processing. Defaults to 1. :param path: Optional path to save the computed scores as a CSV file. Defaults to None. :param nan_value: Numerical value used for NaN values in the context data. If None, an error will be raised if such values are present. Defaults to None. :return: A pd.DataFrame containing the computed association scores. """ # Check nan values and input format context_data, nan_value = _check_input_data(context=context_data, meta_file=meta_file, nan_value=nan_value) # Separate the data into categorical and continuous data ord, nom, cont, bi = _separate_types(context_data, meta_file) # Calculate scores scores = calculate_association_scores(ord_data=ord, nom_data=nom, cont_data=cont, bi_data=bi, test_type=test_type, num_workers=num_workers, nan_value=nan_value) # Take the adjusted p-value and the corresponding effect size column_names = scores.iloc[:, 2:].columns if correction == 'bh': correction = 'benjamini_hb' elif correction == 'by': correction = 'benjamini_yek' else: raise ValueError(f"Invalid correction method '{correction}'. Choose from: 'bh' or 'yek'.") p_adj = '_p_' + correction p_columns = [column for column in column_names if p_adj in column] e_columns = [column for column in column_names if '_e_' in column] scores_final = scores[['label1', 'label2']].copy() for i in scores.index: for p_type in p_columns: if pd.notna(scores.loc[i, p_type]): test = p_type.split('_')[0] e_type = [column for column in e_columns if test in column][0] scores_final.loc[i, 'raw-P'] = scores.loc[i, p_type] scores_final.loc[i, 'raw-E'] = scores.loc[i, e_type] scores_final.loc[i, 'test_type'] = test break scores_final = scores_final.drop_duplicates(subset=['label1', 'label2', 'test_type'], keep='first') scores_final = scores_final.sort_values(by=['label1', 'label2', 'test_type']).reset_index(drop=True) # Save scores if path is not None: scores_final.to_csv(path, index=False) return scores_final
[docs] def napy_bi_nom(nom_phenotypes: pd.DataFrame, bi_phenotypes: pd.DataFrame, num_workers=8, nan_value=-89): # Combine nominal and binary phenotypes for chi-squared test discrete_phenotypes = pd.concat([nom_phenotypes, bi_phenotypes], axis=1) discrete_phenotypes, cols = _df_to_numpy(discrete_phenotypes) if discrete_phenotypes.shape[1] < 2: return [None] output = napy.chi_squared(discrete_phenotypes, axis=1, threads=num_workers, nan_value=nan_value, use_numba=False) results = _napy_formatting(output, [cols], 'chi2') assert results is not None, "Results should not be None here." for col in results.columns: if "_e_" in col: results[col] = results[col].fillna(0.0) elif "_p_" in col: results[col] = results[col].fillna(1.0) return [results]
[docs] def napy_nom_cont(cont_phenotypes: pd.DataFrame, nom_phenotypes: pd.DataFrame, test: str='nonparametric', num_workers=8, nan_value=-89): if nom_phenotypes.shape[1] < 1 or cont_phenotypes.shape[1] < 1: return [None] cont_phenotypes, cont_cols = _df_to_numpy(cont_phenotypes) nom_phenotypes, nom_cols = _df_to_numpy(nom_phenotypes) result = None done_test = None if test == 'parametric': result = napy.anova(cat_data=nom_phenotypes, cont_data=cont_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "anova" elif test == 'nonparametric': result = napy.kruskal_wallis(cat_data=nom_phenotypes, cont_data=cont_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "kruskal" else: raise ValueError(f"Invalid test type '{test}'. Specify 'parametric' or 'nonparametric' for nominal-continuous association testing.") return [_napy_formatting(result, [nom_cols, cont_cols], done_test)]
[docs] def napy_ord_nom(ord_phenotypes: pd.DataFrame, nom_phenotypes: pd.DataFrame, num_workers=8, nan_value=-89): if nom_phenotypes.shape[1] < 1 or ord_phenotypes.shape[1] < 1: return [None] ord_phenotypes, ord_cols = _df_to_numpy(ord_phenotypes) nom_phenotypes, nom_cols = _df_to_numpy(nom_phenotypes) result = napy.kruskal_wallis(cat_data=nom_phenotypes, cont_data=ord_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "kruskal" return [_napy_formatting(result, [nom_cols, ord_cols], done_test)]
[docs] def napy_bi_cont(cont_phenotypes: pd.DataFrame, bi_phenotypes: pd.DataFrame, test: str='nonparametric', num_workers=8, nan_value=-89): if bi_phenotypes.shape[1] < 1 or cont_phenotypes.shape[1] < 1: return [None] if (bi_phenotypes.nunique() > 2).any(): raise ValueError("All binary variables must not have more than two unique values.") cont_phenotypes, cont_cols = _df_to_numpy(cont_phenotypes) bi_phenotypes_two, bi_cols = _df_to_numpy(bi_phenotypes) result = None done_test = None if test == 'parametric': result = napy.ttest(bin_data=bi_phenotypes_two, cont_data=cont_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "ttest" elif test == 'nonparametric': result = napy.mwu(bin_data=bi_phenotypes_two, cont_data=cont_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "mwu" else: raise ValueError(f"Invalid test type '{test}'. Specify 'parametric' or 'nonparametric' for binary-continuous association testing.") results = [_napy_formatting(result, [bi_cols, cont_cols], done_test)] # Special case: variables with only one category bi_phenotypes_one = bi_phenotypes.loc[:, bi_phenotypes.nunique() <= 1].copy() bi_phenotypes_one, bi_cols_one = _df_to_numpy(bi_phenotypes_one) if bi_phenotypes_one.shape[1] > 0: logging.warning( f'There were binary variables found with only one category: {bi_cols_one}. ' 'These will be added as dummy rows with p-value 1.0 and effect size 0.0.') # For each combination of bi_cols_one and cont_cols, add a row with p-value 1.0 and effect size 0.0 for i, df in enumerate(results): if df is None: continue p_cols = [col for col in df.columns if "_p_" in col] e_cols = [col for col in df.columns if "_e_" in col] special_rows = [] for bi_var in bi_cols_one: for cont_var in cont_cols: row = { 'label1': bi_var, 'label2': cont_var } for col in p_cols: row[col] = 1.0 for col in e_cols: row[col] = 0.0 special_rows.append(row) # Add special case rows to results results[i] = pd.concat([df, pd.DataFrame(special_rows)], ignore_index=True) # Replace NaNs results_final = [] for df in results: if df is None: results_final.append(None) continue df = df.copy() p_cols = [col for col in df.columns if "_p_" in col] e_cols = [col for col in df.columns if "_e_" in col] if p_cols: df[p_cols] = df[p_cols].fillna(1.0) if e_cols: df[e_cols] = df[e_cols].fillna(0.0) results_final.append(df) return results_final
[docs] def napy_bi_ord(ord_phenotypes: pd.DataFrame, bi_phenotypes: pd.DataFrame, num_workers=8, nan_value=-89): if bi_phenotypes.shape[1] < 1 or ord_phenotypes.shape[1] < 1: return [None] if (bi_phenotypes.nunique() > 2).any(): raise ValueError("All binary variables must not have more than two unique values.") ord_phenotypes, ord_cols = _df_to_numpy(ord_phenotypes) bi_phenotypes_two, bi_cols = _df_to_numpy(bi_phenotypes) result = napy.mwu(bin_data=bi_phenotypes_two, cont_data=ord_phenotypes, axis=1, threads=num_workers, nan_value=nan_value) done_test = "mwu" results = [_napy_formatting(result, [bi_cols, ord_cols], done_test)] # Special case: variables with only one category bi_phenotypes_one = bi_phenotypes.loc[:, bi_phenotypes.nunique() <= 1].copy() bi_phenotypes_one, bi_cols_one = _df_to_numpy(bi_phenotypes_one) if bi_phenotypes_one.shape[1] > 0: logging.warning( f'There were binary variables found with only one category: {bi_cols_one}. ' 'These will be added as dummy rows with p-value 1.0 and effect size 0.0.') # For each combination of bi_cols_one and ord_cols, add a row with p-value 1.0 and effect size 0.0 for i, df in enumerate(results): if df is None: continue p_cols = [col for col in df.columns if "_p_" in col] e_cols = [col for col in df.columns if "_e_" in col] special_rows = [] for bi_var in bi_cols_one: for ord_var in ord_cols: row = { 'label1': bi_var, 'label2': ord_var } for col in p_cols: row[col] = 1.0 for col in e_cols: row[col] = 0.0 special_rows.append(row) # Add special case rows to results results[i] = pd.concat([df, pd.DataFrame(special_rows)], ignore_index=True) # Replace NaNs results_final = [] for df in results: if df is None: results_final.append(None) continue df = df.copy() p_cols = [col for col in df.columns if "_p_" in col] e_cols = [col for col in df.columns if "_e_" in col] if p_cols: df[p_cols] = df[p_cols].fillna(1.0) if e_cols: df[e_cols] = df[e_cols].fillna(0.0) results_final.append(df) return results_final
[docs] def napy_cont_cont(cont_phenotypes: pd.DataFrame, test: str='nonparametric', num_workers=8, nan_value=-89): if cont_phenotypes.shape[1] < 2: return [None] cont_phenotypes, cont_cols = _df_to_numpy(cont_phenotypes) result = None done_test = None if test == 'parametric': result = napy.pearsonr(cont_phenotypes, nan_value=nan_value, threads=num_workers, axis=1) done_test = "pearson" elif test == 'nonparametric': result = napy.spearmanr(cont_phenotypes, threads=num_workers, nan_value=nan_value, axis=1) done_test = "spearman" else: raise ValueError(f"Invalid test type '{test}'. Specify 'parametric' or 'nonparametric' for continuous-continuous association testing.") return [_napy_formatting(result, [cont_cols], done_test)]
[docs] def napy_ord_cont(cont_phenotypes: pd.DataFrame, ord_phenotypes: pd.DataFrame, num_workers=8, nan_value=-89): if cont_phenotypes.shape[1] < 1 or ord_phenotypes.shape[1] < 1: return [None] combined_phenotypes = pd.concat([cont_phenotypes, ord_phenotypes], axis=1) combined_phenotypes, combined_cols = _df_to_numpy(combined_phenotypes) cont_phenotypes, cont_cols = _df_to_numpy(cont_phenotypes) ord_phenotypes, ord_cols = _df_to_numpy(ord_phenotypes) result = napy.spearmanr(combined_phenotypes, threads=num_workers, nan_value=nan_value, axis=1) done_test = "spearman" return [_napy_formatting(result, [combined_cols], done_test, ord_cols=ord_cols.tolist(), cont_cols=cont_cols.tolist())]
# Check input format of context data def _check_input_data(context: pd.DataFrame, meta_file: pd.DataFrame, nan_value: Optional[int] = None) -> Tuple[pd.DataFrame, int]: """ Check if the input data is in the expected format and check for missing values. :param context: The context data to check. :param meta_file: Metadata file containing one row per variable in the context data. :param nan_value: Numerical value used for NaN values in the context data. If None, an error will be raised if such values are present. :return: The checked context data. """ # Check if context is a DataFrame if not isinstance(context, pd.DataFrame): raise ValueError('The context data should be provided as a pandas DataFrame.') # Check if meta_file is a DataFrame if not isinstance(meta_file, pd.DataFrame): raise ValueError('The meta_file should be provided as a pandas DataFrame.') # Check if meta_file contains required columns required_columns = {'label', 'type'} if not required_columns.issubset(set(meta_file.columns)): raise ValueError(f'The meta_file should contain the following columns: {required_columns}.') # Check if all variables in context are present in meta_file context_vars = set(context.columns) meta_vars = set(meta_file['label'].values) if not context_vars.issubset(meta_vars): missing_vars = context_vars - meta_vars raise ValueError(f'The following variables are missing in the meta_file: {missing_vars}.') # Search for non-numeric and NaN values if context.apply(lambda col: pd.to_numeric(col, errors="coerce").isna()).values.any() > 0: if nan_value is not None: logging.warning(f'The context data still contains non-numeric or NaN values. These will be replaced by the specified nan_value {nan_value}.') context = context.apply(pd.to_numeric, errors="coerce") context = context.fillna(nan_value) else: raise ValueError('The context data contains non-numeric or NaN values. Please clean the data and/or specify a nan_value to replace these values.') else: if nan_value is None: # Find a value that does not exist in the data use as nan_value for napy existing = set(context.stack().values) while True: nan_value = np.random.randint(-10**5, -10**3) if nan_value not in existing: break logging.warning(f'The context data does not contain any missing values. ' f'For statistical tests, the randomly generated value {nan_value} will be used as ' f'the NaN replacement, as this value does not occur in the data. ' f'If you want to specify a different value, please provide it via the \'nan_value\' argument.' ) return context, nan_value def _combine_tests(*result_groups) -> pd.DataFrame: all_results = [] for results in result_groups: merged = None for test in results: if test is None: continue # Convert all columns except 'label1' and 'label2' to float32 cols_to_convert = test.columns.difference(['label1', 'label2']) test[cols_to_convert] = test[cols_to_convert].astype('float32') if merged is None: merged = test continue merged = pd.merge(merged, test, on=['label1', 'label2'], how='outer') all_results.append(merged) out = pd.concat(all_results, ignore_index=True) out = out.sort_values(by=['label1', 'label2'], kind='mergesort').reset_index(drop=True) return out def _napy_formatting(assoc_out: dict[np.array], labels: list, test: str, ord_cols: Optional[list] = None, cont_cols: Optional[list] = None, file_name: Optional[str] = None) -> Optional[pd.DataFrame]: if not assoc_out: return None if len(labels) == 1: rows_idx, cols_idx = np.tril_indices(assoc_out['p_unadjusted'].shape[0], k=-1) # Pre-format labels and values label1 = np.array(labels[0])[rows_idx] label2 = np.array(labels[0])[cols_idx] else: rows_idx, cols_idx = np.indices(assoc_out['p_unadjusted'].shape) label1 = np.array(labels[0])[rows_idx.ravel()] label2 = np.array(labels[1])[cols_idx.ravel()] p_values_raw = {key: assoc_out[key][rows_idx, cols_idx].ravel() for key in assoc_out if key.startswith('p_')} effects_raw = {key: assoc_out[key][rows_idx, cols_idx].ravel() for key in assoc_out if not key.startswith('p_') and key not in EXCLUDED_EFFECTS} p_columns = [f"{test}_{key}" for key in p_values_raw.keys()] e_columns = [f"{test}_e_{key}" for key in effects_raw.keys()] df = pd.DataFrame({ 'label1': label1, 'label2': label2, **{p_columns[i]: p_values_raw[key] for i, key in enumerate(p_values_raw)}, **{e_columns[i]: effects_raw[key] for i, key in enumerate(effects_raw)}, }) if ord_cols is not None and cont_cols is not None: mask = ( (df["label1"].isin(ord_cols) & df["label2"].isin(cont_cols)) | (df["label1"].isin(cont_cols) & df["label2"].isin(ord_cols)) | (df["label1"].isin(ord_cols) & df["label2"].isin(ord_cols)) ) df = df[mask] if file_name: df.to_csv(file_name, sep=',', index=True, header=False, lineterminator='\n') return df def _order_categories(data: pd.DataFrame): """ Order categories in a dataframe such that they start at 0 and are consecutive integers. :param data: the dataframe to order :return: the ordered dataframe """ data = data.copy() order_table = {col: {o: n for n, o in enumerate(sorted(data[col].dropna().unique()))} for col in data.columns} for col, mapping in order_table.items(): data[col] = data[col].map(mapping).astype(pd.Int64Dtype()) return data