Source code for scripts.metrics.corrs

import argparse
import logging
import pathlib as pl
from typing import List, Literal, Tuple

import anndata as ad
import numpy as np
import pandas as pd
import scanpy as sc

logging.basicConfig()
logging.root.setLevel(logging.INFO)

_LOGGER = logging.getLogger(__name__)

_PHASE = "phase"
_SIG_TYPE = "signature_type"
_META_SIG = "meta_signature"
_GROUND_TRUTH = "ground_truth_signature"
_G1 = "G1"



_NORMMETHOD = Literal["rank", "log_rank", "log", "custom"]
_SCORERNAME = Literal["ssgsea", "average"]


[docs] class AvgBulkScorer: """This class creates a scorer that takes the average of the (std or not) bulk gex as a proxy of signature score""" def __init__(self, std: bool): """ Args: name: name of the scorer std: if True, the bulk gex will be standardized before the average is computed Returns: None """ self.std = std
[docs] def score(self, bulk_values: pd.DataFrame, metasig: np.ndarray) -> pd.Series: """The main scoring function Args: bulk_values: a df of size (n_samples, n_genes) with the bulk gene expression metasig: a list of genes representing the signature to score Returns: a series with the score for each patient """ intersection = bulk_values.columns.intersection(metasig) _LOGGER.info(f"Found {intersection} overlapping genes.") if len(intersection) == 0: raise ValueError( "There is no common gene between the metasignature and the bulk DataFrame" ) else: df = bulk_values.loc[:, intersection] if self.std: df = (df - df.mean()) / df.std() return df.mean(axis=1)
[docs] def get_all_scores( bulk_values: pd.DataFrame, metasignatures: pd.DataFrame, truesignatures: pd.DataFrame ) -> pd.DataFrame: """Computes the scores for all metasignatures and true signatures using the appropriate scorer function Args: bulk_values: df of bulk gex of size (n_samples, n_genes) purity: series with the purity information per patient + cancer type of size (n_samples, 2) metasignatures: df with n_metasignatures columns, containing in each column the list of genes that constitute the metasignature truesignatures: df with n_true signatures columns, containing in each column the list of genes that constitute the true signature std: only used if scorer name is average, if True the bulk gex will be standardized before computing the average sample_norm_method: only used if scorer name is ssgsea, what method to use for sample norm in ssgsea (see ssgsea doc for more info) Returns: the dataframe of size (n_samples, n_metasignatures + n_true signatures + 1 + 1), containing all scores on the metasignatures, the true signatuers, the purity information, and TCGA the cancer type the scoring was performed on """ scorer = AvgBulkScorer(std=True) all_scores = {} for sig in truesignatures.columns: all_scores[sig] = scorer.score( bulk_values=bulk_values, metasig=truesignatures[sig].ravel() ) for sig in metasignatures.columns: all_scores[sig] = scorer.score( bulk_values=bulk_values, metasig=metasignatures[sig].ravel() ) all_scores = pd.concat(all_scores, axis=1) return all_scores
[docs] def score_dataset( bulk_data: pd.DataFrame, metasignature: pd.DataFrame, truesignature: pd.DataFrame ) -> Tuple[List[str], List[str], pd.DataFrame]: """Main function, computes the bulk score for metasignatures and reference signatures Args: bulk_file: path to file with the bulk gex metasignature_file: path to the file with the metasignature genes truesignature_file: path to the file with the true signature genes scorer_name: which scoring to use std: only used if scorer name is average, if True the bulk gex will be standardized before computing the average sample_norm_method: only used if scorer name is ssgsea, what method to use for sample norm in ssgsea (see ssgsea doc for more info) Returns: the dataframe of size (n_samples, n_metasignatures + n_true signatures + 1 + 1), containing all scores on the metasignatures, the true signatuers, the purity information, and TCGA the cancer type the scoring was performed on """ all_scores = get_all_scores( bulk_values=bulk_data, metasignatures=metasignature, truesignatures=truesignature, ) return truesignature.columns.to_list(), metasignature.columns.to_list(), all_scores
[docs] def get_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--input", "-i", type=str, help="Path to meta signatures stored in .csv.") parser.add_argument("--output", "-o", type=str, help="Final results.") parser.add_argument("--data-path", "-d", type=str, help="Path to the anndata.") parser.add_argument("--annotation-path", "-a", type=str, help="Path to the folder holding all the known signatures.") parser.add_argument("--type", type=str, help="Either scRNA or bulk.") args = parser.parse_args() return args
[docs] def get_adata(data_path: pl.Path) -> ad.AnnData: adata = ad.read_h5ad(data_path) _LOGGER.info("Subsetting to non-cycling cells.") adata = adata[adata.obs[_PHASE] == _G1].copy() return adata
[docs] def get_scores(adata: ad.AnnData, gt_sigs: pd.DataFrame, meta_sigs: pd.DataFrame): gt_names = score_signature(adata, gt_sigs) metasig_names = score_signature(adata, meta_sigs) return gt_names, metasig_names, adata.obs[gt_names+metasig_names]
[docs] def score_signature(adata, sigs: pd.DataFrame) -> List[str]: signatures = [] for sig_name in sigs.columns: gene_list = sigs[sig_name].dropna().values[:50] if len(adata.var_names.intersection(gene_list))==0: continue sc.tl.score_genes(adata, gene_list=gene_list, score_name=sig_name) signatures.append(sig_name) return signatures
[docs] def corr_signatures(df: pd.DataFrame, gt_names: List[str], meta_sigs_names: List[str]) -> pd.DataFrame: corr = df[gt_names + meta_sigs_names].corr(method="spearman") corr[_SIG_TYPE] = _META_SIG corr.loc[corr.index.isin(gt_names), _SIG_TYPE] = _GROUND_TRUTH return corr
[docs] def main() -> None: args = get_args() gt_sigs = pd.read_csv(args.annotation_path) meta_sigs = pd.read_csv(args.input) if args.type.lower() == "scrna": adata = get_adata(args.data_path) gt_names, meta_sigs_names, scores = get_scores(adata, gt_sigs, meta_sigs) elif args.type.lower() == "bulk": bulk_data = pd.read_csv(args.data_path, index_col=0) gt_names, meta_sigs_names, scores = score_dataset(bulk_data=bulk_data, truesignature=gt_sigs, metasignature=meta_sigs) else: raise ValueError(f"Unkown scoring type {args.type}.") corr = corr_signatures(scores, gt_names, meta_sigs_names) corr.to_csv(args.output)
if __name__ == "__main__": main()