import numpy as np
import pandas as pd
from scipy import stats
from typing import Tuple
# Convert Cohen's d to point-biserial r for ttest edges.
# Uses context sample sizes to account for unequal groups:
# equal sizes (n1==n2): r = d / sqrt(d² + 4)
# unequal sizes: r = d / sqrt(d² + (n1+n2)² / (n1*n2))
[docs]
def cohens_d_to_r(scores1, scores2, n1: int, n2: int):
scores1 = scores1.copy()
scores2 = scores2.copy()
correction = (n1 + n2) ** 2 / (n1 * n2)
for scores in [scores1, scores2]:
mask = scores['test_type'] == 'ttest'
if mask.any():
d = scores.loc[mask, 'raw-E'].to_numpy()
scores.loc[mask, 'raw-E'] = d / np.sqrt(d ** 2 + correction)
return scores1, scores2
# Std rescaling (divide raw-E by pooled std per test type)
[docs]
def std_rescaling(scores1, scores2, metric='std-E'):
scores1 = scores1.copy()
scores2 = scores2.copy()
if metric != 'std-E':
raise ValueError(f"Invalid metric '{metric}'. Only 'std-E' is supported.")
metric_raw = 'raw-E'
scores1[metric] = np.nan
scores2[metric] = np.nan
if not scores1['test_type'].equals(scores2['test_type']):
raise ValueError("scores1 and scores2 must have identical 'test_type' columns.")
for test in np.unique(scores1['test_type']):
s1f = scores1[scores1['test_type'] == test]
s2f = scores2[scores2['test_type'] == test]
values = np.concatenate([s1f[metric_raw].to_numpy(), s2f[metric_raw].to_numpy()])
std = np.std(values)
if std == 0:
rescaled1 = rescaled2 = 0
else:
rescaled1 = s1f[metric_raw] / std
rescaled2 = s2f[metric_raw] / std
scores1.loc[scores1['test_type'] == test, metric] = rescaled1
scores2.loc[scores2['test_type'] == test, metric] = rescaled2
return scores1, scores2
# Probit rescaling (rank-based normalization)
[docs]
def probit_rescaling(scores1, scores2, metric='probit-E'):
scores1 = scores1.copy()
scores2 = scores2.copy()
if metric != 'probit-E':
raise ValueError(f"Invalid metric '{metric}'. Only 'probit-E' is supported.")
metric_raw = 'raw-E'
scores1[metric] = np.nan
scores2[metric] = np.nan
if not scores1['test_type'].equals(scores2['test_type']):
raise ValueError("scores1 and scores2 must have identical 'test_type' columns.")
for test in np.unique(scores1['test_type']):
idx1 = scores1['test_type'] == test
idx2 = scores2['test_type'] == test
v1 = scores1.loc[idx1, metric_raw].to_numpy()
v2 = scores2.loc[idx2, metric_raw].to_numpy()
combined = np.concatenate([v1, v2])
n = len(combined)
# Folded probit: rank |raw-E| so association strength (not sign) determines rank.
# Percentile mapped to (0.5, 1) so norm.ppf gives values in (0, +inf).
# Sign is restored afterward, so strong negative associations rank equally to
# strong positive ones. For non-negative test types sign=+1 always (no-op).
signs = np.sign(combined)
ranks = stats.rankdata(np.abs(combined))
percentiles = 0.5 + ranks / (n + 1) * 0.5
probit_magnitude = stats.norm.ppf(percentiles)
probit_vals = signs * probit_magnitude
scores1.loc[idx1, metric] = probit_vals[:len(v1)]
scores2.loc[idx2, metric] = probit_vals[len(v1):]
return scores1, scores2
def _separate_types(all_data, meta_file) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Separating the data into ordinal, nominal, continuous and binary variables.
:param all_data: DataFrame with all data
:param meta_file: DataFrame with metadata of the variables
:return: tuple with the ordinal, nominal, continuous and binary variables
"""
# Check if meta_file has an invalid type
if not meta_file['type'].str.lower().isin(['ordinal', 'nominal', 'binary', 'continuous']).all():
raise ValueError("Invalid type found in meta_file. Allowed types are 'ordinal', 'nominal', 'binary', and 'continuous'.")
# Extract ordinal phenotypes
ord_data = all_data.iloc[:, all_data.columns.isin(meta_file[meta_file.type.str.lower() == 'ordinal'].label)].copy()
# Extract nominal phenotypes
nom_data = all_data.iloc[:, all_data.columns.isin(meta_file[meta_file.type.str.lower() == 'nominal'].label)].copy()
# Extract binary phenotypes
bi_data = all_data.iloc[:, all_data.columns.isin(meta_file[meta_file.type.str.lower() == 'binary'].label)].copy()
# Extract continuous phenotypes
cont_data = all_data.iloc[:, all_data.columns.isin(meta_file[meta_file.type.str.lower() == 'continuous'].label)].copy()
return ord_data, nom_data, cont_data, bi_data
def _df_to_numpy(df: pd.DataFrame):
cols = df.columns
df_np = df.to_numpy(dtype=np.float64).copy()
return df_np, cols