# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2019 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik.
import numpy as np
from scipy.sparse import coo_matrix
from scipy.sparse.linalg import aslinearoperator
from scipy.special import erfc
from ..domain_tuple import DomainTuple
from ..domains.rg_space import RGSpace
from ..domains.unstructured_domain import UnstructuredDomain
from ..field import Field
from ..operators.linear_operator import LinearOperator
def _gaussian_sf(x):
return 0.5*erfc(x/np.sqrt(2.))
def _comp_traverse(start, end, shp, dist, lo, mid, hi, sig, erf):
ndim = start.shape[0]
nlos = start.shape[1]
inc = np.full(len(shp), 1, dtype=np.int64)
for i in range(-2, -len(shp)-1, -1):
inc[i] = inc[i+1]*shp[i+1]
pmax = np.array(shp)
out = [None]*nlos
for i in range(nlos):
direction = end[:, i]-start[:, i]
dirx = np.where(direction == 0., 1e-12, direction)
d0 = np.where(direction == 0., ((start[:, i] > 0)-0.5)*1e12,
-start[:, i]/dirx)
d1 = np.where(direction == 0., ((start[:, i] < pmax)-0.5)*-1e12,
(pmax-start[:, i])/dirx)
(dmin, dmax) = (np.minimum(d0, d1), np.maximum(d0, d1))
dmin = dmin.max()
dmax = dmax.min()
dmin = np.maximum(0., dmin)
dmax = np.minimum(1., dmax)
dmax = np.maximum(dmin, dmax)
# hack: move away from potential grid crossings
dmin += 1e-7
dmax -= 1e-7
if dmin >= dmax: # no intersection
out[i] = (np.full(0, 0, dtype=np.int64), np.full(0, 0.))
continue
# determine coordinates of first cell crossing
c_first = np.ceil(start[:, i]+direction*dmin)
c_first = np.where(direction > 0., c_first, c_first-1.)
c_first = (c_first-start[:, i])/dirx
pos1 = np.asarray((start[:, i]+dmin*direction), dtype=np.int64)
pos1 = np.sum(pos1*inc)
cdist = np.empty(0, dtype=np.float64)
add = np.empty(0, dtype=np.int64)
for j in range(ndim):
if direction[j] != 0:
step = inc[j] if direction[j] > 0 else -inc[j]
tmp = np.arange(start=c_first[j], stop=dmax,
step=abs(1./direction[j]))
cdist = np.append(cdist, tmp)
add = np.append(add, np.full(len(tmp), step, dtype=np.int64))
idx = np.argsort(cdist)
cdist = cdist[idx]
add = add[idx]
cdist = np.append(np.full(1, dmin), cdist)
cdist = np.append(cdist, np.full(1, dmax))
corfac = np.linalg.norm(direction*dist)
cdist *= corfac
wgt = np.diff(cdist)
mdist = 0.5*(cdist[:-1]+cdist[1:])
wgt = apply_erf(wgt, mdist, lo[i], mid[i], hi[i], sig[i], erf)
add = np.append(pos1, add)
add = np.cumsum(add)
out[i] = (add, wgt)
return out
[docs]
def apply_erf(wgt, dist, lo, mid, hi, sig, erf):
wgt = wgt.copy()
mask = dist > hi
wgt[mask] = 0.
mask = (dist > lo) & (dist <= hi)
wgt[mask] *= erf((-1/dist[mask]+1/mid)/sig)
return wgt
[docs]
class LOSResponse(LinearOperator):
"""Line-of-sight response operator
This operator transforms from a single RGSpace to an UnstructuredDomain
with as many entries as there were lines of sight passed to the
constructor. Adjoint application is also provided.
Parameters
----------
domain : RGSpace or DomainTuple
The operator's input domain. This must be a single RGSpace.
starts, ends : numpy.ndarray(float) with two dimensions
Arrays containing the start and end points of the individual lines
of sight. The first dimension must have as many entries as `domain`
has dimensions. The second dimensions must be identical for both arrays
and indicated the total number of lines of sight.
sigmas: numpy.ndarray(float) (optional)
If this is not None, the inverse of the lengths of the LOSs are assumed
to be Gaussian distributed with these sigmas. The start point will
remain the same, but the endpoint is assumed to be unknown.
This is a typical statistical model for astrophysical parallaxes.
The LOS response then returns the expected integral
over the input given that the length of the LOS is unknown and
therefore the result is averaged over different endpoints.
Default: None.
truncation: float (optional)
Use only if the sigmas keyword argument is used!
This truncates the probability of the endpoint lying more sigmas away
than the truncation. Used to speed up computation and to avoid negative
distances. It should hold that `1./(1./length-sigma*truncation)>0`
for all lengths of the LOSs and all corresponding sigma of sigmas.
If unsure, leave blank.
Default: 3.
Notes
-----
`starts, `ends`, `sigmas`, and `truncation` have to be identical on
every calling MPI task (i.e. the full LOS information has to be provided on
every task).
"""
[docs]
def __init__(self, domain, starts, ends, sigmas=None, truncation=3.):
self._domain = DomainTuple.make(domain)
self._capability = self.TIMES | self.ADJOINT_TIMES
if ((not isinstance(self.domain[0], RGSpace)) or
(len(self._domain) != 1)):
raise TypeError("The domain must be exactly one RGSpace instance.")
ndim = len(self.domain[0].shape)
starts = np.array(starts)
nlos = starts.shape[1]
ends = np.array(ends)
if sigmas is None:
sigmas = np.zeros(nlos, dtype=np.float32)
sigmas = np.array(sigmas)
if starts.shape[0] != ndim:
raise TypeError("dimension mismatch")
if nlos != sigmas.shape[0]:
raise TypeError("dimension mismatch")
if starts.shape != ends.shape:
raise TypeError("dimension mismatch")
diffs = ends-starts
difflen = np.linalg.norm(diffs, axis=0)
diffs /= difflen
real_distances = 1./(1./difflen - truncation*sigmas)
if np.any(real_distances < 0):
raise ValueError("parallax error truncation to high: "
"getting negative distances")
real_ends = starts + diffs*real_distances
dist = np.array(self.domain[0].distances).reshape((-1, 1))
pixel_starts = starts/dist + 0.5
pixel_ends = real_ends/dist + 0.5
w_i = _comp_traverse(pixel_starts,
pixel_ends,
self.domain[0].shape,
np.array(self.domain[0].distances),
1./(1./difflen+truncation*sigmas),
difflen,
1./(1./difflen-truncation*sigmas),
sigmas,
_gaussian_sf)
boxsz = 16
nlos = len(w_i)
npix = np.prod(self.domain[0].shape)
ntot = 0
for i in w_i:
ntot += len(i[1])
pri = np.empty(ntot, dtype=np.float64)
ilos = np.empty(ntot, dtype=np.int32)
iarr = np.empty(ntot, dtype=np.int32)
xwgt = np.empty(ntot, dtype=np.float32)
ofs = 0
cnt = 0
for i in w_i:
nval = len(i[1])
ilos[ofs:ofs+nval] = cnt
iarr[ofs:ofs+nval] = i[0]
xwgt[ofs:ofs+nval] = i[1]
fullidx = np.unravel_index(i[0], self.domain[0].shape)
tmp = np.zeros(nval, dtype=np.float64)
fct = 1.
for j in range(ndim):
tmp += (fullidx[j]//boxsz)*fct
fct *= self.domain[0].shape[j]
tmp += cnt/float(nlos)
tmp += iarr[ofs:ofs+nval]/(float(nlos)*float(npix))
pri[ofs:ofs+nval] = tmp
ofs += nval
cnt += 1
xtmp = np.argsort(pri)
ilos = ilos[xtmp]
iarr = iarr[xtmp]
xwgt = xwgt[xtmp]
self._smat = aslinearoperator(
coo_matrix((xwgt, (ilos, iarr)),
shape=(nlos, np.prod(self.domain[0].shape))))
self._target = DomainTuple.make(UnstructuredDomain(nlos))
[docs]
def apply(self, x, mode):
self._check_input(x, mode)
if mode == self.TIMES:
result_arr = self._smat.matvec(x.val.reshape(-1))
return Field(self._target, result_arr)
input_data = x.val.reshape(-1)
res = self._smat.rmatvec(input_data).reshape(self.domain[0].shape)
return Field(self._domain, res)