Source code for qinfer.expdesign

#!/usr/bin/python
# -*- coding: utf-8 -*-
##
# expdesign.py: Adaptive experimental design algorithms.
##
# © 2017, Chris Ferrie ([email protected]) and
#         Christopher Granade ([email protected]).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#     1. Redistributions of source code must retain the above copyright
#        notice, this list of conditions and the following disclaimer.
#
#     2. Redistributions in binary form must reproduce the above copyright
#        notice, this list of conditions and the following disclaimer in the
#        documentation and/or other materials provided with the distribution.
#
#     3. Neither the name of the copyright holder nor the names of its
#        contributors may be used to endorse or promote products derived from
#        this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
##

## FEATURES ###################################################################

from __future__ import absolute_import
from __future__ import division

## ALL ########################################################################

# We use __all__ to restrict what globals are visible to external modules.
__all__ = [
    'ExperimentDesigner',
    'Heuristic',
    'EnsembleHeuristic',
    'ExpSparseHeuristic',
    'PGH',
    'OptimizationAlgorithms'
]

## IMPORTS ####################################################################

from future.utils import with_metaclass

import numpy as np

# for BCRB and BED classes
import scipy.optimize as opt
from qinfer._lib import enum # <- TODO: replace with flufl.enum!

from abc import ABCMeta, abstractmethod
import warnings

from qinfer.finite_difference import *

## FUNCTIONS ###################################################################

def identity(arg): return arg

## CLASSES #####################################################################

OptimizationAlgorithms = enum.enum("NULL", "CG", "NCG", "NELDER_MEAD")

