# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2021 Max-Planck-Society
# Copyright(C) 2022 Philipp Arras
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik.
import cProfile
import io
import pstats
import sys
from time import time
import numpy as np
from . import pointwise, utilities
from .domain_tuple import DomainTuple
from .domains.power_space import PowerSpace
from .field import Field
from .logger import logger
from .multi_domain import MultiDomain
from .multi_field import MultiField
from .operators.block_diagonal_operator import BlockDiagonalOperator
from .operators.diagonal_operator import DiagonalOperator
from .operators.distributors import PowerDistributor
from .operators.operator import Operator
from .operators.scaling_operator import ScalingOperator
from .operators.selection_operators import SliceOperator
from .plot import Plot, plottable2D
__all__ = ['PS_field', 'power_analyze', 'create_power_operator',
'density_estimator', 'create_harmonic_smoothing_operator',
'from_random', 'full', 'makeField', 'is_fieldlike',
'is_linearization', 'is_operator', 'makeDomain', 'is_likelihood_energy',
'get_signal_variance', 'makeOp', 'domain_union',
'get_default_codomain', 'single_plot', 'exec_time',
'calculate_position', 'plot_priorsamples'] + list(pointwise.ptw_dict.keys())
[docs]
def PS_field(pspace, function):
"""Convenience function sampling a power spectrum
Parameters
----------
pspace : PowerSpace
space at whose `k_lengths` the power spectrum function is evaluated
function : function taking and returning a numpy.ndarray(float)
the power spectrum function
Returns
-------
:class:`nifty8.field.Field`
A field defined on (pspace,) containing the computed function values
"""
if not isinstance(pspace, PowerSpace):
raise TypeError
data = function(pspace.k_lengths)
return Field(DomainTuple.make(pspace), data)
[docs]
def get_signal_variance(spec, space):
"""
Computes how much a field with a given power spectrum will vary in space
This is a small helper function that computes the expected variance
of a harmonically transformed sample of this power spectrum.
Parameters
---------
spec: method
a method that takes one k-value and returns the power spectrum at that
location
space: PowerSpace or any harmonic Domain
If this function is given a harmonic domain, it creates the naturally
binned PowerSpace to that domain.
The field, for which the signal variance is then computed, is assumed
to have this PowerSpace as naturally binned PowerSpace
"""
if space.harmonic:
space = PowerSpace(space)
if not isinstance(space, PowerSpace):
raise ValueError(
"space must be either a harmonic space or Power space.")
field = PS_field(space, spec)
dist = PowerDistributor(space.harmonic_partner, space)
k_field = dist(field)
return k_field.weight(2).s_sum()
def _single_power_analyze(field, idx, binbounds):
power_domain = PowerSpace(field.domain[idx], binbounds)
pd = PowerDistributor(field.domain, power_domain, idx)
return pd.adjoint_times(field.weight(1)).weight(-1) # divides by bin size
# MR FIXME: this function is not well suited for analyzing more than one
# subdomain at once, because it allows only one set of binbounds.
[docs]
def power_analyze(field, spaces=None, binbounds=None,
keep_phase_information=False):
"""Computes the power spectrum for a subspace of `field`.
Creates a PowerSpace for the space addressed by `spaces` with the
given binning and computes the power spectrum as a
:class:`~nifty8.field.Field` over this PowerSpace. This can only
be done if the subspace to be analyzed is a harmonic space. The
resulting field has the same units as the square of the initial
field.
Parameters
----------
field : :class:`nifty8.field.Field`
The field to be analyzed
spaces : None or int or tuple of int, optional
The indices of subdomains for which the power spectrum shall be
computed.
If None, all subdomains will be converted.
(default : None).
binbounds : None or array-like, optional
Inner bounds of the bins (default : None).
if binbounds is None : bins are inferred.
keep_phase_information : bool, optional
If False, return a real-valued result containing the power spectrum
of `field`.
If True, return a complex-valued result whose real component
contains the power spectrum computed from the real part of `field`,
and whose imaginary component contains the power
spectrum computed from the imaginary part of `field`.
The absolute value of this result should be identical to the output
of power_analyze with keep_phase_information=False.
(default : False).
Returns
-------
:class:`nifty8.field.Field`
The output object. Its domain is a PowerSpace and it contains
the power spectrum of `field`.
"""
for sp in field.domain:
if not sp.harmonic and not isinstance(sp, PowerSpace):
logger.warning("WARNING: Field has a space in `domain` which is "
"neither harmonic nor a PowerSpace.")
spaces = utilities.parse_spaces(spaces, len(field.domain))
if len(spaces) == 0:
raise ValueError("No space for analysis specified.")
field_real = not utilities.iscomplextype(field.dtype)
if (not field_real) and keep_phase_information:
raise ValueError("cannot keep phase from real-valued input Field")
if keep_phase_information:
parts = [field.real*field.real, field.imag*field.imag]
else:
if field_real:
parts = [field**2]
else:
parts = [field.real*field.real + field.imag*field.imag]
for space_index in spaces:
parts = [_single_power_analyze(part, space_index, binbounds)
for part in parts]
return parts[0] + 1j*parts[1] if keep_phase_information else parts[0]
def _create_power_field(domain, power_spectrum):
if not callable(power_spectrum): # we have a Field defined on a PowerSpace
if not isinstance(power_spectrum, Field):
raise TypeError("Field object expected")
if len(power_spectrum.domain) != 1:
raise ValueError("exactly one domain required")
if not isinstance(power_spectrum.domain[0], PowerSpace):
raise TypeError("PowerSpace required")
power_domain = power_spectrum.domain[0]
fp = power_spectrum
else:
power_domain = PowerSpace(domain)
fp = PS_field(power_domain, power_spectrum)
return PowerDistributor(domain, power_domain)(fp)
[docs]
def create_power_operator(domain, power_spectrum, space=None,
sampling_dtype=None):
"""Creates a diagonal operator with the given power spectrum.
Constructs a diagonal operator that is defined on the specified domain.
Parameters
----------
domain : Domain, tuple of Domain or DomainTuple
Domain on which the power operator shall be defined.
power_spectrum : callable or :class:`nifty8.field.Field`
An object that contains the power spectrum as a function of k.
space : int
the domain index on which the power operator will work
sampling_dtype : dtype or dict of dtype
Specifies the dtype of the underlying Gaussian distribution. Gaussian.
If `sampling_dtype` is `None`, the operator cannot be used as a
covariance, i.e. no samples can be drawn. Default: None.
Returns
-------
DiagonalOperator
An operator that implements the given power spectrum.
"""
domain = DomainTuple.make(domain)
space = utilities.infer_space(domain, space)
field = _create_power_field(domain[space], power_spectrum)
return DiagonalOperator(field, domain, space, sampling_dtype)
[docs]
def density_estimator(domain, pad=1.0, cf_fluctuations=None,
cf_azm_uniform=None, prefix=""):
from .domains.rg_space import RGSpace
from .library.correlated_fields import CorrelatedFieldMaker
from .library.special_distributions import UniformOperator
cf_azm_uniform_sane_default = (1e-4, 1.0)
cf_fluctuations_sane_default = {
"scale": (0.5, 0.3),
"cutoff": (4.0, 3.0),
"loglogslope": (-6.0, 3.0)
}
domain = DomainTuple.make(domain)
dom_scaling = 1. + np.broadcast_to(pad, (len(domain.axes), ))
if cf_fluctuations is None:
cf_fluctuations = cf_fluctuations_sane_default
if cf_azm_uniform is None:
cf_azm_uniform = cf_azm_uniform_sane_default
domain_padded = []
for d_scl, d in zip(dom_scaling, domain):
if not isinstance(d, RGSpace) or d.harmonic:
te = [f"unexpected domain encountered in `domain`: {domain}"]
te += "expected a non-harmonic `RGSpace`"
raise TypeError("\n".join(te))
shape_padded = tuple((d_scl * np.array(d.shape)).astype(int))
domain_padded.append(RGSpace(shape_padded, distances=d.distances))
domain_padded = DomainTuple.make(domain_padded)
# Set up the signal model
azm_offset_mean = 0.0 # The zero-mode should be inferred only from the data
cfmaker = CorrelatedFieldMaker(prefix)
for i, d in enumerate(domain_padded):
if isinstance(cf_fluctuations, (list, tuple)):
cf_fl = cf_fluctuations[i]
else:
cf_fl = cf_fluctuations
cfmaker.add_fluctuations_matern(d, **cf_fl, prefix=f"ax{i}")
scalar_domain = DomainTuple.scalar_domain()
uniform = UniformOperator(scalar_domain, *cf_azm_uniform)
azm = uniform.ducktape("zeromode")
cfmaker.set_amplitude_total_offset(azm_offset_mean, azm)
correlated_field = cfmaker.finalize(0).clip(-10., 10.)
normalized_amplitudes = cfmaker.get_normalized_amplitudes()
domain_shape = tuple(d.shape for d in domain)
slc = SliceOperator(correlated_field.target, domain_shape)
signal = (slc @ correlated_field).exp()
model_operators = {
"correlated_field": correlated_field,
"select_subset": slc,
"amplitude_total_offset": azm,
"normalized_amplitudes": normalized_amplitudes
}
return signal, model_operators
[docs]
def create_harmonic_smoothing_operator(domain, space, sigma):
"""Creates an operator which smoothes a subspace of a harmonic domain.
Parameters
----------
domain: DomainTuple
The total domain and target of the operator
space : int
the index of the subspace on which the operator acts.
This must be a harmonic space
sigma : float
The sigma of the Gaussian smoothing kernel
Returns
-------
DiagonalOperator
The requested smoothing operator
"""
kfunc = domain[space].get_fft_smoothing_kernel_function(sigma)
return DiagonalOperator(kfunc(domain[space].get_k_length_array()), domain,
space)
[docs]
def full(domain, val):
"""Convenience function creating Fields/MultiFields with uniform values.
Parameters
----------
domain : Domainoid
the intended domain of the output field
val : scalar value
the uniform value to be placed into all entries of the result
Returns
-------
:class:`nifty8.field.Field` or:class:`nifty8.mulit_field.MultiField`
The newly created uniform field
"""
if isinstance(domain, (dict, MultiDomain)):
return MultiField.full(domain, val)
return Field.full(domain, val)
[docs]
def from_random(domain, random_type='normal', dtype=np.float64, **kwargs):
"""Convenience function creating Fields/MultiFields with random values.
Parameters
----------
domain : Domainoid
the intended domain of the output field
random_type : 'pm1', 'normal', or 'uniform'
The random distribution to use.
dtype : type
data type of the output field (e.g. numpy.float64)
If the datatype is complex, each real an imaginary part have
variance 1.
**kwargs : additional parameters for the random distribution
('mean' and 'std' for 'normal', 'low' and 'high' for 'uniform')
Returns
-------
:class:`nifty8.field.Field` or:class:`nifty8.mulit_field.MultiField`
The newly created random field
Notes
-----
When called with a multi-domain, the individual fields will be drawn in
alphabetical order of the multi-domain's domain keys. As a consequence,
renaming these keys may cause the multi-field to be filled with different
random numbers, even for the same initial RNG state.
"""
if isinstance(domain, (dict, MultiDomain)):
return MultiField.from_random(domain, random_type, dtype, **kwargs)
return Field.from_random(domain, random_type, dtype, **kwargs)
[docs]
def makeField(domain, arr):
"""Convenience function creating Fields/MultiFields from Numpy arrays or
dicts of Numpy arrays.
Parameters
----------
domain : Domainoid
the intended domain of the output field
arr : Numpy array if `domain` corresponds to a `DomainTuple`,
dictionary of Numpy arrays if `domain` corresponds to a `MultiDomain`
Returns
-------
:class:`nifty8.field.Field` or:class:`nifty8.mulit_field.MultiField`
The newly created random field
"""
if isinstance(domain, (dict, MultiDomain)):
if not isinstance(arr, dict):
raise TypeError("If `domain` is an instance of `MultiDomain`, `arr` must be a dict of Numpy arrays.")
return MultiField.from_raw(domain, arr)
return Field.from_raw(domain, arr)
[docs]
def makeDomain(domain):
"""Convenience function creating DomainTuples/MultiDomains Domainoids.
Parameters
----------
domain : Domainoid (can be DomainTuple, MultiDomain, dict, Domain or list of Domains)
the description of the requested (multi-)domain
Returns
-------
DomainTuple or MultiDomain
The newly created domain object
"""
if isinstance(domain, (MultiDomain, dict)):
return MultiDomain.make(domain)
return DomainTuple.make(domain)
[docs]
def makeOp(inp, dom=None, sampling_dtype=None):
"""Converts a Field or MultiField to a diagonal operator.
Parameters
----------
inp : None, :class:`nifty8.field.Field` or :class:`nifty8.multi_field.MultiField`
- if None, None is returned.
- if Field on scalar-domain, a ScalingOperator with the coefficient
given by the Field is returned.
- if Field, a DiagonalOperator with the coefficients given by this
Field is returned.
- if MultiField, a BlockDiagonalOperator with entries given by this
MultiField is returned.
dom : DomainTuple or MultiDomain
if `inp` is a scalar, this is used as the operator's domain
sampling_dtype : dtype or dict of dtypes
If `inp` shall represent the diagonal covariance of a Gaussian
probabilty distribution, `sampling_dtype` specifies if it is real or
complex Gaussian. If `sampling_dtype` is `None`, the operator cannot be
used as a covariance, i.e. no samples can be drawn. Default: None.
Notes
-----
No volume factors are applied.
"""
if inp is None:
return None
if np.isscalar(inp):
if not isinstance(dom, (DomainTuple, MultiDomain)):
raise TypeError("need proper `dom` argument")
return ScalingOperator(dom, inp, sampling_dtype=sampling_dtype)
if dom is not None:
utilities.check_object_identity(dom, inp.domain)
if inp.domain is DomainTuple.scalar_domain():
return ScalingOperator(inp.domain, inp.val[()], sampling_dtype=sampling_dtype)
if isinstance(inp, Field):
return DiagonalOperator(inp, sampling_dtype=sampling_dtype)
if isinstance(inp, MultiField):
dct = {}
for key, val in inp.items():
if isinstance(sampling_dtype, dict):
sdt = sampling_dtype[key]
else:
sdt = sampling_dtype
dct[key] = makeOp(val, sampling_dtype=sdt)
return BlockDiagonalOperator(inp.domain, dct)
raise NotImplementedError
[docs]
def domain_union(domains):
"""Computes the union of multiple DomainTuples/MultiDomains.
Parameters
----------
domains : list of DomainTuple or MultiDomain
- if DomainTuple, all entries must be equal
- if MultiDomain, there must not be any conflicting components
"""
if isinstance(domains[0], DomainTuple):
for dom in domains[1:]:
utilities.check_object_identity(dom, domains[0])
return domains[0]
return MultiDomain.union(domains)
# Pointwise functions
_current_module = sys.modules[__name__]
for f in pointwise.ptw_dict.keys():
def func(f):
def func2(x, *args, **kwargs):
return x.ptw(f, *args, **kwargs)
return func2
setattr(_current_module, f, func(f))
[docs]
def get_default_codomain(domainoid, space=None):
"""For `RGSpace`, returns the harmonic partner domain.
For `DomainTuple`, returns a copy of the object in which the domain
indexed by `space` is substituted by its harmonic partner domain.
In this case, if `space` is None, it is set to 0 if the `DomainTuple`
contains exactly one domain.
Parameters
----------
domain: `RGSpace` or `DomainTuple`
Domain for which to constuct the default harmonic partner
space: int
Optional index of the subdomain to be replaced by its default
codomain. `domain[space]` must be of class `RGSpace`.
"""
from .domains.gl_space import GLSpace
from .domains.hp_space import HPSpace
from .domains.lm_space import LMSpace
from .domains.rg_space import RGSpace
if isinstance(domainoid, RGSpace):
return domainoid.get_default_codomain()
if not isinstance(domainoid, DomainTuple):
raise TypeError(
'Works only on RGSpaces and DomainTuples containing those')
space = utilities.infer_space(domainoid, space)
if not isinstance(domainoid[space], (RGSpace, HPSpace, GLSpace, LMSpace)):
raise TypeError("can only codomain structrued spaces")
ret = [dom for dom in domainoid]
ret[space] = domainoid[space].get_default_codomain()
return DomainTuple.make(ret)
[docs]
def single_plot(field, **kwargs):
"""Creates a single plot using `Plot`.
Keyword arguments are passed to both `Plot.add` and `Plot.output`.
"""
p = Plot()
p.add(field, **kwargs)
if 'title' in kwargs:
del(kwargs['title'])
p.output(**kwargs)
[docs]
def plot_priorsamples(op, n_samples=5, common_colorbar=True, **kwargs):
"""Create a number of prior sample plots using `Plot`
Parameters
----------
op:
Operator that mapping from standard Gaussian with covariance 1 to the prior distribution
n_samples: int
Number of prior samples for plotting
Note
----
Keyword arguments are passed to both `Plot.add` and `Plot.output`.
"""
p = Plot()
samples = list(op(from_random(op.domain)) for _ in range(n_samples))
if common_colorbar:
vmin = min(np.min(samples[i].val) for i in range(n_samples))
vmax = max(np.max(samples[i].val) for i in range(n_samples))
else:
vmin = vmax = None
if plottable2D(samples[0]):
for i in range(n_samples):
p.add(samples[i], vmin=vmin, vmax=vmax, **kwargs)
if 'title' in kwargs:
del(kwargs['title'])
else:
p.add(samples, **kwargs)
p.output(**kwargs)
[docs]
def exec_time(obj, want_metric=True, verbose=False, domain_dtype=np.float64, ntries=1):
"""Times the execution time of an operator or an energy.
Parameters
----------
obj : Operator or Energy
Operator or Energy that shall be profiled.
want_metric : bool, optional
Determine if Operator shall be called with `want_metric=True`. Only
applicable for EnergyOperators. Default: True.
verbose : bool, optional
If True, more profiling information is printed. Default: False.
domain_dtype : dtype or dict of dtype
ntries : int
Number of times the operator shall be called. Default: 1.
"""
from .linearization import Linearization
from .minimization.energy import Energy
def _profile_func(func, inp, what):
t0 = time()
with cProfile.Profile() as pr:
for _ in range(ntries):
res = func(inp)
logger.info(f'{what}: {(time() - t0)*1000/ntries:>8.3f} ms')
if verbose:
s = io.StringIO()
pstats.Stats(pr, stream=s).sort_stats(pstats.SortKey.TIME).print_stats(5)
logger.info(s.getvalue())
return res
def _profile_get_attr(obj, attr, what):
return _profile_func(lambda x: getattr(obj, x), attr, what)
if isinstance(obj, Energy):
newpos = 0.99*obj.position
_profile_func(lambda x: x.at(newpos), obj, "Energy.at()\t\t\t\t")
_profile_get_attr(obj, "value", "Energy.value\t\t\t\t")
_profile_get_attr(obj, "gradient", "Energy.gradient\t\t\t\t")
_profile_get_attr(obj, "metric", "Energy.metric\t\t\t\t")
if obj.metric is not None:
_profile_func(lambda x: x.apply_metric(x.position), obj, "Energy.apply_metric\t\t\t")
_profile_func(lambda x: x.metric(x.position), obj, "Energy.metric(position)\t\t\t")
elif isinstance(obj, Operator):
want_metric = bool(want_metric)
pos = from_random(obj.domain, 'normal', dtype=domain_dtype)
lin = Linearization.make_var(pos, want_metric=want_metric)
_profile_func(lambda x: x(pos), obj, "Operator call with field\t\t")
res = _profile_func(lambda x: x(lin), obj, "Operator call with linearization\t")
_profile_func(lambda x: res.jac(x), pos, "Apply linearization\t\t\t")
_profile_func(lambda x: res.jac.adjoint(x), res.val, "Apply linearization (adjoint)\t\t")
if obj.target is DomainTuple.scalar_domain():
_profile_get_attr(res, "gradient", "Gradient evaluation\t\t\t")
if want_metric:
_profile_func(lambda x: res.metric(x), pos, "Metric apply\t\t\t\t")
else:
raise TypeError
[docs]
def calculate_position(operator, output):
"""Finds approximate preimage of an operator for a given output."""
from .minimization.descent_minimizers import NewtonCG
from .minimization.iteration_controllers import GradientNormController
from .minimization.kl_energies import SampledKLEnergy
from .operators.energy_operators import GaussianEnergy, StandardHamiltonian
if not isinstance(operator, Operator):
raise TypeError
if output.domain != operator.target:
raise TypeError
if isinstance(output, MultiField):
cov = 1e-3*max([np.max(np.abs(vv)) for vv in output.val.values()])**2
invcov = ScalingOperator(output.domain, cov).inverse
dtype = list(set([ff.dtype for ff in output.values()]))
if len(dtype) != 1:
raise ValueError('Only MultiFields with one dtype supported.')
dtype = dtype[0]
else:
cov = 1e-3*np.max(np.abs(output.val))**2
dtype = output.dtype
invcov = ScalingOperator(output.domain, cov, output.dtype).inverse
d = output + invcov.draw_sample(from_inverse=True)
lh = GaussianEnergy(d, invcov) @ operator
H = StandardHamiltonian(
lh, ic_samp=GradientNormController(iteration_limit=200))
pos = 0.1*from_random(operator.domain)
minimizer = NewtonCG(GradientNormController(iteration_limit=10, name='findpos'))
for ii in range(3):
logger.info(f'Start iteration {ii+1}/3')
kl = SampledKLEnergy(pos, H, 3, None)
kl, _ = minimizer(kl)
pos = kl.position
return pos
[docs]
def is_likelihood_energy(obj):
"""Checks if object behaves like a likelihood energy.
"""
return isinstance(obj, Operator) and obj.get_transformation() is not None
[docs]
def is_operator(obj):
"""Checks if object is operator-like.
Note
----
A simple `isinstance(obj, ift.Operator)` does not give the expected result
because, e.g., :class:`~nifty8.field.Field` inherits from
:class:`~nifty8.operators.operator.Operator`.
"""
return isinstance(obj, Operator) and obj.val is None
[docs]
def is_linearization(obj):
"""Checks if object is linearization-like."""
return isinstance(obj, Operator) and obj.jac is not None
[docs]
def is_fieldlike(obj):
"""Checks if object is field-like.
Note
----
A simple `isinstance(obj, ift.Field)` does not give the expected result
because users might have implemented another class which behaves field-like
but is not an instance of :class:`~nifty8.field.Field`. Also note that
instances of :class:`~nifty8.linearization.Linearization` behave
field-like.
"""
return isinstance(obj, Operator) and obj.val is not None