Source code for petab.v1.core

"""PEtab core functions (or functions that don't fit anywhere else)"""

import logging
import os
import re
from collections.abc import Callable, Iterable, Sequence
from pathlib import Path
from typing import (
    Any,
)
from warnings import warn

import numpy as np
import pandas as pd
from pandas.api.types import is_string_dtype

from . import yaml
from .C import *  # noqa: F403

logger = logging.getLogger(__name__)
__all__ = [
    "get_simulation_df",
    "write_simulation_df",
    "get_visualization_df",
    "write_visualization_df",
    "get_notnull_columns",
    "flatten_timepoint_specific_output_overrides",
    "concat_tables",
    "to_float_if_float",
    "is_empty",
    "create_combine_archive",
    "unique_preserve_order",
    "unflatten_simulation_df",
]

POSSIBLE_GROUPVARS_FLATTENED_PROBLEM = [
    OBSERVABLE_ID,
    OBSERVABLE_PARAMETERS,
    NOISE_PARAMETERS,
    SIMULATION_CONDITION_ID,
    PREEQUILIBRATION_CONDITION_ID,
]


[docs] def get_simulation_df(simulation_file: str | Path) -> pd.DataFrame: """Read PEtab simulation table Arguments: simulation_file: URL or filename of PEtab simulation table Returns: Simulation DataFrame """ return pd.read_csv( simulation_file, sep="\t", index_col=None, float_precision="round_trip" )
[docs] def write_simulation_df(df: pd.DataFrame, filename: str | Path) -> None: """Write PEtab simulation table Arguments: df: PEtab simulation table filename: Destination file name. The parent directory will be created if necessary. """ Path(filename).parent.mkdir(parents=True, exist_ok=True) df.to_csv(filename, sep="\t", index=False)
[docs] def get_visualization_df( visualization_file: str | Path | pd.DataFrame | None, ) -> pd.DataFrame | None: """Read PEtab visualization table Arguments: visualization_file: URL or filename of PEtab visualization table to read from, or a DataFrame or None that will be returned as is. Returns: Visualization DataFrame """ if visualization_file is None: return None if isinstance(visualization_file, pd.DataFrame): return visualization_file try: types = {PLOT_NAME: str} vis_spec = pd.read_csv( visualization_file, sep="\t", index_col=None, converters=types, float_precision="round_trip", ) except pd.errors.EmptyDataError: warn( "Visualization table is empty. Defaults will be used. " "Refer to the documentation for details.", stacklevel=2, ) vis_spec = pd.DataFrame() return vis_spec
[docs] def write_visualization_df(df: pd.DataFrame, filename: str | Path) -> None: """Write PEtab visualization table Arguments: df: PEtab visualization table filename: Destination file name """ df.to_csv(filename, sep="\t", index=False)
[docs] def get_notnull_columns(df: pd.DataFrame, candidates: Iterable): """ Return list of ``df``-columns in ``candidates`` which are not all null/nan. The output can e.g. be used as input for ``pandas.DataFrame.groupby``. Arguments: df: Dataframe candidates: Columns of ``df`` to consider """ return [ col for col in candidates if col in df and not np.all(df[col].isnull()) ]
def get_observable_replacement_id( groupvars: list[str], groupvar: Sequence ) -> str: """Get the replacement ID for an observable. Arguments: groupvars: The columns of a PEtab measurement table that should be unique between observables in a flattened PEtab problem. groupvar: A specific grouping of `groupvars`. Same length and order as `groupvars`. Returns: The observable replacement ID. """ replacement_id = "" for field in POSSIBLE_GROUPVARS_FLATTENED_PROBLEM: if field in groupvars: val = ( str(groupvar[groupvars.index(field)]) .replace(PARAMETER_SEPARATOR, "_") .replace(".", "_") ) if replacement_id == "": replacement_id = val elif val != "": replacement_id += f"__{val}" return replacement_id def get_hyperparameter_replacement_id( hyperparameter_type, observable_replacement_id, ): """Get the full ID for a replaced hyperparameter. Arguments: hyperparameter_type: The type of hyperparameter, e.g. `noiseParameter`. observable_replacement_id: The observable replacement ID, e.g. the output of `get_observable_replacement_id`. Returns: The hyperparameter replacement ID, with a field that will be replaced by the first matched substring in a regex substitution. """ return f"{hyperparameter_type}\\1_{observable_replacement_id}" def get_flattened_id_mappings( petab_problem: "petab.problem.Problem", ) -> dict[str, dict[str, str]]: """Get mapping from unflattened to flattened observable IDs. Arguments: petab_problem: The unflattened PEtab problem. Returns: A dictionary of dictionaries. Each inner dictionary is a mapping from original ID to flattened ID. Each outer dictionary is the mapping for either: observable IDs; noise parameter IDs; or, observable parameter IDs. """ groupvars = get_notnull_columns( petab_problem.measurement_df, POSSIBLE_GROUPVARS_FLATTENED_PROBLEM ) mappings = { OBSERVABLE_ID: {}, NOISE_PARAMETERS: {}, OBSERVABLE_PARAMETERS: {}, } for groupvar, measurements in petab_problem.measurement_df.groupby( groupvars, dropna=False ): observable_id = groupvar[groupvars.index(OBSERVABLE_ID)] observable_replacement_id = get_observable_replacement_id( groupvars, groupvar ) logger.debug(f"Creating synthetic observable {observable_id}") if observable_replacement_id in petab_problem.observable_df.index: raise RuntimeError( "could not create synthetic observables " f"since {observable_replacement_id} was " "already present in observable table" ) mappings[OBSERVABLE_ID][observable_replacement_id] = observable_id for field, hyperparameter_type in [ (NOISE_PARAMETERS, "noiseParameter"), (OBSERVABLE_PARAMETERS, "observableParameter"), ]: if field in measurements: mappings[field][ get_hyperparameter_replacement_id( hyperparameter_type=hyperparameter_type, observable_replacement_id=observable_replacement_id, ) ] = rf"{hyperparameter_type}([0-9]+)_{observable_id}" return mappings
[docs] def flatten_timepoint_specific_output_overrides( petab_problem: "petab.problem.Problem", ) -> None: """Flatten timepoint-specific output parameter overrides. If the PEtab problem definition has timepoint-specific `observableParameters` or `noiseParameters` for the same observable, replace those by replicating the respective observable. This is a helper function for some tools which may not support such timepoint-specific mappings. The observable table and measurement table are modified in place. Arguments: petab_problem: PEtab problem to work on. Modified in place. """ new_measurement_dfs = [] new_observable_dfs = [] groupvars = get_notnull_columns( petab_problem.measurement_df, POSSIBLE_GROUPVARS_FLATTENED_PROBLEM ) mappings = get_flattened_id_mappings(petab_problem) for groupvar, measurements in petab_problem.measurement_df.groupby( groupvars, dropna=False ): obs_id = groupvar[groupvars.index(OBSERVABLE_ID)] observable_replacement_id = get_observable_replacement_id( groupvars, groupvar ) observable = petab_problem.observable_df.loc[obs_id].copy() observable.name = observable_replacement_id for field, hyperparameter_type, target in [ (NOISE_PARAMETERS, "noiseParameter", NOISE_FORMULA), (OBSERVABLE_PARAMETERS, "observableParameter", OBSERVABLE_FORMULA), (OBSERVABLE_PARAMETERS, "observableParameter", NOISE_FORMULA), ]: if field not in measurements: continue if not is_string_dtype(type(observable[target])): # if not a string, we don't have to substitute anything continue hyperparameter_replacement_id = get_hyperparameter_replacement_id( hyperparameter_type=hyperparameter_type, observable_replacement_id=observable_replacement_id, ) hyperparameter_id = mappings[field][hyperparameter_replacement_id] observable[target] = re.sub( hyperparameter_id, hyperparameter_replacement_id, observable[target], ) measurements[OBSERVABLE_ID] = observable_replacement_id new_measurement_dfs.append(measurements) new_observable_dfs.append(observable) petab_problem.observable_df = pd.concat(new_observable_dfs, axis=1).T petab_problem.observable_df.index.name = OBSERVABLE_ID petab_problem.measurement_df = pd.concat(new_measurement_dfs) # remove visualization df if it uses observables that are not in the # flattened PEtab problem if petab_problem.visualization_df is not None: assert petab_problem.observable_df.index.name == OBSERVABLE_ID if not all( petab_problem.observable_df.index.isin( petab_problem.visualization_df[Y_VALUES] ) ): petab_problem.visualization_df = None logger.warning( "Removing visualization table from flattened PEtab problem." )
[docs] def unflatten_simulation_df( simulation_df: pd.DataFrame, petab_problem: "petab.problem.Problem", ) -> pd.DataFrame: """Unflatten simulations from a flattened PEtab problem. A flattened PEtab problem is the output of applying :func:`flatten_timepoint_specific_output_overrides` to a PEtab problem. Arguments: simulation_df: The simulation dataframe. A dataframe in the same format as a PEtab measurements table, but with the ``measurement`` column switched with a ``simulation`` column. petab_problem: The unflattened PEtab problem. Returns: The simulation dataframe for the unflattened PEtab problem. """ mappings = get_flattened_id_mappings(petab_problem) original_observable_ids = simulation_df[OBSERVABLE_ID].replace( mappings[OBSERVABLE_ID] ) unflattened_simulation_df = simulation_df.assign( **{ OBSERVABLE_ID: original_observable_ids, } ) return unflattened_simulation_df
[docs] def concat_tables( tables: str | Path | pd.DataFrame | Iterable[pd.DataFrame | str | Path], file_parser: Callable | None = None, ) -> pd.DataFrame: """Concatenate DataFrames provided as DataFrames or filenames, and a parser Arguments: tables: Iterable of tables to join, as DataFrame or filename. file_parser: Function used to read the table in case filenames are provided, accepting a filename as only argument. Returns: The concatenated DataFrames """ if isinstance(tables, pd.DataFrame): return tables if isinstance(tables, str | Path): return file_parser(tables) df = pd.DataFrame() for tmp_df in tables: # load from file, if necessary if isinstance(tmp_df, str | Path): tmp_df = file_parser(tmp_df) df = pd.concat( [df, tmp_df], sort=False, ignore_index=isinstance(tmp_df.index, pd.RangeIndex), ) return df
[docs] def to_float_if_float(x: Any) -> Any: """Return input as float if possible, otherwise return as is Arguments: x: Anything Returns: ``x`` as float if possible, otherwise ``x`` """ try: return float(x) except (ValueError, TypeError): return x
[docs] def is_empty(val) -> bool: """Check if the value `val`, e.g. a table entry, is empty. Arguments: val: The value to check. Returns: Whether the field is to be considered empty. """ return val == "" or pd.isnull(val)
[docs] def create_combine_archive( yaml_file: str | Path, filename: str | Path, family_name: str | None = None, given_name: str | None = None, email: str | None = None, organization: str | None = None, ) -> None: """Create COMBINE archive (https://co.mbine.org/documents/archive) based on PEtab YAML file. Arguments: yaml_file: Path to PEtab YAML file filename: Destination file name family_name: Family name of archive creator given_name: Given name of archive creator email: E-mail address of archive creator organization: Organization of archive creator """ path_prefix = os.path.dirname(str(yaml_file)) yaml_config = yaml.load_yaml(yaml_file) # function-level import, because module-level import interfered with # other SWIG interfaces try: import libcombine except ImportError as err: raise ImportError( "To use PEtab's COMBINE functionality, libcombine " "(python-libcombine) must be installed." ) from err def _add_file_metadata(location: str, description: str = ""): """Add metadata to the added file""" omex_description = libcombine.OmexDescription() omex_description.setAbout(location) omex_description.setDescription(description) omex_description.setCreated( libcombine.OmexDescription.getCurrentDateAndTime() ) archive.addMetadata(location, omex_description) archive = libcombine.CombineArchive() # Add PEtab files and metadata archive.addFile( str(yaml_file), os.path.basename(yaml_file), "http://identifiers.org/combine.specifications/petab.version-1", True, ) _add_file_metadata( location=os.path.basename(yaml_file), description="PEtab YAML file" ) # Add parameter file(s) that describe a single parameter table. # Works for a single file name, or a list of file names. for parameter_subset_file in list( np.array(yaml_config[PARAMETER_FILE]).flat ): archive.addFile( os.path.join(path_prefix, parameter_subset_file), parameter_subset_file, libcombine.KnownFormats.lookupFormat("tsv"), False, ) _add_file_metadata( location=parameter_subset_file, description="PEtab parameter file" ) for problem in yaml_config[PROBLEMS]: for sbml_file in problem[SBML_FILES]: archive.addFile( os.path.join(path_prefix, sbml_file), sbml_file, libcombine.KnownFormats.lookupFormat("sbml"), False, ) _add_file_metadata(location=sbml_file, description="SBML model") for field in [ MEASUREMENT_FILES, OBSERVABLE_FILES, VISUALIZATION_FILES, CONDITION_FILES, ]: if field not in problem: continue for file in problem[field]: archive.addFile( os.path.join(path_prefix, file), file, libcombine.KnownFormats.lookupFormat("tsv"), False, ) desc = field.split("_")[0] _add_file_metadata( location=file, description=f"PEtab {desc} file" ) # Add archive metadata description = libcombine.OmexDescription() description.setAbout(".") description.setDescription("PEtab archive") description.setCreated(libcombine.OmexDescription.getCurrentDateAndTime()) # Add creator info creator = libcombine.VCard() if family_name: creator.setFamilyName(family_name) if given_name: creator.setGivenName(given_name) if email: creator.setEmail(email) if organization: creator.setOrganization(organization) description.addCreator(creator) archive.addMetadata(".", description) archive.writeToFile(str(filename))
[docs] def unique_preserve_order(seq: Sequence) -> list: """Return a list of unique elements in Sequence, keeping only the first occurrence of each element Parameters: seq: Sequence to prune Returns: List of unique elements in ``seq`` """ seen = set() seen_add = seen.add return [x for x in seq if not (x in seen or seen_add(x))]