# pylint: disable=too-few-public-methods, method-hidden
""" Contains Sampler-classes. """
import warnings
from copy import copy
import numpy as np
try:
import scipy.stats as ss
except ImportError:
pass
from .utils_random import make_rng
# aliases for Numpy, Scipy-Stats, TensorFlow-samplers
ALIASES = {
'n': {'np': 'normal', 'ss': 'norm', 'tf': 'Normal'},
'u': {'np': 'uniform', 'ss': 'uniform', 'tf': 'Uniform'},
'mvn': {'np': 'multivariate_normal', 'ss': 'multivariate_normal'},
'e': {'np': 'exponential', 'ss': 'expon', 'tf': 'Exponential'},
'g': {'np': 'gamma', 'ss': 'gamma', 'tf': 'Gamma'},
'be' : {'np': 'beta', 'ss': 'beta', 'tf': 'Beta'},
'mnm': {'np': 'multinomial', 'ss': 'multinomial', 'tf': 'Multinomial'},
'f': {'np': 'f', 'ss': 'f'},
'p': {'np': 'poisson', 'ss': 'poisson'},
'w': {'np': 'weibull', 'ss': 'dweibull'},
'ln': {'np': 'lognormal', 'ss': 'lognorm'},
'b' : {'np': 'binomial', 'ss': 'binom'},
'chi2': {'np': 'chisquare', 'ss': 'chi2'},
'c': {'np': 'choice'}
}
def _get_method_by_alias(alias, module, tf_distributions=None):
""" Fetch fullname of a randomizer from ``scipy.stats``, ``tensorflow`` or
``numpy`` by its alias or fullname.
"""
rnd_submodules = {'np': np.random,
'tf': tf_distributions,
'ss': ss}
# fetch fullname
fullname = ALIASES.get(alias, {module: alias for module in ['np', 'tf', 'ss']}).get(module, None)
if fullname is None:
raise ValueError(f"Distribution {alias} has no implementaion in module {module}")
# check that the randomizer is implemented in corresponding module
if not hasattr(rnd_submodules[module], fullname):
raise ValueError(f"Distribution {fullname} has no implementaion in module {module}")
return fullname
[docs]def arithmetize(cls):
""" Add arithmetic operations to Sampler-class.
"""
for oper in ['__add__', '__mul__', '__truediv__', '__sub__', '__pow__', '__floordiv__', '__mod__',
'__radd__', '__rmul__', '__rtruediv__', '__rsub__', '__rpow__', '__rfloordiv__', '__rmod__']:
def transform(self, other, fake=oper):
""" Arithmetic operation on couple of Samplers.
Implemented via corresponding operation in ndarrays.
Parameters
----------
other : Sampler
second Sampler, the operation is applied to.
Returns
-------
Sampler
resulting sampler.
"""
_class = classes[fake]
return _class(self, other)
setattr(cls, oper, transform)
return cls
[docs]@arithmetize
class Sampler():
""" Base class Sampler that implements algebra of Samplers.
Attributes
----------
weight : float
weight of Sampler self in mixtures.
"""
def __init__(self, *args, **kwargs):
self.__array_priority__ = 100
self.weight = 1.0
# if dim is supplied, redefine sampling method
if 'dim' in kwargs:
# assemble stacked sampler
dim = kwargs.pop('dim')
stacked = type(self)(*args, **kwargs)
for _ in range(dim - 1):
stacked = type(self)(*args, **kwargs) & stacked
# redefine sample of self
self.sample = stacked.sample
[docs] def sample(self, size):
""" Sampling method of a sampler.
Parameters
----------
size : int
lentgh of sample to be generated.
Returns
-------
np.ndarray
Array of size (len, Sampler's dimension).
"""
raise NotImplementedError('The method should be implemented in child-classes!')
def __or__(self, other):
""" Implementation of '|' operation for two instances of Sampler-class.
The result is the mixture of two samplers. Weights are taken from
samplers' weight-attributes.
Parameters
----------
other : Sampler
the sampler to be added to self.
Returns
-------
Sampler
resulting mixture of two samplers.
"""
return OrSampler(self, other)
def __and__(self, other):
""" Implementation of '&' operation for instance of Sampler-class.
Two cases are possible: if ``other`` is numeric, then "&"-operation changes
the weight of a sampler. Otherwise, if ``other`` is also a Sampler, the resulting
Sampler is a multidimensional sampler, with starting coordinates being sampled from
``self``, and trailing - from ``other``.
Parameters
----------
other : int or float or Sampler
the sampler/weight for multiplication.
Returns
-------
Sampler
result of the multiplication.
"""
if isinstance(other, (float, int)):
self.weight *= other
return self
return AndSampler(self, other)
def __rand__(self, other):
""" Implementation of '&' operation on a weight for instance of Sampler-class.
see docstring of Sampler.__and__.
"""
return self & other
[docs] def apply(self, transform):
""" Apply a transformation to the sampler.
Build new sampler, which sampling function is given by `transform(self.sample(size))``.
Parameters
----------
transform : callable
function, that takes ndarray of shape (size, dim_sampler) and produces
ndarray of shape (size, new_dim_sampler).
Returns
-------
Sampler
instance of class Sampler with redefined method `sample`.
"""
return ApplySampler(self, transform)
[docs] def truncate(self, high=None, low=None, expr=None, prob=0.5, max_iters=None, sample_anyways=False):
""" Truncate a sampler. Resulting sampler produces points satisfying ``low <= pts <= high``.
If ``expr`` is suplied, the condition is ``low <= expr(pts) <= high``.
Uses while-loop to obtain a sample from the region of interest of needed size. The behaviour
of the while loop is controlled by parameters ``max_iters`` and ``sample_anyways``-parameters.
Parameters
----------
high : ndarray, list, float
upper truncation-bound.
low : ndarray, list, float
lower truncation-bound.
expr : callable, optional.
Some vectorized function. Accepts points of sampler, returns either bool or float.
In case of float, either high or low should also be supplied.
prob : float, optional
estimate of P(truncation-condtion is satisfied). When supplied,
can improve the performance of sampling-method of truncated sampler.
max_iters : float, optional
if the number of iterations needed for obtaining the sample exceeds this number,
either a warning or error is raised. By default is set to 1e7 (constant of TruncateSampler-class).
sample_anyways : bool, optional
If set to True, when exceeding `self.max_iters` number of iterations the procedure throws a warning
but continues. If set to False, the error is raised.
Returns
-------
Sampler
new Sampler-instance, truncated version of self.
"""
return TruncateSampler(self, high, low, expr, prob, max_iters, sample_anyways)
[docs]class OrSampler(Sampler):
""" Class for implementing `|` (mixture) operation on `Sampler`-instances.
"""
def __init__(self, left, right, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bases = [left, right]
# calculate probs of samplers in mixture
weights = np.array([self.bases[0].weight, self.bases[1].weight])
self.weight = np.sum(weights)
self.normed = weights / np.sum(weights)
[docs] def sample(self, size):
""" Sampling procedure of a mixture of two samplers. Samples points with probabilities
defined by weights (`self.weight`-attr) from two samplers invoked (`self.bases`-attr) and
mixes them in one sample of needed size.
"""
up_size = np.random.binomial(size, self.normed[0])
low_size = size - up_size
up_sample = self.bases[0].sample(size=up_size)
low_sample = self.bases[1].sample(size=low_size)
sample_points = np.concatenate([up_sample, low_sample])
sample_points = sample_points[np.random.permutation(size)]
return sample_points
[docs]class AndSampler(Sampler):
""" Class for implementing `&` (coordinates stacking) operation on `Sampler`-instances.
"""
def __init__(self, left, right, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bases = [left, right]
[docs] def sample(self, size):
""" Sampling procedure of a product of two samplers. Check out the docstring of
`Sampler.__and__` for more info.
"""
left_sample = self.bases[0].sample(size)
right_sample = self.bases[1].sample(size)
return np.concatenate([left_sample, right_sample], axis=1)
[docs]class ApplySampler(Sampler):
""" Class for implementing `apply` (adding transform) operation on `Sampler`-instances.
"""
def __init__(self, sampler, transform, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bases = [sampler]
self.transform = transform
[docs] def sample(self, size):
""" Sampling procedure of a sampler subjugated to a transform. Check out the docstring of
`Sampler.apply` for more info.
"""
return self.transform(self.bases[0].sample(size))
[docs]class TruncateSampler(Sampler):
""" Class for implementing `truncate` (truncation by a condition) operation on `Sampler`-instances.
"""
# Used when truncating a Sampler. If we cannot obtain a needed amount of points
# from the region of interest using this number of iterations, we throw a Warning or ValueError
max_iters = 1e7
def __init__(self, sampler, high=None, low=None, expr=None, prob=0.5, max_iters=None,
sample_anyways=False, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bases = [sampler]
self.high = high
self.low = low
self.expr = expr
self.prob = prob
self.sample_anyways = sample_anyways
self.max_iters = max_iters or self.max_iters
[docs] def sample(self, size):
""" Sampling method of a sampler subjugated to truncation. Check out the docstring of
`Sampler.truncation` for more info.
"""
if size == 0:
return self.bases[0].sample(size=0)
high, low, expr, prob = self.high, self.low, self.expr, self.prob
# set batch-size
expectation = size / prob
sigma = np.sqrt(size * (1 - prob) / (prob**2))
batch_size = int(expectation + 2 * sigma)
# sample, filter out, concat
ctr = 0
cumulated = 0
samples = []
while cumulated < size:
# sample points and compute condition-vector
sample = self.bases[0].sample(size=batch_size)
cond = np.ones(shape=batch_size).astype(np.bool)
if low is not None:
if expr is not None:
cond &= np.greater_equal(expr(sample).reshape(batch_size, -1), low).all(axis=1)
else:
cond &= np.greater_equal(sample, low).all(axis=1)
if high is not None:
if expr is not None:
cond &= np.less_equal(expr(sample).reshape(batch_size, -1), high).all(axis=1)
else:
cond &= np.less_equal(sample, high).all(axis=1)
if high is None and low is None:
cond &= expr(sample).all(axis=1)
# check if we reached max_iters-number of iterations
if ctr > self.max_iters:
if self.sample_anyways:
msg = f"Already took {self.max_iters} number of iteration to make a sample. "\
"Yet, `sample_anyways` is set to true, so going on. Kill the process manually if needed."
warnings.warn(msg)
else:
msg = f"The number of iterations needed to obtain the sample exceeds {self.max_iters}. "\
"Stopping the process."
raise ValueError(msg)
# get points from region of interest
samples.append(sample[cond])
cumulated += np.sum(cond)
ctr += 1
return np.concatenate(samples)[:size]
[docs]class BaseOperationSampler(Sampler):
""" Base class for implementing all arithmetic operations on `Sampler`-instances.
"""
operation = None
def __init__(self, left, right, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bases = [left, right]
[docs] def sample(self, size):
if isinstance(self.bases[1], Sampler):
return getattr(self.bases[0].sample(size), self.operation)(self.bases[1].sample(size))
return getattr(self.bases[0].sample(size), self.operation)(np.array(self.bases[1]))
[docs]class AddSampler(BaseOperationSampler):
operation = '__add__'
[docs]class MulSampler(BaseOperationSampler):
operation = '__mul__'
[docs]class TruedivSampler(BaseOperationSampler):
operation = '__truediv__'
[docs]class SubSampler(BaseOperationSampler):
operation = '__sub__'
[docs]class PowSampler(BaseOperationSampler):
operation = '__pow__'
[docs]class FloordivSampler(BaseOperationSampler):
operation = '__floordiv__'
[docs]class ModSampler(BaseOperationSampler):
operation = '__mod__'
[docs]class RAddSampler(BaseOperationSampler):
operation = '__radd__'
[docs]class RMulSampler(BaseOperationSampler):
operation = '__rmul__'
[docs]class RTruedivSampler(BaseOperationSampler):
operation = '__rtruediv__'
[docs]class RSubSampler(BaseOperationSampler):
operation = '__rsub__'
[docs]class RPowSampler(BaseOperationSampler):
operation = '__rpow__'
[docs]class RFloordivSampler(BaseOperationSampler):
operation = '__rfloordiv__'
[docs]class RModSampler(BaseOperationSampler):
operation = '__rmod__'
classes = dict(zip(['__add__', '__mul__', '__truediv__', '__sub__', '__pow__', '__floordiv__', '__mod__',
'__radd__', '__rmul__', '__rtruediv__', '__rsub__', '__rpow__', '__rfloordiv__', '__rmod__'],
[AddSampler, MulSampler, TruedivSampler, SubSampler, PowSampler, FloordivSampler, ModSampler,
RAddSampler, RMulSampler, RTruedivSampler, RSubSampler, RPowSampler, RFloordivSampler,
RModSampler]))
[docs]class ConstantSampler(Sampler):
""" Sampler of a constant.
Parameters
----------
constant : int, str, float, list
constant, associated with the Sampler. Can be multidimensional,
e.g. list or np.ndarray.
Attributes
----------
constant : np.array
vectorized constant, associated with the Sampler.
"""
def __init__(self, constant, **kwargs):
self.constant = np.array(constant).reshape(1, -1)
super().__init__(constant, **kwargs)
[docs] def sample(self, size):
""" Sampling method of ``ConstantSampler``.
Repeats sampler's constant ``size`` times.
Parameters
----------
size : int
the size of sample to be generated.
Returns
-------
np.ndarray
array of shape (size, 1) containing Sampler's constant.
"""
return np.repeat(self.constant, repeats=size, axis=0)
[docs]class NumpySampler(Sampler):
""" Sampler based on a distribution from `numpy random`.
Parameters
----------
name : str
a distribution name (a method from `numpy random`) or its alias.
seed : int
random seed for setting up sampler's state (see :func:`~.make_rng`).
**kwargs
additional keyword-arguments defining properties of specific
distribution (e.g. ``loc`` for 'normal').
Attributes
----------
name : str
a distribution name (a method from `numpy random`).
state : numpy.random.Generator
a random number generator
_params : dict
dict of args for Sampler's distribution.
"""
def __init__(self, name, seed=None, **kwargs):
super().__init__(name, seed, **kwargs)
name = _get_method_by_alias(name, 'np')
self.name = name
self._params = copy(kwargs)
self.state = make_rng(seed)
[docs] def sample(self, size):
""" Generates random samples from distribution ``self.name``.
Parameters
----------
size : int
the size of sample to be generated.
Returns
-------
np.ndarray
array of shape (size, Sampler's dimension).
"""
sampler = getattr(self.state, self.name)
sample = sampler(size=size, **self._params)
if len(sample.shape) == 1:
sample = sample.reshape(-1, 1)
return sample
[docs]class ScipySampler(Sampler):
""" Sampler based on a distribution from `scipy.stats`.
Parameters
----------
name : str
a distribution name, a class from `scipy.stats`, or its alias.
seed : int
random seed for setting up sampler's state (see :func:`~.make_rng`).
**kwargs
additional parameters for specification of the distribution.
For instance, `scale` for name='gamma'.
Attributes
----------
name : str
a distribution name (a class from `scipy.stats`).
state : numpy.random.Generator
a random number generator
distr
a distribution class
"""
def __init__(self, name, seed=None, **kwargs):
super().__init__(name, seed, **kwargs)
name = _get_method_by_alias(name, 'ss')
self.name = name
self.state = make_rng(seed)
self.distr = getattr(ss, self.name)(**kwargs)
[docs] def sample(self, size):
""" Sampling method of ``ScipySampler``.
Generates random samples from distribution ``self.name``.
Parameters
----------
size : int
the size of sample to be generated.
Returns
-------
np.ndarray
array of shape (size, Sampler's dimension).
"""
sampler = self.distr.rvs
sample = sampler(size=size, random_state=self.state)
if len(sample.shape) == 1:
sample = sample.reshape(-1, 1)
return sample
[docs]class HistoSampler(Sampler):
""" Sampler based on a histogram, output of `np.histogramdd`.
Parameters
----------
histo : tuple
histogram, on which the sampler is based.
Make sure that it is unnormalized (`normed=False` in `np.histogramdd`).
edges : list
list of len=histo_dimension, contains edges of bins along axes.
seed : int
random seed for setting up sampler's state (see :func:`~.make_rng`).
Attributes
----------
bins : np.ndarray
bins of base-histogram (see `np.histogramdd`).
edges : list
edges of base-histogram.
Notes
-----
The sampler should be based on unnormalized histogram.
if `histo`-arg is supplied, it is used for histo-initilization.
Otherwise, edges should be supplied. In this case all bins are empty.
"""
def __init__(self, histo=None, edges=None, seed=None, **kwargs):
super().__init__(histo, edges, seed, **kwargs)
if histo is not None:
self.bins = histo[0]
self.edges = histo[1]
elif edges is not None:
self.edges = edges
bins_shape = [len(axis_edge) - 1 for axis_edge in edges]
self.bins = np.zeros(shape=bins_shape, dtype=np.float32)
else:
raise ValueError('Either `histo` or `edges` should be specified.')
self.l_all = cart_prod(*(range_dim[:-1] for range_dim in self.edges))
self.h_all = cart_prod(*(range_dim[1:] for range_dim in self.edges))
self.probs = (self.bins / np.sum(self.bins)).reshape(-1)
self.nonzero_probs_idx = np.asarray(self.probs != 0.0).nonzero()[0]
self.nonzero_probs = self.probs[self.nonzero_probs_idx]
self.state = make_rng(seed)
self.state_sampler = self.state.uniform
[docs] def sample(self, size):
""" Sampling method of ``HistoSampler``.
Generates random samples from distribution, represented by
histogram (self.bins, self.edges).
Parameters
----------
size : int
the size of sample to be generated.
Returns
-------
np.ndarray
array of shape (size, histo dimension).
"""
# Choose bins to use according to non-zero probabilities
bin_nums = self.state.choice(self.nonzero_probs_idx, p=self.nonzero_probs, size=size)
# uniformly generate samples from selected boxes
low, high = self.l_all[bin_nums], self.h_all[bin_nums]
return self.state_sampler(low=low, high=high)
[docs] def update(self, points):
""" Update bins of sampler's histogram by throwing in additional points.
Parameters
----------
points : np.ndarray
Array of points of shape (n_points, histo_dimension).
"""
histo_update = np.histogramdd(sample=points, bins=self.edges)
self.bins += histo_update[0]
[docs]def cart_prod(*arrs):
""" Get array of cartesian tuples from arbitrary number of arrays.
Faster version of itertools.product. The order of tuples is lexicographic.
Parameters
----------
arrs : tuple, list or ndarray.
Any sequence of ndarrays.
Returns
-------
ndarray
2d-array with rows (arr[0][i], arr[2][j],...,arr[n][k]).
"""
grids = np.meshgrid(*arrs, indexing='ij')
return np.stack(grids, axis=-1).reshape(-1, len(arrs))