"""
Core functions for the IcaData object
"""
import copy
import logging
import re
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.cluster import KMeans
from tqdm import tqdm_notebook as tqdm
from pymodulon.enrichment import (
compute_annotation_enrichment,
compute_regulon_enrichment,
compute_trn_enrichment,
contingency,
)
from pymodulon.util import _check_dict, _check_table, compute_threshold
[docs]class IcaData(object):
"""
Class representation of all iModulon-related data
"""
def __init__(
self,
M,
A,
X=None,
log_tpm=None,
gene_table=None,
sample_table=None,
imodulon_table=None,
trn=None,
dagostino_cutoff=None,
optimize_cutoff=False,
thresholds=None,
threshold_method="dagostino",
motif_info=None,
imodulondb_table=None,
gene_links=None,
tf_links=None,
):
"""
Initialize IcaData object
Parameters
----------
M : str or ~pandas.DataFrame
`M` matrix from ICA
A : str or ~pandas.DataFrame
`A` matrix from ICA
X : str or ~pandas.DataFrame, optional
log-TPM expression matrix, centered to reference condition(s) (
default: None)
log_tpm : str or ~pandas.DataFrame, optional
Raw `log-TPM` expression matrix (without centering) (default: None)
gene_table : str or ~pandas.DataFrame, optional
Table containing genome annotation (default: None)
sample_table : str or ~pandas.DataFrame, optional
Table containing sample metadata (default: None)
imodulon_table : str or ~pandas.DataFrame, optional
Table containing iModulon names, enrichments, and annotations
(default: None)
trn : str or ~pandas.DataFrame, optional
Table mapping transcriptional regulators to target genes (
default: None)
dagostino_cutoff : int, optional
Cutoff value to use for the D'agostino test for iModulon gene
thresholds. This option will be ignored if optimize_cutoff is True,
if threshold_method is "kmeans", or if custom thresholds are
provided.
optimize_cutoff : bool
If true, optimize the D'agostino cutoff for iModulon threshold using
the TRN (if provided). This option will be ignored if
threshold_method is "kmeans" or if custom thresholds are
provided. (default: False)
thresholds : dict, optional
Dictionary mapping custom thresholds to iModulons (default: None). If
numerical thresholds are supplied, this supercedes all related options (
i.e. `threshold_method`, `optimize_cutoff`, `dagostino_cutoff`).
threshold_method : str
Either "dagostino" (default with TRN) or "kmeans" (default if no TRN
provided)
imodulondb_table : dict, optional
Dictionary of general dataset information for the details box on the
dataset page of iModulonDB and other iModulonDB features (default: None)
gene_links : dict, optional
dictionary of genes to links in an external database (default: None)
tf_links : dict, optional
Dictionary of TFs (from the TRN) to links in a database (default:
None)
"""
#########################
# Load M and A matrices #
#########################
M = _check_table(M, "M")
A = _check_table(A, "A")
# Convert column names of M to int if possible
try:
M.columns = M.columns.astype(int)
except TypeError:
pass
# Check that M and A matrices have identical iModulon names
if M.columns.tolist() != A.index.tolist():
raise ValueError("M and A matrices have different iModulon names")
# Ensure that M and A matrices have unique indices/columns
if M.index.duplicated().any():
raise ValueError("M matrix contains duplicate gene names")
if A.columns.duplicated().any():
raise ValueError("A matrix contains duplicate sample names")
if M.columns.duplicated().any():
raise ValueError("M and A matrices contain " "duplicate iModulon names")
# Store M and A
self._m = M
self._a = A
#################
# Load X matrix #
#################
# Check X matrix
if X is None:
self._x = None
else:
self.X = X
if log_tpm is None:
self._log_tpm = None
else:
self.log_tpm = log_tpm
####################
# Load data tables #
####################
# Initialize sample and gene names
self._gene_names = M.index.tolist()
self.gene_table = gene_table
self._sample_names = A.columns.tolist()
self.sample_table = sample_table
self._imodulon_names = M.columns.tolist()
self.imodulon_table = imodulon_table
############
# Load TRN #
############
self.trn = trn
# Initialize thresholds either with or without optimization
self._cutoff_optimized = False
if thresholds is not None:
# Throw a warning if user was expecting d'agostino optimization
if optimize_cutoff:
logging.warning(
"Using manually input thresholds. D'agostino "
"optimization will not be performed."
)
self.thresholds = thresholds
self._dagostino_cutoff = None
# Use kmeans if TRN is empty, or kmeans is selected
elif self.trn.empty or threshold_method == "kmeans":
# Throw a warning if user was expecting d'agostino optimization
if optimize_cutoff:
logging.warning(
"Using Kmeans threshold method. D'agostino "
"optimization will not be performed."
)
self.compute_kmeans_thresholds()
self._dagostino_cutoff = None
# Else use D'agostino method
elif threshold_method == "dagostino":
if optimize_cutoff:
logging.warning(
"Optimizing iModulon thresholds, may take 2-3 minutes..."
)
# this function sets self.dagostino_cutoff internally
self.reoptimize_thresholds(progress=False, plot=False)
# also sets an attribute to tell us if we've done
# this optimization; only reasonable to try it
# again if the user uploads a new TRN
elif dagostino_cutoff is None:
logging.warning(
"Using the default dagostino_cutoff of 550. This may "
"not be optimal for your dataset. Use "
"ica_data.reoptimize_thresholds() to find the optimal "
"threshold."
)
self._dagostino_cutoff = 550
self.recompute_thresholds(self.dagostino_cutoff)
else:
self._dagostino_cutoff = dagostino_cutoff
self.recompute_thresholds(self.dagostino_cutoff)
# Capture improper threshold methods
else:
raise ValueError('Threshold method must either be "dagostino" or "kmeans"')
##############
# Motif Info #
##############
self.motif_info = motif_info
##############################
# Load iModulonDB Properties #
##############################
# initialize links
self.imodulondb_table = imodulondb_table
self.gene_links = gene_links
self.tf_links = tf_links
# Initialize COG colors
if "COG" in self.gene_table.columns:
self.gene_table.COG = self.gene_table.COG.fillna("No COG category")
cogs = sorted(self.gene_table.COG.unique())
self.cog_colors = dict(
zip(
cogs,
[
"red",
"pink",
"y",
"orchid",
"mediumvioletred",
"green",
"lightgray",
"lightgreen",
"slategray",
"blue",
"saddlebrown",
"turquoise",
"lightskyblue",
"c",
"skyblue",
"lightblue",
"fuchsia",
"dodgerblue",
"lime",
"sandybrown",
"black",
"goldenrod",
"chocolate",
"orange",
],
)
)
# Initialize motif info
self._motif_info = {}
if motif_info is not None:
for k1, v1 in motif_info.items():
params = {}
for k2, v2 in v1.items():
try:
params[k2] = pd.read_json(v2, orient="table")
except ValueError:
params[k2] = v2
self._motif_info[k1] = MotifInfo(**params)
@property
[docs] def M(self):
"""Get M matrix"""
return self._m
@property
[docs] def M_binarized(self):
"""Get binarized version of M matrix based on current thresholds"""
m_binarized = pd.DataFrame().reindex_like(self.M)
m_binarized[:] = 0
for imodulon in m_binarized.columns:
in_imodulon = abs(self.M[imodulon]) > self.thresholds[imodulon]
m_binarized.loc[in_imodulon, imodulon] = 1
return m_binarized
@property
[docs] def A(self):
"""Get A matrix"""
return self._a
@property
[docs] def X(self):
"""Get X matrix"""
return self._x
@X.setter
def X(self, x_matrix):
x = _check_table(x_matrix, "X")
# Check that gene and sample names conform to M and A matrices
if x.columns.tolist() != self.A.columns.tolist():
raise ValueError("X and A matrices have different sample names")
if x.index.tolist() != self.M.index.tolist():
raise ValueError("X and M matrices have different gene names")
# Set x matrix
self._x = x
@property
[docs] def log_tpm(self):
"""Get 'log_tpm' matrix"""
return self._log_tpm
@log_tpm.setter
def log_tpm(self, lt_matrix):
log_tpm = _check_table(lt_matrix, "log-TPM")
# Check that gene and sample names conform to M and A matrices
if log_tpm.columns.tolist() != self.A.columns.tolist():
raise ValueError("log-TPM and A matrices have different sample " "names")
if log_tpm.index.tolist() != self.M.index.tolist():
raise ValueError("log-TPM and M matrices have different gene names")
# Set log-tpm matrix
self._log_tpm = log_tpm
# Gene, sample and iModulon name properties
@property
[docs] def imodulon_names(self):
"""Get iModulon names"""
return self._imodulon_table.index.tolist()
@imodulon_names.setter
def imodulon_names(self, new_names):
self._update_imodulon_names(new_names)
@property
[docs] def sample_names(self):
"""Get sample names"""
return self._sample_table.index.tolist()
@property
[docs] def gene_names(self):
"""Get gene names"""
return self._gene_table.index.tolist()
# Gene, sample and iModulon tables
@property
[docs] def gene_table(self):
"""Get gene table"""
return self._gene_table
@gene_table.setter
def gene_table(self, new_table):
table = _check_table(new_table, "gene", self._gene_names)
self._gene_table = table
# Update gene names
names = table.index
self._gene_names = names
self._m.index = names
if self._x is not None:
self._x.index = names
@property
[docs] def sample_table(self):
"""Get sample table"""
return self._sample_table
@sample_table.setter
def sample_table(self, new_table):
table = _check_table(new_table, "sample", self._sample_names)
self._sample_table = table
# Update sample names
names = table.index
self._sample_names = names
self._a.columns = names
if self._x is not None:
self._x.columns = names
@property
[docs] def imodulon_table(self):
"""Get table of iModulons"""
return self._imodulon_table
@imodulon_table.setter
def imodulon_table(self, new_table):
table = _check_table(new_table, "imodulon", self._imodulon_names)
self._imodulon_table = table
# TRN
@property
[docs] def trn(self):
"""Get table with TRN information"""
return self._trn
@trn.setter
def trn(self, new_trn):
self._trn = _check_table(new_trn, "TRN", index_col=None)
if not self._trn.empty:
# Check that regulator and gene_id columns are filled in
if self._trn.regulator.isnull().any():
raise ValueError('Null value detected in "regulator" column ' "of TRN")
if self._trn.gene_id.isnull().any():
raise ValueError('Null value detected in "gene_id" column ' "of TRN")
# Make sure regulators do not contain / or + characters
for reg in self._trn.regulator.unique():
if "+" in reg or "/" in reg:
logging.warning(
"The characters '+' and '/' are used for combining "
"regulons and cannot be in regulator names. These "
"characters will be replaced with ';'"
)
break
self._trn.regulator = [
re.sub("\\+", ";", re.sub("/", ";", reg)) for reg in self._trn.regulator
]
# Only include genes that are in S/X matrix
extra_genes = set(self._trn.gene_id) - set(self.gene_names)
if len(extra_genes) > 0:
logging.warning(
"The following genes are in the TRN but not in "
"your M "
"matrix: {}".format(extra_genes)
)
self._trn = self._trn[self._trn.gene_id.isin(self.gene_names)]
# Save regulator information to gene table
reg_dict = {}
for name, group in self._trn.groupby("gene_id"):
reg_dict[name] = ",".join(group.regulator)
self._gene_table["regulator"] = pd.Series(reg_dict).reindex(self.gene_names)
# mark that our cutoffs are no longer optimized since the TRN
self._cutoff_optimized = False
# Motif information
@property
[docs] def motif_info(self):
"""Get motif info"""
return self._motif_info
@motif_info.setter
def motif_info(self, info):
self._motif_info = info
[docs] def _update_imodulon_names(self, new_names):
"""
Iterates and updates iModulon names
Parameters
----------
new_names: list
New names to update iModulons
Returns
-------
None: None
"""
name_series = pd.Series(new_names, index=self.imodulon_names)
# Check if new names are duplicates
dups = name_series[name_series.duplicated(keep=False)]
if len(dups) > 0:
seen = {}
# For duplicated names, add a "-1" or "-2" etc.
for key, val in dups.items():
if val in seen.keys():
name_series[key] = val + "-" + str(seen[val])
seen[val] += 1
else:
name_series[key] = val + "-1"
seen[val] = 2
logging.warning(
"Duplicate iModulon names detected. iModulon {} will "
"be renamed to {}".format(key, name_series[key])
)
# Update thresholds and motif info
for old_name, new_name in name_series.items():
self._thresholds[new_name] = self._thresholds.pop(old_name)
try:
self._motif_info[new_name] = self._motif_info.pop(old_name)
except KeyError:
pass
# Update iModulon names
final_names = name_series.values.tolist()
self._imodulon_names = final_names
self._a.index = final_names
self._m.columns = final_names
self._imodulon_table.index = final_names
[docs] def rename_imodulons(self, name_dict=None, column=None) -> None:
"""
Rename an iModulon.
Parameters
----------
name_dict : dict
Dictionary mapping old iModulon names to new names
(e.g. {old_name:new_name}) (default: None)
column : str
Uses a column from the iModulon table to rename iModulons (
default: None)
Returns
-------
None: None
"""
# Rename using the column parameter if given
if column is not None:
raise DeprecationWarning(
"column paramter will be removed soon. Please "
"use 'ica_data.imodulon_names = "
"ica_data.imodulon_table[column]'"
)
else:
new_names = [
name_dict[name] if name in name_dict.keys() else name
for name in self.imodulon_names
]
self._update_imodulon_names(new_names)
# Show enriched
[docs] def view_imodulon(self, imodulon):
"""
View genes in an iModulon and show relevant information about each gene.
Parameters
----------
imodulon : int or str
Name of iModulon
Returns
-------
final_rows: ~pandas.DataFrame
Table showing iModulon gene information
"""
# Find genes in iModulon
in_imodulon = abs(self.M[imodulon]) > self.thresholds[imodulon]
# Get gene weights information
gene_weights = self.M.loc[in_imodulon, imodulon]
gene_weights.name = "gene_weight"
gene_rows = self.gene_table.loc[in_imodulon]
final_rows = pd.concat([gene_weights, gene_rows], axis=1)
return final_rows
[docs] def find_single_gene_imodulons(self, save=False):
"""
A simple function that returns the names of all likely single-gene
iModulons. Checks if the largest iModulon gene weight is more
than twice the weight of the second highest iModulon gene weight.
Parameters
----------
save : bool
If true, save output to imodulon_table (default: False)
Returns
-------
single_genes_imodulons: list
List of single-gene iModulons
"""
single_genes_imodulons = []
for imodulon in self.imodulon_names:
sorted_weights = abs(self.M[imodulon]).sort_values(ascending=False)
if sorted_weights.iloc[0] > 2 * sorted_weights.iloc[1]:
single_genes_imodulons.append(imodulon)
if save:
self.imodulon_table.loc[imodulon, "single_gene"] = True
return single_genes_imodulons
###############
# Enrichments #
###############
[docs] def _update_imodulon_table(self, enrichment):
"""
Update iModulon table given new iModulon enrichments
Parameters
----------
enrichment : ~pandas.Series or ~pandas.DataFrame iModulon enrichment
Returns
-------
None: None
"""
if isinstance(enrichment, pd.Series):
enrichment = pd.DataFrame(enrichment)
keep_rows = self.imodulon_table[
~self.imodulon_table.index.isin(enrichment.index)
]
keep_cols = self.imodulon_table.loc[
enrichment.index, set(self.imodulon_table.columns) - set(enrichment.columns)
]
df_top_enrich = pd.concat([enrichment, keep_cols], axis=1)
new_table = pd.concat([keep_rows, df_top_enrich], sort=False)
# Reorder columns
col_order = enrichment.columns.tolist() + keep_cols.columns.tolist()
new_table = new_table[col_order]
# Reorder rows
new_table = new_table.reindex(self.imodulon_names)
self.imodulon_table = new_table
[docs] def compute_regulon_enrichment(
self, imodulon, regulator, save=False, evidence=None
):
"""
Compare an iModulon against a regulon. (Note: q-values cannot be
computed for single enrichments)
Parameters
----------
imodulon: int or str
Name of 'iModulon'
regulator: str
TF name, or complex regulon, where "/" uses genes in any regulon
and "+" uses genes in all regulons
save: bool
If true, save enrichment score to the imodulon_table (default: True)
evidence: list or str
'Evidence' level of TRN interactions to include during TRN
enrichment (default: None)
Returns
-------
enrich: ~pandas.Series
Table containing enrichment statistics
"""
if evidence is not None:
if isinstance(evidence, str):
evidences_to_use = [evidence]
else:
evidences_to_use = evidence
if "evidence" in self.trn.columns:
trn_to_use = self.trn[self.trn["evidence"].isin(evidences_to_use)]
else:
logging.warning(
'TRN does not contain an "evidence" column. Ignoring '
"evidence argument."
)
trn_to_use = self.trn
else:
trn_to_use = self.trn
imod_genes = self.view_imodulon(imodulon).index
enrich = compute_regulon_enrichment(
set(imod_genes), regulator, set(self.gene_names), trn_to_use
)
enrich.rename({"gene_set_size": "imodulon_size"}, inplace=True)
if save:
table = self.imodulon_table
for key, value in enrich.items():
table.loc[imodulon, key] = value
table.loc[imodulon, "regulator"] = enrich.name
self.imodulon_table = table
return enrich
[docs] def compute_trn_enrichment(
self,
imodulons=None,
fdr=1e-5,
max_regs=1,
save=False,
method="both",
force=False,
evidence=None,
) -> pd.DataFrame:
"""
Compare iModulons against all regulons in the TRN
Parameters
----------
imodulons: int or str
Name of iModulon(s). If none given, compute enrichments for all
'iModulons' (default: None)
fdr : float
False detection rate (default: 1e-5)
max_regs : int
Maximum number of regulators to include in complex regulon (
default: 1)
save : bool
Save regulons with highest enrichment scores to the
imodulon_table (default: False)
method : str
How to combine multiple regulators (default: 'both').
'or' computes enrichment against union of regulons,
'and' computes enrichment against intersection of regulons, and
'both' performs both tests
force : bool
If false, prevents computation of >2 regulators (default: False)
evidence: list or str
Evidence level of TRN interactions to include during TRN enrichment
Returns
-------
df_enriched: ~pandas.DataFrame
Table of statistically significant enrichments
"""
enrichments = []
if imodulons is None:
imodulon_list = self.imodulon_names
elif isinstance(imodulons, str) or isinstance(imodulons, int):
imodulon_list = [imodulons]
else:
imodulon_list = imodulons
if evidence is not None:
if isinstance(evidence, str):
evidences_to_use = [evidence]
else:
evidences_to_use = evidence
trn_to_use = self.trn[self.trn["evidence"].isin(evidences_to_use)]
else:
trn_to_use = self.trn
for imodulon in imodulon_list:
gene_list = self.view_imodulon(imodulon).index
df_enriched = compute_trn_enrichment(
set(gene_list),
set(self.gene_names),
trn_to_use,
max_regs=max_regs,
fdr=fdr,
method=method,
force=force,
)
df_enriched["imodulon"] = imodulon
enrichments.append(df_enriched)
df_enriched = pd.concat(enrichments, axis=0, sort=True)
# Set regulator as column instead of axis
df_enriched.index.name = "regulator"
df_enriched.reset_index(inplace=True)
# Reorder columns
df_enriched.rename({"gene_set_size": "imodulon_size"}, inplace=True, axis=1)
col_order = [
"imodulon",
"regulator",
"pvalue",
"qvalue",
"precision",
"recall",
"f1score",
"TP",
"regulon_size",
"imodulon_size",
"n_regs",
]
df_enriched = df_enriched[col_order]
# Sort by q-value
df_enriched.sort_values(["imodulon", "qvalue", "n_regs"])
if save:
df_top_enrich = df_enriched.drop_duplicates("imodulon")
self._update_imodulon_table(df_top_enrich.set_index("imodulon"))
return df_enriched
[docs] def compute_annotation_enrichment(
self, annotation, column, imodulons=None, fdr=0.1
) -> pd.DataFrame:
"""
Compare iModulons against a gene annotation table
Parameters
----------
annotation : ~pandas.DataFrame
Table containing two columns: the gene locus tag, and its
appropriate annotation
column : str
Name of the column containing the annotation
imodulons : list or str or int
Name of iModulon(s). If none given, compute enrichments for all
iModulons (default: None)
fdr : float
False detection rate (default: 0.1)
Returns
-------
DF_enriched: ~pandas.DataFrame
Table of statistically significant enrichments
"""
# TODO: write test function
# TODO: Figure out save function
enrichments = []
if imodulons is None:
imodulon_list = self.imodulon_names
elif isinstance(imodulons, str):
imodulon_list = [imodulons]
else:
imodulon_list = imodulons
for imodulon in imodulon_list:
gene_list = self.view_imodulon(imodulon).index
df_enriched = compute_annotation_enrichment(
set(gene_list),
set(self.gene_names),
column=column,
annotation=annotation,
fdr=fdr,
)
df_enriched["imodulon"] = imodulon
enrichments.append(df_enriched)
DF_enriched = pd.concat(enrichments, axis=0, sort=True)
# Set annotation name as column instead of axis
DF_enriched.index.name = column
DF_enriched.reset_index(inplace=True)
# Rename column
DF_enriched.rename({"gene_set_size": "imodulon_size"}, inplace=True, axis=1)
enrich_col = DF_enriched.columns[0]
col_order = [
"imodulon",
enrich_col,
"pvalue",
"qvalue",
"precision",
"recall",
"f1score",
"TP",
"target_set_size",
"imodulon_size",
]
DF_enriched = DF_enriched[col_order]
return DF_enriched
######################################
# Threshold properties and functions #
######################################
@property
[docs] def dagostino_cutoff(self):
"""Get D'agostino cutoff"""
return self._dagostino_cutoff
@property
[docs] def cutoff_optimized(self):
return self._cutoff_optimized
@property
[docs] def thresholds(self):
"""Get thresholds"""
return self._thresholds
@thresholds.setter
def thresholds(self, new_thresholds):
"""Set thresholds"""
new_thresh_len = len(new_thresholds)
imod_names_len = len(self._imodulon_names)
if new_thresh_len != imod_names_len:
raise ValueError(
"new_threshold has {:d} elements, but should "
"have {:d} elements".format(new_thresh_len, imod_names_len)
)
# fix json peculiarity of saving int dict keys as string
thresh_copy = new_thresholds.copy()
for key in thresh_copy.keys():
# Could this be replaced with a try/except clause?
if isinstance(key, str) and all([char.isdigit() for char in key]):
new_thresholds.update({int(key): new_thresholds.pop(key)})
self._thresholds = new_thresholds
self._cutoff_optimized = False
[docs] def change_threshold(self, imodulon, value):
"""
Set threshold for an iModulon
Parameters
----------
imodulon : int or str
Name of iModulon
value : float
New threshold
Returns
-------
None: None
"""
self._thresholds[imodulon] = value
self._cutoff_optimized = False
[docs] def recompute_thresholds(self, dagostino_cutoff):
"""
Re-computes iModulon thresholds using a new D'Agostino cutoff
Parameters
----------
dagostino_cutoff : int
New D'agostino cutoff statistic
Returns
-------
None: None
"""
self._update_thresholds(dagostino_cutoff)
self._cutoff_optimized = False
[docs] def _update_thresholds(self, dagostino_cutoff: int):
self._thresholds = {
k: compute_threshold(self._m[k], dagostino_cutoff)
for k in self._imodulon_names
}
self._dagostino_cutoff = dagostino_cutoff
[docs] def reoptimize_thresholds(self, progress=True, plot=True):
"""
Re-optimizes the D'Agostino statistic cutoff for defining iModulon
thresholds if the TRN has been updated
Parameters
----------
progress : bool
Show a progress bar (default: True)
plot : bool
Show the sensitivity analysis plot (default: True)
Returns
-------
int
New D'agostino cutoff
"""
if self.trn.empty:
raise ValueError(
"D'agostino cutoff cannot be optimized if no TRN is "
"provided. Use ica_data.compute_kmeans_thresholds() "
"instead."
)
if not self._cutoff_optimized:
new_cutoff = self._optimize_dagostino_cutoff(progress, plot)
self._update_thresholds(new_cutoff)
self._cutoff_optimized = True
else:
print(
"Cutoff already optimized, and no new TRN data provided. "
"Re-optimization will return same cutoff."
)
return self.dagostino_cutoff
[docs] def _optimize_dagostino_cutoff(self, progress, plot):
"""
Computes an abridged version of the TRN enrichments for the 20
highest-weighted genes in order to determine a global minimum
for the D'Agostino cutoff ultimately used to threshold and
define the genes "in" an iModulon
Parameters
----------
progress : bool
Show a progress bar (default: True)
plot : bool
Show the sensitivity analysis plot (default: True)
Returns
-------
int
New D'agostino cutoff
"""
# prepare a DataFrame of the best single-TF enrichments for the
# top 20 genes in each component
top_enrichments = []
all_genes = list(self.M.index)
for imod in self.M.columns:
genes_top20 = list(abs(self.M[imod]).sort_values().iloc[-20:].index)
imod_enrichment_df = compute_trn_enrichment(
set(genes_top20), set(all_genes), self.trn, max_regs=1
)
# compute_trn_enrichment is being hijacked a bit; we want
# the index to be components, not the enriched TFs
imod_enrichment_df["TF"] = imod_enrichment_df.index
imod_enrichment_df["component"] = imod
if not imod_enrichment_df.empty:
# take the best single-TF enrichment row (by q-value)
top_enrichment = imod_enrichment_df.sort_values(by="qvalue").iloc[0, :]
top_enrichments.append(top_enrichment)
# perform a sensitivity analysis to determine threshold effects
# on precision/recall overall
cutoffs_to_try = np.arange(50, 2000, 50)
f1_scores = []
if progress:
iterator = tqdm(cutoffs_to_try)
else:
iterator = cutoffs_to_try
for cutoff in iterator:
cutoff_f1_scores = []
for enrich_row in top_enrichments:
# for this enrichment row, get all the genes regulated
# by the regulator chosen above
regulon_genes = list(
self.trn[self.trn["regulator"] == enrich_row["TF"]].gene_id
)
# compute the weighting threshold based on this cutoff to try
thresh = compute_threshold(self.M[enrich_row["component"]], cutoff)
component_genes = self.M[
abs(self.M[enrich_row["component"]]) > thresh
].index.tolist()
# Compute the contingency table (aka confusion matrix)
# for overlap between the regulon and iM genes
((tp, fp), (fn, tn)) = contingency(
set(regulon_genes), component_genes, set(all_genes)
)
# Calculate F1 score for one regulator-component pair
# and add it to the running list for this cutoff
precision = np.true_divide(tp, tp + fp) if tp > 0 else 0
recall = np.true_divide(tp, tp + fn) if tp > 0 else 0
f1_score = (
(2 * precision * recall) / (precision + recall) if tp > 0 else 0
)
cutoff_f1_scores.append(f1_score)
# Get mean of F1 score for this potential cutoff
f1_scores.append(np.mean(cutoff_f1_scores))
# extract the best cutoff
best_cutoff = cutoffs_to_try[np.argmax(f1_scores)]
if plot:
fig, ax = plt.subplots(figsize=(4, 4))
ax.set_xlabel("D'agostino Test Statistic", fontsize=14)
ax.set_ylabel("Mean F1 score")
ax.plot(cutoffs_to_try, f1_scores)
ax.scatter([best_cutoff], [max(f1_scores)], color="r")
return int(best_cutoff)
[docs] def _kmeans_cluster(self, imodulon):
data = self.M[imodulon]
model = KMeans(n_clusters=3, random_state=1)
model.fit(abs(data).values.reshape(-1, 1))
df = pd.DataFrame(abs(data))
df["cluster"] = model.labels_
# Get top two clusters
counts = df.cluster.value_counts().sort_values(ascending=True)
idx1 = counts.index[0]
idx2 = counts.index[1]
clust1 = df[df.cluster == idx1]
clust2 = df[df.cluster == idx2]
# Get midpoint between lowest iModulon gene and highest insignificant
# gene
threshold = np.mean([clust1[imodulon].min(), clust2[imodulon].max()])
return threshold
[docs] def compute_kmeans_thresholds(self):
"""
Computes iModulon thresholds using K-means clustering
Returns
-------
None: None
"""
self._thresholds = {k: self._kmeans_cluster(k) for k in self._imodulon_names}
[docs] def copy(self):
"""
Make a deep copy of an IcaData object
Returns
-------
IcaData: ~pymodulon.core.IcaData
Copy of IcaData object
"""
return copy.deepcopy(self)
[docs] def imodulons_with(self, gene):
"""
Lists the iModulons containing a gene
Parameters
----------
gene : str
Gene name or locus tag
Returns
-------
list
List of iModulons containing the gene
"""
# Check that gene exists
if gene not in self.gene_names:
gene = self.name2num(gene)
return self.M.columns[self.M_binarized.loc[gene] == 1].to_list()
[docs] def name2num(self, gene):
"""
Convert a gene name to the locus tag
Parameters
----------
gene : list or str
Gene name or list of gene names
Returns
-------
final_list : list or str
Locus tag or list of locus tags
"""
gene_table = self.gene_table
if "gene_name" not in gene_table.columns:
raise ValueError('Gene table does not contain "gene_name" column.')
if isinstance(gene, str):
gene_list = [gene]
else:
gene_list = gene
final_list = []
for g in gene_list:
g_names = gene_table.gene_name.str.casefold()
loci = gene_table[g_names == g.casefold()].index
# Ensure only one locus maps to this gene
if len(loci) == 0:
raise ValueError("Gene does not exist: {}".format(g))
elif len(loci) > 1:
logging.warning(
"Found multiple genes named {}. Only "
"reporting first locus tag".format(g)
)
final_list.append(loci[0])
# Return string if string was given as input
if isinstance(gene, str):
return final_list[0]
else:
return final_list
[docs] def num2name(self, gene):
"""
Get the name of a gene from its locus tag
Parameters
----------
gene : list or str
Locus tag or list of locus tags
Returns
-------
result : list or str
Gene name or list of gene names
"""
result = self.gene_table.loc[gene].gene_name
if isinstance(gene, list):
return result.tolist()
else:
return result
#########################
# iModulonDB Properties #
#########################
@property
[docs] def imodulondb_table(self):
return self._imodulondb_table
@imodulondb_table.setter
def imodulondb_table(self, new_imdb):
if new_imdb is None:
new_imdb = dict()
if isinstance(new_imdb, str):
new_imdb = _check_dict(new_imdb)
self._imodulondb_table = new_imdb
default_imdb_table = {
"organism": "New Organism",
"dataset": "New Dataset",
"strain": "Unspecified",
"publication_name": "Unpublished Study",
"publication_link": "",
"gene_link_db": "External Database",
"organism_folder": "new_organism",
"dataset_folder": "new_dataset",
}
for k, v in default_imdb_table.items():
if k not in new_imdb: # use what is provided, default for
# what isn't
self._imodulondb_table[k] = v
@property
[docs] def gene_links(self):
return self._gene_links
@gene_links.setter
def gene_links(self, new_links):
if new_links is None:
new_links = dict()
if isinstance(new_links, str):
new_links = _check_dict(new_links)
"""
# uncomment this to be warned for unused gene links
for gene in new_links.keys():
if not(gene in self._m.index):
warnings.warn('The gene %s has a link
but is not in the M matrix.'%(gene))
"""
self._gene_links = new_links
for gene in set(self._m.index) - set(new_links.keys()):
self._gene_links[gene] = np.nan
@property
[docs] def tf_links(self):
return self._tf_links
@tf_links.setter
def tf_links(self, new_links):
if new_links is None:
new_links = dict()
if isinstance(new_links, str):
new_links = _check_dict(new_links)
self._tf_links = new_links
##########
# Motifs #
##########
[docs]class MotifInfo:
def __init__(self, motifs, sites, cmd, file, matches=None):
self._motifs = motifs
self._sites = sites
self._cmd = cmd
self._file = file
self._matches = matches
[docs] def __repr__(self):
if len(self.motifs) == 1:
motif_str = "motif"
else:
motif_str = "motifs"
if len(self.sites) == 1:
site_str = "site"
else:
site_str = "sites"
return (
f"<MotifInfo with {len(self.motifs)} {motif_str} across "
f"{sum(self.sites.site_seq.notnull())} {site_str}>"
)
@property
[docs] def motifs(self):
return self._motifs
@property
[docs] def sites(self):
return self._sites
@property
[docs] def cmd(self):
return self._cmd
@property
[docs] def file(self):
return self._file
@property
[docs] def matches(self):
return self._matches
@matches.setter
def matches(self, matches):
self._matches = matches