#!/usr/bin/python
# -*- coding: utf-8 -*-
##
# distributions.py: module for probability distributions.
##
# © 2017, Chris Ferrie (csferrie@gmail.com) and
# Christopher Granade (cgranade@cgranade.com).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
##
## IMPORTS ###################################################################
from __future__ import division
from __future__ import absolute_import
from builtins import range
from future.utils import with_metaclass
import numpy as np
import scipy.stats as st
import scipy.linalg as la
from scipy.interpolate import interp1d
from scipy.integrate import cumtrapz
from scipy.spatial import ConvexHull, Delaunay
from functools import partial
import abc
from qinfer import utils as u
from qinfer.metrics import rescaled_distance_mtx
from qinfer.clustering import particle_clusters
from qinfer._exceptions import ApproximationWarning
import warnings
## EXPORTS ###################################################################
__all__ = [
'Distribution',
'SingleSampleMixin',
'MixtureDistribution',
'ParticleDistribution',
'ProductDistribution',
'UniformDistribution',
'DiscreteUniformDistribution',
'MVUniformDistribution',
'ConstantDistribution',
'NormalDistribution',
'MultivariateNormalDistribution',
'SlantedNormalDistribution',
'LogNormalDistribution',
'BetaDistribution',
'DirichletDistribution',
'BetaBinomialDistribution',
'GammaDistribution',
'GinibreUniform',
'HaarUniform',
'HilbertSchmidtUniform',
'PostselectedDistribution',
'ConstrainedSumDistribution',
'InterpolatedUnivariateDistribution'
]
## FUNCTIONS #################################################################
def scipy_dist(name, *args, **kwargs):
"""
Wraps calling a scipy.stats distribution to allow for pickling.
See https://github.com/scipy/scipy/issues/3125.
"""
return getattr(st, name)(*args, **kwargs)
## ABSTRACT CLASSES AND MIXINS ###############################################
[docs]class Distribution(with_metaclass(abc.ABCMeta, object)):
"""
Abstract base class for probability distributions on one or more random
variables.
"""
@abc.abstractproperty
def n_rvs(self):
"""
The number of random variables that this distribution is over.
:type: `int`
"""
pass
[docs] @abc.abstractmethod
def sample(self, n=1):
"""
Returns one or more samples from this probability distribution.
:param int n: Number of samples to return.
:rtype: numpy.ndarray
:return: An array containing samples from the
distribution of shape ``(n, d)``, where ``d`` is the number of
random variables.
"""
pass
[docs]class SingleSampleMixin(with_metaclass(abc.ABCMeta, object)):
"""
Mixin class that extends a class so as to generate multiple samples
correctly, given a method ``_sample`` that generates one sample at a time.
"""
@abc.abstractmethod
def _sample(self):
pass
[docs] def sample(self, n=1):
samples = np.zeros((n, self.n_rvs))
for idx in range(n):
samples[idx, :] = self._sample()
return samples
## CLASSES ###################################################################
[docs]class MixtureDistribution(Distribution):
r"""
Samples from a weighted list of distributions.
:param weights: Length ``n_dist`` list or ``np.ndarray``
of probabilites summing to 1.
:param dist: Either a length ``n_dist`` list of ``Distribution`` instances,
or a ``Distribution`` class, for example, ``NormalDistribution``.
It is assumed that a list of ``Distribution``s all
have the same ``n_rvs``.
:param dist_args: If ``dist`` is a class, an array
of shape ``(n_dist, n_rvs)`` where ``dist_args[k,:]`` defines
the arguments of the k'th distribution. Use ``None`` if the distribution
has no arguments.
:param dist_kw_args: If ``dist`` is a class, a dictionary
where each key's value is an array
of shape ``(n_dist, n_rvs)`` where ``dist_kw_args[key][k,:]`` defines
the keyword argument corresponding to ``key`` of the k'th distribution.
Use ``None`` if the distribution needs no keyword arguments.
:param bool shuffle: Whether or not to shuffle result after sampling. Not shuffling
will result in variates being in the same order as
the distributions. Default is ``True``.
"""
def __init__(self, weights, dist, dist_args=None, dist_kw_args=None, shuffle=True):
super(MixtureDistribution, self).__init__()
self._weights = weights
self._n_dist = len(weights)
self._shuffle = shuffle
try:
self._example_dist = dist[0]
self._is_dist_list = True
self._dist_list = dist
assert(self._n_dist == len(self._dist_list))
except:
self._is_dist_list = False
self._dist = dist
self._dist_args = dist_args
self._dist_kw_args = dist_kw_args
assert(self._n_dist == self._dist_args.shape[0])
self._example_dist = self._dist(
*self._dist_arg(0),
**self._dist_kw_arg(0)
)
def _dist_arg(self, k):
"""
Returns the arguments for the k'th distribution.
:param int k: Index of distribution in question.
:rtype: ``np.ndarary``
"""
if self._dist_args is not None:
return self._dist_args[k,:]
else:
return []
def _dist_kw_arg(self, k):
"""
Returns a dictionary of keyword arguments
for the k'th distribution.
:param int k: Index of the distribution in question.
:rtype: ``dict``
"""
if self._dist_kw_args is not None:
return {
key:self._dist_kw_args[key][k,:]
for key in self._dist_kw_args.keys()
}
else:
return {}
@property
def n_rvs(self):
return self._example_dist.n_rvs
@property
def n_dist(self):
"""
The number of distributions in the mixture distribution.
"""
return self._n_dist
[docs] def sample(self, n=1):
# how many samples to take from each dist
ns = np.random.multinomial(n, self._weights)
idxs = np.arange(self.n_dist)[ns > 0]
if self._is_dist_list:
# sample from each distribution
samples = np.concatenate([
self._dist_list[k].sample(n=ns[k])
for k in idxs
])
else:
# instantiate each distribution and then sample
samples = np.concatenate([
self._dist(
*self._dist_arg(k),
**self._dist_kw_arg(k)
).sample(n=ns[k])
for k in idxs
])
# in-place shuffling
if self._shuffle:
np.random.shuffle(samples)
return samples
[docs]class ParticleDistribution(Distribution):
r"""
A distribution consisting of a list of weighted vectors.
Note that either `n_mps` or both (`particle_locations`, `particle_weights`)
must be specified, or an error will be raised.
:param numpy.ndarray particle_weights: Length ``n_particles`` list
of particle weights.
:param particle_locations: Shape ``(n_particles, n_mps)`` array of
particle locations.
:param int n_mps: Dimension of parameter space. This parameter should
only be set when `particle_weights` and `particle_locations` are
not set (and vice versa).
"""
def __init__(self, n_mps=None, particle_locations=None, particle_weights=None):
super(ParticleDistribution, self).__init__()
if particle_locations is None or particle_weights is None:
# Initialize with single particle at origin.
self.particle_locations = np.zeros((1, n_mps))
self.particle_weights = np.ones((1,))
elif n_mps is None:
self.particle_locations = particle_locations
self.particle_weights = np.abs(particle_weights)
self.particle_weights = self.particle_weights / np.sum(self.particle_weights)
else:
raise ValueError('Either the dimension of parameter space, `n_mps`, or the particles, `particle_locations` and `particle_weights` must be specified.')
@property
def n_particles(self):
"""
Returns the number of particles in the distribution
:type: `int`
"""
return self.particle_locations.shape[0]
@property
def n_ess(self):
"""
Returns the effective sample size (ESS) of the current particle
distribution.
:type: `float`
:return: The effective sample size, given by :math:`1/\sum_i w_i^2`.
"""
return 1 / (np.sum(self.particle_weights**2))
## DISTRIBUTION CONTRACT ##
@property
def n_rvs(self):
"""
Returns the dimension of each particle.
:type: `int`
"""
return self.particle_locations.shape[1]
[docs] def sample(self, n=1):
"""
Returns random samples from the current particle distribution according
to particle weights.
:param int n: The number of samples to draw.
:return: The sampled model parameter vectors.
:rtype: `~numpy.ndarray` of shape ``(n, updater.n_rvs)``.
"""
cumsum_weights = np.cumsum(self.particle_weights)
return self.particle_locations[np.minimum(cumsum_weights.searchsorted(
np.random.random((n,)),
side='right'
), len(cumsum_weights) - 1)]
## MOMENT FUNCTIONS ##
[docs] @staticmethod
def particle_mean(weights, locations):
r"""
Returns the arithmetic mean of the `locations` weighted by `weights`
:param numpy.ndarray weights: Weights of each particle in array of
shape ``(n_particles,)``.
:param numpy.ndarray locations: Locations of each particle in array
of shape ``(n_particles, n_modelparams)``
:rtype: :class:`numpy.ndarray`, shape ``(n_modelparams,)``.
:returns: An array containing the mean
"""
return np.dot(weights, locations)
[docs] @classmethod
def particle_covariance_mtx(cls, weights, locations):
"""
Returns an estimate of the covariance of a distribution
represented by a given set of SMC particle.
:param weights: An array of shape ``(n_particles,)`` containing
the weights of each particle.
:param location: An array of shape ``(n_particles, n_modelparams)``
containing the locations of each particle.
:rtype: :class:`numpy.ndarray`, shape
``(n_modelparams, n_modelparams)``.
:returns: An array containing the estimated covariance matrix.
"""
# Find the mean model vector, shape (n_modelparams, ).
mu = cls.particle_mean(weights, locations)
# Transpose the particle locations to have shape
# (n_modelparams, n_particles).
xs = locations.transpose([1, 0])
# Give a shorter name to the particle weights, shape (n_particles, ).
ws = weights
cov = (
# This sum is a reduction over the particle index, chosen to be
# axis=2. Thus, the sum represents an expectation value over the
# outer product $x . x^T$.
#
# All three factors have the particle index as the rightmost
# index, axis=2. Using the Einstein summation convention (ESC),
# we can reduce over the particle index easily while leaving
# the model parameter index to vary between the two factors
# of xs.
#
# This corresponds to evaluating A_{m,n} = w_{i} x_{m,i} x_{n,i}
# using the ESC, where A_{m,n} is the temporary array created.
np.einsum('i,mi,ni', ws, xs, xs)
# We finish by subracting from the above expectation value
# the outer product $mu . mu^T$.
- np.dot(mu[..., np.newaxis], mu[np.newaxis, ...])
)
# The SMC approximation is not guaranteed to produce a
# positive-semidefinite covariance matrix. If a negative eigenvalue
# is produced, we should warn the caller of this.
assert np.all(np.isfinite(cov))
if not np.all(la.eig(cov)[0] >= 0):
warnings.warn('Numerical error in covariance estimation causing positive semidefinite violation.', ApproximationWarning)
return cov
[docs] def est_mean(self):
"""
Returns the mean value of the current particle distribution.
:rtype: :class:`numpy.ndarray`, shape ``(n_mps,)``.
:returns: An array containing the an estimate of the mean model vector.
"""
return self.particle_mean(self.particle_weights,
self.particle_locations)
[docs] def est_meanfn(self, fn):
"""
Returns an the expectation value of a given function
:math:`f` over the current particle distribution.
Here, :math:`f` is represented by a function ``fn`` that is vectorized
over particles, such that ``f(modelparams)`` has shape
``(n_particles, k)``, where ``n_particles = modelparams.shape[0]``, and
where ``k`` is a positive integer.
:param callable fn: Function implementing :math:`f` in a vectorized
manner. (See above.)
:rtype: :class:`numpy.ndarray`, shape ``(k, )``.
:returns: An array containing the an estimate of the mean of :math:`f`.
"""
return np.einsum('i...,i...',
self.particle_weights, fn(self.particle_locations)
)
[docs] def est_covariance_mtx(self, corr=False):
"""
Returns the full-rank covariance matrix of the current particle
distribution.
:param bool corr: If `True`, the covariance matrix is normalized
by the outer product of the square root diagonal of the covariance matrix,
i.e. the correlation matrix is returned instead.
:rtype: :class:`numpy.ndarray`, shape
``(n_modelparams, n_modelparams)``.
:returns: An array containing the estimated covariance matrix.
"""
cov = self.particle_covariance_mtx(self.particle_weights,
self.particle_locations)
if corr:
dstd = np.sqrt(np.diag(cov))
cov /= (np.outer(dstd, dstd))
return cov
## INFORMATION QUANTITIES ##
[docs] def est_entropy(self):
r"""
Estimates the entropy of the current particle distribution
as :math:`-\sum_i w_i \log w_i` where :math:`\{w_i\}`
is the set of particles with nonzero weight.
"""
nz_weights = self.particle_weights[self.particle_weights > 0]
return -np.sum(np.log(nz_weights) * nz_weights)
def _kl_divergence(self, other_locs, other_weights, kernel=None, delta=1e-2):
"""
Finds the KL divergence between this and another particle
distribution by using a kernel density estimator to smooth over the
other distribution's particles.
"""
if kernel is None:
kernel = st.norm(loc=0, scale=1).pdf
dist = rescaled_distance_mtx(self, other_locs) / delta
K = kernel(dist)
return -self.est_entropy() - (1 / delta) * np.sum(
self.particle_weights *
np.log(
np.sum(
other_weights * K,
axis=1 # Sum over the particles of ``other``.
)
),
axis=0 # Sum over the particles of ``self``.
)
[docs] def est_kl_divergence(self, other, kernel=None, delta=1e-2):
"""
Finds the KL divergence between this and another particle
distribution by using a kernel density estimator to smooth over the
other distribution's particles.
:param SMCUpdater other:
"""
return self._kl_divergence(
other.particle_locations,
other.particle_weights,
kernel, delta
)
## CLUSTER ESTIMATION METHODS #############################################
[docs] def est_cluster_moments(self, cluster_opts=None):
# TODO: document
if cluster_opts is None:
cluster_opts = {}
for cluster_label, cluster_particles in particle_clusters(
self.particle_locations, self.particle_weights,
**cluster_opts
):
w = self.particle_weights[cluster_particles]
l = self.particle_locations[cluster_particles]
yield (
cluster_label,
sum(w), # The zeroth moment is very useful here!
self.particle_mean(w, l),
self.particle_covariance_mtx(w, l)
)
[docs] def est_cluster_covs(self, cluster_opts=None):
# TODO: document
cluster_moments = np.array(
list(self.est_cluster_moments(cluster_opts=cluster_opts)),
dtype=[
('label', 'int'),
('weight', 'float64'),
('mean', '{}float64'.format(self.n_rvs)),
('cov', '{0},{0}float64'.format(self.n_rvs)),
])
ws = cluster_moments['weight'][:, np.newaxis, np.newaxis]
within_cluster_var = np.sum(ws * cluster_moments['cov'], axis=0)
between_cluster_var = self.particle_covariance_mtx(
# Treat the cluster means as a new very small particle cloud.
cluster_moments['weight'], cluster_moments['mean']
)
total_var = within_cluster_var + between_cluster_var
return within_cluster_var, between_cluster_var, total_var
[docs] def est_cluster_metric(self, cluster_opts=None):
"""
Returns an estimate of how much of the variance in the current posterior
can be explained by a separation between *clusters*.
"""
wcv, bcv, tv = self.est_cluster_covs(cluster_opts)
return np.diag(bcv) / np.diag(tv)
## REGION ESTIMATION METHODS ##############################################
[docs] def est_credible_region(self, level=0.95, return_outside=False, modelparam_slice=None):
"""
Returns an array containing particles inside a credible region of a
given level, such that the described region has probability mass
no less than the desired level.
Particles in the returned region are selected by including the highest-
weight particles first until the desired credibility level is reached.
:param float level: Crediblity level to report.
:param bool return_outside: If `True`, the return value is a tuple
of the those particles within the credible region, and the rest
of the posterior particle cloud.
:param slice modelparam_slice: Slice over which model parameters
to consider.
:rtype: :class:`numpy.ndarray`, shape ``(n_credible, n_mps)``,
where ``n_credible`` is the number of particles in the credible
region and ``n_mps`` corresponds to the size of ``modelparam_slice``.
If ``return_outside`` is ``True``, this method instead
returns tuple ``(inside, outside)`` where ``inside`` is as
described above, and ``outside`` has shape ``(n_particles-n_credible, n_mps)``.
:return: An array of particles inside the estimated credible region. Or,
if ``return_outside`` is ``True``, both the particles inside and the
particles outside, as a tuple.
"""
# which slice of modelparams to take
s_ = np.s_[modelparam_slice] if modelparam_slice is not None else np.s_[:]
mps = self.particle_locations[:, s_]
# Start by sorting the particles by weight.
# We do so by obtaining an array of indices `id_sort` such that
# `particle_weights[id_sort]` is in descending order.
id_sort = np.argsort(self.particle_weights)[::-1]
# Find the cummulative sum of the sorted weights.
cumsum_weights = np.cumsum(self.particle_weights[id_sort])
# Find all the indices where the sum is less than level.
# We first find id_cred such that
# `all(cumsum_weights[id_cred] <= level)`.
id_cred = cumsum_weights <= level
# By construction, by adding the next particle to id_cred, it must be
# true that `cumsum_weights[id_cred] >= level`, as required.
id_cred[np.sum(id_cred)] = True
# We now return a slice onto the particle_locations by first permuting
# the particles according to the sort order, then by selecting the
# credible particles.
if return_outside:
return (
mps[id_sort][id_cred],
mps[id_sort][np.logical_not(id_cred)]
)
else:
return mps[id_sort][id_cred]
[docs] def region_est_hull(self, level=0.95, modelparam_slice=None):
"""
Estimates a credible region over models by taking the convex hull of
a credible subset of particles.
:param float level: The desired crediblity level (see
:meth:`SMCUpdater.est_credible_region`).
:param slice modelparam_slice: Slice over which model parameters
to consider.
:return: The tuple ``(faces, vertices)`` where ``faces`` describes all the
vertices of all of the faces on the exterior of the convex hull, and
``vertices`` is a list of all vertices on the exterior of the
convex hull.
:rtype: ``faces`` is a ``numpy.ndarray`` with shape
``(n_face, n_mps, n_mps)`` and indeces ``(idx_face, idx_vertex, idx_mps)``
where ``n_mps`` corresponds to the size of ``modelparam_slice``.
``vertices`` is an ``numpy.ndarray`` of shape ``(n_vertices, n_mps)``.
"""
points = self.est_credible_region(
level=level,
modelparam_slice=modelparam_slice
)
hull = ConvexHull(points)
return points[hull.simplices], points[u.uniquify(hull.vertices.flatten())]
[docs] def region_est_ellipsoid(self, level=0.95, tol=0.0001, modelparam_slice=None):
r"""
Estimates a credible region over models by finding the minimum volume
enclosing ellipse (MVEE) of a credible subset of particles.
:param float level: The desired crediblity level (see
:meth:`SMCUpdater.est_credible_region`).
:param float tol: The allowed error tolerance in the MVEE optimization
(see :meth:`~qinfer.utils.mvee`).
:param slice modelparam_slice: Slice over which model parameters
to consider.
:return: A tuple ``(A, c)`` where ``A`` is the covariance
matrix of the ellipsoid and ``c`` is the center.
A point :math:`\vec{x}` is in the ellipsoid whenever
:math:`(\vec{x}-\vec{c})^{T}A^{-1}(\vec{x}-\vec{c})\leq 1`.
:rtype: ``A`` is ``np.ndarray`` of shape ``(n_mps,n_mps)`` and
``centroid`` is ``np.ndarray`` of shape ``(n_mps)``.
``n_mps`` corresponds to the size of ``param_slice``.
"""
_, vertices = self.region_est_hull(level=level, modelparam_slice=modelparam_slice)
A, centroid = u.mvee(vertices, tol)
return A, centroid
[docs] def in_credible_region(self, points, level=0.95, modelparam_slice=None, method='hpd-hull', tol=0.0001):
"""
Decides whether each of the points lie within a credible region
of the current distribution.
If ``tol`` is ``None``, the particles are tested directly against
the convex hull object. If ``tol`` is a positive ``float``,
particles are tested to be in the interior of the smallest
enclosing ellipsoid of this convex hull, see
:meth:`SMCUpdater.region_est_ellipsoid`.
:param np.ndarray points: An ``np.ndarray`` of shape ``(n_mps)`` for
a single point, or of shape ``(n_points, n_mps)`` for multiple points,
where ``n_mps`` corresponds to the same dimensionality as ``param_slice``.
:param float level: The desired crediblity level (see
:meth:`SMCUpdater.est_credible_region`).
:param str method: A string specifying which credible region estimator to
use. One of ``'pce'``, ``'hpd-hull'`` or ``'hpd-mvee'`` (see below).
:param float tol: The allowed error tolerance for those methods
which require a tolerance (see :meth:`~qinfer.utils.mvee`).
:param slice modelparam_slice: A slice describing which model parameters
to consider in the credible region, effectively marginizing out the
remaining parameters. By default, all model parameters are included.
:return: A boolean array of shape ``(n_points, )`` specifying whether
each of the points lies inside the confidence region.
Methods
~~~~~~~
The following values are valid for the ``method`` argument.
- ``'pce'``: Posterior Covariance Ellipsoid.
Computes the covariance
matrix of the particle distribution marginalized over the excluded
slices and uses the :math:`\chi^2` distribution to determine
how to rescale it such the the corresponding ellipsoid has
the correct size. The ellipsoid is translated by the
mean of the particle distribution. It is determined which
of the ``points`` are on the interior.
- ``'hpd-hull'``: High Posterior Density Convex Hull.
See :meth:`SMCUpdater.region_est_hull`. Computes the
HPD region resulting from the particle approximation, computes
the convex hull of this, and it is determined which
of the ``points`` are on the interior.
- ``'hpd-mvee'``: High Posterior Density Minimum Volume Enclosing Ellipsoid.
See :meth:`SMCUpdater.region_est_ellipsoid`
and :meth:`~qinfer.utils.mvee`. Computes the
HPD region resulting from the particle approximation, computes
the convex hull of this, and determines the minimum enclosing
ellipsoid. Deterimines which
of the ``points`` are on the interior.
"""
if method == 'pce':
s_ = np.s_[modelparam_slice] if modelparam_slice is not None else np.s_[:]
A = self.est_covariance_mtx()[s_, s_]
c = self.est_mean()[s_]
# chi-squared distribution gives correct level curve conversion
mult = st.chi2.ppf(level, c.size)
results = u.in_ellipsoid(points, mult * A, c)
elif method == 'hpd-mvee':
tol = 0.0001 if tol is None else tol
A, c = self.region_est_ellipsoid(level=level, tol=tol, modelparam_slice=modelparam_slice)
results = u.in_ellipsoid(points, np.linalg.inv(A), c)
elif method == 'hpd-hull':
# it would be more natural to call region_est_hull,
# but that function uses ConvexHull which has no
# easy way of determining if a point is interior.
# Here, Delaunay gives us access to all of the
# necessary simplices.
# this fills the convex hull with (n_mps+1)-dimensional
# simplices; the convex hull is an almost-everywhere
# disjoint union of these simplices
hull = Delaunay(self.est_credible_region(level=level, modelparam_slice=modelparam_slice))
# now we just check whether each of the given points are in
# any of the simplices. (http://stackoverflow.com/a/16898636/1082565)
results = hull.find_simplex(points) >= 0
return results
[docs]class ProductDistribution(Distribution):
r"""
Takes a non-zero number of QInfer distributions :math:`D_k` as input
and returns their Cartesian product.
In other words, the returned distribution is
:math:`\Pr(D_1, \dots, D_N) = \prod_k \Pr(D_k)`.
:param Distribution factors:
Distribution objects representing :math:`D_k`.
Alternatively, one iterable argument can be given,
in which case the factors are the values drawn from that iterator.
"""
def __init__(self, *factors):
if len(factors) == 1:
try:
self._factors = list(factors[0])
except:
self._factors = factors
else:
self._factors = factors
@property
def n_rvs(self):
return sum([f.n_rvs for f in self._factors])
[docs] def sample(self, n=1):
return np.hstack([f.sample(n) for f in self._factors])
_DEFAULT_RANGES = np.array([[0, 1]])
_DEFAULT_RANGES.flags.writeable = False # Prevent anyone from modifying the
# default ranges.
## CLASSES ###################################################################
[docs]class ConstantDistribution(Distribution):
"""
Represents a determinstic variable; useful for combining with other
distributions, marginalizing, etc.
:param values: Shape ``(n,)`` array or list of values :math:`X_0` such that
:math:`\Pr(X) = \delta(X - X_0)`.
"""
def __init__(self, values):
self._values = np.array(values)[np.newaxis, :]
@property
def n_rvs(self):
return self._values.shape[1]
[docs] def sample(self, n=1):
return np.repeat(self._values, n, axis=0)
[docs]class NormalDistribution(Distribution):
"""
Normal or truncated normal distribution over a single random
variable.
:param float mean: Mean of the represented random variable.
:param float var: Variance of the represented random variable.
:param tuple trunc: Limits at which the PDF of this
distribution should be truncated, or ``None`` if
the distribution is to have infinite support.
"""
def __init__(self, mean, var, trunc=None):
self.mean = mean
self.var = var
if trunc is not None:
low, high = trunc
sigma = np.sqrt(var)
a = (low - mean) / sigma
b = (high - mean) / sigma
self.dist = partial(scipy_dist, 'truncnorm', a, b, loc=mean, scale=np.sqrt(var))
else:
self.dist = partial(scipy_dist, 'norm', mean, np.sqrt(var))
@property
def n_rvs(self):
return 1
[docs] def sample(self, n=1):
return self.dist().rvs(size=n)[:, np.newaxis]
[docs] def grad_log_pdf(self, x):
return -(x - self.mean) / self.var
[docs]class MultivariateNormalDistribution(Distribution):
"""
Multivariate (vector-valued) normal distribution.
:param np.ndarray mean: Array of shape ``(n_rvs, )``
representing the mean of the distribution.
:param np.ndarray cov: Array of shape ``(n_rvs, n_rvs)``
representing the covariance matrix of the distribution.
"""
def __init__(self, mean, cov):
# Flatten the mean first, so we have a strong guarantee about its
# shape.
self.mean = np.array(mean).flatten()
self.cov = cov
self.invcov = la.inv(cov)
@property
def n_rvs(self):
return self.mean.shape[0]
[docs] def sample(self, n=1):
return np.einsum("ij,nj->ni", la.sqrtm(self.cov), np.random.randn(n, self.n_rvs)) + self.mean
[docs] def grad_log_pdf(self, x):
return -np.dot(self.invcov, (x - self.mean).transpose()).transpose()
[docs]class SlantedNormalDistribution(Distribution):
r"""
Uniform distribution on a given rectangular region with
additive noise. Random variates from this distribution
follow :math:`X+Y` where :math:`X` is drawn uniformly
with respect to the rectangular region defined by ranges, and
:math:`Y` is normally distributed about 0 with variance
``weight**2``.
:param numpy.ndarray ranges: Array of shape ``(n_rvs, 2)``, where ``n_rvs``
is the number of random variables, specifying the upper and lower limits
for each variable.
:param float weight: Number specifying the inverse variance
of the additive noise term.
"""
def __init__(self, ranges=_DEFAULT_RANGES, weight=0.01):
if not isinstance(ranges, np.ndarray):
ranges = np.array(ranges)
if len(ranges.shape) == 1:
ranges = ranges[np.newaxis, ...]
self._ranges = ranges
self._n_rvs = ranges.shape[0]
self._delta = ranges[:, 1] - ranges[:, 0]
self._weight = weight
@property
def n_rvs(self):
return self._n_rvs
[docs] def sample(self, n=1):
shape = (n, self._n_rvs)# if n == 1 else (self._n_rvs, n)
z = np.random.randn(n, self._n_rvs)
return self._ranges[:, 0] + \
self._weight*z + \
np.random.rand(n, self._n_rvs)*self._delta[np.newaxis,:]
[docs]class LogNormalDistribution(Distribution):
"""
Log-normal distribution.
:param mu: Location parameter (numeric), set to 0 by default.
:param sigma: Scale parameter (numeric), set to 1 by default.
Must be strictly greater than zero.
"""
def __init__(self, mu=0, sigma=1):
self.mu = mu # lognormal location parameter
self.sigma = sigma # lognormal scale parameter
self.dist = partial(scipy_dist, 'lognorm', 1, mu, sigma) # scipy distribution location = 0
@property
def n_rvs(self):
return 1
[docs] def sample(self, n=1):
return self.dist().rvs(size=n)[:, np.newaxis]
[docs]class BetaDistribution(Distribution):
r"""
The beta distribution, whose pdf at :math:`x` is proportional to
:math:`x^{\alpha-1}(1-x)^{\beta-1}`.
Note that either ``alpha`` and ``beta``, or ``mean`` and ``var``, must be
specified as inputs;
either case uniquely determines the distribution.
:param float alpha: The alpha shape parameter of the beta distribution.
:param float beta: The beta shape parameter of the beta distribution.
:param float mean: The desired mean value of the beta distribution.
:param float var: The desired variance of the beta distribution.
"""
def __init__(self, alpha=None, beta=None, mean=None, var=None):
if alpha is not None and beta is not None:
self.alpha = alpha
self.beta = beta
self.mean = alpha / (alpha + beta)
self.var = alpha * beta / ((alpha + beta) ** 2 * (alpha + beta + 1))
elif mean is not None and var is not None:
self.mean = mean
self.var = var
self.alpha = mean ** 2 * (1 - mean) / var - mean
self.beta = (1 - mean) ** 2 * mean / var - (1 - mean)
else:
raise ValueError(
"BetaDistribution requires either (alpha and beta) "
"or (mean and var)."
)
self.dist = st.beta(a=self.alpha, b=self.beta)
@property
def n_rvs(self):
return 1
[docs] def sample(self, n=1):
return self.dist.rvs(size=n)[:, np.newaxis]
[docs]class DirichletDistribution(Distribution):
r"""
The dirichlet distribution, whose pdf at :math:`x` is proportional to
:math:`\prod_i x_i^{\alpha_i-1}`.
:param alpha: The list of concentration parameters.
"""
def __init__(self, alpha):
self._alpha = np.array(alpha)
if self.alpha.ndim != 1:
raise ValueError('The input alpha must be a 1D list of concentration parameters.')
self._dist = st.dirichlet(alpha=self.alpha)
@property
def alpha(self):
return self._alpha
@property
def n_rvs(self):
return self._alpha.size
[docs] def sample(self, n=1):
return self._dist.rvs(size=n)
[docs]class BetaBinomialDistribution(Distribution):
r"""
The beta-binomial distribution, whose pmf at the non-negative
integer :math:`k` is equal to
:math:`\binom{n}{k}\frac{B(k+\alpha,n-k+\beta)}{B(\alpha,\beta)}`
with :math:`B(\cdot,\cdot)` the beta function.
This is the compound distribution whose variates are binomial distributed
with a bias chosen from a beta distribution.
Note that either ``alpha`` and ``beta``, or ``mean`` and ``var``, must be
specified as inputs;
either case uniquely determines the distribution.
:param int n: The :math:`n` parameter of the beta-binomial distribution.
:param float alpha: The alpha shape parameter of the beta-binomial distribution.
:param float beta: The beta shape parameter of the beta-binomial distribution.
:param float mean: The desired mean value of the beta-binomial distribution.
:param float var: The desired variance of the beta-binomial distribution.
"""
def __init__(self, n, alpha=None, beta=None, mean=None, var=None):
self.n = n
if alpha is not None and beta is not None:
self.alpha = alpha
self.beta = beta
self.mean = n * alpha / (alpha + beta)
self.var = n * alpha * beta * (alpha + beta + n) / ((alpha + beta) ** 2 * (alpha + beta + 1))
elif mean is not None and var is not None:
self.mean = mean
self.var = var
self.alpha = - mean * (var + mean **2 - n * mean) / (mean ** 2 + n * (var - mean))
self.beta = (n - mean) * (var + mean ** 2 - n * mean) / ((n - mean) * mean - n * var)
else:
raise ValueError("BetaBinomialDistribution requires either (alpha and beta) or (mean and var).")
# Beta-binomial is a compound distribution, drawing binomial
# RVs off of a beta-distrubuted bias.
self._p_dist = st.beta(a=self.alpha, b=self.beta)
@property
def n_rvs(self):
return 1
[docs] def sample(self, n=1):
p_vals = self._p_dist.rvs(size=n)[:, np.newaxis]
# numpy.random.binomial supports sampling using different p values,
# whereas scipy does not.
return np.random.binomial(self.n, p_vals)
[docs]class GammaDistribution(Distribution):
r"""
The gamma distribution, whose pdf at :math:`x` is proportional to
:math:`x^{-\alpha-1}e^{-x\beta}`.
Note that either alpha and beta, or mean and var, must be
specified as inputs;
either case uniquely determines the distribution.
:param float alpha: The alpha shape parameter of the gamma distribution.
:param float beta: The beta shape parameter of the gamma distribution.
:param float mean: The desired mean value of the gamma distribution.
:param float var: The desired variance of the gamma distribution.
"""
def __init__(self, alpha=None, beta=None, mean=None, var=None):
if alpha is not None and beta is not None:
self.alpha = alpha
self.beta = beta
self.mean = alpha / beta
self.var = alpha / beta ** 2
elif mean is not None and var is not None:
self.mean = mean
self.var = var
self.alpha = mean ** 2 / var
self.beta = mean / var
else:
raise ValueError("GammaDistribution requires either (alpha and beta) or (mean and var).")
# This is the distribution we want up to a scale factor of beta
self._dist = st.gamma(self.alpha)
@property
def n_rvs(self):
return 1
[docs] def sample(self, n=1):
return self._dist.rvs(size=n)[:, np.newaxis] / self.beta
[docs]class PostselectedDistribution(Distribution):
"""
Postselects a distribution based on validity within a given model.
"""
# TODO: rewrite LiuWestResampler in terms of this and a
# new MixtureDistribution.
def __init__(self, distribution, model, maxiters=100):
self._dist = distribution
self._model = model
self._maxiters = maxiters
@property
def n_rvs(self):
return self._dist.n_rvs
[docs] def sample(self, n=1):
"""
Returns one or more samples from this probability distribution.
:param int n: Number of samples to return.
:return numpy.ndarray: An array containing samples from the
distribution of shape ``(n, d)``, where ``d`` is the number of
random variables.
"""
samples = np.empty((n, self.n_rvs))
idxs_to_sample = np.arange(n)
iters = 0
while idxs_to_sample.size and iters < self._maxiters:
samples[idxs_to_sample] = self._dist.sample(len(idxs_to_sample))
idxs_to_sample = idxs_to_sample[np.nonzero(np.logical_not(
self._model.are_models_valid(samples[idxs_to_sample, :])
))[0]]
iters += 1
if idxs_to_sample.size:
raise RuntimeError("Did not successfully postselect within {} iterations.".format(self._maxiters))
return samples
[docs] def grad_log_pdf(self, x):
return self._dist.grad_log_pdf(x)
[docs]class InterpolatedUnivariateDistribution(Distribution):
"""
Samples from a single-variable distribution specified by its PDF. The
samples are drawn by first drawing uniform samples over the interval
``[0, 1]``, and then using an interpolation of the inverse-CDF
corresponding to the given PDF to transform these samples into the
desired distribution.
:param callable pdf: Vectorized single-argument function that evaluates
the PDF of the desired distribution.
:param float compactification_scale: Scale of the compactified coordinates
used to interpolate the given PDF.
:param int n_interp_points: The number of points at which to sample the
given PDF.
"""
def __init__(self, pdf, compactification_scale=1, n_interp_points=1500):
self._pdf = pdf
self._xs = u.compactspace(compactification_scale, n_interp_points)
self._generate_interp()
def _generate_interp(self):
xs = self._xs
pdfs = self._pdf(xs)
norm_factor = np.trapz(pdfs, xs)
self._cdfs = cumtrapz(pdfs / norm_factor, xs, initial=0)
self._interp_inv_cdf = interp1d(self._cdfs, xs, bounds_error=False)
@property
def n_rvs(self):
return 1
[docs] def sample(self, n=1):
return self._interp_inv_cdf(np.random.random(n))[:, np.newaxis]
[docs]class ConstrainedSumDistribution(Distribution):
"""
Samples from an underlying distribution and then
enforces that all samples must sum to some given
value by normalizing each sample.
:param Distribution underlying_distribution: Underlying probability distribution.
:param float desired_total: Desired sum of each sample.
"""
def __init__(self, underlying_distribution, desired_total=1):
super(ConstrainedSumDistribution, self).__init__()
self._ud = underlying_distribution
self.desired_total = desired_total
@property
def underlying_distribution(self):
return self._ud
@property
def n_rvs(self):
return self.underlying_distribution.n_rvs
[docs] def sample(self, n=1):
s = self.underlying_distribution.sample(n)
totals = np.sum(s, axis=1)[:,np.newaxis]
return self.desired_total * np.sign(totals) * s / totals