from collections import namedtuple
import sys
import re
import warnings
import array
import numpy as np
from scipy._lib._util import check_random_state
from scipy.optimize import minimize, differential_evolution, least_squares
import scipy.optimize as sciopt
from refnx.analysis import Objective, Interval, PDF, is_parameter
from refnx._lib import (
unique as f_unique,
MapWrapper,
possibly_open_file,
flatten,
)
from refnx._lib.util import getargspec
from refnx._lib._qmc import LatinHypercube
from refnx._lib import emcee
from refnx._lib.emcee.state import State
from refnx._lib.emcee.pbar import get_progress_bar
MCMCResult = namedtuple(
"MCMCResult", ["name", "param", "stderr", "chain", "median"]
)
class PTSampler:
def __init__(self, ntemps, nwalkers, ndim, logl, logp, **kwargs):
"""
Shim class for a ptemcee.PTSampler.
Parameters
----------
ntemps: int, np.array
Specifies the number of parallel tempering temperatures.
If an array specifies a ladder of Beta values.
nwalkers: int
Number of walkers
ndim: int
Dimensionality of the problem space
logl: callable
log-likelihood function
logp: callable
log-prior function
kwargs:
Other keyword arguments supplied to construct the ptemcee.Sampler.
"""
from refnx._lib.ptemcee import Sampler as _PTSampler
self.ntemps = ntemps
self.nwalkers = nwalkers
self.ndim = ndim
self.logl = logl
self.logp = logp
self.kwargs = kwargs
sig = {
"betas": ntemps,
"nwalkers": nwalkers,
"ndim": ndim,
"logl": logl,
"logp": logp,
}
sig.update(kwargs)
self.sampler = _PTSampler(**sig)
# chain stepper
self._ptchain = None
self._state = None
def sample(
self,
initial_state,
iterations=1,
thin_by=1,
progress=False,
mapper=None,
**kwds,
):
"""
Runs the PTSampler for a given number of iterations.
Parameters
----------
initial_state: emcee.state.State
Holds the coordinates of the initial state
iterations: int
Number of steps to save into the chain
thin_by: int
The saved steps are separated by this many discarded steps.
progress: bool
Display a progress bar.
mapper: map-like callable
For parallelisation
kwds: dict
Unknown keywords
Yields
-------
state: emcee.state.State
The coordinates of the current state
"""
if isinstance(initial_state, State):
init_x = initial_state.coords
rstate0 = initial_state.random_state
else:
init_x = initial_state
rstate0 = np.random.RandomState().get_state()
if self._ptchain is None:
self._ptchain = self.sampler.chain(init_x)
else:
self._ptchain.ensemble.x = init_x
# set random state of stateful chain
self.random_state = rstate0
self._ptchain.thin_by = thin_by
if mapper is not None:
self._ptchain.ensemble._mapper = mapper
try:
with get_progress_bar(progress, iterations * thin_by) as pbar:
for e in self._ptchain.iterate(iterations):
self._state = State(e.x, log_prob=e.logl + e.logP)
yield self._state
pbar.update(thin_by)
finally:
self._ptchain.ensemble._mapper = map
def thermodynamic_integration_log_evidence(self, fburnin=0.1):
if self._ptchain is not None:
return self._ptchain.log_evidence_estimate(fburnin)
return None, None
def reset(self):
if self._state is not None:
self._ptchain = self.sampler.chain(self._state.coords)
def get_chain(self):
if self._ptchain is not None:
return self._ptchain.x
return None
@property
def chain(self):
if self._ptchain is not None:
return self._ptchain.x
return None
def get_log_prob(self):
if self._ptchain is not None:
return self._ptchain.logP
return None
@property
def random_state(self):
if self._ptchain is not None:
self._ptchain.ensemble._random.get_state()
@random_state.setter
def random_state(self, rstate0):
if self._ptchain is not None:
self._ptchain.ensemble._random.set_state(rstate0)
[docs]class CurveFitter:
"""
Analyse a curvefitting system (with MCMC sampling)
Parameters
----------
objective : refnx.analysis.Objective
The :class:`refnx.analysis.Objective` to be analysed.
nwalkers : int, optional
How many walkers you would like the sampler to have. Must be an
even number. The more walkers the better.
ntemps : int or None, optional
If `ntemps == -1`, then an :class:`emcee.EnsembleSampler` is used
during the `sample` method.
Otherwise, or if `ntemps is None` then parallel tempering is
used with a :class:`ptemcee.sampler.Sampler` object during the `sample`
method, with `ntemps` specifing the number of temperatures. Can be
`None`, in which case the `Tmax` keyword argument sets the maximum
temperature. Parallel Tempering is useful if you expect your
posterior distribution to be multi-modal.
mcmc_kws : dict
Keywords used to create the :class:`emcee.EnsembleSampler` or
:class:`ptemcee.sampler.Sampler` objects.
Notes
-----
See the documentation at http://dan.iel.fm/emcee/current/api/ for
further details on what keywords are permitted, and for further
information on Parallel Tempering. The `pool` and `threads` keywords
are ignored here. Specification of parallel threading is done with the
`pool` argument in the `sample` method.
"""
def __init__(self, objective, nwalkers=200, ntemps=-1, **mcmc_kws):
"""
Parameters
----------
objective : refnx.analysis.Objective
The :class:`refnx.analysis.Objective` to be analysed.
nwalkers : int, optional
How many walkers you would like the sampler to have. Must be an
even number. The more walkers the better.
ntemps : int or None, optional
If `ntemps == -1`, then an :class:`emcee.EnsembleSampler` is used
during the `sample` method.
Otherwise, or if `ntemps is None` then parallel tempering is
used with a :class:`ptemcee.sampler.Sampler` object during the
`sample` method, with `ntemps` specifing the number of
temperatures. Can be `None`, in which case the `Tmax` keyword
argument sets the maximum temperature. Parallel Tempering is
useful if you expect your posterior distribution to be multi-modal.
mcmc_kws : dict
Keywords used to create the :class:`emcee.EnsembleSampler` or
:class:`ptemcee.sampler.PTSampler` objects.
Notes
-----
See the documentation at http://dan.iel.fm/emcee/current/api/ for
further details on what keywords are permitted. The `pool` and
keyword is ignored here. Specification of parallel threading is done
with the `pool` argument in the `sample` method.
To use parallel tempering you will need to install the
:package:`ptemcee` package.
"""
self.objective = objective
self._varying_parameters = []
self.__var_id = []
self.mcmc_kws = {}
if mcmc_kws is not None:
self.mcmc_kws.update(mcmc_kws)
if "pool" in self.mcmc_kws:
self.mcmc_kws.pop("pool")
if "threads" in self.mcmc_kws:
self.mcmc_kws.pop("threads")
self._nwalkers = nwalkers
self._ntemps = ntemps
self.make_sampler()
self._state = None
def __setstate__(self, state):
self.__dict__.update(state)
self.__var_id = [
id(obj) for obj in self.objective.varying_parameters()
]
@property
def nvary(self):
return len(self._varying_parameters)
def __repr__(self):
# attempt to get a minimum repr for a CurveFitter. However,
# it has so much state when the sampling has been done, that
# will be ignored.
return (
f"CurveFitter({self.objective!r},"
f" nwalkers={self._nwalkers},"
f" ntemps={self._ntemps},"
f" {self.mcmc_kws!r})"
)
[docs] def make_sampler(self):
"""
Make the samplers for the Objective.
Use this method if the number of varying parameters changes.
"""
self._varying_parameters = self.objective.varying_parameters()
self.__var_id = [id(obj) for obj in self._varying_parameters]
if not self.nvary:
raise ValueError("No parameters are being fitted")
if self._ntemps == -1:
self.sampler = emcee.EnsembleSampler(
self._nwalkers,
self.nvary,
self.objective.logpost,
**self.mcmc_kws,
)
# Parallel Tempering was requested.
else:
sig = {
"ntemps": self._ntemps,
"nwalkers": self._nwalkers,
"ndim": self.nvary,
"logl": self.objective.logl,
"logp": self.objective.logp,
}
sig.update(self.mcmc_kws)
self.sampler = PTSampler(**sig)
self._state = None
def _check_vars_unchanged(self):
"""
Keep track of whether the varying parameters have changed after
construction of CurveFitter object.
"""
var_ids = [id(obj) for obj in self.objective.varying_parameters()]
if not (np.array_equal(var_ids, self.__var_id)):
raise RuntimeError(
"The Objective.varying_parameters() have"
" changed since the CurveFitter was created."
" To keep on using the CurveFitter call"
" the CurveFitter.make_samplers() method."
)
[docs] def initialise(self, pos="covar", random_state=None):
"""
Initialise the emcee walkers.
Parameters
----------
pos : str or np.ndarray
Method for initialising the emcee walkers. One of:
- 'covar', use the estimated covariance of the system.
- 'jitter', add a small amount of gaussian noise to each parameter
- 'prior', sample random locations from the prior using Latin
Hyper Cube.
- pos, an array that specifies a snapshot of the walkers. Has shape
`(nwalkers, ndim)`, or `(ntemps, nwalkers, ndim)` if parallel
tempering is employed. You can also provide a previously
created chain.
random_state : {int, `np.random.RandomState`, `np.random.Generator`}
If `random_state` is not specified the `~np.random.RandomState`
singleton is used.
If `random_state` is an int, a new ``RandomState`` instance is
used, seeded with random_state.
If `random_state` is already a ``RandomState`` or a ``Generator``
instance, then that object is used.
Specify `random_state` for repeatable initialisations.
"""
nwalkers = self._nwalkers
nvary = self.nvary
# acquire a random number generator
rng = check_random_state(random_state)
# account for parallel tempering
_ntemps = self._ntemps
# If you're not doing parallel tempering, temporarily set the number of
# temperatures to be created to 1, thereby producing initial positions
# of (1, nwalkers, nvary), this first dimension should be removed at
# the end of the method
if self._ntemps == -1:
_ntemps = 1
# position is specified with array (no parallel tempering)
if (
isinstance(pos, np.ndarray)
and self._ntemps == -1
and pos.shape == (nwalkers, nvary)
):
init_walkers = np.copy(pos)[np.newaxis]
# position is specified with array (with parallel tempering)
elif (
isinstance(pos, np.ndarray)
and self._ntemps > -1
and pos.shape == (_ntemps, nwalkers, nvary)
):
init_walkers = np.copy(pos)
# position is specified with existing chain
elif isinstance(pos, np.ndarray):
self.initialise_with_chain(pos)
return
# position is to be created from covariance matrix
elif pos == "covar":
p0 = np.array(self._varying_parameters)
cov = self.objective.covar()
init_walkers = rng.multivariate_normal(
np.atleast_1d(p0), np.atleast_2d(cov), size=(_ntemps, nwalkers)
)
# position is specified by jittering the parameters with gaussian noise
elif pos == "jitter":
var_arr = np.array(self._varying_parameters)
pos = 1 + rng.standard_normal((_ntemps, nwalkers, nvary)) * 1.0e-4
pos *= var_arr
init_walkers = pos
# use the prior to initialise position
elif pos == "prior":
arr = np.zeros((_ntemps, nwalkers, nvary))
LHC = LatinHypercube(nvary, seed=random_state)
samples = LHC.random(n=_ntemps * nwalkers).reshape(
_ntemps, nwalkers, nvary
)
for i, param in enumerate(self._varying_parameters):
# bounds are not a closed interval, just jitter it.
if (
isinstance(param.bounds, Interval)
and not param.bounds._closed_bounds
):
vals = (
1 + rng.standard_normal((_ntemps, nwalkers)) * 1.0e-1
)
vals *= param.value
arr[..., i] = vals
else:
sample_arr = samples[..., i]
transformed = param.bounds.invcdf(sample_arr)
arr[..., i] = transformed
init_walkers = arr
else:
raise RuntimeError(
"Didn't use any known method for CurveFitter.initialise"
)
# if you're not doing parallel tempering then remove the first
# dimension
if self._ntemps == -1:
init_walkers = init_walkers[0]
# now validate initialisation, ensuring all init pos have finite
# logpost
for i, param in enumerate(self._varying_parameters):
init_walkers[..., i] = param.valid(init_walkers[..., i])
rstate0 = None
if isinstance(rng, np.random.RandomState):
rstate0 = rng.get_state()
self._state = State(init_walkers, random_state=rstate0)
# finally reset the sampler to reset the chain
# you have to do this at the end, not at the start because resetting
# makes self.sampler.chain == None and the PTsampler creation doesn't
# work
self.sampler.reset()
[docs] def initialise_with_chain(self, chain):
"""
Initialise sampler with a pre-existing chain
Parameters
----------
chain : array
Array of size `(steps, ntemps, nwalkers, ndim)` or
`(steps, nwalkers, ndim)`, containing a chain from a previous
sampling run.
"""
# we should be left with (nwalkers, ndim) or (ntemp, nwalkers, ndim)
if self._ntemps == -1:
required_shape = (self._nwalkers, self.nvary)
else:
required_shape = (self._ntemps, self._nwalkers, self.nvary)
chain_shape = chain.shape[1:]
# if the shapes are the same, then we can initialise
if required_shape == chain_shape:
self.initialise(pos=chain[-1])
else:
raise ValueError(
"You tried to initialise with a chain, but it was"
" the wrong shape"
)
@property
def chain(self):
"""
MCMC chain belonging to CurveFitter.sampler
Returns
-------
chain : array
The MCMC chain with shape `(steps, nwalkers, ndim)` or
`(steps, ntemps, nwalkers, ndim)`.
"""
return self.sampler.get_chain()
@property
def logpost(self):
"""
Log-probability for each of the entries in `self.chain`
"""
return self.sampler.get_log_prob()
@property
def index_max_prob(self):
"""
The index of the highest log-probability for the samples
"""
log_probs = self.sampler.get_log_prob()
if isinstance(self.sampler, PTSampler):
log_probs = log_probs[:, 0]
loc = np.argmax(log_probs)
idx = np.unravel_index(loc, log_probs.shape)
if isinstance(self.sampler, PTSampler):
idx = list(idx)
idx.insert(1, 0)
return tuple(idx)
return idx
[docs] def reset(self):
"""
Reset the sampled chain.
Typically used on a sampler after a burn-in period.
"""
self.sampler.reset()
[docs] def acf(self, nburn=0, nthin=1):
"""
Calculate the autocorrelation function
Returns
-------
acfs : np.ndarray
The autocorrelation function, acfs.shape=(lags, nvary)
"""
return autocorrelation_chain(self.chain, nburn=nburn, nthin=nthin)
[docs] def sample(
self,
steps,
nthin=1,
random_state=None,
f=None,
callback=None,
verbose=True,
pool=-1,
):
"""
Performs sampling from the objective.
Parameters
----------
steps : int
Collect `steps` samples into the chain. The sampler will run a
total of `steps * nthin` moves.
nthin : int, optional
Each chain sample is separated by `nthin` iterations.
random_state : {int, `np.random.RandomState`, `np.random.Generator`}
If `random_state` is not specified the `~np.random.RandomState`
singleton is used.
If `random_state` is an int, a new ``RandomState`` instance is
used, seeded with random_state.
If `random_state` is already a ``RandomState`` or a ``Generator``
instance, then that object is used.
Specify `random_state` for repeatable minimizations.
f : file-like or str
File to incrementally save chain progress to. Each row in the file
is a flattened array of size `(nwalkers, ndim)` or
`(ntemps, nwalkers, ndim)`. There are `steps` rows in the
file.
callback : callable
callback function to be called at each iteration step. Has the
signature `callback(coords, logprob)`.
verbose : bool, optional
Gives updates on the sampling progress
pool : int or map-like object, optional
If `pool` is an `int` then it specifies the number of threads to
use for parallelization. If `pool == -1`, then all CPU's are used.
If pool is a map-like callable that follows the same calling
sequence as the built-in map function, then this pool is used for
parallelisation.
Notes
-----
Please see :class:`emcee.EnsembleSampler` for its detailed behaviour.
>>> # we'll burn the first 500 steps
>>> fitter.sample(500)
>>> # after you've run those, then discard them by resetting the
>>> # sampler.
>>> fitter.sampler.reset()
>>> # Now collect 40 steps, each step separated by 50 sampler
>>> # generations.
>>> fitter.sample(40, nthin=50)
One can also burn and thin in `Curvefitter.process_chain`.
"""
self._check_vars_unchanged()
# setup a random number generator
rng = check_random_state(random_state)
if self._state is None:
self.initialise(random_state=rng)
# for saving progress to file
def _callback_wrapper(state, h=None):
if callback is not None:
callback(state.coords, state.log_prob)
if h is not None:
h.write(" ".join(map(str, state.coords.ravel())))
h.write("\n")
# remove chains from each of the parameters because they slow down
# pickling but only if they are parameter objects.
flat_params = f_unique(flatten(self.objective.parameters))
flat_params = [param for param in flat_params if is_parameter(param)]
# zero out all the old parameter stderrs
for param in flat_params:
param.stderr = None
param.chain = None
# make sure the checkpoint file exists
if f is not None:
with possibly_open_file(f, "w") as h:
# write the shape of each step of the chain
h.write("# ")
shape = self._state.coords.shape
h.write(", ".join(map(str, shape)))
h.write("\n")
# set the random state of the sampler
# normally one could give this as an argument to the sample method
# but PTSampler didn't historically accept that...
if isinstance(rng, np.random.RandomState):
rstate0 = rng.get_state()
self._state.random_state = rstate0
self.sampler.random_state = rstate0
# using context manager means we kill off zombie pool objects
# but does mean that the pool has to be specified each time.
with MapWrapper(pool) as g, possibly_open_file(f, "a") as h:
# these kwargs are provided to the sampler.sample method
kwargs = {"iterations": steps, "thin": nthin}
# if you're not creating more than 1 thread, then don't bother with
# a pool.
if isinstance(self.sampler, emcee.EnsembleSampler):
if pool == 1:
self.sampler.pool = None
else:
self.sampler.pool = g
else:
kwargs["mapper"] = g
# new emcee arguments
sampler_args = getargspec(self.sampler.sample).args
if "progress" in sampler_args and verbose:
kwargs["progress"] = True
verbose = False
if "thin_by" in sampler_args:
kwargs["thin_by"] = nthin
kwargs.pop("thin", 0)
# perform the sampling
for state in self.sampler.sample(self._state, **kwargs):
self._state = state
_callback_wrapper(state, h=h)
if isinstance(self.sampler, emcee.EnsembleSampler):
self.sampler.pool = None
# sets parameter value and stderr
return process_chain(self.objective, self.chain)
[docs] def fit(self, method="L-BFGS-B", target="nll", verbose=True, **kws):
"""
Obtain the maximum log-likelihood, or log-posterior, estimate (mode)
of the objective. Maximising the log-likelihood is equivalent to
minimising chi2 in a least squares fit.
Parameters
----------
method : str
which method to use for the optimisation. One of:
- `'least_squares'`: :func:`scipy.optimize.least_squares`.
- `'L-BFGS-B'`: L-BFGS-B.
- `'differential_evolution'`:
:func:`scipy.optimize.differential_evolution`
- `'dual_annealing'`:
:func:`scipy.optimize.dual_annealing` (SciPy >= 1.2.0)
- `'shgo'`: :func:`scipy.optimize.shgo` (SciPy >= 1.2.0)
You can also choose many of the minimizers from
:func:`scipy.optimize.minimize`.
target : {'nll', 'nlpost'}, optional
Minimize the negative log-likelihood (`'nll'`) or the negative
log-posterior (`'nlpost'`). This is equivalent to maximising the
likelihood or posterior probabilities respectively.
Maximising the likelihood is equivalent to minimising chi^2 in a
least-squares fit.
This option only applies to the `differential_evolution`, `shgo`,
`dual_annealing` or `L-BFGS-B` methods.
These optimisers require lower and upper (box) bounds for each
parameter. If the `Bounds` on a parameter are not an `Interval`,
but a `PDF` specifying a statistical distribution, then the lower
and upper bounds are approximated as
``PDF.rv.ppf([0.005, 0.995])``, covering 99 % of the statistical
distribution.
verbose : bool, optional
Gives fitting progress. To see a progress bar tqdm has to be
installed.
kws : dict
Additional arguments are passed to the underlying minimization
method.
Returns
-------
result, covar : :class:`scipy.optimize.OptimizeResult`, np.ndarray
`result.x` contains the best fit parameters
`result.covar` is the covariance matrix for the fit.
`result.stderr` is the uncertainties on each of the fit parameters.
Notes
-----
If the `objective` supplies a `residuals` method then `least_squares`
can be used. Otherwise the `nll` method of the `objective` is
minimised. Use this method just before a sampling run.
If `self.objective.parameters` is a `Parameters` instance, then each
of the varying parameters has its value updated by the fit, and each
`Parameter` has a `stderr` attribute which represents the uncertainty
on the fit parameter.
The use of `dual annealing` and `shgo` requires that `scipy >= 1.2.0`
be installed.
"""
_varying_parameters = self.objective.varying_parameters()
init_pars = np.array(_varying_parameters)
_min_kws = {}
_min_kws.update(kws)
_bounds = bounds_list(self.objective.varying_parameters())
_min_kws["bounds"] = _bounds
# setup callback default
_min_kws.setdefault("callback", None)
cost = self.objective.nll
if target == "nlpost":
cost = self.objective.nlpost
# a decorator for the progress bar updater
def _callback_wrapper(callback_func, pbar):
def callback(intermediate_result, *args, **kwds):
pbar.update(1)
if hasattr(pbar, "set_description"):
if hasattr(intermediate_result, "fun"):
_stat = intermediate_result.fun
elif isinstance(intermediate_result, np.ndarray):
_stat = cost(intermediate_result)
pbar.set_description(f"{_stat}")
if callback_func is None:
return None
else:
return callback_func(intermediate_result, *args, **kwds)
return callback
# least_squares Trust Region Reflective by default
if method == "least_squares":
b = np.array(_bounds)
_min_kws["bounds"] = (b[..., 0], b[..., 1])
# least_squares doesn't have a callback
_min_kws.pop("callback", None)
res = least_squares(
self.objective.residuals, init_pars, **_min_kws
)
# differential_evolution, dual_annealing, shgo require lower and upper
# bounds
elif method in ["differential_evolution", "dual_annealing", "shgo"]:
mini = getattr(sciopt, method)
if method == "shgo":
if "n" not in _min_kws:
_min_kws["n"] = 100
if "iters" not in kws:
_min_kws["iters"] = 5
with get_progress_bar(verbose, None) as pbar:
_min_kws["callback"] = _callback_wrapper(
_min_kws["callback"], pbar
)
res = mini(cost, **_min_kws)
else:
# otherwise stick it to minimizer. Default being L-BFGS-B
_min_kws["method"] = method
_min_kws["bounds"] = _bounds
with get_progress_bar(verbose, None) as pbar:
_min_kws["callback"] = _callback_wrapper(
_min_kws["callback"], pbar
)
res = minimize(cost, init_pars, **_min_kws)
# OptimizeResult.success may not be present (dual annealing)
if hasattr(res, "success") and res.success:
self.objective.setp(res.x)
# Covariance matrix estimation
covar = self.objective.covar()
errors = np.sqrt(np.diag(covar))
res["covar"] = covar
res["stderr"] = errors
# check if the parameters are all Parameter instances.
flat_params = list(f_unique(flatten(self.objective.parameters)))
if np.all([is_parameter(param) for param in flat_params]):
# zero out all the old parameter stderrs
for param in flat_params:
param.stderr = None
param.chain = None
for i, param in enumerate(_varying_parameters):
param.stderr = errors[i]
# need to touch up the output to check we leave
# parameters as we found them
self.objective.setp(res.x)
return res
[docs]def load_chain(f):
"""
Loads a chain from disk. Does not change the state of a CurveFitter
object.
Parameters
----------
f : str or file-like
File containing the chain.
Returns
-------
chain : array
The loaded chain - `(nsteps, nwalkers, ndim)` or
`(nsteps, ntemps, nwalkers, ndim)`
"""
with possibly_open_file(f, "r") as g:
# read header
header = g.readline()
expr = re.compile(r"(\d+)")
matches = expr.findall(header)
if matches:
if len(matches) == 3:
ntemps, nwalkers, ndim = map(int, matches)
elif len(matches) == 2:
ntemps = None
nwalkers, ndim = map(int, matches)
else:
raise ValueError("Couldn't read header line of chain file")
chain = np.loadtxt(f)
if ntemps is not None:
chain = np.reshape(chain, (-1, ntemps, nwalkers, ndim))
else:
chain = np.reshape(chain, (-1, nwalkers, ndim))
return chain
[docs]def process_chain(objective, chain, nburn=0, nthin=1, flatchain=False):
"""
Process the chain produced by a sampler for a given Objective
Parameters
----------
objective : refnx.analysis.Objective
The Objective function that the Posterior was sampled for
chain : array
The MCMC chain
nburn : int, optional
discard this many steps from the start of the chain
nthin : int, optional
only accept every `nthin` samples from the chain
flatchain : bool, optional
collapse the walkers down into a single dimension.
Returns
-------
[(param, stderr, chain)] : list
List of (param, stderr, chain) tuples.
If `isinstance(objective.parameters, Parameters)` then `param` is a
`Parameter` instance. `param.value`, `param.stderr` and
`param.chain` will contain the median, stderr and chain samples,
respectively. Otherwise `param` will be a float representing the
median of the chain samples.
`stderr` is the half width of the [15.87, 84.13] spread (similar to
standard deviation) and `chain` is an array containing the MCMC
samples for that parameter.
Notes
-----
The chain should have the shape `(iterations, nwalkers, nvary)` or
`(iterations, ntemps, nwalkers, nvary)` if parallel tempering was
employed.
The burned and thinned chain is created via:
`chain[nburn::nthin]`.
Note, if parallel tempering is employed, then only the lowest temperature
of the parallel tempering chain is processed and returned as it
corresponds to the (lowest energy) target distribution.
If `flatten is True` then the burned/thinned chain is reshaped and
`arr.reshape(-1, nvary)` is returned.
This function has the effect of setting the parameter stderr's.
"""
chain = chain[nburn::nthin]
shape = chain.shape
nvary = shape[-1]
# nwalkers = shape[1]
if len(shape) == 4:
ntemps = shape[1]
elif len(shape) == 3:
ntemps = -1
if ntemps != -1:
# PTSampler, we require the target distribution in the first row.
chain = chain[:, 0]
_flatchain = chain.reshape((-1, nvary))
if flatchain:
chain = _flatchain
flat_params = list(f_unique(flatten(objective.parameters)))
varying_parameters = objective.varying_parameters()
# set the stderr of each of the Parameters
result_list = []
if np.all([is_parameter(param) for param in flat_params]):
# zero out all the old parameter stderrs
for param in flat_params:
param.stderr = None
param.chain = None
# do the error calcn for the varying parameters and set the chain
quantiles = np.percentile(_flatchain, [15.87, 50, 84.13], axis=0)
for i, param in enumerate(varying_parameters):
std_l, median, std_u = quantiles[:, i]
param.value = median
param.stderr = 0.5 * (std_u - std_l)
# copy in the chain
param.chain = np.copy(chain[..., i])
res = MCMCResult(
name=param.name,
param=param,
median=param.value,
stderr=param.stderr,
chain=param.chain,
)
result_list.append(res)
fitted_values = np.array(varying_parameters)
# give each constrained param a chain (to be reshaped later)
constrained_params = [
param for param in flat_params if param.constraint is not None
]
for constrain_param in constrained_params:
constrain_param.chain = np.empty(chain.shape[:-1], float)
# now iterate through the varying parameters, set the values, thereby
# setting the constraint value
if len(constrained_params):
for index in np.ndindex(chain.shape[:-1]):
# iterate over parameter vectors
pvals = chain[index]
objective.setp(pvals)
for constrain_param in constrained_params:
constrain_param.chain[index] = constrain_param.value
for constrain_param in constrained_params:
quantiles = np.percentile(
constrain_param.chain, [15.87, 50, 84.13]
)
std_l, median, std_u = quantiles
constrain_param.value = median
constrain_param.stderr = 0.5 * (std_u - std_l)
# now reset fitted parameter values (they would've been changed by
# constraints calculations
objective.setp(fitted_values)
# the parameter set are not Parameter objects, an array was probably
# being used with BaseObjective.
else:
for i in range(nvary):
c = np.copy(chain[..., i])
median, stderr = uncertainty_from_chain(c)
res = MCMCResult(
name="", param=median, median=median, stderr=stderr, chain=c
)
result_list.append(res)
return result_list
def uncertainty_from_chain(chain):
"""
Calculates the median and uncertainty of MC samples.
Parameters
----------
chain : array-like
Returns
-------
median, stderr : float, float
`median` of the chain samples. `stderr` is half the width of the
[15.87, 84.13] spread.
"""
flatchain = chain.flatten()
std_l, median, std_u = np.percentile(flatchain, [15.87, 50, 84.13])
return median, 0.5 * (std_u - std_l)
[docs]def autocorrelation_chain(chain, nburn=0, nthin=1):
"""
Calculate the autocorrelation function
Parameters
----------
chain : np.ndarray
The MCMC chain - `(nsteps, nwalkers, ndim)` or
`(nsteps, ntemps, nwalkers, ndim)`
Returns
-------
acfs : np.ndarray
The autocorrelation function, acfs.shape=(lags, nvary)
"""
lchain = chain
# parallel tempered chain
if len(chain.shape) == 4:
lchain = lchain[:, 0]
lchain = lchain[nburn::nthin]
# (iterations, walkers, vary) -> (vary, walkers, iterations)
lchain = np.swapaxes(lchain, 0, 2)
shape = lchain.shape[:-1]
acfs = np.zeros_like(lchain)
# iterate over each parameter/walker
for index in np.ndindex(*shape):
s = _function_1d(lchain[index])
acfs[index] = s
# now average over walkers
acfs = np.mean(acfs, axis=1)
return np.transpose(acfs)
def bounds_list(parameters):
"""
Approximates interval bounds for a parameter set.
Parameters
----------
parameters : sequence
A sequence containing individual parameters
Returns
-------
bounds: tuple
``(min, max)`` pairs that define the finite lower and upper bounds
every element in ``parameters``.
If the `Bounds` applied by a parameter are a `PDF` instance then the upper
and lower bound are approximated by ``PDF.rv.ppf([0.005, 0.995])``, which
covers 99% of the statistical distribution.
"""
bounds = []
for param in parameters:
if hasattr(param, "bounds") and isinstance(param.bounds, Interval):
bnd = param.bounds
bounds.append((bnd.lb, bnd.ub))
elif (
hasattr(param, "bounds")
and isinstance(param.bounds, PDF)
and hasattr(param.bounds.rv, "ppf")
):
bounds.append(param.bounds.rv.ppf([0.005, 0.995]))
else:
# We can't handle this bound
bounds.append((-np.inf, np.inf))
return bounds
# Following code is for autocorrelation analysis of chains and is taken from
# emcee.autocorr
def _next_pow_two(n):
"""Returns the next power of two greater than or equal to `n`"""
i = 1
while i < n:
i = i << 1
return i
def _function_1d(x):
"""Estimate the normalized autocorrelation function of a 1-D series
Args:
x: The series as a 1-D numpy array.
Returns:
array: The autocorrelation function of the time series.
"""
x = np.atleast_1d(x)
if len(x.shape) != 1:
raise ValueError("invalid dimensions for 1D autocorrelation function")
n = _next_pow_two(len(x))
# Compute the FFT and then (from that) the auto-correlation function
f = np.fft.fft(x - np.mean(x), n=2 * n)
acf = np.fft.ifft(f * np.conjugate(f))[: len(x)].real
acf /= acf[0]
return acf