"""Functions performing various calculations."""
import numbers
import operator
from functools import reduce
import numpy as np
import pandas as pd
import sympy as sp
import petab.v1 as petab
from .C import *
from .math import sympify_petab
__all__ = [
"calculate_residuals",
"calculate_residuals_for_table",
"get_symbolic_noise_formulas",
"evaluate_noise_formula",
"calculate_chi2",
"calculate_chi2_for_table_from_residuals",
"calculate_llh",
"calculate_llh_for_table",
"calculate_single_llh",
]
[docs]
def calculate_residuals(
measurement_dfs: list[pd.DataFrame] | pd.DataFrame,
simulation_dfs: list[pd.DataFrame] | pd.DataFrame,
observable_dfs: list[pd.DataFrame] | pd.DataFrame,
parameter_dfs: list[pd.DataFrame] | pd.DataFrame,
normalize: bool = True,
scale: bool = True,
) -> list[pd.DataFrame]:
"""Calculate residuals.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
normalize:
Whether to normalize residuals by the noise standard deviation
terms.
scale:
Whether to calculate residuals of scaled values.
Returns:
List of DataFrames in the same structure as `measurement_dfs`
with a field `residual` instead of measurement.
"""
# convenience
if isinstance(measurement_dfs, pd.DataFrame):
measurement_dfs = [measurement_dfs]
if isinstance(simulation_dfs, pd.DataFrame):
simulation_dfs = [simulation_dfs]
if isinstance(observable_dfs, pd.DataFrame):
observable_dfs = [observable_dfs]
if isinstance(parameter_dfs, pd.DataFrame):
parameter_dfs = [parameter_dfs]
# iterate over data frames
residual_dfs = []
for measurement_df, simulation_df, observable_df, parameter_df in zip(
measurement_dfs,
simulation_dfs,
observable_dfs,
parameter_dfs,
strict=True,
):
residual_df = calculate_residuals_for_table(
measurement_df,
simulation_df,
observable_df,
parameter_df,
normalize,
scale,
)
residual_dfs.append(residual_df)
return residual_dfs
[docs]
def calculate_residuals_for_table(
measurement_df: pd.DataFrame,
simulation_df: pd.DataFrame,
observable_df: pd.DataFrame,
parameter_df: pd.DataFrame,
normalize: bool = True,
scale: bool = True,
) -> pd.DataFrame:
"""
Calculate residuals for a single measurement table.
For the arguments, see `calculate_residuals`.
"""
# below, we rely on a unique index
measurement_df = measurement_df.reset_index(drop=True)
# create residual df as copy of measurement df, change column
residual_df = measurement_df.copy(deep=True).rename(
columns={MEASUREMENT: RESIDUAL}
)
residual_df[RESIDUAL] = residual_df[RESIDUAL].astype("float64")
# matching columns
compared_cols = set(measurement_df.columns) & set(simulation_df.columns)
# compute noise formulas for observables
noise_formulas = get_symbolic_noise_formulas(observable_df)
# iterate over measurements, find corresponding simulations
for irow, row in measurement_df.iterrows():
measurement = row[MEASUREMENT]
# look up in simulation df
masks = [
(simulation_df[col] == row[col]) | petab.is_empty(row[col])
for col in compared_cols
]
mask = reduce(lambda x, y: x & y, masks)
if mask.sum() == 0:
raise ValueError(
f"Could not find simulation for measurement {row}."
)
# if we have multiple matches, check that the rows are all identical
elif (
mask.sum() > 1
and simulation_df.loc[mask].drop_duplicates().shape[0] > 1
):
raise ValueError(
f"Multiple different simulations found for measurement "
f"{row}:\n{simulation_df.loc[mask]}"
)
simulation = simulation_df.loc[mask][SIMULATION].iloc[0]
if scale:
# apply scaling
observable = observable_df.loc[row[OBSERVABLE_ID]]
trafo = observable.get(OBSERVABLE_TRANSFORMATION, LIN)
scaled_simulation = petab.scale(simulation, trafo)
scaled_measurement = petab.scale(measurement, trafo)
# non-normalized residual is just the difference
residual = scaled_measurement - scaled_simulation
if normalize:
# divide by standard deviation
residual /= evaluate_noise_formula(
row, noise_formulas, parameter_df, simulation
)
# fill in value
residual_df.loc[irow, RESIDUAL] = residual
return residual_df
[docs]
def calculate_chi2(
measurement_dfs: list[pd.DataFrame] | pd.DataFrame,
simulation_dfs: list[pd.DataFrame] | pd.DataFrame,
observable_dfs: list[pd.DataFrame] | pd.DataFrame,
parameter_dfs: list[pd.DataFrame] | pd.DataFrame,
normalize: bool = True,
scale: bool = True,
) -> float:
"""Calculate the chi2 value.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
normalize:
Whether to normalize residuals by the noise standard deviation
terms.
scale:
Whether to calculate residuals of scaled values.
Returns:
The aggregated chi2 value.
"""
residual_dfs = calculate_residuals(
measurement_dfs,
simulation_dfs,
observable_dfs,
parameter_dfs,
normalize,
scale,
)
chi2s = [
calculate_chi2_for_table_from_residuals(df) for df in residual_dfs
]
return sum(chi2s)
[docs]
def calculate_chi2_for_table_from_residuals(
residual_df: pd.DataFrame,
) -> float:
"""Compute chi2 value for a single residual table."""
return (np.array(residual_df[RESIDUAL]) ** 2).sum()
[docs]
def calculate_llh(
measurement_dfs: list[pd.DataFrame] | pd.DataFrame,
simulation_dfs: list[pd.DataFrame] | pd.DataFrame,
observable_dfs: list[pd.DataFrame] | pd.DataFrame,
parameter_dfs: list[pd.DataFrame] | pd.DataFrame,
) -> float:
"""Calculate total log likelihood.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
Returns:
The log-likelihood.
"""
# convenience
if isinstance(measurement_dfs, pd.DataFrame):
measurement_dfs = [measurement_dfs]
if isinstance(simulation_dfs, pd.DataFrame):
simulation_dfs = [simulation_dfs]
if isinstance(observable_dfs, pd.DataFrame):
observable_dfs = [observable_dfs]
if isinstance(parameter_dfs, pd.DataFrame):
parameter_dfs = [parameter_dfs]
# iterate over data frames
llhs = []
for measurement_df, simulation_df, observable_df, parameter_df in zip(
measurement_dfs,
simulation_dfs,
observable_dfs,
parameter_dfs,
strict=True,
):
_llh = calculate_llh_for_table(
measurement_df, simulation_df, observable_df, parameter_df
)
llhs.append(_llh)
return sum(llhs)
[docs]
def calculate_llh_for_table(
measurement_df: pd.DataFrame,
simulation_df: pd.DataFrame,
observable_df: pd.DataFrame,
parameter_df: pd.DataFrame,
) -> float:
"""Calculate log-likelihood for one set of tables. For the arguments, see
`calculate_llh`.
"""
llhs = []
# matching columns
compared_cols = set(measurement_df.columns) & set(simulation_df.columns)
# compute noise formulas for observables
noise_formulas = get_symbolic_noise_formulas(observable_df)
# iterate over measurements, find corresponding simulations
for _, row in measurement_df.iterrows():
measurement = row[MEASUREMENT]
# look up in simulation df
masks = [
(simulation_df[col] == row[col]) | petab.is_empty(row[col])
for col in compared_cols
]
mask = reduce(operator.and_, masks)
simulation = simulation_df.loc[mask][SIMULATION].iloc[0]
observable = observable_df.loc[row[OBSERVABLE_ID]]
# get scale
scale = observable.get(OBSERVABLE_TRANSFORMATION, LIN)
# get noise standard deviation
noise_value = evaluate_noise_formula(
row, noise_formulas, parameter_df, simulation
)
# get noise distribution
noise_distribution = observable.get(NOISE_DISTRIBUTION, NORMAL)
llh = calculate_single_llh(
measurement, simulation, scale, noise_distribution, noise_value
)
llhs.append(llh)
return sum(llhs)
[docs]
def calculate_single_llh(
measurement: float,
simulation: float,
scale: str,
noise_distribution: str,
noise_value: float,
) -> float:
"""Calculate a single log likelihood.
Arguments:
measurement: The measurement value.
simulation: The simulated value.
scale: The scale on which the noise model is to be applied.
noise_distribution: The noise distribution.
noise_value: The considered noise models possess a single noise
parameter, e.g. the normal standard deviation.
Returns:
The computed likelihood for the given values.
"""
# short-hand
m, s, sigma = measurement, simulation, noise_value
pi, log, log10 = np.pi, np.log, np.log10
# go over the possible cases
if noise_distribution == NORMAL and scale == LIN:
nllh = 0.5 * log(2 * pi * sigma**2) + 0.5 * ((s - m) / sigma) ** 2
elif noise_distribution == NORMAL and scale == LOG:
nllh = (
0.5 * log(2 * pi * sigma**2 * m**2)
+ 0.5 * ((log(s) - log(m)) / sigma) ** 2
)
elif noise_distribution == NORMAL and scale == LOG10:
nllh = (
0.5 * log(2 * pi * sigma**2 * m**2 * log(10) ** 2)
+ 0.5 * ((log10(s) - log10(m)) / sigma) ** 2
)
elif noise_distribution == LAPLACE and scale == LIN:
nllh = log(2 * sigma) + abs((s - m) / sigma)
elif noise_distribution == LAPLACE and scale == LOG:
nllh = log(2 * sigma * m) + abs((log(s) - log(m)) / sigma)
elif noise_distribution == LAPLACE and scale == LOG10:
nllh = log(2 * sigma * m * log(10)) + abs(
(log10(s) - log10(m)) / sigma
)
else:
raise NotImplementedError(
"Unsupported combination of noise_distribution and scale "
f"specified: {noise_distribution}, {scale}."
)
return -nllh