[docs]class Heuristic(with_metaclass(ABCMeta, object)): r""" Defines a heuristic used for selecting new experiments without explicit optimization of the risk. As an example, the :math:`t_k = (9/8)^k` heuristic discussed by [FGC12]_ does not make explicit reference to the risk, and so would be appropriate as a `Heuristic` subclass. In particular, the [FGC12]_ heuristic is implemented by the :class:`ExpSparseHeuristic` class. """ def __init__(self, updater): self._updater = updater @abstractmethod def __call__(self, *args): raise NotImplementedError("Not yet implemented.")
class EnsembleHeuristic(Heuristic): r""" Heuristic that randomly chooses one of several other heuristics. :param list ensemble: List of tuples ``(heuristic, pr)`` specifying the probability of choosing each member heuristic. """ def __init__(self, ensemble): self._pr = np.array([pr for heuristic, pr in ensemble]) self._heuristics = ([heuristic for heuristic, pr in ensemble]) def __call__(self, *args): idx_heuristic = np.random.choice(len(self._heuristics), p=self._pr) return self._heuristics[idx_heuristic](*args)
[docs]class ExpSparseHeuristic(Heuristic): r""" Implements the exponentially-sparse time evolution heuristic of [FGC12]_, under which :math:`t_k = A b^k`, where :math:`A` and :math:`b` are parameters of the heuristic. :param qinfer.smc.SMCUpdater updater: Posterior updater for which experiments should be heuristicly designed. :param float scale: The value of :math:`A`, implicitly setting the frequency scale for the problem. :param float base: The base of the exponent; in general, should be closer to 1 for higher-dimensional models. :param str t_field: Name of the expparams field representing time. If None, then the generated expparams are taken to be scalar, and not a record. :param dict other_fields: Values of the other fields to be used in designed experiments. """ def __init__(self, updater, scale=1, base=9/8, t_field=None, other_fields=None ): super(ExpSparseHeuristic, self).__init__(updater) self._scale = scale self._base = base self._t_field = t_field self._other_fields = other_fields def __call__(self): n_exps = len(self._updater.data_record) t = self._scale * (self._base ** n_exps) dtype = self._updater.model.expparams_dtype if self._t_field is None: return np.array([t], dtype=dtype) else: eps = np.empty((1,), dtype=dtype) for field, value in self._other_fields.items(): eps[field] = value eps[self._t_field] = t return eps
[docs]class PGH(Heuristic): """ Implements the *particle guess heuristic* (PGH) of [WGFC13a]_, which selects two particles from the current posterior, selects one as an inversion hypothesis and sets the time parameter to be the inverse of the distance between the particles. In this way, the PGH adapts to the current uncertianty without additional simulation resources. :param qinfer.smc.SMCUpdater updater: Posterior updater for which experiments should be heuristicly designed. :param str inv_field: Name of the ``expparams`` field corresponding to the inversion hypothesis. :param str t_field: Name of the ``expparams`` field corresponding to the evolution time. :param callable inv_func: Function to be applied to modelparameter vectors to produce an inversion field ``x_``. :param callable t_func: Function to be applied to the evolution time to produce a time field ``t``. :param int maxiters: Number of times to try and choose distinct particles before giving up. :param dict other_fields: Values to set for fields not given by the PGH. Once initialized, a ``PGH`` object can be called to generate a new experiment parameter vector: >>> pgh = PGH(updater) # doctest: +SKIP >>> expparams = pgh() # doctest: +SKIP If the posterior weights are very highly peaked (that is, if the effective sample size is too small, as measured by :attr:`~qinfer.smc.SMCUpdater.n_ess`), then it may be the case that the two particles chosen by the PGH are identical, such that the time would be determined by ``1 / 0``. In this case, the `PGH` class will instead draw new pairs of particles until they are not identical, up to ``maxiters`` attempts. If that limit is reached, a `RuntimeError` will be raised. """ def __init__(self, updater, inv_field='x_', t_field='t', inv_func=identity, t_func=identity, maxiters=10, other_fields=None ): super(PGH, self).__init__(updater) self._x_ = inv_field self._t = t_field self._inv_func = inv_func self._t_func = t_func self._maxiters = maxiters self._other_fields = other_fields if other_fields is not None else {} def __call__(self): idx_iter = 0 while idx_iter < self._maxiters: x, xp = self._updater.sample(n=2)[:, np.newaxis, :] if self._updater.model.distance(x, xp) > 0: break else: idx_iter += 1 if self._updater.model.distance(x, xp) == 0: raise RuntimeError("PGH did not find distinct particles in {} iterations.".format(self._maxiters)) eps = np.empty((1,), dtype=self._updater.model.expparams_dtype) eps[self._x_] = self._inv_func(x) eps[self._t] = self._t_func(1 / self._updater.model.distance(x, xp)) for field, value in self._other_fields.items(): eps[field] = value return eps
[docs]class ExperimentDesigner(object): """ Designs new experiments using the current best information provided by a Bayesian updater. :param qinfer.smc.SMCUpdater updater: A Bayesian updater to design experiments for. :param OptimizationAlgorithms opt_algo: Algorithm to be used to perform local optimization. """ def __init__(self, updater, opt_algo=OptimizationAlgorithms.CG): if opt_algo not in OptimizationAlgorithms.reverse_mapping: raise ValueError("Unsupported or unknown optimization algorithm.") self._updater = updater self._opt_algo = opt_algo # Set everything up for the first experiment. self.new_exp() ## METHODS ################################################################
[docs] def new_exp(self): """ Resets this `ExperimentDesigner` instance and prepares for designing the next experiment. """ self.__best_cost = None self.__best_ep = None
[docs] def design_expparams_field(self, guess, field, cost_scale_k=1.0, disp=False, maxiter=None, maxfun=None, store_guess=False, grad_h=None, cost_mult=False ): r""" Designs a new experiment by varying a single field of a shape ``(1,)`` record array and minimizing the objective function .. math:: O(\vec{e}) = r(\vec{e}) + k \$(\vec{e}), where :math:`r` is the Bayes risk as calculated by the updater, and where :math:`\$` is the cost function specified by the model. Here, :math:`k` is a parameter specified to relate the units of the risk and the cost. See :ref:`expdesign` for more details. :param guess: Either a record array with a single guess, or a callable function that generates guesses. :type guess: Instance of :class:`~Heuristic`, `callable` or :class:`~numpy.ndarray` of ``dtype`` :attr:`~qinfer.abstract_model.Simulatable.expparams_dtype` :param str field: The name of the ``expparams`` field to be optimized. All other fields of ``guess`` will be held constant. :param float cost_scale_k: A scale parameter :math:`k` relating the Bayes risk to the experiment cost. See :ref:`expdesign`. :param bool disp: If `True`, the optimization will print additional information as it proceeds. :param int maxiter: For those optimization algorithms which support it (currently, only CG and NELDER_MEAD), limits the number of optimization iterations used for each guess. :param int maxfun: For those optimization algorithms which support it (currently, only NCG and NELDER_MEAD), limits the number of objective calls that can be made. :param bool store_guess: If ``True``, will compare the outcome of this guess to previous guesses and then either store the optimization of this experiment, or the previous best-known experiment design. :param float grad_h: Step size to use in estimating gradients. Used only if ``opt_algo`` is NCG. :return: An array representing the best experiment design found so far for the current experiment. """ # Define some short names for commonly used properties. up = self._updater m = up.model # Generate a new guess or use a guess provided, depending on the # type of the guess argument. if isinstance(guess, Heuristic): raise NotImplementedError("Not yet implemented.") elif callable(guess): # Generate a new guess by calling the guess function provided. ep = guess( idx_exp=len(up.data_record), mean=up.est_mean(), cov=up.est_covariance_mtx() ) else: # Make a copy of the guess that we can manipulate, but otherwise # use it as-is. ep = np.copy(guess) # Define an objective function that wraps a vector of scalars into # an appropriate record array. if (cost_mult==False): def objective_function(x): """ Used internally by design_expparams_field. If you see this, something probably went wrong. """ ep[field] = x return up.bayes_risk(ep) + cost_scale_k * m.experiment_cost(ep) else: def objective_function(x): """ Used internally by design_expparams_field. If you see this, something probably went wrong. """ ep[field] = x return up.bayes_risk(ep)* m.experiment_cost(ep)**cost_scale_k # Some optimizers require gradients of the objective function. # Here, we create a FiniteDifference object to compute that for # us. d_dx_objective = FiniteDifference(objective_function, ep[field].size) # Allocate a variable to hold the local optimum value found. # This way, if an optimization algorithm doesn't support returning # the value as well as the location, we can find it manually. f_opt = None # Here's the core, where we break out and call the various optimization # routines provided by SciPy. if self._opt_algo == OptimizationAlgorithms.NULL: # This optimization algorithm does nothing locally, but only # exists to leverage the store_guess functionality below. x_opt = guess[0][field] elif self._opt_algo == OptimizationAlgorithms.CG: # Prepare any additional options. opt_options = {} if maxiter is not None: opt_options['maxiter'] = maxiter # Actually call fmin_cg, gathering all outputs we can. x_opt, f_opt, func_calls, grad_calls, warnflag = opt.fmin_cg( objective_function, guess[0][field], disp=disp, full_output=True, **opt_options ) elif self._opt_algo == OptimizationAlgorithms.NCG: # Prepare any additional options. opt_options = {} if maxfun is not None: opt_options['maxfun'] = maxfun if grad_h is not None: opt_options['epsilon'] = grad_h # Actually call fmin_tnc, gathering all outputs we can. # We use fmin_tnc in preference to fmin_ncg, as they implement the # same algorithm, but fmin_tnc seems better behaved with respect # to very flat gradient regions, due to respecting maxfun. # By contrast, fmin_ncg can get stuck in an infinite loop in # versions of SciPy < 0.11. # # Note that in some versions of SciPy, there was a bug in # fmin_ncg and fmin_tnc that can propagate outward if the gradient # is too flat. We catch it here and return the initial guess in that # case, since by hypothesis, it's too flat to make much difference # anyway. try: x_opt, f_opt, func_calls, grad_calls, h_calls, warnflag = opt.fmin_tnc( objective_function, guess[0][field], fprime=None, bounds=None, approx_grad=True, disp=disp, full_output=True, **opt_options ) except TypeError: warnings.warn( "Gradient function too flat for NCG.", RuntimeWarning) x_opt = guess[0][field] f_opt = None elif self._opt_algo == OptimizationAlgorithms.NELDER_MEAD: opt_options = {} if maxfun is not None: opt_options['maxfun'] = maxfun if maxiter is not None: opt_options['maxiter'] = maxiter x_opt, f_opt, iters, func_calls, warnflag = opt.fmin( objective_function, guess[0][field], disp=disp, full_output=True, **opt_options ) # Optionally compare the result to previous guesses. if store_guess: # Possibly compute the objective function value at the local optimum # if we don't already know it. if f_opt is None: guess_qual = objective_function(x_opt) # Compare to the known best cost so far. if self.__best_cost is None or (self.__best_cost > f_opt): # No known best yet, or we're better than the previous best, # so record this guess. ep[field] = x_opt self.__best_cost = f_opt self.__best_ep = ep else: ep = self.__best_ep # Guess is bad, return current best guess else: # We aren't using guess recording, so just pack the local optima # into ep for returning. ep[field] = x_opt # In any case, return the optimized guess. return ep