Source code for refnx._lib.emcee.autocorr

# -*- coding: utf-8 -*-

import logging

import numpy as np

__all__ = ["function_1d", "integrated_time", "AutocorrError"]

logger = logging.getLogger(__name__)


def next_pow_two(n):
    """Returns the next power of two greater than or equal to `n`"""
    i = 1
    while i < n:
        i = i << 1
    return i


def function_1d(x):
    """Estimate the normalized autocorrelation function of a 1-D series

    Args:
        x: The series as a 1-D numpy array.

    Returns:
        array: The autocorrelation function of the time series.

    """
    x = np.atleast_1d(x)
    if len(x.shape) != 1:
        raise ValueError("invalid dimensions for 1D autocorrelation function")
    n = next_pow_two(len(x))

    # Compute the FFT and then (from that) the auto-correlation function
    f = np.fft.fft(x - np.mean(x), n=2 * n)
    acf = np.fft.ifft(f * np.conjugate(f))[: len(x)].real
    acf /= acf[0]
    return acf


def auto_window(taus, c):
    m = np.arange(len(taus)) < c * taus
    if np.any(m):
        return np.argmin(m)
    return len(taus) - 1


[docs]def integrated_time(x, c=5, tol=50, quiet=False, has_walkers=True): """Estimate the integrated autocorrelation time of a time series. This estimate uses the iterative procedure described on page 16 of `Sokal's notes <https://www.semanticscholar.org/paper/Monte-Carlo-Methods-in-Statistical-Mechanics%3A-and-Sokal/0bfe9e3db30605fe2d4d26e1a288a5e2997e7225>`_ to determine a reasonable window size. Args: x (numpy.ndarray): The time series. If 2-dimensional, the array dimesions are interpreted as ``(n_step, n_walker)`` unless ``has_walkers==False``, in which case they are interpreted as ``(n_step, n_param)``. If 3-dimensional, the dimensions are interperted as ``(n_step, n_walker, n_param)``. c (Optional[float]): The step size for the window search. (default: ``5``) tol (Optional[float]): The minimum number of autocorrelation times needed to trust the estimate. (default: ``50``) quiet (Optional[bool]): This argument controls the behavior when the chain is too short. If ``True``, give a warning instead of raising an :class:`AutocorrError`. (default: ``False``) has_walkers (Optional[bool]): Whether the last axis should be interpreted as walkers or parameters if ``x`` has 2 dimensions. (default: ``True``) Returns: float or array: An estimate of the integrated autocorrelation time of the time series ``x``. Raises AutocorrError: If the autocorrelation time can't be reliably estimated from the chain and ``quiet`` is ``False``. This normally means that the chain is too short. """ x = np.atleast_1d(x) if len(x.shape) == 1: x = x[:, np.newaxis, np.newaxis] if len(x.shape) == 2: if not has_walkers: x = x[:, np.newaxis, :] else: x = x[:, :, np.newaxis] if len(x.shape) != 3: raise ValueError("invalid dimensions") n_t, n_w, n_d = x.shape tau_est = np.empty(n_d) windows = np.empty(n_d, dtype=int) # Loop over parameters for d in range(n_d): f = np.zeros(n_t) for k in range(n_w): f += function_1d(x[:, k, d]) f /= n_w taus = 2.0 * np.cumsum(f) - 1.0 windows[d] = auto_window(taus, c) tau_est[d] = taus[windows[d]] # Check convergence flag = tol * tau_est > n_t # Warn or raise in the case of non-convergence if np.any(flag): msg = ( "The chain is shorter than {0} times the integrated " "autocorrelation time for {1} parameter(s). Use this estimate " "with caution and run a longer chain!\n" ).format(tol, np.sum(flag)) msg += "N/{0} = {1:.0f};\ntau: {2}".format(tol, n_t / tol, tau_est) if not quiet: raise AutocorrError(tau_est, msg) logger.warning(msg) return tau_est
class AutocorrError(Exception): """Raised if the chain is too short to estimate an autocorrelation time. The current estimate of the autocorrelation time can be accessed via the ``tau`` attribute of this exception. """ def __init__(self, tau, *args, **kwargs): self.tau = tau super(AutocorrError, self).__init__(*args, **kwargs)