Source code for refnx.analysis.parameter

import operator
from types import MethodType
from collections import UserList
import copy


import numpy as np
from refnx._lib import flatten, unique as f_unique
from refnx.analysis import Interval, PDF, Bounds


# Functions for making Functors
def MAKE_BINARY(opfn):
    return lambda self, other: (_BinaryOp(self, asMagicNumber(other), opfn))


def MAKE_RBINARY(opfn):
    return lambda self, other: (_BinaryOp(asMagicNumber(other), self, opfn))


def MAKE_UNARY(opfn):
    return lambda val: _UnaryOp(val, opfn)


def asMagicNumber(x):
    return x if isinstance(x, BaseParameter) else Constant(x)


# a function that takes a function that returns a function
# MAKE_BINARY = lambda opfn: lambda self, other: (
#     _BinaryOp(self, asMagicNumber(other), opfn))
# MAKE_RBINARY = lambda opfn: lambda self, other: (
#     _BinaryOp(asMagicNumber(other), self, opfn))
# MAKE_UNARY = lambda opfn: lambda val: _UnaryOp(val, opfn)
# asMagicNumber = lambda x: x if isinstance(x, BaseParameter) else Constant(x)

# mathematical operations that we might want to use in constraints
ops = {
    "sin": np.sin,
    "cos": np.cos,
    "tan": np.tan,
    "arcsin": np.arcsin,
    "arccos": np.arccos,
    "arctan": np.arctan,
    "log": np.log,
    "log10": np.log10,
    "exp": np.exp,
    "sqrt": np.sqrt,
    "sum": np.sum,
}
math_ops = {k: MAKE_UNARY(v) for k, v in ops.items()}

binary = [
    operator.add,
    operator.sub,
    operator.mul,
    operator.truediv,
    operator.floordiv,
    np.power,
    operator.pow,
    operator.mod,
]

unary = [
    operator.neg,
    operator.abs,
    np.sin,
    np.tan,
    np.cos,
    np.arcsin,
    np.arctan,
    np.arccos,
    np.log10,
    np.log,
    np.sqrt,
    np.exp,
]


