# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2021 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik.
import numpy as np
from .. import random
from ..domain_tuple import DomainTuple
from ..linearization import Linearization
from ..minimization.energy import Energy
from ..multi_domain import MultiDomain
from ..sugar import from_random
from ..utilities import (allreduce_sum, get_MPI_params_from_comm, myassert,
shareRange)
[docs]
class EnergyAdapter(Energy):
"""Helper class which provides the traditional Nifty Energy interface to
Nifty operators with a scalar target domain.
Parameters
-----------
position : :class:`nifty8.field.Field` or :class:`nifty8.multi_field.MultiField`
The position where the minimization process is started.
op : EnergyOperator
The expression computing the energy from the input data.
constants : list of strings
The component names of the operator's input domain which are assumed
to be constant during the minimization process.
If the operator's input domain is not a MultiField, this must be empty.
Default: [].
want_metric : bool
If True, the class will provide a `metric` property. This should only
be enabled if it is required, because it will most likely consume
additional resources. Default: False.
nanisinf : bool
If true, nan energies which can happen due to overflows in the forward
model are interpreted as inf. Thereby, the code does not crash on
these occaisions but rather the minimizer is told that the position it
has tried is not sensible.
"""
[docs]
def __init__(self, position, op, constants=[], want_metric=False,
nanisinf=False):
if len(constants) > 0:
cstpos = position.extract_by_keys(constants)
_, op = op.simplify_for_constant_input(cstpos)
varkeys = set(op.domain.keys()) - set(constants)
position = position.extract_by_keys(varkeys)
super(EnergyAdapter, self).__init__(position)
self._op = op
self._want_metric = want_metric
lin = Linearization.make_var(position, want_metric)
tmp = self._op(lin)
self._val = tmp.val.val[()]
self._grad = tmp.gradient
self._metric = tmp._metric
self._nanisinf = bool(nanisinf)
if self._nanisinf and np.isnan(self._val):
self._val = np.inf
[docs]
def at(self, position):
return EnergyAdapter(position, self._op, want_metric=self._want_metric,
nanisinf=self._nanisinf)
@property
def value(self):
return self._val
@property
def gradient(self):
return self._grad
@property
def metric(self):
return self._metric
[docs]
def apply_metric(self, x):
return self._metric(x)
[docs]
class StochasticEnergyAdapter(Energy):
"""Provide the energy interface for an energy operator where parts of the
input are averaged instead of optimized.
Specifically, a set of standard normal distributed samples are drawn for
the input corresponding to `keys` and each sample is inserted partially
into `op`. The resulting operators are then averaged. The subdomain that
is not sampled is left a stochastic average of an energy with the remaining
subdomain being the DOFs that are considered to be optimization parameters.
Notes
-----
`StochasticEnergyAdapter` should never be created using the constructor,
but rather via the factory function :attr:`make`.
"""
[docs]
def __init__(self, position, op, keys, local_ops, n_samples, comm, nanisinf,
noise, _callingfrommake=False):
if not _callingfrommake:
raise NotImplementedError
super(StochasticEnergyAdapter, self).__init__(position)
for lop in local_ops:
myassert(position.domain == lop.domain)
self._comm = comm
self._local_ops = local_ops
self._n_samples = n_samples
self._nanisinf = nanisinf
lin = Linearization.make_var(position)
v, g = [], []
for lop in self._local_ops:
tmp = lop(lin)
v.append(tmp.val.val)
g.append(tmp.gradient)
self._val = allreduce_sum(v, self._comm)[()]/self._n_samples
if np.isnan(self._val) and self._nanisinf:
self._val = np.inf
self._grad = allreduce_sum(g, self._comm)/self._n_samples
self._noise = noise
self._op = op
self._keys = keys
@property
def value(self):
return self._val
@property
def gradient(self):
return self._grad
[docs]
def at(self, position):
return StochasticEnergyAdapter(position, self._op, self._keys,
self._local_ops, self._n_samples, self._comm, self._nanisinf,
self._noise, _callingfrommake=True)
[docs]
def apply_metric(self, x):
lin = Linearization.make_var(self.position, want_metric=True)
res = []
for op in self._local_ops:
res.append(op(lin).metric(x))
return allreduce_sum(res, self._comm)/self._n_samples
@property
def metric(self):
from .kl_energies import _SelfAdjointOperatorWrapper
return _SelfAdjointOperatorWrapper(self.position.domain,
self.apply_metric)
[docs]
def resample_at(self, position):
return StochasticEnergyAdapter.make(position, self._op, self._keys,
self._n_samples, self._comm)
[docs]
@staticmethod
def make(position, op, sampling_keys, n_samples, mirror_samples,
comm=None, nanisinf=False):
"""Factory function for StochasticEnergyAdapter.
Parameters
----------
position : :class:`nifty8.multi_field.MultiField`
Values of the optimization parameters
op : Operator
The objective function of the optimization problem. Must have a
scalar target. The domain must be a `MultiDomain` with its keys
being the union of `sampling_keys` and `position.domain.keys()`.
sampling_keys : iterable of String
The keys of the subdomain over which the stochastic average of `op`
should be performed.
n_samples : int
Number of samples used for the stochastic estimate.
mirror_samples : boolean
Whether the negative of the drawn samples are also used, as they are
equally legitimate samples. If true, the number of used samples
doubles.
comm : MPI communicator or None
If not None, samples will be distributed as evenly as possible
across this communicator. If `mirror_samples` is set, then a sample
and its mirror image will always reside on the same task.
nanisinf : bool
If true, nan energies, which can occur due to overflows in the
forward model, are interpreted as inf which can be interpreted by
optimizers.
"""
myassert(op.target == DomainTuple.scalar_domain())
samdom = {}
if not isinstance(n_samples, int):
raise TypeError
for k in sampling_keys:
if (k in position.domain.keys()) or (k not in op.domain.keys()):
raise ValueError
samdom[k] = op.domain[k]
samdom = MultiDomain.make(samdom)
noise = []
sseq = random.spawn_sseq(n_samples)
ntask, rank, _ = get_MPI_params_from_comm(comm)
for i in range(*shareRange(n_samples, ntask, rank)):
with random.Context(sseq[i]):
rnd = from_random(samdom)
noise.append(rnd)
if mirror_samples:
noise.append(-rnd)
local_ops = []
for nn in noise:
_, tmp = op.simplify_for_constant_input(nn)
myassert(tmp.domain == position.domain)
local_ops.append(tmp)
n_samples = 2*n_samples if mirror_samples else n_samples
return StochasticEnergyAdapter(position, op, sampling_keys, local_ops,
n_samples, comm, nanisinf, noise, _callingfrommake=True)
[docs]
def samples(self):
return self._noise