import warnings
import numpy as np
from numpy.linalg import LinAlgError
from scipy.linalg import LinAlgWarning
from scipy.optimize._numdiff import approx_derivative
import scipy.stats as stats
from refnx.util import ErrorProp as EP
from refnx._lib import flatten, approx_hess2
from refnx._lib import unique as f_unique
from refnx.dataset import Data1D
from refnx.analysis import (
is_parameter,
Parameter,
possibly_create_parameter,
is_parameters,
Parameters,
Interval,
PDF,
)
[docs]class BaseObjective:
"""Don't necessarily have to use Parameters, could use np.array"""
def __init__(
self,
p,
logl,
logp=None,
fcn_args=(),
fcn_kwds=None,
name=None,
weighted=True,
):
self.name = name
self.parameters = p
self.nvary = len(p)
self._logl = logl
self._logp = logp
self.fcn_args = fcn_args
self.fcn_kwds = {}
# give the BaseObjective a default value, so that it can be used in a
# GlobalObjective
self.weighted = weighted
if fcn_kwds is not None:
self.fcn_kwds = fcn_kwds
[docs] def setp(self, pvals):
"""
Set the parameters from pvals
Parameters
----------
pvals : np.ndarray
Array containing the values to be tested.
"""
self.parameters[:] = pvals
[docs] def nll(self, pvals=None):
"""
Negative log-likelihood function
Parameters
----------
pvals : np.ndarray
Array containing the values to be tested.
Returns
-------
nll : float
negative log-likelihood
"""
vals = self.parameters
if pvals is not None:
vals = pvals
return -self.logl(vals)
[docs] def logp(self, pvals=None):
"""
Log-prior probability function
Parameters
----------
pvals : np.ndarray
Array containing the values to be tested.
Returns
-------
logp : float
log-prior probability
"""
vals = self.parameters
if pvals is not None:
vals = pvals
if callable(self._logp):
return self._logp(vals, *self.fcn_args, **self.fcn_kwds)
return 0
[docs] def logl(self, pvals=None):
"""
Log-likelihood probability function
Parameters
----------
pvals : np.ndarray
Array containing the values to be tested.
Returns
-------
logl : float
log-likelihood probability.
"""
vals = self.parameters
if pvals is not None:
vals = pvals
return self._logl(vals, *self.fcn_args, **self.fcn_kwds)
[docs] def logpost(self, pvals=None):
"""
Log-posterior probability function
Parameters
----------
pvals : np.ndarray
Array containing the values to be tested.
Returns
-------
logpost : float
log-probability.
Notes
-----
The log probability is the sum is the sum of the log-prior and
log-likelihood probabilities. Does not set the parameter attribute.
"""
vals = self.parameters
if pvals is not None:
vals = pvals
logpost = self.logp(vals)
if not np.isfinite(logpost):
return -np.inf
logpost += self.logl(vals)
return logpost
[docs] def nlpost(self, pvals=None):
"""
Negative log-posterior function
Parameters
----------
pvals : np.ndarray
Array containing the values to be tested.
Returns
-------
nlpost : float
negative log-posterior
"""
return -self.logpost(pvals)
[docs] def varying_parameters(self):
"""
Returns
-------
varying_parameters : np.ndarray
The parameters varying in this objective function.
"""
return self.parameters
[docs] def covar(self, target="nll"):
"""
Estimates a covariance matrix based on numerical differentiation
of either the negative log-likelihood or negative log-posterior
probabilities.
Parameters
----------
target : str, {"nll", "nlpost"}
Returns
-------
covar : np.ndarray
The covariance matrix for the fitting system
Notes
-----
Estimation of a covariance matrix can be susceptible to numeric
instabilities. Critically evaluate the matrix before further use.
"""
_pvals = np.array(self.varying_parameters())
if target == "nll":
fn = self.nll
elif target == "nlpost":
fn = self.nlpost
try:
# from statsmodels
# the output from this for the test in test_objective.covar
# is very similar to numdifftools.Hessian, or a chained version
# of approx_derivative
hess = approx_hess2(_pvals, fn)
covar = np.linalg.inv(hess)
except LinAlgError:
sz = np.size(_pvals, 0)
covar = np.full((sz, sz), np.inf)
finally:
self.setp(_pvals)
return covar
[docs]class Objective(BaseObjective):
"""
Objective function for using with curvefitters such as
`refnx.analysis.curvefitter.CurveFitter`.
Parameters
----------
model : refnx.analysis.Model
the generative model function. One can also provide an object that
inherits `refnx.analysis.Model`.
data : refnx.dataset.Data1D
data to be analysed.
lnsigma : float or refnx.analysis.Parameter, optional
Used if the experimental uncertainty (`data.y_err`) underestimated by
a constant fractional amount. The experimental uncertainty is modified
as:
`s_n**2 = y_err**2 + exp(lnsigma * 2) * model**2`
See `Objective.logl` for more details.
use_weights : bool
use experimental uncertainty in calculation of residuals and
logl, if available. If this is set to False, then you should also
set `self.lnsigma.vary = False`, it will have no effect on the fit.
transform : callable, optional
the model, data and data uncertainty are transformed by this
function before calculating the likelihood/residuals. Has the
signature `transform(data.x, y, y_err=None)`, returning the tuple
(`transformed_y, transformed_y_err`).
logp_extra : callable, optional
user specifiable log-probability term. This contribution is in
addition to the log-prior term of the `model` parameters, and
`model.logp`, as well as the log-likelihood of the `data`. Has
signature:
`logp_extra(model, data)`. The `model` will already possess
updated parameters. Beware of including the same log-probability
terms more than once.
auxiliary_params : {sequence, Parameters}, optional
Extra Parameter objects that are involved with curvefitting, but
aren't directly included as part of the `model`. See notes for more
details.
name : str
Name for the objective.
alpha : {float, Parameter, None}, optional
Multiplies the overall log-prior term for the parameters. Used to
balance log-prior and log-likelihood terms in the log-posterior.
Notes
-----
For parallelisation `logp_extra` needs to be picklable.
`auxiliary_params` are included in calculating the `Objective.logp`
term, are present in `Objective.varying_parameters()`, and are modified by
Curvefitter during an analysis. Their main purpose is to aid in making
constraints in models.
"""
def __init__(
self,
model,
data,
lnsigma=None,
use_weights=True,
transform=None,
logp_extra=None,
auxiliary_params=(),
name=None,
alpha=None,
):
self.model = model
# should be a Data1D instance
if isinstance(data, Data1D):
self.data = data
else:
self.data = Data1D(data=data)
self.lnsigma = lnsigma
if lnsigma is not None:
self.lnsigma = possibly_create_parameter(lnsigma, "lnsigma")
if isinstance(auxiliary_params, Parameters):
self.auxiliary_params = auxiliary_params
else:
self.auxiliary_params = Parameters(auxiliary_params)
self._use_weights = use_weights
self.transform = transform
self.logp_extra = logp_extra
self.name = name
if name is None:
self.name = id(self)
self.alpha = alpha
if alpha is not None:
self.alpha = possibly_create_parameter(alpha, name="alpha")
def __str__(self):
s = [f"{'':_>80}"]
s.append(f"Objective - {self.name}")
# dataset name
if self.data.name is None:
s.append(f"Dataset = {self.data}")
else:
s.append(f"Dataset = {self.data.name}")
s.append(f"datapoints = {self.npoints}")
s.append(f"chi2 = {self.chisqr()}")
s.append(f"Weighted = {self.weighted}")
s.append(f"Transform = {self.transform}")
s.append(str(self.parameters))
return "\n".join(s)
def __repr__(self):
return (
f"Objective({self.model!r}, {self.data!r},"
f" lnsigma={self.lnsigma!r},"
f" use_weights={self._use_weights},"
f" transform={self.transform!r},"
f" logp_extra={self.logp_extra!r},"
f" name={self.name!r},"
f" alpha={self.alpha!r})"
)
@property
def weighted(self):
"""
**bool** Does the data have weights (`data.y_err`), and is the
objective using them?
"""
return self.data.weighted and self._use_weights
@weighted.setter
def weighted(self, use_weights):
self._use_weights = bool(use_weights)
@property
def npoints(self):
"""
**int** the number of points in the dataset.
"""
return self.data.y.size
[docs] def varying_parameters(self):
"""
Returns
-------
varying_parameters : refnx.analysis.Parameters
The varying Parameter objects allowed to vary during the fit.
"""
# create and return a Parameters object because it has the
# __array__ method, which allows one to quickly get numerical values.
lst = []
for p in flatten(self.parameters):
if p.vary:
lst.append(p)
continue
if len(p._deps):
lst.extend([_p for _p in p.dependencies() if _p.vary])
# should already be totally flattened by this point
return Parameters(f_unique(lst))
def _data_transform(self, model=None):
"""
Returns
-------
y, y_err, model: tuple of np.ndarray
The y data, its uncertainties, and the model, all put through the
transform.
"""
x = self.data.x
y = self.data.y
y_err = 1.0
if self.weighted:
y_err = self.data.y_err
if self.transform is None:
return y, y_err, model
else:
if model is not None:
model, _ = self.transform(x, model)
y, y_err = self.transform(x, y, y_err)
if self.weighted:
return y, y_err, model
else:
return y, 1, model
[docs] def generative(self, pvals=None):
"""
Calculate the generative (dependent variable) function associated with
the model.
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
model : np.ndarray
"""
self.setp(pvals)
return self.model(self.data.x, x_err=self.data.x_err)
[docs] def residuals(self, pvals=None):
"""
Calculates the residuals for a given fitting system.
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
residuals : np.ndarray
Residuals, `(data.y - model) / y_err`.
"""
self.setp(pvals)
model = self.model(self.data.x, x_err=self.data.x_err)
# TODO add in varying parameter residuals? (z-scores...)
y, y_err, model = self._data_transform(model)
if self.lnsigma is not None:
s_n = np.sqrt(
y_err * y_err + np.exp(2 * float(self.lnsigma)) * model * model
)
else:
s_n = y_err
return (y - model) / s_n
[docs] def chisqr(self, pvals=None):
"""
Calculates the chi-squared value for a given fitting system.
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
chisqr : np.ndarray
Chi-squared value, `np.sum(residuals**2)`.
"""
# TODO reduced chisqr? include z-scores for parameters? DOF?
self.setp(pvals)
res = self.residuals(None)
return np.sum(res * res)
@property
def parameters(self):
"""
:class:`refnx.analysis.Parameters`, all the Parameters contained in the
fitting system.
"""
pars = self.model.parameters
if is_parameter(self.alpha):
pars = pars | self.alpha
if is_parameter(self.lnsigma):
pars = pars | self.lnsigma
if len(self.auxiliary_params):
pars = pars | self.auxiliary_params
return pars
[docs] def setp(self, pvals):
"""
Set the parameters from pvals.
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
"""
if pvals is None:
return
# set here rather than delegating to a Parameters
# object, because it may not necessarily be a
# Parameters object
_varying_parameters = self.varying_parameters()
if len(pvals) == len(_varying_parameters):
for idx, param in enumerate(_varying_parameters):
param.value = pvals[idx]
return
# values supplied are enough to specify all parameter values
# even those that are repeated
flattened_parameters = list(flatten(self.parameters))
if len(pvals) == len(flattened_parameters):
for idx, param in enumerate(flattened_parameters):
param.value = pvals[idx]
return
raise ValueError(
f"Incorrect number of values supplied ({len(pvals)})"
f", supply either the full number of parameters"
f" ({len(flattened_parameters)}, or only the varying"
f" parameters ({len(_varying_parameters)})."
)
[docs] def logp(self, pvals=None):
"""
Calculate the log-prior of the system
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
logp : float
log-prior probability
Notes
-----
The log-prior is calculated as:
.. code-block:: python
logp = np.sum(param.logp() for param in
self.varying_parameters())
"""
self.setp(pvals)
logp = np.sum(
[
param.logp()
for param in f_unique(
p for p in flatten(self.parameters) if p.vary
)
]
)
if not np.isfinite(logp):
return -np.inf
return logp
[docs] def logl(self, pvals=None):
"""
Calculate the log-likelhood of the system
The major component of the log-likelhood probability is from the data.
Extra potential terms are added on from the Model, `self.model.logp`,
and the user specifiable `logp_extra` function.
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
logl : float
log-likelihood probability
Notes
-----
The log-likelihood is calculated as:
.. code-block:: python
logl = -0.5 * np.sum(((y - model) / s_n)**2
+ np.log(2 * pi * s_n**2))
logp += self.model.logp()
logp += self.logp_extra(self.model, self.data)
where
.. code-block:: python
s_n**2 = y_err**2 + exp(2 * lnsigma) * model**2
"""
self.setp(pvals)
model = self.model(self.data.x, x_err=self.data.x_err)
logl = 0.0
y, y_err, model = self._data_transform(model)
if self.lnsigma is not None:
var_y = (
y_err * y_err + np.exp(2 * float(self.lnsigma)) * model * model
)
else:
var_y = y_err**2
# TODO do something sensible if data isn't weighted
if self.weighted:
logl += np.log(2 * np.pi * var_y)
logl += (y - model) ** 2 / var_y
# nans play havoc
if np.isnan(logl).any():
raise RuntimeError("Objective.logl encountered a NaN.")
# add on extra 'potential' terms from the model.
extra_potential = self.model.logp()
if self.logp_extra is not None:
extra_potential += self.logp_extra(self.model, self.data)
return -0.5 * np.sum(logl) + extra_potential
[docs] def nll(self, pvals=None):
"""
Negative log-likelihood function
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
nll : float
negative log-likelihood
"""
self.setp(pvals)
return -self.logl()
[docs] def logpost(self, pvals=None):
"""
Calculate the log-probability of the curvefitting system
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
logpost : float
log-probability
Notes
-----
The overall log-probability is the sum of the log-prior and
log-likelihood. The log-likelihood is not calculated if the log-prior
is impossible (`logp == -np.inf`).
"""
self.setp(pvals)
alpha = 1.0
if is_parameter(self.alpha):
alpha = self.alpha.value
logpost = alpha * self.logp()
# only calculate the probability if the parameters have finite
# log-prior
if not np.isfinite(logpost):
return -np.inf
logpost += self.logl()
return logpost
[docs] def covar(self, target="residuals"):
"""
Estimates the covariance matrix of the Objective.
Parameters
----------
target : {"residuals", "nll", "nlpost"}
Specifies what approach should be used to estimate covariance.
Returns
-------
covar : np.ndarray
Covariance matrix
Notes
-----
For most purposes the Jacobian of the `'residuals'` should be used to
calculate the covariance matrix, estimated as J.T x J.
If an Objective cannot calculate residuals then the covariance matrix
can be estimated by inverting a Hessian matrix created from either the
`'nll'` or `'nlpost'` methods.
The default `'residuals'` approach falls back to `'nll'` if a problem
is experienced.
The default `'residuals'` setting is preferred as the other settings
can sometimes experience instabilities during Hessian estimation with
numerical differentiation.
"""
if target == "residuals":
try:
covar = self._covar_from_residuals()
except Exception:
# fallback to "nll"
target = "nll"
if target in ["nll", "nlpost"]:
covar = super().covar(target)
pvar = np.diagonal(covar).copy()
psingular = np.where(pvar == 0)[0]
if len(psingular) > 0:
var_params = self.varying_parameters()
singular_params = [var_params[ps] for ps in psingular]
warnings.warn(
"The following Parameters have no effect on"
" Objective.residuals, please consider fixing"
" them.\n" + repr(singular_params),
LinAlgWarning,
)
return covar
def _covar_from_residuals(self):
_pvals = np.array(self.varying_parameters())
used_residuals_scaler = False
def fn_scaler(vals):
return np.squeeze(self.residuals(_pvals * vals))
try:
# we should be able to calculate a Jacobian for a parameter whose
# value is zero. However, the scaling approach won't work.
# This will force Jacobian calculation by unscaled parameters
if np.any(_pvals == 0):
raise FloatingPointError()
with np.errstate(invalid="raise"):
jac = approx_derivative(fn_scaler, np.ones_like(_pvals))
used_residuals_scaler = True
except FloatingPointError:
jac = approx_derivative(self.residuals, _pvals)
finally:
# using approx_derivative changes the state of the objective
# parameters have to make sure they're set at the end
self.setp(_pvals)
# need to create this because GlobalObjective may not have
# access to all the datapoints being fitted.
n_datapoints = np.size(jac, 0)
# covar = J.T x J
# from scipy.optimize.minpack.py
# eliminates singular parameters
_, s, VT = np.linalg.svd(jac, full_matrices=False)
threshold = np.finfo(float).eps * max(jac.shape) * s[0]
s = s[s > threshold]
VT = VT[: s.size]
covar = np.dot(VT.T / s**2, VT)
if used_residuals_scaler:
# unwind the scaling.
covar = covar * np.atleast_2d(_pvals) * np.atleast_2d(_pvals).T
scale = 1.0
# scale by reduced chi2 if experimental uncertainties weren't used.
if not (self.weighted):
scale = self.chisqr() / (
n_datapoints - len(self.varying_parameters())
)
return covar * scale
[docs] def pgen(self, ngen=1000, nburn=0, nthin=1, random_state=None):
"""
Yield random parameter vectors from the MCMC samples. The objective
state is not altered.
Parameters
----------
ngen : int, optional
the number of samples to yield. The actual number of samples
yielded is `min(ngen, chain.size)`
nburn : int, optional
discard this many steps from the start of the chain
nthin : int, optional
only accept every `nthin` samples from the chain
random_state : {int, np.random.Generator, None}
random number generator that picks the samples
Yields
------
pvec : np.ndarray
A randomly chosen parameter vector
"""
yield from self.parameters.pgen(
ngen=ngen, nburn=nburn, nthin=nthin, random_state=random_state
)
def _generate_generative_mcmc(
self, ngen=1000, nburn=0, nthin=1, random_state=None
):
"""
Yield generative curves from the MCMC samples. The objective state
is altered during generation, but should be restored when the
generator exits. If this generator is not exhausted then the
Objective state will not be restored!
Parameters
----------
ngen : int, optional
the number of samples to yield. The actual number of samples
yielded is `min(ngen, chain.size)`
nburn : int, optional
discard this many steps from the start of the chain
nthin : int, optional
only accept every `nthin` samples from the chain
random_state : {int, np.random.Generator, None}
random number generator that picks the samples
Yields
------
generative : np.ndarray
`Objective.generative` points for each of the samples.
"""
saved_params = np.array(self.varying_parameters())
_pgen = self.pgen(
ngen=ngen, nburn=nburn, nthin=nthin, random_state=random_state
)
try:
for pars in _pgen:
yield self.generative(pars)
finally:
self.setp(saved_params)
[docs] def plot(self, pvals=None, samples=0, parameter=None, fig=None):
"""
Plot the data/model.
Requires matplotlib be installed.
Parameters
----------
pvals : np.ndarray, optional
Numeric values for the Parameter's that are varying
samples: number
If the objective has been sampled, how many samples you wish to
plot on the graph.
parameter: refnx.analysis.Parameter
Creates an interactive plot for the Parameter in Jupyter. Requires
ipywidgets be installed. Use with %matplotlib notebook/qt.
fig: Figure instance, optional
If `fig` is not supplied then a new figure is created. Otherwise
the graph is created on the current axes on the supplied figure.
Returns
-------
fig, ax : :class:`matplotlib.Figure`, :class:`matplotlib.Axes`
`matplotlib` figure and axes objects.
"""
self.setp(pvals)
if fig is None:
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
else:
ax = fig.gca()
y, y_err, model = self._data_transform(model=self.generative())
# add the data (in a transformed fashion)
if self.weighted:
ax.errorbar(
self.data.x,
y,
y_err,
color="blue",
label=self.data.name,
marker="o",
ms=3,
lw=0,
elinewidth=2,
)
else:
ax.scatter(self.data.x, y, color="blue", s=3, label=self.data.name)
if samples > 0:
# Get a number of chains, chosen randomly, set the objective,
# and plot the model.
for curve in self._generate_generative_mcmc(ngen=samples):
y, y_err, model = self._data_transform(model=curve)
ax.plot(self.data.x, model, color="k", alpha=0.01)
# add the fit
generative_plot = ax.plot(self.data.x, model, color="red", zorder=20)
if parameter is None:
return fig, ax
# create an interactive plot in a Jupyter notebook.
def f(val):
if parameter is not None:
parameter.value = float(val)
y, y_err, model = self._data_transform(model=self.generative())
generative_plot[0].set_data(self.data.x, model)
fig.canvas.draw()
import ipywidgets
return fig, ax, ipywidgets.interact(f, val=float(parameter))
[docs] def corner(self, **kwds):
"""
Corner plot of the chains belonging to the Parameters.
Requires the `corner` and `matplotlib` packages.
Parameters
----------
kwds: dict
passed directly to the `corner.corner` function
Returns
-------
fig : :class:`matplotlib.Figure` object.
"""
import corner
var_pars = self.varying_parameters()
chain = np.array([par.chain for par in var_pars])
labels = [par.name for par in var_pars]
chain = chain.reshape(len(chain), -1).T
kwds["labels"] = labels
kwds["quantiles"] = [0.16, 0.5, 0.84]
return corner.corner(chain, **kwds)
[docs]class GlobalObjective(Objective):
"""
Global Objective function for simultaneous fitting with
`refnx.analysis.CurveFitter`
Parameters
----------
objectives : list
list of :class:`refnx.analysis.Objective` objects
lambdas : array-like
Lagrangian multipliers for each of the objective terms that contribute
to the log-likelihood. Broadcast against the list of objectives. This
array-like *may* become a list of Parameters in the future.
alpha : {float, Parameter, None}, optional
Multiplies the overall log-prior term for the parameters. Used to
balance log-prior and log-likelihood terms in the log-posterior.
"""
def __init__(self, objectives, lambdas=None, alpha=1.0):
self.objectives = objectives
nobj = len(objectives)
if lambdas is not None:
self.lambdas = np.broadcast_to(lambdas, (nobj,)).astype(float)
else:
self.lambdas = np.ones(nobj)
weighted = [objective.weighted for objective in objectives]
self._weighted = np.array(weighted, dtype=bool)
if len(np.unique(self._weighted)) > 1:
raise ValueError(
"All the objectives must be either weighted or"
" unweighted, you cannot have a mixture."
)
self.alpha = alpha
if alpha is not None:
self.alpha = possibly_create_parameter(alpha, name="alpha")
def __str__(self):
s = [f"{'':_>80}", "\n"]
s.append("--Global Objective--")
for obj in self.objectives:
s.append(str(obj))
s.append("\n")
return "\n".join(s)
def __repr__(self):
return (
f"GlobalObjective({self.objectives!r},"
f" lambdas={list(self.lambdas)!r},"
f" alpha={self.alpha!r})"
)
@property
def weighted(self):
"""
**bool** do all the datasets have y_err, and are all the objectives
wanting to use weights?
"""
return self._weighted.all()
@property
def npoints(self):
"""
**int** number of data points in all the objectives.
"""
npoints = 0
for objective in self.objectives:
npoints += objective.npoints
return npoints
[docs] def generative(self, pvals=None):
"""
Concatenated generative curves for the
:meth:`refnx.analysis.Objective.generative`.
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
generative : np.ndarray
Concatenated :meth:`refnx.analysis.Objective.generative`
"""
self.setp(pvals)
generative = np.hstack([o.generative() for o in self.objectives])
return generative
[docs] def residuals(self, pvals=None):
"""
Concatenated residuals for each of the
:meth:`refnx.analysis.Objective.residuals`.
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
residuals : np.ndarray
Concatenated :meth:`refnx.analysis.Objective.residuals`
Notes
-----
The Lagrangian multipliers contained in the ``lambdas`` attribute are
also included in the calculation of these residual arrays, to permit
least squares analyses. If you would like to view un-modified
residuals you should calculate them from the individual objectives.
"""
self.setp(pvals)
residuals = []
for objective, _lambda in zip(self.objectives, self.lambdas):
residual = _lambda * objective.residuals()
residuals.append(residual)
return np.concatenate(residuals)
@property
def parameters(self):
"""
:class:`refnx.analysis.Parameters` associated with all the objectives.
"""
# TODO this is probably going to be slow.
# cache and update strategy?
p = Parameters(name="global fitting parameters")
for objective in self.objectives:
p.append(objective.parameters)
return p
[docs] def logp(self, pvals=None):
"""
Calculate the log-prior of the system
Parameters
----------
pvals : array-like or refnx.analysis.Parameters, optional
values for the varying or entire set of parameters
Returns
-------
logp : float
log-prior probability
"""
self.setp(pvals)
logp = 0.0
for objective in self.objectives:
logp += objective.logp()
# shortcut if one of the priors is impossible
if not np.isfinite(logp):
return -np.inf
return logp
[docs] def logl(self, pvals=None):
"""
Calculate the combined log-likelhood of the system.
Parameters
----------
pvals : array-like or refnx.analysis.Parameters
values for the varying or entire set of parameters
Returns
-------
logl : float
log-likelihood probability
Notes
-----
The log-likelihood of each of the objectives is multiplied by the
respective Lagrangian multiplier in ``GlobalObjective.lambdas``.
The purpose of this multiplier is to allow the user to weight certain
datasets more heavily (e.g. to make each of the contributions equal).
"""
self.setp(pvals)
logl = 0.0
for objective, _lambda in zip(self.objectives, self.lambdas):
logl += _lambda * objective.logl()
return logl
[docs] def plot(self, pvals=None, samples=0, parameter=None, fig=None):
"""
Plot the data/model for all the objectives in the GlobalObjective.
Matplotlib must be installed to use this method.
Parameters
----------
pvals : np.ndarray, optional
Numeric values for the Parameter's that are varying
samples: number, optional
If the objective has been sampled, how many samples you wish to
plot on the graph.
parameter: refnx.analysis.Parameter, optional
Creates an interactive plot for the Parameter in Jupyter. Requires
ipywidgets be installed. Use with %matplotlib notebook/qt.
fig: Figure instance, optional
If `fig` is not supplied then a new figure is created. Otherwise
the graph is created on the current axes on the supplied figure.
Returns
-------
fig, ax : :class:`matplotlib.Figure`, :class:`matplotlib.Axes`
`matplotlib` figure and axes objects.
"""
self.setp(pvals)
if fig is None:
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
else:
ax = fig.gca()
generative_plots = []
if samples > 0:
saved_params = np.array(self.parameters)
# Get a number of chains, chosen randomly, set the objectives,
# and plot the model.
for pvec in self.pgen(ngen=samples):
self.setp(pvec)
for objective in self.objectives:
y, y_err, model = objective._data_transform(
model=objective.generative()
)
ax.plot(objective.data.x, model, color="k", alpha=0.01)
# put back saved_params
self.setp(saved_params)
for objective in self.objectives:
# add the data (in a transformed fashion)
y, y_err, model = objective._data_transform(
model=objective.generative()
)
if objective.weighted:
ax.errorbar(
objective.data.x,
y,
y_err,
label=objective.data.name,
ms=3,
lw=0,
elinewidth=2,
marker="o",
)
else:
ax.scatter(objective.data.x, y, label=objective.data.name)
# add the fit
generative_plots.append(
ax.plot(objective.data.x, model, color="r", lw=1.5, zorder=20)[
0
]
)
if parameter is None:
return fig, ax
# create an interactive plot in a Jupyter notebook.
def f(val):
if parameter is not None:
parameter.value = float(val)
for i, objective in enumerate(self.objectives):
y, y_err, model = objective._data_transform(
model=objective.generative()
)
generative_plots[i].set_data(objective.data.x, model)
fig.canvas.draw()
import ipywidgets
return fig, ax, ipywidgets.interact(f, val=float(parameter))
return fig, ax
[docs]def pymc_model(objective):
"""
Creates a pymc model from an Objective.
Requires aesara and pymc be installed. This is an experimental feature.
Parameters
----------
objective: refnx.analysis.Objective
Returns
-------
model: pymc.Model
Notes
-----
The varying parameters are renamed 'p0', 'p1', etc, as it's vital in pymc
that all parameters have their own unique name.
"""
import pymc as pm
import pytensor.tensor as pt
from refnx._lib._pymc import _LogLikeWithGrad
basic_model = pm.Model()
pars = objective.varying_parameters()
wrapped_pars = []
with basic_model:
# Priors for unknown model parameters
for i, par in enumerate(pars):
name = "p%d" % i
p = _to_pymc_distribution(name, par)
wrapped_pars.append(p)
# # Expected value of outcome
# try:
# # Likelihood (sampling distribution) of observations
# pm.Normal(
# "y_obs",
# mu=objective.generative,
# sigma=objective.data.y_err,
# observed=objective.data.y,
# )
# except Exception:
# # Falling back, theano autodiff won't work on function object
theta = pt.as_tensor_variable(wrapped_pars)
logl = _LogLikeWithGrad(objective.logl)
pm.Potential("log-likelihood", logl(theta))
return basic_model
def _to_pymc_distribution(name, par):
"""
Create a pymc continuous distribution from a Bounds object.
Parameters
----------
name : str
Name of parameter
par : refnx.analysis.Parameter
The parameter to wrap
Returns
-------
d : pymc.Distribution
The pymc distribution
"""
import pymc as pm
from pytensor import tensor as pt
from pytensor.compile.ops import as_op
dist = par.bounds
# interval and both lb, ub are finite
if isinstance(dist, Interval) and np.isfinite([dist.lb, dist.ub]).all():
return pm.Uniform(name, dist.lb, dist.ub)
# no bounds
elif (
isinstance(dist, Interval)
and np.isneginf(dist.lb)
and np.isinf(dist.lb)
):
return pm.Flat(name)
# half open uniform
elif isinstance(dist, Interval) and not np.isfinite(dist.lb):
return dist.ub - pm.HalfFlat(name)
# half open uniform
elif isinstance(dist, Interval) and not np.isfinite(dist.ub):
return dist.lb + pm.HalfFlat(name)
# it's a PDF
if isinstance(dist, PDF):
dist_gen = getattr(dist.rv, "dist", None)
if isinstance(dist.rv, stats.rv_continuous):
dist_gen = dist.rv
if isinstance(dist_gen, type(stats.uniform)):
if hasattr(dist.rv, "args"):
p = pm.Uniform(
name,
dist.rv.args[0],
dist.rv.args[1] + dist.rv.args[0],
)
else:
p = pm.Uniform(name, 0, 1)
return p
# norm from scipy.stats
if isinstance(dist_gen, type(stats.norm)):
if hasattr(dist.rv, "args"):
p = pm.Normal(
name,
mu=dist.rv.args[0],
sigma=dist.rv.args[1],
)
else:
p = pm.Normal(name, mu=0, sigma=1)
return p
# not open, uniform, or normal, so fall back to DensityDist.
d = as_op(itypes=[pt.dscalar], otypes=[pt.dscalar])(dist.logp)
r = as_op(itypes=[pt.dscalar], otypes=[pt.dscalar])(dist.rvs)
p = pm.DensityDist(name, d, random=r)
return p