Source code for pymodulon.motif

# copy/pasted from the ICA version below

import os
import re
import subprocess
from typing import Dict, List, Optional, Union

import numpy as np
import pandas as pd
from Bio import SeqIO
from bs4 import BeautifulSoup

import pymodulon.data.meme
from pymodulon.core import IcaData, MotifInfo


[docs]def _get_upstream_seqs( ica_data: IcaData, imodulon: Union[str, int], seq_dict: Dict, upstream: int, downstream: int, ): """ Get upstream sequences for a table of operons Parameters ---------- ica_data: IcaData IcaData object imodulon: Union[str,int] Name of iModulon seq_dict: Dict Dictionary mapping accession numbers to Biopython SeqRecords upstream: int Number of basepairs upstream from first gene in operon to include in motif search downstream: int Number of basepairs upstream from first gene in operon to include in motif search Returns ------- pd.DataFrame DataFrame containing operon information List[SeqRecord] List of SeqRecords containing upstream sequences """ # Get list of operons in component enriched_genes = ica_data.view_imodulon(imodulon).index enriched_operons = ica_data.gene_table.loc[enriched_genes] # Get upstream sequences list2struct = [] seq_records = [] for name, group in enriched_operons.groupby("operon"): genes = ",".join(group.gene_name) ids = ",".join(group.index) genome = seq_dict[group.accession[0]] if all(group.strand == "+"): pos = min(group.start) start_pos = max(0, pos - upstream) sequence = genome[start_pos : pos + downstream] seq = SeqIO.SeqRecord(seq=sequence, id=name) list2struct.append([name, genes, ids, start_pos, "+", str(seq.seq)]) seq_records.append(seq) elif all(group.strand == "-"): pos = max(group.end) start_pos = max(0, pos - downstream) sequence = genome[start_pos : pos + upstream] seq = SeqIO.SeqRecord(seq=sequence, id=name) list2struct.append([name, genes, ids, start_pos, "-", str(seq.seq)]) seq_records.append(seq) else: raise ValueError("Operon contains genes on both strands:", name) DF_seqs = pd.DataFrame( list2struct, columns=["operon", "genes", "locus_tags", "start_pos", "strand", "seq"], ).set_index("operon") return DF_seqs, seq_records
[docs]def find_motifs( ica_data: IcaData, imodulon: Union[int, str], fasta_file: Union[os.PathLike, List[os.PathLike]], outdir: Optional[os.PathLike] = None, palindrome: bool = False, nmotifs: int = 5, upstream: int = 500, downstream: int = 100, verbose: bool = True, force: bool = False, evt: float = 0.001, cores: int = 8, minw: int = 6, maxw: int = 40, minsites: Optional[int] = None, ): """ Finds motifs upstream of genes in an iModulon Parameters ---------- ica_data: IcaData IcaData object imodulon: Union[int, str] iModulon name fasta_file: Union[os.PathLike, List[os.PathLike]] Path or list of paths to fasta file(s) for organism outdir: os.PathLike Path to output directory palindrome: bool If True, limit search to palindromic motifs (default: False) nmotifs: int Number of motifs to search for (default: 5) upstream: int Number of basepairs upstream from first gene in operon to include in motif search (default: 500) downstream: int Number of basepairs upstream from first gene in operon to include in motif search (default: 100) verbose: bool Show steps in verbose output (default: True) force: bool Force execution of MEME even if output already exists (default: False) evt: float E-value threshold (default: 0.001) cores: int Number of cores to use (default: 8) minw: int Minimum motif width in basepairs (default: 6) maxw: int Maximum motif width in basepairs (default: 40) minsites: Optional[int] Minimum number of sites required for a motif. Default is the number of operons divided by 3. Returns ------- # TODO: add documentation of return """ # Handle output directory if outdir is None: outdir = "motifs" if not os.path.isdir(outdir): os.mkdir(outdir) # Read in fasta sequence from file seq_dict = {} try: for record in SeqIO.parse(fasta_file, "fasta"): seq_dict[record.id] = record.seq except AttributeError: for file in fasta_file: for record in SeqIO.parse(file, "fasta"): seq_dict[record.id] = record.seq # Ensure that all genome and plasmids have been loaded missing_acc = set(ica_data.gene_table.accession) - set(seq_dict.keys()) if len(missing_acc) > 0: raise ValueError(f"Missing FASTA file for {missing_acc}") DF_seqs, seq_records = _get_upstream_seqs( ica_data, imodulon, seq_dict, upstream, downstream ) n_seqs = len(DF_seqs) if n_seqs < 2: if verbose: print("Less than two sequences found for iModulon: {}".format(imodulon)) return None # Handle options if verbose: print("Finding motifs for {:d} sequences".format(n_seqs)) def full_path(name): return os.path.join(outdir, re.sub("/| ", "_", name)) if palindrome: comp_dir = full_path("{}_pal".format(imodulon)) else: comp_dir = full_path(str(imodulon)) if minsites is None: minsites = max(2, int(n_seqs / 3)) # Populate command fasta = full_path("{}.fasta".format(imodulon)) cmd = [ "meme", fasta, "-oc", comp_dir, "-dna", "-mod", "zoops", "-p", str(cores), "-nmotifs", str(nmotifs), "-evt", str(evt), "-minw", str(minw), "-maxw", str(maxw), "-allw", "-minsites", str(minsites), ] if palindrome: cmd.append("-pal") # Skip intensive tasks on rerun if force or not os.path.isdir(comp_dir): # Write sequence to file SeqIO.write(seq_records, fasta, "fasta") # print MEME command if verbose: print("Running command:", " ".join(cmd)) # Run MEME subprocess.call(cmd) # Save results DF_motifs, DF_sites = _parse_meme(comp_dir, DF_seqs, verbose=verbose, evt=evt) if DF_motifs.empty: return None else: result = MotifInfo( DF_motifs, DF_sites, " ".join(cmd), os.path.join(comp_dir, "meme.txt") ) ica_data.motif_info[imodulon] = result return result
[docs]def _parse_meme(directory, DF_seqs, verbose, evt): # Read MEME results with open(os.path.join(directory, "meme.xml"), "r") as f: result_file = BeautifulSoup(f.read(), "lxml") # Convert to motif XML file to dataframes: (overall,[individual_motif]) DF_motifs = pd.DataFrame(columns=["e_value", "sites", "width", "consensus"]) dfs = [] for motif in result_file.find_all("motif"): try: idx = motif["alt"] except KeyError: idx = motif["id"] # Motif statistics DF_motifs.loc[idx, "e_value"] = np.float64(motif["e_value"]) DF_motifs.loc[idx, "sites"] = motif["sites"] DF_motifs.loc[idx, "width"] = motif["width"] DF_motifs.loc[idx, "consensus"] = motif["name"] # Map Sequence to name list_to_struct = [] for seq in result_file.find_all("sequence"): list_to_struct.append([seq["id"], seq["name"]]) df_names = pd.DataFrame(list_to_struct, columns=["seq_id", "operon"]) # Get motif sites list_to_struct = [] for site in motif.find_all("contributing_site"): site_seq = "".join( [letter["letter_id"] for letter in site.find_all("letter_ref")] ) data = [site["position"], site["pvalue"], site["sequence_id"], site_seq] list_to_struct.append(data) tmp_df = pd.DataFrame( list_to_struct, columns=["rel_position", "pvalue", "seq_id", "site_seq"] ) # Combine motif sites with sequence to name mapper DF_meme = pd.merge(tmp_df, df_names) DF_meme = DF_meme.set_index("operon").sort_index().drop("seq_id", axis=1) DF_meme = pd.concat([DF_meme, DF_seqs], axis=1, sort=True) DF_meme.index.name = idx # Report number of sequences with motif DF_motifs.loc[idx, "motif_frac"] = np.true_divide( sum(DF_meme.rel_position.notnull()), len(DF_meme) ) # Remove full upstream sequence DF_meme.drop(columns="seq", inplace=True) dfs.append(DF_meme) if len(dfs) == 0: if verbose: print("No motif found with E-value < {0:.1e}".format(evt)) idx1 = pd.Series([], name="motif") idx2 = pd.MultiIndex.from_tuples([], names=["motif", "operon"]) return ( pd.DataFrame( columns=[ "e_value", "sites", "width", "consensus", "motif_name", "motif_frac", ], index=idx1, ), pd.DataFrame( columns=[ "rel_position", "pvalue", "site_seq", "genes", "locus_tags", "start_pos", "strand", ], index=idx2, ), ) DF_sites = pd.concat({df.index.name: df for df in dfs}) # Add index labels DF_sites.index.names = ["motif", "operon"] DF_motifs.index.name = "motif" if verbose: if len(DF_motifs) == 1: motif_str = "motif" else: motif_str = "motifs" if len(DF_sites) == 1: site_str = "site" else: site_str = "sites" print( f"Found {len(DF_motifs)} {motif_str} across " f"{sum(DF_sites.site_seq.notnull())} {site_str}" ) return DF_motifs, DF_sites
[docs]def _parse_tomtom(tomtom_dir): DF_tomtom = pd.read_csv( os.path.join(tomtom_dir, "tomtom.tsv"), sep="\t", skipfooter=3, engine="python" ) with open(os.path.join(tomtom_dir, "tomtom.xml"), "r") as f: result = BeautifulSoup(f.read(), "lxml") # Convert ID to names (for the PRODORIC database) id2name = {} for motif in result.find_all("motif"): try: id2name[motif["id"]] = motif["alt"] except KeyError: id2name[motif["id"]] = motif["id"] # Switch to correct name DF_tomtom["motif"] = DF_tomtom["Query_ID"].apply(lambda x: id2name[x]) DF_tomtom["target"] = DF_tomtom["Target_ID"].apply(lambda x: id2name[x]) # Get database names db_names = [db["name"] for db in result.find("target_dbs").find_all("db")] id2db = {} for motif in result.find("targets").find_all("motif"): id2db[motif["id"]] = db_names[int(motif["db"])] DF_tomtom["database"] = DF_tomtom["Target_ID"].apply(lambda x: id2db[x]) # Reorder columns DF_tomtom = DF_tomtom[ [ "motif", "target", "Target_ID", "p-value", "E-value", "q-value", "Overlap", "Optimal_offset", "Orientation", "database", ] ] DF_tomtom.columns = [ "motif", "target", "target_id", "pvalue", "Evalue", "qvalue", "overlap", "optimal_offset", "orientation", "database", ] return DF_tomtom
[docs]def compare_motifs( motif_info: Optional[MotifInfo] = None, motif_file: Optional[str] = None, motif_db: Optional[str] = None, outdir: Optional[str] = None, force=False, verbose=True, evt=0.001, ): """ Compare a MEME motif against external motif databases Parameters ---------- motif_info: Optional[MotifInfo] MotifInfo object. Either 'motif_info' or 'motif_file' must be provided. 'motif_info' takes precedence over 'motif_file'. motif_file: Optional[os.PathLike] Txt file generated by MEME from a motif search. motif_db: Optional[Union[List,str]] Name or path to MEME formatted databases outdir: Optional[os.PathLike] Output directory for TOMTOM comparisons force: bool Force execution of TOMTOM even if output already exists (default: False) verbose: bool Show steps in verbose output (default: True) evt: float E-value threshold (default: 0.001) Returns ------- """ if motif_info: motif_file = motif_info.file elif motif_file is None: raise ValueError("Either 'motif_info' or 'motif_file' must be provided") # Ensure motif file exists if not os.path.isfile(motif_file): raise FileNotFoundError(f"File does not exist: '{motif_file}'") # Get motif DB locations motif_db_dir = pymodulon.data.meme.__path__[0] default_dbs = [ "collectf", "dpinteract", "prodoric", "regtransbase", "SwissRegulon_e_coli", ] if motif_db is None: motif_db = default_dbs motif_db = [os.path.join(motif_db_dir, db + ".meme") for db in motif_db] elif isinstance(motif_db, str): motif_db = [motif_db] for db in motif_db: if db in default_dbs: motif_db = [os.path.join(motif_db_dir, db + ".meme") for db in motif_db] elif not os.path.isfile(db): raise FileNotFoundError(f"File does not exist: '{db}'") # Handle output directory if outdir is None: outdir = os.path.join(os.path.dirname(motif_file), "tomtom") # Run TOMTOM if not os.path.isdir(outdir) or force: cmd = [ "tomtom", "-oc", outdir, "-thresh", str(evt), "-incomplete-scores", "-png", motif_file, ] + motif_db if verbose: print("Running command:", " ".join(cmd)) res = subprocess.check_call(cmd) if res != 0: cmd_str = " ".join(cmd) raise RuntimeError( f"TOMTOM failed to execute the following command: {cmd_str}" ) result = _parse_tomtom(outdir) # Save result if motif_info: motif_info.matches = result return result