Skip to content

Commit

Permalink
Merge pull request #78 from mhvk/remap-time
Browse files Browse the repository at this point in the history
ENH: add a function to remap a dynamic spectrum in time
  • Loading branch information
mhvk authored Sep 1, 2024
2 parents 2545efb + 3df29af commit 32c9556
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 1 deletion.
3 changes: 2 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ equation appearing in these tutorials.
Reference/API
=============

.. automodapi:: screens.screen
.. automodapi:: screens.fields
.. automodapi:: screens.dynspec
.. automodapi:: screens.conjspec
.. automodapi:: screens.screen
.. automodapi:: screens.remap
.. automodapi:: screens.visualization
102 changes: 102 additions & 0 deletions screens/remap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Licensed under the GPLv3 - see LICENSE
import numpy as np
import astropy.units as u


def lincover(a, n):
"""Cover the range spanned by a in n points.
Create an array that exactly covers the range spanned by ``a``, i.e.,
``a.min()`` is at the lower border of the first pixel and ``a.max()``
is at the upper border of the last pixel.
a : array
Holding the values the range of which should be convered.
n : int
Number of points to use.
Returns
-------
out : array
Linearly increasing values.
"""
start, stop = a.min(), a.max()
step = (stop - start) / n
return np.linspace(start + step/2, stop - step/2, n)


def remap_time(ds, t_map, new_t):
"""Remap DS(t, f) to new(t_map[t], f).
Parameters
----------
ds : array
Dynamic spectrum that is to be remapped in time.
t_map : array
Holding the new times each old time (or each pixel) should map to.
new_t : array or int
Time array for the output. The frequency array is assumed to be
unchanged. Should be monotonously increasing. Input that covers
the range in ``tmap`` can be created with ``lincover(t_map, n)``.
Returns
-------
out, weight : array
Summed fractional values and fractions
See Also
--------
lincover : to create a linspace that exactly covers the range of an array
"""
# For the whole map, find the pixel just before where it should go.
ipix = np.clip(np.searchsorted(new_t, t_map), 1, len(new_t) - 1) - 1
# Calculate the fractional pixel target positions.
pix = u.Quantity((t_map - new_t[ipix]) / (new_t[ipix+1] - new_t[ipix]),
u.one, copy=False).value + ipix
# Use these to calculate where the pixel boundaries would land.
dpix = np.diff(pix, axis=0)
bounds_l = pix - 0.5 * np.concatenate([dpix[:1], dpix])
bounds_u = pix + 0.5 * np.concatenate([dpix, dpix[-1:]])
# Ensure the lower bound is always below the upper bound.
bounds_l, bounds_u = np.minimum(bounds_l, bounds_u), np.maximum(bounds_l, bounds_u)
# Create output and weight arrays.
out = np.zeros((len(new_t),)+ds.shape[1:])
weight = np.zeros((len(new_t),)+ds.shape[1:])
# As well as a fake frequency pixel axis (needed for the case that t_map
# depends on the frequency axis as well).
if t_map.ndim == 2:
f = np.broadcast_to(np.arange(ds.shape[1]), ds.shape)
else:
f = None

# Find the range in pixels each input pixel will fall into.
pix_start = np.maximum(np.round(bounds_l).astype(int), 0)
pix_stop = np.minimum(np.round(bounds_u).astype(int) + 1, len(out))
# Loop over the series of pixels inputs should go in to.
for ipix in range((pix_stop - pix_start).max()):
# Nominal output pixel index for each input pixel (for some this
# will be beyond the actual needed range, but those will get weight 0).
indx = pix_start + ipix
# Calculate fraction of the output pixel covered by the input pixel.
# Example: bounds_l, u = 0.9, 1.7, indx = 0: 0.5 - (-0.5) = 0.
# 0.9, 1.7, indx = 1: 0.5 - (-0.1) = 0.6
# 0.9, 1.7, indx = 2: -0.3 - (-0.5) = 0.2
# 0.9, 1.7, indx = 3: -0.5 - (-0.5) = 0.
w = np.clip(bounds_u-indx, -0.5, 0.5) - np.clip(bounds_l-indx, -0.5, 0.5)
# Only care about pixels with a fraction > 0 and which fall inside output.
ok = (w > 0.) & (indx < len(out))
# If locations vary with frequency, we need to pass in arrays for both
# time and frequency.
wok = w[ok]
if ok.ndim == 2:
indices = indx[ok], f[ok]
else:
indices = indx[ok]
if ds.ndim == 2:
wok = wok[:, np.newaxis]
# Add fraction of each input pixel to the output and track fractions added.
np.add.at(out, indices, wok * ds[ok])
np.add.at(weight, indices, wok)

return out, weight
68 changes: 68 additions & 0 deletions screens/tests/test_remap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Licensed under the GPLv3 - see LICENSE
import numpy as np
from numpy.testing import assert_allclose
from astropy import units as u

from screens.remap import remap_time, lincover
from screens import DynamicSpectrum


def test_lincover():
a = np.array([1.1, 0., 1.05, -0.1])
out = lincover(a, 10)
expected = np.linspace(-0.1+1.2/20, 1.1-1.2/20, 10)
assert_allclose(out, expected)


class TestRemapTime:
pb = 0.1022515592973 * u.day # Double pulsar

@classmethod
def position(cls, t, *, a=0.5, delta=0., p=None):
"""Model position along screen."""
if p is None:
p = cls.pb
phase = (t-t.mean())/p
return np.cos(phase*u.cy) + a*phase

@staticmethod
def scint(pos, scale=3.):
"""Super simple interference pattern."""
return (np.cos(pos*scale*u.cy)**2).value

@classmethod
def setup_class(self):
dt = 10*u.s
n = int(round(((2 * self.pb) / dt).to_value(u.one)))
nf = 51
nover = 5
ta = np.linspace(0*self.pb, 2*self.pb, n*nover,
endpoint=False).reshape(-1, nover)
# Cyclical position + slope
pos = self.position(ta)
f = np.linspace(1, 1.3, nf, endpoint=False) << u.GHz
self.scale = 3 * f.to_value(u.GHz)
ds = self.scint(pos[..., np.newaxis], scale=self.scale)
self.ds = DynamicSpectrum(ds.mean(1), t=ta.mean(1).to(u.min), f=f)
self.pos = pos.mean(1)
self.new_pos = (np.arange(0, 100) + 0.5) / 100.

def test_single_frequency(self):
remapped, weight = remap_time(self.ds.dynspec[:, 0], self.pos, self.new_pos)
out = remapped / weight
expected = self.scint(self.new_pos, self.scale[0])
assert_allclose(out, expected, atol=0.015)

def test_all_frequencies(self):
remapped, weight = remap_time(self.ds.dynspec, self.pos, self.new_pos)
out = remapped / weight
expected = self.scint(self.new_pos[:, np.newaxis], self.scale)
assert_allclose(out, expected, atol=0.015)

def test_remap_nut(self):
map_pos = self.pos[:, np.newaxis] * (self.ds.f / self.ds.f[0])
remapped, weight = remap_time(self.ds.dynspec, map_pos, self.new_pos)
out = remapped / weight
expected = self.scint(self.new_pos, self.scale[0])
expected = np.broadcast_to(expected[:, np.newaxis], out.shape)
assert_allclose(out, expected, atol=0.015)

0 comments on commit 32c9556

Please sign in to comment.