"""Functions for working with the PEtab observables table"""
import re
from collections import OrderedDict
from pathlib import Path
from typing import Literal
import pandas as pd
from . import core, lint
from .C import * # noqa: F403
from .math import sympify_petab
from .models import Model
__all__ = [
"create_observable_df",
"get_formula_placeholders",
"get_observable_df",
"get_output_parameters",
"get_placeholders",
"write_observable_df",
]
[docs]
def get_observable_df(
observable_file: str | pd.DataFrame | Path | None,
) -> pd.DataFrame | None:
"""
Read the provided observable file into a ``pandas.Dataframe``.
Arguments:
observable_file: Name of the file to read from or pandas.Dataframe.
Returns:
Observable DataFrame
"""
if observable_file is None:
return observable_file
if isinstance(observable_file, str | Path):
observable_file = pd.read_csv(
observable_file, sep="\t", float_precision="round_trip"
)
lint.assert_no_leading_trailing_whitespace(
observable_file.columns.values, "observable"
)
if not isinstance(observable_file.index, pd.RangeIndex):
observable_file.reset_index(
drop=observable_file.index.name != OBSERVABLE_ID,
inplace=True,
)
try:
observable_file.set_index([OBSERVABLE_ID], inplace=True)
except KeyError:
raise KeyError(
f"Observable table missing mandatory field {OBSERVABLE_ID}."
) from None
return observable_file
[docs]
def write_observable_df(df: pd.DataFrame, filename: str | Path) -> None:
"""Write PEtab observable table
Arguments:
df: PEtab observable table
filename: Destination file name
"""
df = get_observable_df(df)
df.to_csv(filename, sep="\t", index=True)
[docs]
def get_output_parameters(
observable_df: pd.DataFrame,
model: Model,
observables: bool = True,
noise: bool = True,
mapping_df: pd.DataFrame = None,
) -> list[str]:
"""Get output parameters
Returns IDs of parameters used in observable and noise formulas that are
not defined in the model.
Arguments:
observable_df: PEtab observable table
model: The underlying model
observables: Include parameters from observableFormulas
noise: Include parameters from noiseFormulas
mapping_df: PEtab mapping table
Returns:
List of output parameter IDs
"""
formulas = []
if observables:
formulas.extend(observable_df[OBSERVABLE_FORMULA])
if noise and NOISE_FORMULA in observable_df:
formulas.extend(observable_df[NOISE_FORMULA])
output_parameters = OrderedDict()
for formula in formulas:
free_syms = sorted(
sympify_petab(formula).free_symbols,
key=lambda symbol: symbol.name,
)
for free_sym in free_syms:
sym = str(free_sym)
if model.symbol_allowed_in_observable_formula(sym):
continue
# does it map to a model entity?
if (
mapping_df is not None
and sym in mapping_df.index
and model.symbol_allowed_in_observable_formula(
mapping_df.loc[sym, MODEL_ENTITY_ID]
)
):
continue
output_parameters[sym] = None
return list(output_parameters.keys())
[docs]
def get_placeholders(
observable_df: pd.DataFrame,
observables: bool = True,
noise: bool = True,
) -> list[str]:
"""Get all placeholder parameters from observable table observableFormulas
and noiseFormulas
Arguments:
observable_df: PEtab observable table
observables: Include parameters from observableFormulas
noise: Include parameters from noiseFormulas
Returns:
List of placeholder parameters from observable table observableFormulas
and noiseFormulas.
"""
# collect placeholder parameters overwritten by
# {observable,noise}Parameters
placeholder_types = []
formula_columns = []
if observables:
placeholder_types.append("observable")
formula_columns.append(OBSERVABLE_FORMULA)
if noise:
placeholder_types.append("noise")
formula_columns.append(NOISE_FORMULA)
placeholders = []
for _, row in observable_df.iterrows():
for placeholder_type, formula_column in zip(
placeholder_types, formula_columns, strict=True
):
if formula_column not in row:
continue
cur_placeholders = get_formula_placeholders(
row[formula_column], row.name, placeholder_type
)
placeholders.extend(cur_placeholders)
return core.unique_preserve_order(placeholders)
[docs]
def create_observable_df() -> pd.DataFrame:
"""Create empty observable dataframe
Returns:
Created DataFrame
"""
return pd.DataFrame(data={col: [] for col in OBSERVABLE_DF_COLS})