"""Functions performing various calculations."""
import numbers
from functools import reduce
from typing import Dict, List, Union
import numpy as np
import pandas as pd
import sympy
from sympy.abc import _clash
import petab
from .C import *
__all__ = [
"calculate_residuals",
"calculate_residuals_for_table",
"get_symbolic_noise_formulas",
"evaluate_noise_formula",
"calculate_chi2",
"calculate_chi2_for_table_from_residuals",
"calculate_llh",
"calculate_llh_for_table",
"calculate_single_llh",
]
[docs]
def calculate_residuals(
measurement_dfs: Union[List[pd.DataFrame], pd.DataFrame],
simulation_dfs: Union[List[pd.DataFrame], pd.DataFrame],
observable_dfs: Union[List[pd.DataFrame], pd.DataFrame],
parameter_dfs: Union[List[pd.DataFrame], pd.DataFrame],
normalize: bool = True,
scale: bool = True,
) -> List[pd.DataFrame]:
"""Calculate residuals.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
normalize:
Whether to normalize residuals by the noise standard deviation
terms.
scale:
Whether to calculate residuals of scaled values.
Returns:
List of DataFrames in the same structure as `measurement_dfs`
with a field `residual` instead of measurement.
"""
# convenience
if isinstance(measurement_dfs, pd.DataFrame):
measurement_dfs = [measurement_dfs]
if isinstance(simulation_dfs, pd.DataFrame):
simulation_dfs = [simulation_dfs]
if isinstance(observable_dfs, pd.DataFrame):
observable_dfs = [observable_dfs]
if isinstance(parameter_dfs, pd.DataFrame):
parameter_dfs = [parameter_dfs]
# iterate over data frames
residual_dfs = []
for measurement_df, simulation_df, observable_df, parameter_df in zip(
measurement_dfs, simulation_dfs, observable_dfs, parameter_dfs
):
residual_df = calculate_residuals_for_table(
measurement_df,
simulation_df,
observable_df,
parameter_df,
normalize,
scale,
)
residual_dfs.append(residual_df)
return residual_dfs
[docs]
def calculate_residuals_for_table(
measurement_df: pd.DataFrame,
simulation_df: pd.DataFrame,
observable_df: pd.DataFrame,
parameter_df: pd.DataFrame,
normalize: bool = True,
scale: bool = True,
) -> pd.DataFrame:
"""
Calculate residuals for a single measurement table.
For the arguments, see `calculate_residuals`.
"""
# create residual df as copy of measurement df, change column
residual_df = measurement_df.copy(deep=True).rename(
columns={MEASUREMENT: RESIDUAL}
)
residual_df[RESIDUAL] = residual_df[RESIDUAL].astype("float64")
# matching columns
compared_cols = set(MEASUREMENT_DF_COLS)
compared_cols -= {MEASUREMENT}
compared_cols &= set(measurement_df.columns)
compared_cols &= set(simulation_df.columns)
# compute noise formulas for observables
noise_formulas = get_symbolic_noise_formulas(observable_df)
# iterate over measurements, find corresponding simulations
for irow, row in measurement_df.iterrows():
measurement = row[MEASUREMENT]
# look up in simulation df
masks = [
(simulation_df[col] == row[col]) | petab.is_empty(row[col])
for col in compared_cols
]
mask = reduce(lambda x, y: x & y, masks)
simulation = simulation_df.loc[mask][SIMULATION].iloc[0]
if scale:
# apply scaling
observable = observable_df.loc[row[OBSERVABLE_ID]]
trafo = observable.get(OBSERVABLE_TRANSFORMATION, LIN)
simulation = petab.scale(simulation, trafo)
measurement = petab.scale(measurement, trafo)
# non-normalized residual is just the difference
residual = simulation - measurement
noise_value = 1
if normalize:
# look up noise standard deviation
noise_value = evaluate_noise_formula(
row, noise_formulas, parameter_df, simulation
)
residual /= noise_value
# fill in value
residual_df.loc[irow, RESIDUAL] = residual
return residual_df
[docs]
def calculate_chi2(
measurement_dfs: Union[List[pd.DataFrame], pd.DataFrame],
simulation_dfs: Union[List[pd.DataFrame], pd.DataFrame],
observable_dfs: Union[List[pd.DataFrame], pd.DataFrame],
parameter_dfs: Union[List[pd.DataFrame], pd.DataFrame],
normalize: bool = True,
scale: bool = True,
) -> float:
"""Calculate the chi2 value.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
normalize:
Whether to normalize residuals by the noise standard deviation
terms.
scale:
Whether to calculate residuals of scaled values.
Returns:
The aggregated chi2 value.
"""
residual_dfs = calculate_residuals(
measurement_dfs,
simulation_dfs,
observable_dfs,
parameter_dfs,
normalize,
scale,
)
chi2s = [
calculate_chi2_for_table_from_residuals(df) for df in residual_dfs
]
return sum(chi2s)
[docs]
def calculate_chi2_for_table_from_residuals(
residual_df: pd.DataFrame,
) -> float:
"""Compute chi2 value for a single residual table."""
return (np.array(residual_df[RESIDUAL]) ** 2).sum()
[docs]
def calculate_llh(
measurement_dfs: Union[List[pd.DataFrame], pd.DataFrame],
simulation_dfs: Union[List[pd.DataFrame], pd.DataFrame],
observable_dfs: Union[List[pd.DataFrame], pd.DataFrame],
parameter_dfs: Union[List[pd.DataFrame], pd.DataFrame],
) -> float:
"""Calculate total log likelihood.
Arguments:
measurement_dfs:
The problem measurement tables.
simulation_dfs:
Simulation tables corresponding to the measurement tables.
observable_dfs:
The problem observable tables.
parameter_dfs:
The problem parameter tables.
Returns:
The log-likelihood.
"""
# convenience
if isinstance(measurement_dfs, pd.DataFrame):
measurement_dfs = [measurement_dfs]
if isinstance(simulation_dfs, pd.DataFrame):
simulation_dfs = [simulation_dfs]
if isinstance(observable_dfs, pd.DataFrame):
observable_dfs = [observable_dfs]
if isinstance(parameter_dfs, pd.DataFrame):
parameter_dfs = [parameter_dfs]
# iterate over data frames
llhs = []
for measurement_df, simulation_df, observable_df, parameter_df in zip(
measurement_dfs, simulation_dfs, observable_dfs, parameter_dfs
):
_llh = calculate_llh_for_table(
measurement_df, simulation_df, observable_df, parameter_df
)
llhs.append(_llh)
return sum(llhs)
[docs]
def calculate_llh_for_table(
measurement_df: pd.DataFrame,
simulation_df: pd.DataFrame,
observable_df: pd.DataFrame,
parameter_df: pd.DataFrame,
) -> float:
"""Calculate log-likelihood for one set of tables. For the arguments, see
`calculate_llh`."""
llhs = []
# matching columns
compared_cols = set(MEASUREMENT_DF_COLS)
compared_cols -= {MEASUREMENT}
compared_cols &= set(measurement_df.columns)
compared_cols &= set(simulation_df.columns)
# compute noise formulas for observables
noise_formulas = get_symbolic_noise_formulas(observable_df)
# iterate over measurements, find corresponding simulations
for irow, row in measurement_df.iterrows():
measurement = row[MEASUREMENT]
# look up in simulation df
masks = [
(simulation_df[col] == row[col]) | petab.is_empty(row[col])
for col in compared_cols
]
mask = reduce(lambda x, y: x & y, masks)
simulation = simulation_df.loc[mask][SIMULATION].iloc[0]
observable = observable_df.loc[row[OBSERVABLE_ID]]
# get scale
scale = observable.get(OBSERVABLE_TRANSFORMATION, LIN)
# get noise standard deviation
noise_value = evaluate_noise_formula(
row, noise_formulas, parameter_df, petab.scale(simulation, scale)
)
# get noise distribution
noise_distribution = observable.get(NOISE_DISTRIBUTION, NORMAL)
llh = calculate_single_llh(
measurement, simulation, scale, noise_distribution, noise_value
)
llhs.append(llh)
return sum(llhs)
[docs]
def calculate_single_llh(
measurement: float,
simulation: float,
scale: str,
noise_distribution: str,
noise_value: float,
) -> float:
"""Calculate a single log likelihood.
Arguments:
measurement: The measurement value.
simulation: The simulated value.
scale: The scale on which the noise model is to be applied.
noise_distribution: The noise distribution.
noise_value: The considered noise models possess a single noise
parameter, e.g. the normal standard deviation.
Returns:
The computed likelihood for the given values.
"""
# short-hand
m, s, sigma = measurement, simulation, noise_value
pi, log, log10 = np.pi, np.log, np.log10
# go over the possible cases
if noise_distribution == NORMAL and scale == LIN:
nllh = 0.5 * log(2 * pi * sigma**2) + 0.5 * ((s - m) / sigma) ** 2
elif noise_distribution == NORMAL and scale == LOG:
nllh = (
0.5 * log(2 * pi * sigma**2 * m**2)
+ 0.5 * ((log(s) - log(m)) / sigma) ** 2
)
elif noise_distribution == NORMAL and scale == LOG10:
nllh = (
0.5 * log(2 * pi * sigma**2 * m**2 * log(10) ** 2)
+ 0.5 * ((log10(s) - log10(m)) / sigma) ** 2
)
elif noise_distribution == LAPLACE and scale == LIN:
nllh = log(2 * sigma) + abs((s - m) / sigma)
elif noise_distribution == LAPLACE and scale == LOG:
nllh = log(2 * sigma * m) + abs((log(s) - log(m)) / sigma)
elif noise_distribution == LAPLACE and scale == LOG10:
nllh = log(2 * sigma * m * log(10)) + abs(
(log10(s) - log10(m)) / sigma
)
else:
raise NotImplementedError(
"Unsupported combination of noise_distribution and scale "
f"specified: {noise_distribution}, {scale}."
)
return -nllh