# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2021 Max-Planck-Society
# Authors: Philipp Frank, Philipp Arras, Philipp Haim;
# Matern Kernel by Matteo Guardiani, Jakob Roth and
# Gordian Edenhofer
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik.
from functools import reduce
from operator import mul
from warnings import warn
import numpy as np
from .. import utilities
from ..domain_tuple import DomainTuple
from ..domains.power_space import PowerSpace
from ..domains.rg_space import RGSpace
from ..domains.unstructured_domain import UnstructuredDomain
from ..field import Field
from ..logger import logger
from ..multi_field import MultiField
from ..operators.adder import Adder
from ..operators.contraction_operator import ContractionOperator
from ..operators.diagonal_operator import DiagonalOperator
from ..operators.distributors import PowerDistributor
from ..operators.endomorphic_operator import EndomorphicOperator
from ..operators.harmonic_operators import HarmonicTransformOperator
from ..operators.linear_operator import LinearOperator
from ..operators.normal_operators import LognormalTransform, NormalTransform
from ..operators.operator import Operator
from ..operators.simple_linear_operators import VdotOperator, ducktape
from ..probing import StatCalculator
from ..sugar import full, makeDomain, makeField, makeOp
from ..utilities import myassert
def _log_k_lengths(pspace):
"""Log(k_lengths) without zeromode"""
return np.log(pspace.k_lengths[1:])
def _relative_log_k_lengths(power_space):
"""Log-distance to first bin
logkl.shape==power_space.shape, logkl[0]=logkl[1]=0"""
power_space = DomainTuple.make(power_space)
myassert(isinstance(power_space[0], PowerSpace))
myassert(len(power_space) == 1)
logkl = _log_k_lengths(power_space[0])
myassert(logkl.shape[0] == power_space[0].shape[0] - 1)
logkl -= logkl[0]
return np.insert(logkl, 0, 0)
def _log_vol(power_space):
power_space = makeDomain(power_space)
myassert(isinstance(power_space[0], PowerSpace))
logk_lengths = _log_k_lengths(power_space[0])
return logk_lengths[1:] - logk_lengths[:-1]
def _structured_spaces(domain):
if isinstance(domain[0], UnstructuredDomain):
return np.arange(1, len(domain))
return np.arange(len(domain))
def _total_fluctuation_realized(samples):
spaces = _structured_spaces(samples[0].domain)
co = ContractionOperator(samples[0].domain, spaces)
size = co.domain.size/co.target.size
res = 0.
for s in samples:
res = res + (s - co.adjoint(co(s)/size))**2
res = res.mean(spaces)/len(samples)
return np.sqrt(res if np.isscalar(res) else res.val)
class _SlopeRemover(EndomorphicOperator):
def __init__(self, domain, space=0):
self._domain = makeDomain(domain)
myassert(isinstance(self._domain[space], PowerSpace))
logkl = _relative_log_k_lengths(self._domain[space])
sc = logkl/float(logkl[-1])
self._space = space
axis = self._domain.axes[space][0]
self._last = (slice(None),)*axis + (-1,) + (None,)
extender = (None,)*(axis) + (slice(None),) + (None,)*(self._domain.axes[-1][-1]-axis)
self._sc = sc[extender]
self._capability = self.TIMES | self.ADJOINT_TIMES
def apply(self, x, mode):
self._check_input(x, mode)
if mode == self.TIMES:
x = x.val
res = x - x[self._last]*self._sc
else:
res = x.val_rw()
res[self._last] -= (res*self._sc).sum(axis=self._space, keepdims=True)
return makeField(self._tgt(mode), res)
class _TwoLogIntegrations(LinearOperator):
def __init__(self, target, space=0):
self._target = makeDomain(target)
myassert(isinstance(self.target[space], PowerSpace))
dom = list(self._target)
dom[space] = UnstructuredDomain((2, self.target[space].shape[0]-2))
self._domain = makeDomain(dom)
self._space = space
self._log_vol = _log_vol(self._target[space])
self._capability = self.TIMES | self.ADJOINT_TIMES
def apply(self, x, mode):
self._check_input(x, mode)
# Maybe make class properties
axis = self._target.axes[self._space][0]
sl = (slice(None),)*axis
extender_sl = (None,)*axis + (slice(None),) + (None,)*(self._target.axes[-1][-1] - axis)
first = sl + (0,)
second = sl + (1,)
from_third = sl + (slice(2, None),)
no_border = sl + (slice(1, -1),)
reverse = sl + (slice(None, None, -1),)
if mode == self.TIMES:
x = x.val
res = np.empty(self._target.shape)
res[first] = res[second] = 0
res[from_third] = np.cumsum(x[second], axis=axis)
res[from_third] = (res[from_third] + res[no_border])/2*self._log_vol[extender_sl] + x[first]
res[from_third] = np.cumsum(res[from_third], axis=axis)
else:
x = x.val_rw()
res = np.zeros(self._domain.shape)
x[from_third] = np.cumsum(x[from_third][reverse], axis=axis)[reverse]
res[first] += x[from_third]
x[from_third] *= (self._log_vol/2.)[extender_sl]
x[no_border] += x[from_third]
res[second] += np.cumsum(x[from_third][reverse], axis=axis)[reverse]
return makeField(self._tgt(mode), res)
class _Normalization(Operator):
"""Exponentiate the logarithmic power spectrum, normalize by the sum over
all modes and return the square root of the "normalized" power spectrum.
Notes
-----
The operator does not perform a proper normalization as it does not account
for changes in position space volume. This leads to an additional factor of
`1 / \\sqrt{totvol}` being introduced into the result with `totvol`
referring to the total volume in position space. The exact value of the
additional factor stems from the fact that the volume in harmonic space is
solely dependent on the distances in position space. Thus, if the position
spaces is enlarged without changing its distances, the volume in harmonic
space is kept constant. Doubling the number of pixels though also doubles
the number of harmonic modes with each then occupy a smaller volume. This
linear decrease in per pixel volume in harmonic space is not captured by
just summing up the modes.
"""
def __init__(self, domain, space=0):
self._domain = self._target = DomainTuple.make(domain)
myassert(isinstance(self._domain[space], PowerSpace))
hspace = list(self._domain)
hspace[space] = hspace[space].harmonic_partner
hspace = makeDomain(hspace)
pd = PowerDistributor(hspace,
power_space=self._domain[space],
space=space)
mode_multiplicity = pd.adjoint(full(pd.target, 1.)).val_rw()
zero_mode = (slice(None),)*self._domain.axes[space][0] + (0,)
mode_multiplicity[zero_mode] = 0
multipl = makeOp(makeField(self._domain, mode_multiplicity))
self._specsum = _SpecialSum(self._domain, space) @ multipl
def apply(self, x):
self._check_input(x)
spec = x.ptw("exp")
# NOTE, see the note in the doc-string on why this is not a proper
# normalization!
# NOTE, this "normalizes" also the zero-mode which is supposed to be
# left untouched by this operator. Since the multiplicity of the
# zero-mode is set to 0, the norm does not contain traces of it.
# However, it wrongly sets the zeroth entry of the result. Luckily,
# in subsequent calls, the zeroth entry is not used in the CF model.
return (self._specsum(spec).reciprocal()*spec).sqrt()
class _SpecialSum(EndomorphicOperator):
def __init__(self, domain, space=0):
self._domain = makeDomain(domain)
self._capability = self.TIMES | self.ADJOINT_TIMES
self._contractor = ContractionOperator(domain, space)
def apply(self, x, mode):
self._check_input(x, mode)
return self._contractor.adjoint(self._contractor(x))
class _Distributor(LinearOperator):
def __init__(self, dofdex, domain, target):
self._dofdex = np.array(dofdex)
self._target = DomainTuple.make(target)
self._domain = DomainTuple.make(domain)
self._capability = self.TIMES | self.ADJOINT_TIMES
def apply(self, x, mode):
self._check_input(x, mode)
x = x.val
if mode == self.TIMES:
res = x[self._dofdex]
else:
res = np.zeros(self._tgt(mode).shape, dtype=x.dtype)
np.add.at(res, self._dofdex, x)
return makeField(self._tgt(mode), res)
class _AmplitudeMatern(Operator):
def __init__(self, pow_spc, scale, cutoff, loglogslope, totvol):
expander = ContractionOperator(pow_spc, spaces=None).adjoint
k_squared = makeField(pow_spc, pow_spc.k_lengths**2)
scale = expander @ scale.log()
cutoff = VdotOperator(k_squared).adjoint @ cutoff.power(-2.)
spectral_idx = expander.scale(0.25) @ loglogslope
ker = Adder(full(pow_spc, 1.)) @ cutoff
ker = spectral_idx * ker.log() + scale
op = ker.exp()
# Account for the volume of the position space (dvol in harmonic space
# is 1/volume-of-position-space) in the definition of the amplitude as
# to make the parametric model agnostic to changes in the volume of the
# position space
vol0, vol1 = [np.zeros(pow_spc.shape) for _ in range(2)]
# The zero-mode scales linearly with the volume in position space
vol0[0] = totvol
# Variances decrease linearly with the volume in position space after a
# harmonic transformation (var{HT(randn)} \propto 1/\sqrt{totvol} for
# randn standard normally distributed variables). This needs to be
# accounted for in the amplitude model.
vol1[1:] = totvol**0.5
vol0 = Adder(makeField(pow_spc, vol0))
vol1 = DiagonalOperator(makeField(pow_spc, vol1))
op = vol0 @ vol1 @ op
# std = sqrt of integral of power spectrum
self._fluc = op.power(2).integrate().sqrt()
self.apply = op.apply
self._domain, self._target = op.domain, op.target
self._repr_str = "_AmplitudeMatern: " + op.__repr__()
@property
def fluctuation_amplitude(self):
return self._fluc
def __repr__(self):
return self._repr_str
class _Amplitude(Operator):
def __init__(self, target, fluctuations, flexibility, asperity,
loglogavgslope, totvol, key, dofdex):
"""
fluctuations > 0
flexibility > 0 or None
asperity > 0 or None
loglogavgslope probably negative
"""
myassert(isinstance(fluctuations, Operator))
myassert(isinstance(flexibility, Operator) or flexibility is None)
myassert(isinstance(asperity, Operator) or asperity is None)
myassert(isinstance(loglogavgslope, Operator))
if len(dofdex) > 0:
N_copies = max(dofdex) + 1
space = 1
target = makeDomain((UnstructuredDomain(N_copies), target))
if N_copies != len(dofdex):
distributed_tgt = makeDomain((UnstructuredDomain(len(dofdex)),
target[1]))
Distributor = _Distributor(dofdex, target, distributed_tgt)
else:
distributed_tgt = target
else:
N_copies = 0
space = 0
distributed_tgt = target = makeDomain(target)
myassert(isinstance(target[space], PowerSpace))
twolog = _TwoLogIntegrations(target, space)
dom = twolog.domain
shp = dom[space].shape
expander = ContractionOperator(dom, spaces=space).adjoint
ps_expander = ContractionOperator(twolog.target, spaces=space).adjoint
# Prepare constant fields
vflex = np.zeros(shp)
vflex[0] = vflex[1] = np.sqrt(_log_vol(target[space]))
vflex = DiagonalOperator(makeField(dom[space], vflex), dom, space)
vasp = np.zeros(shp, dtype=np.float64)
vasp[0] += 1
vasp = DiagonalOperator(makeField(dom[space], vasp), dom, space)
shift = np.ones(shp)
shift[0] = _log_vol(target[space])**2 / 12.
shift = DiagonalOperator(makeField(dom[space], shift), dom, space)
shift = shift(full(shift.domain, 1))
vslope = DiagonalOperator(
makeField(target[space], _relative_log_k_lengths(target[space])),
target, space)
vol0, vol1 = [np.zeros(target[space].shape) for _ in range(2)]
# In the harmonic transform convention used here, the zero-mode scales
# linearly with the volume while all other modes scale with the square
# root of the volume. However, as the `_Normalization` operator
# introduces an additional factor of `1 / \sqrt{totvol}` for all modes
# but the zero-mode, the modes here all apparently have the same
# scaling factor. See the respective note in `_Normalization` for
# details.
vol1[1:] = vol0[0] = totvol
vol0 = DiagonalOperator(makeField(target[space], vol0), target, space)
vol0 = vol0(full(vol0.domain, 1))
vol1 = DiagonalOperator(makeField(target[space], vol1), target, space)
# End prepare constant fields
slope = vslope @ ps_expander @ loglogavgslope
sig_flex = vflex @ expander @ flexibility if flexibility is not None else None
sig_asp = vasp @ expander @ asperity if asperity is not None else None
sig_fluc = vol1 @ ps_expander @ fluctuations
if sig_asp is None and sig_flex is None:
op = _Normalization(target, space) @ slope
elif sig_asp is None:
xi = ducktape(dom, None, key)
sigma = DiagonalOperator(shift.ptw("sqrt"), dom) @ sig_flex
smooth = _SlopeRemover(target, space) @ twolog @ (sigma * xi)
op = _Normalization(target, space) @ (slope + smooth)
elif sig_flex is None:
raise ValueError("flexibility may not be disabled on its own")
else:
xi = ducktape(dom, None, key)
sigma = sig_flex * (Adder(shift) @ sig_asp).ptw("sqrt")
smooth = _SlopeRemover(target, space) @ twolog @ (sigma * xi)
op = _Normalization(target, space) @ (slope + smooth)
if N_copies != len(dofdex):
op = Distributor @ op
sig_fluc = Distributor @ sig_fluc
op = Adder(Distributor(vol0)) @ (sig_fluc * op)
self._fluc = (_Distributor(dofdex, fluctuations.target,
distributed_tgt[0]) @ fluctuations)
else:
op = Adder(vol0) @ (sig_fluc * op)
self._fluc = fluctuations
self.apply = op.apply
self._domain, self._target = op.domain, op.target
self._space = space
self._repr_str = "_Amplitude: " + op.__repr__()
@property
def fluctuation_amplitude(self):
return self._fluc
def __repr__(self):
return self._repr_str
def _check_dofdex(dofdex, total_N):
if not (list(dofdex) == list(range(total_N)) or list(dofdex) == total_N*[0]):
warn("In the upcoming release only dofdex==range(total_N) or dofdex==total_N*[0] "
f"will be supported. You can use dofdex={dofdex}.\n"
"Please report at `c@philipp-arras.de` if you use this "
"feature and would like to see it continued.", DeprecationWarning)