"""
General utility functions for the pymodulon package
"""
import json
import logging
import re
from itertools import combinations
import numpy as np
import pandas as pd
from scipy import stats
from scipy.special import digamma
from sklearn.neighbors import BallTree, KDTree
from pymodulon.enrichment import FDR
################
# Type Aliases #
################
[docs]def _check_table(table, name, index=None, index_col=0):
# Set as empty dataframe if not input given
if table is None:
return pd.DataFrame(index=index)
# Load table if necessary
elif isinstance(table, str):
try:
table = pd.read_json(table)
except ValueError:
sep = "\t" if table.endswith(".tsv") else ","
table = pd.read_csv(table, index_col=index_col, sep=sep)
# Coerce indices and columns to ints if necessary
newcols = []
for col in table.columns:
try:
newcols.append(int(col))
except ValueError:
newcols.append(col)
table.columns = newcols
newrows = []
for row in table.index:
try:
newrows.append(int(row))
except ValueError:
newrows.append(row)
table.index = newrows
# Replace empty strings with None
table = table.replace("", np.nan)
if isinstance(table, pd.DataFrame):
# dont run _check_table_helper if no index is passed
return table if index is None else _check_table_helper(table, index, name)
else:
raise TypeError(
"{}_table must be a pandas DataFrame "
"filename or a valid JSON string".format(name)
)
[docs]def _check_table_helper(table, index, name):
if table.shape == (0, 0):
return pd.DataFrame(index=index)
# Check if all indices are in table
missing_index = list(set(index) - set(table.index))
if len(missing_index) > 0:
logging.warning(
"Some {} are missing from the {} table: {}".format(
name, name, missing_index
)
)
# Remove extra indices from table
table = table.reindex(index)
return table
[docs]def _check_dict(table, index_col=0):
try:
table = json.loads(table.replace("'", '"'))
except ValueError:
sep = "\t" if table.endswith(".tsv") else ","
table = pd.read_csv(table, index_col=index_col, header=None, sep=sep)
table = table.to_dict()[1]
return table
[docs]def compute_threshold(ic, dagostino_cutoff):
"""
Computes D'agostino-test-based threshold for a component of an M matrix
Parameters
----------
ic: ~pandas.Series
Pandas Series containing an independent component
dagostino_cutoff: int
Minimum D'agostino test statistic value to determine threshold
Returns
-------
iModulon threshold: list
List of thresholds for each iModulon
"""
i = 0
# Sort genes based on absolute value
ordered_genes = abs(ic).sort_values()
# Compute k2-statistic
k_square, p = stats.normaltest(ic)
# Iteratively remove gene w/ largest weight until k2-statistic < cutoff
while k_square > dagostino_cutoff:
i -= 1
k_square, p = stats.normaltest(ic.loc[ordered_genes.index[:i]])
# Select genes in iModulon
comp_genes = ordered_genes.iloc[i:]
# Slightly modify threshold to improve plotting visibility
if len(comp_genes) == len(ic.index):
return max(comp_genes) + 0.05
else:
return np.mean([ordered_genes.iloc[i], ordered_genes.iloc[i - 1]])
[docs]def dima(ica_data, sample1, sample2, threshold=5, fdr=0.1, alternate_A=None):
"""
Creates DIMA table of differentially expressed iModulons
Parameters
----------
ica_data: ~pymodulon.core.IcaData
:class:`~pymodulon.core.IcaData` data object
sample1: str or list
List of sample IDs or name of "project:condition"
sample2: str or list
List of sample IDs or name of "project:condition"
threshold: float
Minimum activity difference to determine DiMAs (default = 5)
fdr: float
False Detection Rate (default = .1)
alternate_A: ~pandas.DataFrame
Alternate A to use (default = None)
Returns
-------
results: DataFrame
Table of differentially expressed iModulons
"""
# use the undocumented alternate_A option to allow custom-built DIMCA
# activity matrix to be used in lieu of standard activty matrix
if alternate_A is not None:
A_to_use = alternate_A
else:
A_to_use = ica_data.A
_diff = pd.DataFrame()
sample1_list = _parse_sample(ica_data, sample1)
sample2_list = _parse_sample(ica_data, sample2)
for name, group in ica_data.sample_table.groupby(["project", "condition"]):
for i1, i2 in combinations(group.index, 2):
_diff[":".join(name)] = abs(A_to_use[i1] - A_to_use[i2])
dist = {}
for k in A_to_use.index:
dist[k] = stats.lognorm(*stats.lognorm.fit(_diff.loc[k].values)).cdf
res = pd.DataFrame(index=A_to_use.index)
for k in res.index:
a1 = A_to_use.loc[k, sample1_list].mean()
a2 = A_to_use.loc[k, sample2_list].mean()
res.loc[k, "difference"] = a2 - a1
res.loc[k, "pvalue"] = 1 - dist[k](abs(a1 - a2))
result = FDR(res, fdr)
return result[(abs(result.difference) > threshold)].sort_values(
"difference", ascending=False
)
[docs]def _parse_sample(ica_data, sample):
"""
Parses sample inputs into a list of sample IDs
Parameters
----------
ica_data: ~pymodulon.core.IcaData
:class:`~pymodulon.core.IcaData` data object
sample: list
Sequence of sample IDs or "project:condition"
Returns
-------
samples: list
A list of `samples`
"""
sample_table = ica_data.sample_table
if isinstance(sample, str):
proj, cond = re.search("(.*):(.*)", sample).groups()
samples = sample_table[
(sample_table.project == proj) & (sample_table.condition == cond)
].index
if len(samples) == 0:
raise ValueError(
f"No samples exist for project={proj} condition=" f"{cond}"
)
else:
return samples
else:
return sample
[docs]def explained_variance(
ica_data, genes=None, samples=None, imodulons=None, reference=None
):
"""
Computes the fraction of variance explained by iModulons (from 0 to 1)
Parameters
----------
ica_data: ~pymodulon.core.IcaData
:class:`~pymodulon.core.IcaData` data object
genes: str or list, optional
List of genes to use (default: all genes)
samples: str or list, optional
List of samples to use (default: all samples)
imodulons: int or str or list, optional
List of iModulons to use (default: all iModulons)
reference: list, optional
List of samples that represent the reference condition for the
set. If none are provided, uses the dataset-specific reference
condition.
Returns
-------
float
Fraction of variance explained by selected iModulons for selected
genes/samples
"""
# Check inputs
if genes is None:
genes = ica_data.X.index
elif isinstance(genes, str):
genes = [genes]
gene_loci = set(genes) & set(ica_data.X.index)
gene_names = set(genes) - set(ica_data.X.index)
name_loci = [ica_data.name2num(gene) for gene in gene_names]
genes = list(set(gene_loci) | set(name_loci))
if samples is None:
samples = ica_data.X.columns
elif isinstance(samples, str):
samples = [samples]
if imodulons is None:
imodulons = ica_data.M.columns
elif isinstance(imodulons, str) or isinstance(imodulons, int):
imodulons = [imodulons]
if reference is None:
centered = ica_data.X
else:
centered = ica_data.X.subtract(ica_data.X[reference].mean(axis=1), axis=0)
# Account for normalization procedures before ICA (X=SA-x_mean)
baseline = centered.subtract(centered.mean(axis=0), axis=1)
baseline = baseline.loc[genes, samples]
# Initialize variables
base_err = np.linalg.norm(baseline) ** 2
MA = np.zeros(baseline.shape)
rec_var = [0]
ma_arrs = {}
ma_weights = {}
# Get individual modulon contributions
for k in imodulons:
ma_arr = np.dot(
ica_data.M.loc[genes, k].values.reshape(len(genes), 1),
ica_data.A.loc[k, samples].values.reshape(1, len(samples)),
)
ma_arrs[k] = ma_arr
ma_weights[k] = np.sum(ma_arr**2)
# Sum components in order of most important component first
sorted_mods = sorted(ma_weights, key=ma_weights.get, reverse=True)
# Compute reconstructed variance
for k in sorted_mods:
MA = MA + ma_arrs[k]
sa_err = np.linalg.norm(MA - baseline) ** 2
rec_var.append((1 - sa_err / base_err))
return np.clip(rec_var[-1], 0, 1)
[docs]def infer_activities(ica_data, data):
"""
Infer iModulon activities for external data
Parameters
----------
ica_data: ~pymodulon.core.IcaData
:class:`~pymodulon.core.IcaData` data object
data: ~pandas.DataFrame
External expression profiles (must be centered to a reference)
Returns
-------
new_activities: ~pandas.DataFrame
Inferred activities for the expression profiles
"""
shared_genes = ica_data.M.index & data.index
x = data.loc[shared_genes].values
m = ica_data.M.loc[shared_genes].values
m_inv = np.linalg.pinv(m)
a = np.dot(m_inv, x)
return pd.DataFrame(a, index=ica_data.imodulon_names, columns=data.columns)
[docs]def mutual_info_distance(x, y):
x = np.asarray(x).reshape(x.shape[0], 1)
y = np.asarray(y).reshape(x.shape[0], 1)
h = entropy(np.hstack([x, y]))
if h == 0:
return 1
else:
return 1 - mi(x, y) / h
# the following code is taken from the NPEET package; it cannot be installed
# via pip, so the necessary functions are copied here; the package appears to
# be un-maintained, so updates are not very likely; this is the GitHub page:
# https://github.com/gregversteeg/NPEET
[docs]def mi(x, y, z=None, k=3, base=2, alpha=0):
"""Mutual information of x and y (conditioned on z if z is not None)
x, y should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]]
if x is a one-dimensional scalar and we have four samples
"""
assert len(x) == len(y), "Arrays should have same length"
assert k <= len(x) - 1, "Set k smaller than num. samples - 1"
x, y = np.asarray(x), np.asarray(y)
x, y = x.reshape(x.shape[0], -1), y.reshape(y.shape[0], -1)
x = add_noise(x)
y = add_noise(y)
points = [x, y]
if z is not None:
z = np.asarray(z)
z = z.reshape(z.shape[0], -1)
points.append(z)
points = np.hstack(points)
# Find nearest neighbors in joint space, p=inf means max-norm
tree = build_tree(points)
dvec = query_neighbors(tree, points, k)
if z is None:
a, b, c, d = (
avgdigamma(x, dvec),
avgdigamma(y, dvec),
digamma(k),
digamma(len(x)),
)
if alpha > 0:
d += lnc_correction(tree, points, k, alpha)
else:
xz = np.c_[x, z]
yz = np.c_[y, z]
a, b, c, d = (
avgdigamma(xz, dvec),
avgdigamma(yz, dvec),
avgdigamma(z, dvec),
digamma(k),
)
return max(0, (-a - b + c + d) / np.log(base))
[docs]def entropy(x, k=3, base=2):
"""The classic K-L k-nearest neighbor continuous entropy estimator
x should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]]
if x is a one-dimensional scalar and we have four samples
"""
assert k <= len(x) - 1, "Set k smaller than num. samples - 1"
x = np.asarray(x)
n_elements, n_features = x.shape
x = add_noise(x)
tree = build_tree(x)
nn = query_neighbors(tree, x, k)
const = digamma(n_elements) - digamma(k) + n_features * np.log(2)
return max(0, (const + n_features * np.log(nn).mean()) / np.log(base))
[docs]def add_noise(x, intens=1e-10):
"""Small noise to break degeneracy, see doc."""
return x + intens * np.random.random_sample(x.shape)
[docs]def build_tree(points):
if points.shape[1] >= 20:
return BallTree(points, metric="chebyshev")
return KDTree(points, metric="chebyshev")
[docs]def query_neighbors(tree, x, k):
return tree.query(x, k=k + 1)[0][:, k]
[docs]def avgdigamma(points, dvec):
"""This part finds number of neighbors in some radius in the marginal space
returns expectation value of <psi(nx)>"""
tree = build_tree(points)
dvec = dvec - 1e-15
num_points = count_neighbors(tree, points, dvec)
return np.mean(digamma(num_points))
[docs]def lnc_correction(tree, points, k, alpha):
e = 0
n_sample = points.shape[0]
for point in points:
# Find k-nearest neighbors in joint space, p=inf means max norm
knn = tree.query(point[None, :], k=k + 1, return_distance=False)[0]
knn_points = points[knn]
# Substract mean of k-nearest neighbor points
knn_points = knn_points - knn_points[0]
# Calculate covariance matrix of k-nearest neighbor points, obtain
# eigen vectors
covr = knn_points.T @ knn_points / k
_, v = np.linalg.eig(covr)
# Calculate PCA-bounding box using eigen vectors
V_rect = np.log(np.abs(knn_points @ v).max(axis=0)).sum()
# Calculate the volume of original box
log_knn_dist = np.log(np.abs(knn_points).max(axis=0)).sum()
# Perform local non-uniformity checking and update correction term
if V_rect < log_knn_dist + np.log(alpha):
e += (log_knn_dist - V_rect) / n_sample
return e
[docs]def count_neighbors(tree, x, r):
return tree.query_radius(x, r, count_only=True)