Source code for petab.measurements

"""Functions operating on the PEtab measurement table"""
# noqa: F405

import itertools
import math
import numbers
from pathlib import Path
from typing import Dict, List, Union

import numpy as np
import pandas as pd

from . import core, lint, observables
from .C import *  # noqa: F403

__all__ = [
    "assert_overrides_match_parameter_count",
    "create_measurement_df",
    "get_measurement_df",
    "get_measurement_parameter_ids",
    "get_rows_for_condition",
    "get_simulation_conditions",
    "measurements_have_replicates",
    "measurement_is_at_steady_state",
    "split_parameter_replacement_list",
    "write_measurement_df",
]


[docs] def get_measurement_df( measurement_file: Union[None, str, Path, pd.DataFrame] ) -> pd.DataFrame: """ Read the provided measurement file into a ``pandas.Dataframe``. Arguments: measurement_file: Name of file to read from or pandas.Dataframe Returns: Measurement DataFrame """ if measurement_file is None: return measurement_file if isinstance(measurement_file, (str, Path)): measurement_file = pd.read_csv( measurement_file, sep="\t", float_precision="round_trip" ) lint.assert_no_leading_trailing_whitespace( measurement_file.columns.values, MEASUREMENT ) return measurement_file
[docs] def write_measurement_df(df: pd.DataFrame, filename: Union[str, Path]) -> None: """Write PEtab measurement table Arguments: df: PEtab measurement table filename: Destination file name """ df = get_measurement_df(df) df.to_csv(filename, sep="\t", index=False)
[docs] def get_simulation_conditions(measurement_df: pd.DataFrame) -> pd.DataFrame: """ Create a table of separate simulation conditions. A simulation condition is a specific combination of simulationConditionId and preequilibrationConditionId. Arguments: measurement_df: PEtab measurement table Returns: Dataframe with columns 'simulationConditionId' and 'preequilibrationConditionId'. All-null columns will be omitted. Missing 'preequilibrationConditionId's will be set to '' (empty string). """ if measurement_df.empty: return pd.DataFrame(data={SIMULATION_CONDITION_ID: []}) # find columns to group by (i.e. if not all nans). # can be improved by checking for identical condition vectors grouping_cols = core.get_notnull_columns( measurement_df, [SIMULATION_CONDITION_ID, PREEQUILIBRATION_CONDITION_ID], ) # group by cols and return dataframe containing each combination # of those rows only once (and an additional counting row) # We require NaN-containing rows, but they are ignored by `groupby`, # therefore replace them before simulation_conditions = ( measurement_df.fillna("") .groupby(grouping_cols) .size() .reset_index()[grouping_cols] ) # sort to be really sure that we always get the same order return simulation_conditions.sort_values(grouping_cols, ignore_index=True)
[docs] def get_rows_for_condition( measurement_df: pd.DataFrame, condition: Union[pd.Series, pd.DataFrame, Dict], ) -> pd.DataFrame: """ Extract rows in `measurement_df` for `condition` according to 'preequilibrationConditionId' and 'simulationConditionId' in `condition`. Arguments: measurement_df: PEtab measurement DataFrame condition: DataFrame with single row (or Series) and columns 'preequilibrationConditionId' and 'simulationConditionId'. Or dictionary with those keys. Returns: The subselection of rows in ``measurement_df`` for the condition ``condition``. """ # filter rows for condition row_filter = 1 # check for equality in all grouping cols if PREEQUILIBRATION_CONDITION_ID in condition: row_filter = ( measurement_df[PREEQUILIBRATION_CONDITION_ID].fillna("") == condition[PREEQUILIBRATION_CONDITION_ID] ) & row_filter if SIMULATION_CONDITION_ID in condition: row_filter = ( measurement_df[SIMULATION_CONDITION_ID] == condition[SIMULATION_CONDITION_ID] ) & row_filter # apply filter cur_measurement_df = measurement_df.loc[row_filter, :] return cur_measurement_df
[docs] def get_measurement_parameter_ids(measurement_df: pd.DataFrame) -> List[str]: """ Return list of ID of parameters which occur in measurement table as observable or noise parameter overrides. Arguments: measurement_df: PEtab measurement DataFrame Returns: List of parameter IDs """ def get_unique_parameters(series): return core.unique_preserve_order( itertools.chain.from_iterable( series.apply(split_parameter_replacement_list) ) ) return core.unique_preserve_order( get_unique_parameters(measurement_df[OBSERVABLE_PARAMETERS]) + get_unique_parameters(measurement_df[NOISE_PARAMETERS]) )
[docs] def split_parameter_replacement_list( list_string: Union[str, numbers.Number], delim: str = PARAMETER_SEPARATOR ) -> List[Union[str, numbers.Number]]: """ Split values in observableParameters and noiseParameters in measurement table. Arguments: list_string: delim-separated stringified list delim: delimiter Returns: List of split values. Numeric values may be converted to `float`, and parameter IDs are kept as strings. """ if list_string is None or list_string == "": return [] if isinstance(list_string, numbers.Number): # Empty cells in pandas might be turned into nan # We might want to allow nan as replacement... if np.isnan(list_string): return [] return [list_string] result = [x.strip() for x in list_string.split(delim)] def convert_and_check(x): x = core.to_float_if_float(x) if isinstance(x, float): return x if lint.is_valid_identifier(x): return x raise ValueError( f"The value '{x}' in the parameter replacement list " f"'{list_string}' is neither a number, nor a valid parameter ID." ) return list(map(convert_and_check, result))
[docs] def create_measurement_df() -> pd.DataFrame: """Create empty measurement dataframe Returns: Created DataFrame """ return pd.DataFrame( data={ OBSERVABLE_ID: [], PREEQUILIBRATION_CONDITION_ID: [], SIMULATION_CONDITION_ID: [], MEASUREMENT: [], TIME: [], OBSERVABLE_PARAMETERS: [], NOISE_PARAMETERS: [], DATASET_ID: [], REPLICATE_ID: [], } )
[docs] def measurements_have_replicates(measurement_df: pd.DataFrame) -> bool: """Tests whether the measurements come with replicates Arguments: measurement_df: Measurement table Returns: ``True`` if there are replicates, ``False`` otherwise """ grouping_cols = core.get_notnull_columns( measurement_df, [ OBSERVABLE_ID, SIMULATION_CONDITION_ID, PREEQUILIBRATION_CONDITION_ID, TIME, ], ) return np.any( measurement_df.fillna("").groupby(grouping_cols).size().values - 1 )
[docs] def assert_overrides_match_parameter_count( measurement_df: pd.DataFrame, observable_df: pd.DataFrame ) -> None: """Ensure that number of parameters in the observable definition matches the number of overrides in ``measurement_df`` Arguments: measurement_df: PEtab measurement table observable_df: PEtab observable table """ # sympify only once and save number of parameters observable_parameters_count = { obs_id: len( observables.get_formula_placeholders(formula, obs_id, "observable") ) for obs_id, formula in zip( observable_df.index.values, observable_df[OBSERVABLE_FORMULA] ) } noise_parameters_count = { obs_id: len( observables.get_formula_placeholders(formula, obs_id, "noise") ) for obs_id, formula in zip( observable_df.index.values, observable_df[NOISE_FORMULA] ) } for _, row in measurement_df.iterrows(): # check observable parameters try: expected = observable_parameters_count[row[OBSERVABLE_ID]] except KeyError as e: raise ValueError( f"Observable {row[OBSERVABLE_ID]} used in measurement table " f"is not defined." ) from e actual = len( split_parameter_replacement_list( row.get(OBSERVABLE_PARAMETERS, None) ) ) # No overrides are also allowed if actual != expected: formula = observable_df.loc[row[OBSERVABLE_ID], OBSERVABLE_FORMULA] raise AssertionError( f"Mismatch of observable parameter overrides for " f"{row[OBSERVABLE_ID]} ({formula})" f"in:\n{row}\n" f"Expected {expected} but got {actual}" ) # check noise parameters replacements = split_parameter_replacement_list( row.get(NOISE_PARAMETERS, None) ) try: expected = noise_parameters_count[row[OBSERVABLE_ID]] # No overrides are also allowed if len(replacements) != expected: raise AssertionError( f"Mismatch of noise parameter overrides in:\n{row}\n" f"Expected {expected} but got {len(replacements)}" ) except KeyError: # no overrides defined, but a numerical sigma can be provided # anyways if len(replacements) != 1 or not isinstance( replacements[0], numbers.Number ): raise AssertionError( f"No placeholders have been specified in the noise model " f"for observable {row[OBSERVABLE_ID]}, but parameter ID " "or multiple overrides were specified in the " "noiseParameters column." )
[docs] def measurement_is_at_steady_state(time: float) -> bool: """Check whether a measurement is at steady state. Arguments: time: The time. Returns: Whether the measurement is at steady state. """ return math.isinf(time)