Skip to content

Commit

Permalink
Merge pull request #62 from iguinn/main
Browse files Browse the repository at this point in the history
Updates to timepoint thresh
  • Loading branch information
iguinn authored Mar 21, 2024
2 parents 1144522 + 758c7c9 commit ada9ff1
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 9 deletions.
7 changes: 6 additions & 1 deletion src/dspeed/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@
from .soft_pileup_corr import soft_pileup_corr, soft_pileup_corr_bl
from .svm import svm_predict
from .time_over_threshold import time_over_threshold
from .time_point_thresh import interpolated_time_point_thresh, time_point_thresh
from .time_point_thresh import (
interpolated_time_point_thresh,
multi_time_point_thresh,
time_point_thresh,
)
from .transfer_function_convolver import transfer_function_convolver
from .trap_filters import asym_trap_filter, trap_filter, trap_norm, trap_pickoff
from .upsampler import interpolating_upsampler, upsampler
Expand Down Expand Up @@ -143,6 +147,7 @@
"svm_predict",
"time_point_thresh",
"interpolated_time_point_thresh",
"multi_time_point_thresh",
"asym_trap_filter",
"trap_filter",
"trap_norm",
Expand Down
202 changes: 194 additions & 8 deletions src/dspeed/processors/time_point_thresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def time_point_thresh(
t_out[0] = i
return
else:
for i in range(int(t_start), 1, -1):
for i in range(int(t_start), 0, -1):
if w_in[i - 1] < a_threshold <= w_in[i]:
t_out[0] = i
return
Expand Down Expand Up @@ -115,13 +115,20 @@ def interpolated_time_point_thresh(
* ``i`` -- integer `t_in`; equivalent to
:func:`~.dsp.processors.fixed_sample_pickoff`
* ``f`` -- floor; interpolated values are at previous neighbor, so
threshold crossing is at next neighbor
* ``c`` -- ceiling, interpolated values are at next neighbor, so
threshold crossing is at previous neighbor
* ``b`` -- before; closest integer sample before threshold crossing
* ``a`` -- after; closest integer sample after threshold crossing
* ``r`` -- round; round to nearest integer sample to threshold crossing
* ``l`` -- linear interpolation
The following modes are meant to mirror the options
dspeed.upsampler
* ``f`` -- floor; interpolated values are at previous neighbor.
Equivalent to ``a``
* ``c`` -- ceiling, interpolated values are at next neighbor.
Equivalent to ``b``
* ``n`` -- nearest-neighbor interpolation; threshold crossing is
half-way between samples
* ``l`` -- linear interpolation
* ``h`` -- Hermite cubic spline interpolation (*not implemented*)
* ``s`` -- natural cubic spline interpolation (*not implemented*)
t_out
Expand Down Expand Up @@ -169,10 +176,15 @@ def interpolated_time_point_thresh(

if mode_in == ord("i"): # return index before crossing
t_out[0] = i_cross
elif mode_in == ord("f"): # return index after crossing
elif mode_in in (ord("a"), ord("f")): # return index after crossing
t_out[0] = i_cross + 1
elif mode_in == ord("c"): # return index before crossing
elif mode_in in (ord("b"), ord("c")): # return index before crossing
t_out[0] = i_cross
elif mode_in == ord("r"): # return closest index to crossing
if abs(a_threshold - w_in[i_cross]) < abs(a_threshold - w_in[i_cross + 1]):
t_out[0] = i_cross
else:
t_out[0] = i_cross + 1
elif mode_in == ord("n"): # nearest-neighbor; return half-way between samps
t_out[0] = i_cross + 0.5
elif mode_in == ord("l"): # linear
Expand All @@ -181,3 +193,177 @@ def interpolated_time_point_thresh(
)
else:
raise DSPFatal("Unrecognized interpolation mode")


@guvectorize(
[
"void(float32[:], float32[:], float32, float32, char, float32[:])",
"void(float64[:], float64[:], float64, float64, char, float64[:])",
],
"(n),(m),(),(),()->(m)",
**nb_kwargs,
)
def multi_time_point_thresh(
w_in: np.ndarray,
a_threshold: np.ndarray,
t_start: int,
polarity: int,
mode_in: np.int8,
t_out: np.ndarray,
) -> None:
"""Find the time where the waveform value crosses the threshold
Search performed walking either forward or backward from the starting
index. Use interpolation to estimate a time between samples. Interpolation
mode selected with `mode_in`.
Parameters
----------
w_in
the input waveform.
a_threshold
list of threshold values.
t_start
the starting index.
polarity
is the average slope of the waveform up (>0) or down (<0) in the
search region; only sign matters, not value. Raise Exception if 0.
mode_in
Character selecting which interpolation method to use. Note this
must be passed as a ``int8``, e.g. ``ord('i')``. Options:
* ``i`` -- integer `t_in`; equivalent to
:func:`~.dsp.processors.fixed_sample_pickoff`
* ``b`` -- before; closest integer sample before threshold crossing
* ``a`` -- after; closest integer sample after threshold crossing
* ``r`` -- round; round to nearest integer sample to threshold crossing
* ``l`` -- linear interpolation
The following modes are meant to mirror the options
dspeed.upsampler
* ``f`` -- floor; interpolated values are at previous neighbor.
Equivalent to ``a``
* ``c`` -- ceiling, interpolated values are at next neighbor.
Equivalent to ``b``
* ``n`` -- nearest-neighbor interpolation; threshold crossing is
half-way between samples
* ``h`` -- Hermite cubic spline interpolation (*not implemented*)
* ``s`` -- natural cubic spline interpolation (*not implemented*)
t_out
the index where the waveform value crosses the threshold.
JSON Configuration Example
--------------------------
.. code-block :: json
"tp_0": {
"function": "time_point_thresh",
"module": "dspeed.processors",
"args": ["wf_atrap", "bl_std", "tp_start", 0, "'l'", "tp_0"],
"unit": "ns"
}
"""
t_out[:] = np.nan

if np.isnan(w_in).any() or np.isnan(a_threshold).any() or np.isnan(t_start):
return

if t_start < 0 or t_start >= len(w_in):
return

# make polarity +/- 1
if polarity > 0:
polarity = 1
elif polarity < 0:
polarity = -1
else:
raise DSPFatal("polarity cannot be 0")

sorted_idx = np.argsort(a_threshold)

# Get initial values for search
t_start = int(t_start)
a_start = w_in[t_start]
i_start = len(sorted_idx)
for i in range(len(sorted_idx)):
if a_threshold[sorted_idx[i]] >= a_start:
i_start = i
break

# Search for timepoints at larger values
i_tp = i_start
if i_tp < len(sorted_idx):
idx = sorted_idx[i_tp]
for i_wf in range(t_start, len(w_in) - 1 if polarity > 0 else -1, polarity):
if i_tp >= len(sorted_idx):
break
while w_in[i_wf] <= a_threshold[idx] < w_in[i_wf + polarity]:
if mode_in == ord("i"): # return index closest to start of search
t_out[idx] = i_wf
elif mode_in in (ord("a"), ord("f")): # return index after crossing
t_out[idx] = i_wf if polarity < 0 else i_wf + 1
elif mode_in in (ord("b"), ord("c")): # return index before crossing
t_out[idx] = i_wf if polarity > 0 else i_wf - 1
elif mode_in == ord("r"): # round; return closest index
if (
a_threshold[idx] - w_in[i_wf]
< w_in[i_wf + polarity] - a_threshold[sorted_idx[i_tp]]
):
t_out[idx] = i_wf
else:
t_out[idx] = i_wf + polarity
elif mode_in == ord(
"n"
): # nearest-neighbor; return half-way between samps
t_out[idx] = i_wf + 0.5 * polarity
elif mode_in == ord("l"): # linear
t_out[idx] = i_wf + (a_threshold[idx] - w_in[i_wf]) / (
w_in[i_wf + polarity] - w_in[i_wf]
)
else:
raise DSPFatal("Unrecognized interpolation mode")
i_tp += 1
if i_tp >= len(sorted_idx):
break
idx = sorted_idx[i_tp]

# Search for timepoints at smaller values
i_tp = i_start - 1
if i_tp >= 0:
idx = sorted_idx[i_tp]
for i_wf in range(
t_start - 1, len(w_in) - 1 if polarity < 0 else -1, -polarity
):
if i_tp < 0:
break
while w_in[i_wf] <= a_threshold[idx] < w_in[i_wf + polarity]:
if mode_in == ord("i"): # return index closest to start of search
t_out[idx] = i_wf
elif mode_in in (ord("a"), ord("f")): # return index after crossing
t_out[idx] = i_wf if polarity < 0 else i_wf + 1
elif mode_in in (ord("b"), ord("c")): # return index before crossing
t_out[idx] = i_wf if polarity > 0 else i_wf - 1
elif mode_in == ord("r"): # round; return closest index
if (
a_threshold[idx] - w_in[i_wf]
< w_in[i_wf + polarity] - a_threshold[idx]
):
t_out[idx] = i_wf
else:
t_out[idx] = i_wf + polarity
elif mode_in == ord(
"n"
): # nearest-neighbor; return half-way between samps
t_out[idx] = i_wf + 0.5 * polarity
elif mode_in == ord("l"): # linear
t_out[idx] = i_wf + (a_threshold[idx] - w_in[i_wf]) / (
w_in[i_wf + polarity] - w_in[i_wf]
)
else:
raise DSPFatal("Unrecognized interpolation mode")
i_tp -= 1
if i_tp < 0:
break
idx = sorted_idx[i_tp]

0 comments on commit ada9ff1

Please sign in to comment.