[docs]class Parameters(UserList): """ A sequence of Parameters Parameters ---------- data : sequence A sequence of :class:`Parameter` or :class:`Parameters` name : str Name of this :class:`Parameters` instance """ def __init__(self, data=(), name=None): super().__init__() self.name = name self.data.extend(data) def __getitem__(self, i): if type(i) is str: try: _names = [param.name for param in self.data] idx = _names.index(i) except ValueError: return None finally: return self.data[idx] else: return self.data[i] def __setitem__(self, i, v): # i can be index or string # v has to be Parameters or Parameter if type(i) is str: _names = [param.name for param in self.data] # this will raise a KeyError if the key isn't in there. idx = _names.index(i) self.data[idx] = v else: self.data[i] = v def __repr__(self): return f"Parameters(data={self.data!r}, name={self.name!r})" def __str__(self): s = list() s.append(f"{'':_>80}") s.append(f"Parameters: {self.name!r: ^15}") for el in self._pprint(): s.append(el) return "\n".join(list(flatten(s))) def _pprint(self): for el in self.data: if is_parameters(el): yield str(el) else: yield str(el) def __contains__(self, item): """ Does this instance contain a given :class:`Parameter` """ return id(item) in [id(p) for p in f_unique(flatten(self.data))] def __ior__(self, other): """ Concatenate Parameter(s). You can concatenate :class:`Parameters` with :class:`Parameter` or :class:`Parameters` instances. """ # self |= other if not (is_parameter(other) or is_parameters(other)): raise ValueError( "Can only concatenate a Parameter with another" " Parameter or Parameters instance" ) self.append(other) return self def __or__(self, other): """ concatenate Parameter(s). You can concatenate :class:`Parameters` with :class:`Parameter` or :class:`Parameters` instances. """ # c = self | other if not (is_parameter(other) or is_parameters(other)): raise ValueError( "Can only concatenate a Parameter with another" " Parameter or Parameters instance" ) c = Parameters(self.data, self.name) c.append(other) return c
[docs] def logp(self): """ Calculates logp for all the parameters Returns ------- logp : float Log probability for all the parameters """ # logp for all the parameters return np.sum( [ param.logp() for param in f_unique(flatten(self.data)) if param.vary ] )
def __array__(self): """ Convert Parameters to an array containing their values. """ return np.array([float(p) for p in flatten(self.data)]) @property def pvals(self): """ An array containing the values of all the :class:`Parameter` in this object. """ return np.array(self) @pvals.setter def pvals(self, pvals): varying = [ param for param in f_unique(flatten(self.data)) if param.vary ] if np.size(pvals) == len(varying): [ setattr(param, "value", pvals[i]) for i, param in enumerate(varying) ] return flattened_parameters = list(flatten(self.data)) if np.size(pvals) == len(flattened_parameters): [ setattr(param, "value", pvals[i]) for i, param in enumerate(flattened_parameters) ] return else: raise ValueError( "You supplied the wrong number of values %d when " "setting this Parameters.pvals attribute" % len(pvals) ) @property def parameters(self): return self @property def nparam(self): return len(self.flattened())
[docs] def flattened(self, unique=False): """ A list of all the :class:`Parameter` contained in this object, including those contained within :class:`Parameters` at any depth. Parameters ---------- unique : bool The list will only contain unique objects. Returns ------- params : list A list of :class:`Parameter` contained in this object. """ if unique: return list(f_unique(flatten(self.data))) else: return list(flatten(self.data))
[docs] def names(self): """ Returns ------- names : list A list of all the names of all the :class:`Parameter` contained in this object. """ return [param.name for param in flatten(self.data)]
[docs] def nvary(self): """ Returns ------- nvary : int The number of :class:`Parameter` contained in this object that are allowed to vary. """ return len([1 for param in f_unique(flatten(self.data)) if param.vary])
[docs] def constrained_parameters(self): """ Returns ------- constrained_parameters : list A list of unique :class:`Parameter` contained in this object that have constraints. """ return [ param for param in f_unique(flatten(self.data)) if param.constraint is not None ]
[docs] def varying_parameters(self): """ Unique list of varying parameters Returns ------- p : list Unique list of varying parameters """ lst = [] for p in flatten(self.data): if p.vary: lst.append(p) continue if len(p._deps): lst.extend([_p for _p in p.dependencies() if _p.vary]) # should already be totally flattened by this point return Parameters(f_unique(lst))
[docs] def pgen(self, ngen=1000, nburn=0, nthin=1, random_state=None): """ Yield random parameter vectors from MCMC samples. Parameters ---------- ngen : int, optional the number of samples to yield. The actual number of samples yielded is `min(ngen, chain.size)` nburn : int, optional discard this many steps from the start of the chain nthin : int, optional only accept every `nthin` samples from the chain random_state : {int, np.random.Generator, None} random number generator that picks the samples Yields ------ pvec : np.ndarray A randomly chosen parameter vector Notes ----- The entire parameter vector is yielded, not only the varying parameters. The reason for this is that some parameters may possess a chain if they are not varying, because they are controlled by a constraint. """ # it's still possible to have chains, even if there are no varying # parameters, if there are parameters that have constraints # generate for all params that have chains. chain_pars = [ i for i, p in enumerate(self.flattened()) if p.chain is not None ] chains = np.array( [ np.ravel(param.chain[..., nburn::nthin]) for param in self.flattened() if param.chain is not None ] ) if len(chains) == 0 or np.size(chains, 1) == 0: raise ValueError("There were no chains to sample from") samples = np.arange(np.size(chains, 1)) rng = np.random.default_rng(random_state) choices = rng.choice( samples, size=(min(ngen, samples.size),), replace=False ) template_array = np.array(self.flattened()) for choice in choices: template_array[chain_pars] = chains[..., choice] yield np.asarray(template_array).astype(float, copy=False)
class BaseParameter: __add__ = MAKE_BINARY(operator.add) __sub__ = MAKE_BINARY(operator.sub) __mul__ = MAKE_BINARY(operator.mul) __truediv__ = MAKE_BINARY(operator.truediv) __pow__ = MAKE_BINARY(operator.pow) __radd__ = MAKE_RBINARY(operator.add) __rsub__ = MAKE_RBINARY(operator.sub) __rmul__ = MAKE_RBINARY(operator.mul) __rtruediv__ = MAKE_RBINARY(operator.truediv) __abs__ = MAKE_UNARY(operator.abs) __mod__ = MAKE_BINARY(operator.mod) __rmod__ = MAKE_RBINARY(operator.mod) __lt__ = MAKE_BINARY(operator.lt) __le__ = MAKE_BINARY(operator.le) __ge__ = MAKE_BINARY(operator.ge) __gt__ = MAKE_BINARY(operator.gt) __neg__ = MAKE_UNARY(operator.neg) def __init__(self): # add mathematical operations as methods to this object for k, v in math_ops.items(): setattr(self, k, MethodType(v, self)) self._vary = False self.name = None self._value = None self._bounds = None self._deps = [] self._stderr = None self.chain = None def __getstate__(self): # the mathematical ops aren't unpickleable, so lets pop them. They'll # be reinstated when the constructor is called anyway. # we need to retain the mathops in this object, but not in the pickle # dict. Perform a shallow copy. d = self.__dict__.copy() for k in ops: d.pop(k, None) return d def __setstate__(self, state): # the mathematical ops aren't unpickleable, so lets pop them. They'll # be reinstated when the constructor is called anyway. for k, v in math_ops.items(): setattr(self, k, MethodType(v, self)) self.__dict__.update(state) def __or__(self, other): # concatenate parameter with parameter or parameters if not (is_parameter(other) or is_parameters(other)): raise ValueError( "Can only concatenate a Parameter with another" " Parameter or Parameters instance" ) p = Parameters() p.append(self) p.append(other) return p @property def vary(self): return self._vary def logp(self): raise NotImplementedError( "Subclass of BaseParameter should override this method" ) def __float__(self): return float(self.value) def __bool__(self): return bool(self.value) def __numpy_ufunc__(self): return np.array(self.value) @property def value(self): return self._eval() @value.setter def value(self, v): self._value = v @property def constraint(self): return None @property def bounds(self): return None @property def stderr(self): return self._stderr @stderr.setter def stderr(self, val): try: self._stderr = float(val) except TypeError: self._stderr = None def dependencies(self): dep_list = [] for _dep in self._deps: if isinstance(_dep, Parameter): if _dep.constraint is not None: dep_list.append(_dep.dependencies()) else: dep_list.append(_dep) if isinstance(_dep, (_UnaryOp, _BinaryOp)): dep_list.append(_dep.dependencies()) return list(f_unique(flatten(dep_list))) def has_dependencies(self): return len(self._deps) > 0 def __str__(self): """Returns printable representation of a Parameter object.""" constraint = "" if self.constraint is not None: constraint = f", constraint={self.constraint}" fixed = "" if not self.vary and self.constraint is None: fixed = " (fixed)" elif self.stderr is not None: fixed = f" +/- {self.stderr:0.3g}" s = ( f"<Parameter:{self.name!r:^15}" f", value={self.value:g}{fixed: ^10}" f", bounds={str(self.bounds)}" f"{constraint}>" ) return s class Constant(BaseParameter): def __init__(self, value=0.0, name=None): super().__init__() self.name = name self.value = value self._vary = False def __repr__(self): return f"Constant(value={self.value}, name={self.name!r})" def _eval(self): return self._value
[docs]def possibly_create_parameter( value, name="", bounds=None, vary=False, constraint=None, units=None ): """ If supplied with a Parameter return it. If supplied with float, wrap it in a Parameter instance. Parameters ---------- value : float, optional Numerical Parameter value. name : str, optional Name of the parameter. bounds: `refnx.analysis.Bounds`, tuple, optional Sets the bounds for the parameter. Either supply a `refnx.analysis.Bounds` object (or one of its subclasses), or a `(lower_bound, upper_bound)` tuple. vary : bool, optional Whether the Parameter is fixed during a fit. constraint : expression, optional Python expression used to constrain the value during the fit. units : str Units for the Parameter. Returns ------- parameter : refnx.analysis.Parameter """ if is_parameter(value): return value else: return Parameter( value, name=name, bounds=bounds, vary=vary, constraint=constraint, units=units, )
[docs]def sequence_to_parameters(seq): """ Flattens and converts sequence of float/Parameter to a Parameters instance. Parameters ---------- seq: sequence Sequence (possibly nested) of float/Parameter Returns ------- params: `refnx.analysis.Parameters` """ flat_seq = flatten(seq) pars = [possibly_create_parameter(p) for p in flat_seq] return Parameters(pars)
[docs]class Parameter(BaseParameter): """ Class for specifying a variable. Parameters ---------- value : float, optional Numerical Parameter value. name : str, optional Name of the parameter. bounds: `refnx.analysis.Bounds`, tuple, optional Sets the bounds for the parameter. Either supply a `refnx.analysis.Bounds` object (or one of its subclasses), or a `(lower_bound, upper_bound)` tuple. vary : bool, optional Whether the Parameter is fixed during a fit. constraint : expression, optional Python expression used to constrain the value during the fit. """ def __init__( self, value=0.0, name=None, bounds=None, vary=False, constraint=None, units=None, ): """ Class for specifying a variable. Parameters ---------- value : float, optional Numerical Parameter value. name : str, optional Name of the parameter. bounds: `refnx.analysis.Bounds`, tuple, optional Sets the bounds for the parameter. Either supply a `refnx.analysis.Bounds` object (or one of its subclasses), or a `(lower_bound, upper_bound)` tuple. vary : bool, optional Whether the Parameter is fixed during a fit. constraint : expression, optional Python expression used to constrain the value during the fit. units : str, optional units for the Parameter """ super().__init__() self.name = name # set bounds before setting value because the value may not be valid # for the bounds self._bounds = Interval() if bounds is not None: self.bounds = bounds self.value = value self._vary = vary self._constraint = None self._constraint_args = None self.constraint = constraint self.units = units def __repr__(self): # repr does not include stderr because that can't be used to # create a Parameter s = ( f"{self.__class__.__name__}(" f"value={float(self.value)}," f" name={self.name!r}," f" vary={self.vary!r}," f" bounds={self._bounds!r}," f" constraint={self._constraint!r})" ) return s
[docs] def logp(self, pval=None): """ Calculate the log probability of the parameter Returns ------- prob : float log probability of the parameter """ if pval is not None: val = pval else: val = self.value if self.bounds is not None: return self.bounds.logp(val) else: return 0.0
@property def value(self): """ The numeric value of the :class:`Parameter` """ if self._constraint is not None: if callable(self._constraint): retval = float(self._constraint(*self._constraint_args)) else: retval = self._constraint._eval() else: retval = self._value return retval @value.setter def value(self, v): value = np.float64(v) self._value = value def _eval(self): if self._constraint is not None: return self._constraint._eval() else: return self._value @property def bounds(self): """ The bounds placed on this :class:`Parameter`. """ return self._bounds @bounds.setter def bounds(self, bounds): if isinstance(bounds, Bounds): self._bounds = bounds elif bounds is None: self._bounds = Interval() elif hasattr(bounds, "__len__") and len(bounds) == 2: self.range(*bounds) else: rv = PDF(bounds) self._bounds = rv
[docs] def valid(self, val): """ The truth of whether a value would satisfy the bounds for this parameter. Parameters ---------- val : float A proposed value Returns ------- valid : bool `np.isfinite(Parameter.logp(val))` """ return self.bounds.valid(val)
[docs] def range(self, lower, upper): """ Sets the lower and upper limits on the Parameter Parameters ---------- lower : float lower bound upper : float upper bound Returns ------- None """ self.bounds = Interval(lower, upper)
@property def vary(self): """ Whether this :class:`Parameter` is allowed to vary """ return self._vary @vary.setter def vary(self, vary): if self.constraint is not None: raise RuntimeError("cannot vary a Parameter which is constrained") else: self._vary = vary @property def constraint(self): return self._constraint @constraint.setter def constraint(self, expr): self._deps = [] if expr is None: value = self.value self._constraint = None self._constraint_args = None self.value = value return _expr = asMagicNumber(expr) if id(self) in [id(dep) for dep in flatten(_expr.dependencies())]: raise ValueError("Your constraint contains a circular dependency") self._constraint = _expr self._constraint_args = None if isinstance(expr, Parameter): self._deps.append(expr) self._deps.extend(flatten(_expr.dependencies())) self._vary = False
[docs] def set_constraint(self, constraint, args=()): """ Constrains the Parameter. Parameters ---------- constraint : {None, expression, callable} One of: - None, remove all constraints on this Parameter. - expression, an algebraic Python expression used to constrain the Parameter value. - callable, a Python function, ``constraint(*args)`` that returns a float value for the Parameter value. The callable should not use this Parameter in any of its calculations; nor should the callable use any Parameter in its calculations that possesses a constraint that would eventually lead back to this Parameter. If these conditions aren't met then circular dependencies with undefined side effects will be created. *A Parameter cannot ultimately depend on itself*. args : tuple a sequence of arguments given to `constraint` if it is a callable. This sequence can contain other Parameters, numbers, arrays, objects, etc. It is *strongly recommended* that this sequence is not nested. This is because ``args`` is searched for other Parameter objects, which are stored internally within this object as dependencies. Any constraints that these dependencies may have are evaluated before they are provided to the callable. If the callable uses Parameters that are not immediately retrievable from ``args`` (e.g. stored as attributes in an object), and those Parameters have constraints themselves, then those Parameters will likely have stale values, resulting in undefined behaviour. Examples -------- >>> from refnx.analysis import Parameter >>> a = Parameter(1) >>> b = Parameter(2) >>> a.set_constraint(np.sin(2*b)) >>> print(a.value) -0.7568024953079282 >>> def c(*args): ... return np.sin(args[0] * args[1]) >>> a.set_constraint(c, args=(2, b)) >>> print(a.value) -0.7568024953079282 """ if constraint is None: # clear the constraint self.constraint = None elif callable(constraint): deps = [] for arg in flatten(args): if id(arg) == id(self): raise ValueError( "Your constraint contains a circular dependency" ) if isinstance(arg, BaseParameter): expr = asMagicNumber(arg) if id(self) in [ id(dep) for dep in flatten(expr.dependencies()) ]: raise ValueError( "Your constraint contains a circular dependency" ) if isinstance(expr, Parameter): deps.append(expr) deps.extend(flatten(expr.dependencies())) # check return value can be coerced to a float # an Exception will probably be raised if that's the case float(constraint(*args)) # at this point the constraint function should be ok. self._constraint = constraint self._constraint_args = args self._deps = deps self._vary = False else: self.constraint = constraint
[docs] def setp(self, value=None, vary=None, bounds=None, constraint=None): """ Set several attributes of the parameter at once. Parameters ---------- value : float, optional Numerical Parameter value. vary : bool, optional Whether the Parameter is fixed during a fit. bounds: `refnx.analysis.Bounds`, tuple, optional Sets the bounds for the parameter. Either supply a `refnx.analysis.Bounds` object (or one of its subclasses), or a `(lower_bound, upper_bound)` tuple. constraint : expression, optional Python expression used to constrain the value during the fit. """ if value is not None: self.value = value if vary is not None: self.vary = vary if bounds is not None: self.bounds = bounds if constraint is not None: self.constraint = constraint
[docs] def corner(self): """ Plots a histogram of the Parameter posterior. Requires matplotlib be installed. Returns ------- fig, ax : :class:`matplotlib.Figure`, :class:`matplotlib.Axes` `matplotlib` figure and axes objects. """ if self.chain is None: raise RuntimeError( "Parameter.chain is not present try sampling first" ) import matplotlib.pyplot as plt fig = plt.figure() ax = fig.add_subplot(111) lb, med, ub = np.percentile(self.chain, [15.87, 50, 84.13]) sigma = 0.5 * (ub - lb) s = ax.hist( self.chain.flatten(), bins=100, density=True, histtype="step" ) ax.vlines( [med - sigma, med, med + sigma], 0, 1.1 * np.max(s[0]), colors="r", linestyle=["dashed", "solid", "dashed"], ) ax.set_xlabel(f"{self.name} / {self.units}") ax.annotate(f"{med:.6g} $\\pm$ {sigma:.6g}", (med, np.max(s[0]))) return fig, ax
class _BinaryOp(BaseParameter): def __init__(self, op1, op2, operation): super().__init__() self.op1 = op1 self.op2 = op2 self.opn = operation self._deps = [] self._deps.append(op1) self._deps.append(op2) def _eval(self): return self.opn(self.op1._eval(), self.op2._eval()) class _UnaryOp(BaseParameter): def __init__(self, op1, operation): super().__init__() self.op1 = op1 self.opn = operation self._deps = [] self._deps.append(op1) def _eval(self): return self.opn(self.op1._eval())
[docs]def is_parameter(x): """Test for Parameter-ness.""" return isinstance(x, BaseParameter)
[docs]def is_parameters(x): """Test for Parameter-ness.""" return isinstance(x, Parameters)
def _constraint_tree_helper(expr): t = [] if isinstance(expr, Parameter): return expr if isinstance(expr, Constant): return expr if isinstance(expr, _BinaryOp): t.append( [ _constraint_tree_helper(expr.op1), _constraint_tree_helper(expr.op2), expr.opn, ] ) if isinstance(expr, _UnaryOp): t.append([_constraint_tree_helper(expr.op1), expr.opn]) return t def constraint_tree(expr): """ builds a mathematical tree of a constraint expression this can be fed into build_constraint_from_tree to reconstitute a constraint """ if isinstance(expr, Parameter): return [expr] if isinstance(expr, Constant): return [expr] return list(flatten(_constraint_tree_helper(expr))) def build_constraint_from_tree(tree): """ A calculator for a constraint tree. It's essentially a reverse Polish notation calculator. """ v = [] for t in tree: if callable(t): if t in binary: o1 = v.pop() o2 = v.pop() v.append(t(o1, o2)) elif t in unary: o1 = v.pop() v.append(t(o1)) else: v.append(t) return v[0]