Source code for petab.core

"""PEtab core functions (or functions that don't fit anywhere else)"""
import logging
import os
import re
from pathlib import Path
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Sequence,
    Union,
)
from warnings import warn

import numpy as np
import pandas as pd
from pandas.api.types import is_string_dtype

from . import yaml
from .C import *  # noqa: F403

logger = logging.getLogger(__name__)
__all__ = [
    "get_simulation_df",
    "write_simulation_df",
    "get_visualization_df",
    "write_visualization_df",
    "get_notnull_columns",
    "flatten_timepoint_specific_output_overrides",
    "concat_tables",
    "to_float_if_float",
    "is_empty",
    "create_combine_archive",
    "unique_preserve_order",
    "unflatten_simulation_df",
]

POSSIBLE_GROUPVARS_FLATTENED_PROBLEM = [
    OBSERVABLE_ID,
    OBSERVABLE_PARAMETERS,
    NOISE_PARAMETERS,
    SIMULATION_CONDITION_ID,
    PREEQUILIBRATION_CONDITION_ID,
]


[docs] def get_simulation_df(simulation_file: Union[str, Path]) -> pd.DataFrame: """Read PEtab simulation table Arguments: simulation_file: URL or filename of PEtab simulation table Returns: Simulation DataFrame """ return pd.read_csv( simulation_file, sep="\t", index_col=None, float_precision="round_trip" )
[docs] def write_simulation_df(df: pd.DataFrame, filename: Union[str, Path]) -> None: """Write PEtab simulation table Arguments: df: PEtab simulation table filename: Destination file name """ df.to_csv(filename, sep="\t", index=False)
[docs] def get_visualization_df( visualization_file: Union[str, Path, pd.DataFrame, None] ) -> Union[pd.DataFrame, None]: """Read PEtab visualization table Arguments: visualization_file: URL or filename of PEtab visualization table to read from, or a DataFrame or None that will be returned as is. Returns: Visualization DataFrame """ if visualization_file is None: return None if isinstance(visualization_file, pd.DataFrame): return visualization_file try: types = {PLOT_NAME: str} vis_spec = pd.read_csv( visualization_file, sep="\t", index_col=None, converters=types, float_precision="round_trip", ) except pd.errors.EmptyDataError: warn( "Visualization table is empty. Defaults will be used. " "Refer to the documentation for details." ) vis_spec = pd.DataFrame() return vis_spec
[docs] def write_visualization_df( df: pd.DataFrame, filename: Union[str, Path] ) -> None: """Write PEtab visualization table Arguments: df: PEtab visualization table filename: Destination file name """ df.to_csv(filename, sep="\t", index=False)
[docs] def get_notnull_columns(df: pd.DataFrame, candidates: Iterable): """ Return list of ``df``-columns in ``candidates`` which are not all null/nan. The output can e.g. be used as input for ``pandas.DataFrame.groupby``. Arguments: df: Dataframe candidates: Columns of ``df`` to consider """ return [ col for col in candidates if col in df and not np.all(df[col].isnull()) ]
def get_observable_replacement_id(groupvars, groupvar) -> str: """Get the replacement ID for an observable. Arguments: groupvars: The columns of a PEtab measurement table that should be unique between observables in a flattened PEtab problem. groupvar: A specific grouping of `groupvars`. Returns: The observable replacement ID. """ replacement_id = "" for field in POSSIBLE_GROUPVARS_FLATTENED_PROBLEM: if field in groupvars: val = ( str(groupvar[groupvars.index(field)]) .replace(PARAMETER_SEPARATOR, "_") .replace(".", "_") ) if replacement_id == "": replacement_id = val elif val != "": replacement_id += f"__{val}" return replacement_id def get_hyperparameter_replacement_id( hyperparameter_type, observable_replacement_id, ): """Get the full ID for a replaced hyperparameter. Arguments: hyperparameter_type: The type of hyperparameter, e.g. `noiseParameter`. observable_replacement_id: The observable replacement ID, e.g. the output of `get_observable_replacement_id`. Returns: The hyperparameter replacement ID, with a field that will be replaced by the first matched substring in a regex substitution. """ return f"{hyperparameter_type}\\1_{observable_replacement_id}" def get_flattened_id_mappings( petab_problem: "petab.problem.Problem", ) -> Dict[str, Dict[str, str]]: """Get mapping from unflattened to flattened observable IDs. Arguments: petab_problem: The unflattened PEtab problem. Returns: A dictionary of dictionaries. Each inner dictionary is a mapping from original ID to flattened ID. Each outer dictionary is the mapping for either: observable IDs; noise parameter IDs; or, observable parameter IDs. """ groupvars = get_notnull_columns( petab_problem.measurement_df, POSSIBLE_GROUPVARS_FLATTENED_PROBLEM ) mappings = { OBSERVABLE_ID: {}, NOISE_PARAMETERS: {}, OBSERVABLE_PARAMETERS: {}, } for groupvar, measurements in petab_problem.measurement_df.groupby( groupvars, dropna=False ): observable_id = groupvar[groupvars.index(OBSERVABLE_ID)] observable_replacement_id = get_observable_replacement_id( groupvars, groupvar ) logger.debug(f"Creating synthetic observable {observable_id}") if observable_replacement_id in petab_problem.observable_df.index: raise RuntimeError( "could not create synthetic observables " f"since {observable_replacement_id} was " "already present in observable table" ) mappings[OBSERVABLE_ID][observable_replacement_id] = observable_id for field, hyperparameter_type, target in [ (NOISE_PARAMETERS, "noiseParameter", NOISE_FORMULA), (OBSERVABLE_PARAMETERS, "observableParameter", OBSERVABLE_FORMULA), ]: if field in measurements: mappings[field][ get_hyperparameter_replacement_id( hyperparameter_type=hyperparameter_type, observable_replacement_id=observable_replacement_id, ) ] = rf"{hyperparameter_type}([0-9]+)_{observable_id}" return mappings
[docs] def flatten_timepoint_specific_output_overrides( petab_problem: "petab.problem.Problem", ) -> None: """Flatten timepoint-specific output parameter overrides. If the PEtab problem definition has timepoint-specific `observableParameters` or `noiseParameters` for the same observable, replace those by replicating the respective observable. This is a helper function for some tools which may not support such timepoint-specific mappings. The observable table and measurement table are modified in place. Arguments: petab_problem: PEtab problem to work on. Modified in place. """ new_measurement_dfs = [] new_observable_dfs = [] groupvars = get_notnull_columns( petab_problem.measurement_df, POSSIBLE_GROUPVARS_FLATTENED_PROBLEM ) mappings = get_flattened_id_mappings(petab_problem) for groupvar, measurements in petab_problem.measurement_df.groupby( groupvars, dropna=False ): obs_id = groupvar[groupvars.index(OBSERVABLE_ID)] observable_replacement_id = get_observable_replacement_id( groupvars, groupvar ) observable = petab_problem.observable_df.loc[obs_id].copy() observable.name = observable_replacement_id for field, hyperparameter_type, target in [ (NOISE_PARAMETERS, "noiseParameter", NOISE_FORMULA), (OBSERVABLE_PARAMETERS, "observableParameter", OBSERVABLE_FORMULA), (OBSERVABLE_PARAMETERS, "observableParameter", NOISE_FORMULA), ]: if field not in measurements: continue if not is_string_dtype(type(observable[target])): # if not a string, we don't have to substitute anything continue hyperparameter_replacement_id = get_hyperparameter_replacement_id( hyperparameter_type=hyperparameter_type, observable_replacement_id=observable_replacement_id, ) hyperparameter_id = mappings[field][hyperparameter_replacement_id] observable[target] = re.sub( hyperparameter_id, hyperparameter_replacement_id, observable[target], ) measurements[OBSERVABLE_ID] = observable_replacement_id new_measurement_dfs.append(measurements) new_observable_dfs.append(observable) petab_problem.observable_df = pd.concat(new_observable_dfs, axis=1).T petab_problem.observable_df.index.name = OBSERVABLE_ID petab_problem.measurement_df = pd.concat(new_measurement_dfs)
[docs] def unflatten_simulation_df( simulation_df: pd.DataFrame, petab_problem: "petab.problem.Problem", ) -> pd.DataFrame: """Unflatten simulations from a flattened PEtab problem. A flattened PEtab problem is the output of applying :func:`flatten_timepoint_specific_output_overrides` to a PEtab problem. Arguments: simulation_df: The simulation dataframe. A dataframe in the same format as a PEtab measurements table, but with the ``measurement`` column switched with a ``simulation`` column. petab_problem: The unflattened PEtab problem. Returns: The simulation dataframe for the unflattened PEtab problem. """ mappings = get_flattened_id_mappings(petab_problem) original_observable_ids = simulation_df[OBSERVABLE_ID].replace( mappings[OBSERVABLE_ID] ) unflattened_simulation_df = simulation_df.assign( **{ OBSERVABLE_ID: original_observable_ids, } ) return unflattened_simulation_df
[docs] def concat_tables( tables: Union[ str, Path, pd.DataFrame, Iterable[Union[pd.DataFrame, str, Path]] ], file_parser: Optional[Callable] = None, ) -> pd.DataFrame: """Concatenate DataFrames provided as DataFrames or filenames, and a parser Arguments: tables: Iterable of tables to join, as DataFrame or filename. file_parser: Function used to read the table in case filenames are provided, accepting a filename as only argument. Returns: The concatenated DataFrames """ if isinstance(tables, pd.DataFrame): return tables if isinstance(tables, (str, Path)): return file_parser(tables) df = pd.DataFrame() for tmp_df in tables: # load from file, if necessary if isinstance(tmp_df, (str, Path)): tmp_df = file_parser(tmp_df) df = pd.concat( [df, tmp_df], sort=False, ignore_index=isinstance(tmp_df.index, pd.RangeIndex), ) return df
[docs] def to_float_if_float(x: Any) -> Any: """Return input as float if possible, otherwise return as is Arguments: x: Anything Returns: ``x`` as float if possible, otherwise ``x`` """ try: return float(x) except (ValueError, TypeError): return x
[docs] def is_empty(val) -> bool: """Check if the value `val`, e.g. a table entry, is empty. Arguments: val: The value to check. Returns: Whether the field is to be considered empty. """ return val == "" or pd.isnull(val)
[docs] def create_combine_archive( yaml_file: Union[str, Path], filename: Union[str, Path], family_name: Optional[str] = None, given_name: Optional[str] = None, email: Optional[str] = None, organization: Optional[str] = None, ) -> None: """Create COMBINE archive (https://co.mbine.org/documents/archive) based on PEtab YAML file. Arguments: yaml_file: Path to PEtab YAML file filename: Destination file name family_name: Family name of archive creator given_name: Given name of archive creator email: E-mail address of archive creator organization: Organization of archive creator """ path_prefix = os.path.dirname(str(yaml_file)) yaml_config = yaml.load_yaml(yaml_file) # function-level import, because module-level import interfered with # other SWIG interfaces try: import libcombine except ImportError: raise ImportError( "To use PEtab's COMBINE functionality, libcombine " "(python-libcombine) must be installed." ) def _add_file_metadata(location: str, description: str = ""): """Add metadata to the added file""" omex_description = libcombine.OmexDescription() omex_description.setAbout(location) omex_description.setDescription(description) omex_description.setCreated( libcombine.OmexDescription.getCurrentDateAndTime() ) archive.addMetadata(location, omex_description) archive = libcombine.CombineArchive() # Add PEtab files and metadata archive.addFile( str(yaml_file), os.path.basename(yaml_file), "http://identifiers.org/combine.specifications/petab.version-1", True, ) _add_file_metadata( location=os.path.basename(yaml_file), description="PEtab YAML file" ) # Add parameter file(s) that describe a single parameter table. # Works for a single file name, or a list of file names. for parameter_subset_file in list( np.array(yaml_config[PARAMETER_FILE]).flat ): archive.addFile( os.path.join(path_prefix, parameter_subset_file), parameter_subset_file, libcombine.KnownFormats.lookupFormat("tsv"), False, ) _add_file_metadata( location=parameter_subset_file, description="PEtab parameter file" ) for problem in yaml_config[PROBLEMS]: for sbml_file in problem[SBML_FILES]: archive.addFile( os.path.join(path_prefix, sbml_file), sbml_file, libcombine.KnownFormats.lookupFormat("sbml"), False, ) _add_file_metadata(location=sbml_file, description="SBML model") for field in [ MEASUREMENT_FILES, OBSERVABLE_FILES, VISUALIZATION_FILES, CONDITION_FILES, ]: if field not in problem: continue for file in problem[field]: archive.addFile( os.path.join(path_prefix, file), file, libcombine.KnownFormats.lookupFormat("tsv"), False, ) desc = field.split("_")[0] _add_file_metadata( location=file, description=f"PEtab {desc} file" ) # Add archive metadata description = libcombine.OmexDescription() description.setAbout(".") description.setDescription("PEtab archive") description.setCreated(libcombine.OmexDescription.getCurrentDateAndTime()) # Add creator info creator = libcombine.VCard() if family_name: creator.setFamilyName(family_name) if given_name: creator.setGivenName(given_name) if email: creator.setEmail(email) if organization: creator.setOrganization(organization) description.addCreator(creator) archive.addMetadata(".", description) archive.writeToFile(str(filename))
[docs] def unique_preserve_order(seq: Sequence) -> List: """Return a list of unique elements in Sequence, keeping only the first occurrence of each element Parameters: seq: Sequence to prune Returns: List of unique elements in ``seq`` """ seen = set() seen_add = seen.add return [x for x in seq if not (x in seen or seen_add(x))]