"""Functions performing various calculations."""
import numbers
import operator
from functools import reduce
import numpy as np
import pandas as pd
import sympy as sp
from petab.v1 import is_empty, split_parameter_replacement_list
from .C import *
from .math import sympify_petab
__all__ = [
"calculate_residuals",
"calculate_residuals_for_table",
"get_symbolic_noise_formulas",
"evaluate_noise_formula",
"calculate_chi2",
"calculate_chi2_for_table_from_residuals",
"calculate_llh",
"calculate_llh_for_table",
"calculate_single_llh",
]
[docs]
def calculate_residuals(
measurement_dfs: list[pd.DataFrame] | pd.DataFrame,
simulation_dfs: list[pd.DataFrame] | pd.DataFrame,
observable_dfs: list[pd.DataFrame] | pd.DataFrame,
parameter_dfs: list[pd.DataFrame] | pd.DataFrame,
normalize: bool = True,
scale: bool = True,
) -> list[pd.DataFrame]:
"""Calculate residuals.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
normalize:
Whether to normalize residuals by the noise standard deviation
terms.
scale:
Whether to calculate residuals of scaled values.
Returns:
List of DataFrames in the same structure as `measurement_dfs`
with a field `residual` instead of measurement.
"""
# convenience
if isinstance(measurement_dfs, pd.DataFrame):
measurement_dfs = [measurement_dfs]
if isinstance(simulation_dfs, pd.DataFrame):
simulation_dfs = [simulation_dfs]
if isinstance(observable_dfs, pd.DataFrame):
observable_dfs = [observable_dfs]
if isinstance(parameter_dfs, pd.DataFrame):
parameter_dfs = [parameter_dfs]
# iterate over data frames
residual_dfs = []
for measurement_df, simulation_df, observable_df, parameter_df in zip(
measurement_dfs,
simulation_dfs,
observable_dfs,
parameter_dfs,
strict=True,
):
residual_df = calculate_residuals_for_table(
measurement_df,
simulation_df,
observable_df,
parameter_df,
normalize,
scale,
)
residual_dfs.append(residual_df)
return residual_dfs
[docs]
def calculate_residuals_for_table(
measurement_df: pd.DataFrame,
simulation_df: pd.DataFrame,
observable_df: pd.DataFrame,
parameter_df: pd.DataFrame,
normalize: bool = True,
scale: bool = True,
) -> pd.DataFrame:
"""
Calculate residuals for a single measurement table.
For the arguments, see `calculate_residuals`.
"""
from petab.v1 import scale
# below, we rely on a unique index
measurement_df = measurement_df.reset_index(drop=True)
# create residual df as copy of measurement df, change column
residual_df = measurement_df.copy(deep=True).rename(
columns={MEASUREMENT: RESIDUAL}
)
residual_df[RESIDUAL] = residual_df[RESIDUAL].astype("float64")
# matching columns
compared_cols = set(measurement_df.columns) & set(simulation_df.columns)
# compute noise formulas for observables
noise_formulas = get_symbolic_noise_formulas(observable_df)
# iterate over measurements, find corresponding simulations
for irow, row in measurement_df.iterrows():
measurement = row[MEASUREMENT]
# look up in simulation df
masks = [
(simulation_df[col] == row[col]) | is_empty(row[col])
for col in compared_cols
]
mask = reduce(operator.and_, masks)
if mask.sum() == 0:
raise ValueError(
f"Could not find simulation for measurement {row}."
)
# if we have multiple matches, check that the rows are all identical
elif (
mask.sum() > 1
and simulation_df.loc[mask].drop_duplicates().shape[0] > 1
):
raise ValueError(
f"Multiple different simulations found for measurement "
f"{row}:\n{simulation_df.loc[mask]}"
)
simulation = simulation_df.loc[mask][SIMULATION].iloc[0]
if scale:
# apply scaling
observable = observable_df.loc[row[OBSERVABLE_ID]]
# for v2, the transformation is part of the noise distribution
noise_distr = observable.get(NOISE_DISTRIBUTION, NORMAL)
if noise_distr.startswith("log-"):
trafo = LOG
elif noise_distr.startswith("log10-"):
trafo = LOG10
else:
trafo = LIN
# scale simulation and measurement
scaled_simulation = scale(simulation, trafo)
scaled_measurement = scale(measurement, trafo)
# non-normalized residual is just the difference
residual = scaled_measurement - scaled_simulation
if normalize:
# divide by standard deviation
residual /= evaluate_noise_formula(
row, noise_formulas, parameter_df, simulation, observable
)
# fill in value
residual_df.loc[irow, RESIDUAL] = residual
return residual_df
[docs]
def calculate_chi2(
measurement_dfs: list[pd.DataFrame] | pd.DataFrame,
simulation_dfs: list[pd.DataFrame] | pd.DataFrame,
observable_dfs: list[pd.DataFrame] | pd.DataFrame,
parameter_dfs: list[pd.DataFrame] | pd.DataFrame,
normalize: bool = True,
scale: bool = True,
) -> float:
"""Calculate the chi2 value.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
normalize:
Whether to normalize residuals by the noise standard deviation
terms.
scale:
Whether to calculate residuals of scaled values.
Returns:
The aggregated chi2 value.
"""
residual_dfs = calculate_residuals(
measurement_dfs,
simulation_dfs,
observable_dfs,
parameter_dfs,
normalize,
scale,
)
chi2s = [
calculate_chi2_for_table_from_residuals(df) for df in residual_dfs
]
return float(sum(chi2s))
[docs]
def calculate_chi2_for_table_from_residuals(
residual_df: pd.DataFrame,
) -> float:
"""Compute chi2 value for a single residual table."""
return float((np.array(residual_df[RESIDUAL]) ** 2).sum())
[docs]
def calculate_llh(
measurement_dfs: list[pd.DataFrame] | pd.DataFrame,
simulation_dfs: list[pd.DataFrame] | pd.DataFrame,
observable_dfs: list[pd.DataFrame] | pd.DataFrame,
parameter_dfs: list[pd.DataFrame] | pd.DataFrame,
) -> float:
"""Calculate total log likelihood.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
Returns:
The log-likelihood.
"""
# convenience
if isinstance(measurement_dfs, pd.DataFrame):
measurement_dfs = [measurement_dfs]
if isinstance(simulation_dfs, pd.DataFrame):
simulation_dfs = [simulation_dfs]
if isinstance(observable_dfs, pd.DataFrame):
observable_dfs = [observable_dfs]
if isinstance(parameter_dfs, pd.DataFrame):
parameter_dfs = [parameter_dfs]
# iterate over data frames
llhs = []
for measurement_df, simulation_df, observable_df, parameter_df in zip(
measurement_dfs,
simulation_dfs,
observable_dfs,
parameter_dfs,
strict=True,
):
_llh = calculate_llh_for_table(
measurement_df, simulation_df, observable_df, parameter_df
)
llhs.append(_llh)
return float(sum(llhs))
[docs]
def calculate_llh_for_table(
measurement_df: pd.DataFrame,
simulation_df: pd.DataFrame,
observable_df: pd.DataFrame,
parameter_df: pd.DataFrame,
) -> float:
"""Calculate log-likelihood for one set of tables. For the arguments, see
`calculate_llh`.
"""
llhs = []
# matching columns
compared_cols = set(measurement_df.columns) & set(simulation_df.columns)
# compute noise formulas for observables
noise_formulas = get_symbolic_noise_formulas(observable_df)
# iterate over measurements, find corresponding simulations
for _, row in measurement_df.iterrows():
measurement = row[MEASUREMENT]
# look up in simulation df
masks = [
(simulation_df[col] == row[col]) | is_empty(row[col])
for col in compared_cols
]
mask = reduce(lambda x, y: x & y, masks)
simulation = simulation_df.loc[mask][SIMULATION].iloc[0]
observable = observable_df.loc[row[OBSERVABLE_ID]]
# get noise distribution
noise_distr = observable.get(NOISE_DISTRIBUTION, NORMAL)
if noise_distr.startswith("log-"):
obs_scale = LOG
noise_distr = noise_distr.removeprefix("log-")
elif noise_distr.startswith("log10-"):
obs_scale = LOG10
noise_distr = noise_distr.removeprefix("log10-")
else:
obs_scale = LIN
# get noise standard deviation
noise_value = evaluate_noise_formula(
row,
noise_formulas,
parameter_df,
simulation,
observable,
)
llh = calculate_single_llh(
measurement, simulation, obs_scale, noise_distr, noise_value
)
llhs.append(llh)
return float(sum(llhs))
[docs]
def calculate_single_llh(
measurement: float,
simulation: float,
scale: str,
noise_distribution: str,
noise_value: float,
) -> float:
"""Calculate a single log likelihood.
Arguments:
measurement: The measurement value.
simulation: The simulated value.
scale: The scale on which the noise model is to be applied.
noise_distribution: The noise distribution.
noise_value: The considered noise models possess a single noise
parameter, e.g. the normal standard deviation.
Returns:
The computed likelihood for the given values.
"""
# PEtab v2:
if noise_distribution == LOG_NORMAL and scale == LIN:
noise_distribution = NORMAL
scale = LOG
# short-hand
m, s, sigma = measurement, simulation, noise_value
pi, log, log10 = np.pi, np.log, np.log10
# go over the possible cases
if noise_distribution == NORMAL and scale == LIN:
nllh = 0.5 * log(2 * pi * sigma**2) + 0.5 * ((s - m) / sigma) ** 2
elif noise_distribution == NORMAL and scale == LOG:
nllh = (
0.5 * log(2 * pi * sigma**2 * m**2)
+ 0.5 * ((log(s) - log(m)) / sigma) ** 2
)
elif noise_distribution == NORMAL and scale == LOG10:
nllh = (
0.5 * log(2 * pi * sigma**2 * m**2 * log(10) ** 2)
+ 0.5 * ((log10(s) - log10(m)) / sigma) ** 2
)
elif noise_distribution == LAPLACE and scale == LIN:
nllh = log(2 * sigma) + abs((s - m) / sigma)
elif noise_distribution == LAPLACE and scale == LOG:
nllh = log(2 * sigma * m) + abs((log(s) - log(m)) / sigma)
elif noise_distribution == LAPLACE and scale == LOG10:
nllh = log(2 * sigma * m * log(10)) + abs(
(log10(s) - log10(m)) / sigma
)
else:
raise NotImplementedError(
"Unsupported combination of noise_distribution and scale "
f"specified: {noise_distribution}, {scale}."
)
return -nllh