From 8dc8da161c3d29cb1a8334d6a3e5d6d5ce046757 Mon Sep 17 00:00:00 2001 From: Tom Aldcroft Date: Sat, 24 Feb 2024 05:51:32 -0500 Subject: [PATCH 1/2] Initial version of ephem_stk.py --- cheta/derived/ephem_stk.py | 276 +++++++++++++++++++++++++++++++++++++ 1 file changed, 276 insertions(+) create mode 100644 cheta/derived/ephem_stk.py diff --git a/cheta/derived/ephem_stk.py b/cheta/derived/ephem_stk.py new file mode 100644 index 00000000..d8aa5530 --- /dev/null +++ b/cheta/derived/ephem_stk.py @@ -0,0 +1,276 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +""" +STK orbit ephemeris computed MSIDs +""" + +import calendar +import functools +import logging +import re +from pathlib import Path + +import numpy as np +from astropy.table import Column, Table +from cxotime import CxoTime, CxoTimeLike +from kadi import occweb + +from cheta.derived.comps import ComputedMsid + +logger = logging.getLogger("cheta.fetch") + + +EPHEM_STK_RECENT_DIR = "FOT/mission_planning/Backstop/Ephemeris" +EPHEM_STK_ARCHIVE_DIR = "FOT/mission_planning/Backstop/Ephemeris/ArchiveMCC" + +MONTH_NAME_TO_NUM = { + mon: f"{ii + 1:02d}" for ii, mon in enumerate(calendar.month_abbr[1:]) +} + + +def read_stk_file(path, format="stk", **kwargs): + """Read a STK ephemeris file from OCCweb and return an astropy table. + + For format "stk" the table is the same as the file with columns: + + name dtype + ----------- ------- + Time (UTCG) str23 + x (km) float64 + y (km) float64 + z (km) float64 + vx (km/sec) float64 + vy (km/sec) float64 + vz (km/sec) float64 + + For format "cxc" the table has the same columns as ephemeris files in the CXC + archive:: + + name dtype unit + ---- ------- ----- + time float64 + x float64 m + y float64 m + z float64 m + vx float64 m / s + vy float64 m / s + vz float64 m / s + + Parameters + ---------- + path : str + Path on OCCweb of the STK ephemeris file, for example + "FOT/mission_planning/Backstop/Ephemeris/Chandra_23177_24026.stk" + format : str + Format of the file ("stk" or "cxc") + **kwargs : + Additional args passed to get_occweb_page() + + Returns + ------- + astropy.table.Table : + Table of ephemeris data. + """ + logger.info(f"Reading OCCweb STK file {path}") + text = occweb.get_occweb_page(path, **kwargs) + dat = Table.read( + text, format="ascii.fixed_width_two_line", header_start=2, position_line=3 + ) + if format == "stk": + return dat + elif format != "cxc": + raise ValueError(f"Unknown format {format!r} (allowed: 'stk', 'cxc')") + + isos = [] + for date in dat["Time (UTCG)"]: + vals = date.split() + month = MONTH_NAME_TO_NUM[vals[1]] + day = vals[0] + year = vals[2] + time = vals[3] + iso = f"{year}-{month}-{day}T{time}" + isos.append(iso) + out = { + # Round time to nearest millisecond for later float comparison + "time": np.round(CxoTime(isos, format="isot").secs, 3), + "x": Column(dat["x (km)"] * 1000, unit="m"), + "y": Column(dat["y (km)"] * 1000, unit="m"), + "z": Column(dat["z (km)"] * 1000, unit="m"), + "vx": Column(dat["vx (km/sec)"] * 1000, unit="m/s"), + "vy": Column(dat["vy (km/sec)"] * 1000, unit="m/s"), + "zz": Column(dat["vz (km/sec)"] * 1000, unit="m/s"), + } + return Table(out) + + +def _read_stk_file_cxc_cached(path_occweb: str) -> np.ndarray: + """Read a STK ephemeris file from OCCweb and return an astropy table.""" + name = Path(path_occweb).name + cache_file = Path.home() / ".cheta" / "cache" / f"{name}.npz" + if cache_file.exists(): + logger.info(f"Reading cached STK file {path_occweb} from {cache_file}") + out = np.load(cache_file)["arr_0"] + else: + out = read_stk_file(path_occweb, format="cxc").as_array() + logger.info(f"Caching STK file {path_occweb} to {cache_file}") + cache_file.parent.mkdir(parents=True, exist_ok=True) + np.savez(cache_file, out) + return out + + +def get_ephem_stk_occweb_paths( + start: CxoTimeLike, + stop: CxoTimeLike = None, + latest_only=True, +): + """Get STK orbit ephemeris file paths from OCCweb. + + This finds the STK ephemeris files on OCCweb that overlap the interval from start to + stop. + + Returns a list of dict like below for each STK file:: + + { + 'path': 'FOT/mission_planning/Backstop/Ephemeris/Chandra_23107_23321.stk', + 'start': , 'stop': , + } + + Parameters + ---------- + start : CxoTimeLike + Start time of interval + stop : CxoTimeLike + Stop time of interval (default=NOW) + latest_only : bool + If True (default), return only the latest files that overlaps the interval. If + False, return all files that overlap the interval. + + Returns + ------- + files_stk : list of dict + List of dict with keys path, start, stop + """ + start = CxoTime(start) + stop = CxoTime(stop) + + files_stk = [] + + for dir_path in [EPHEM_STK_RECENT_DIR, EPHEM_STK_ARCHIVE_DIR]: + logger.info(f"Checking OCCweb directory {dir_path}") + files = occweb.get_occweb_dir(dir_path) + for name in files["Name"]: + # Match file names like: Chandra_01190_01250.stk. Parse this as a regex + # with 01 as year1, 190 as doy1, 01 as year2, 250 as doy2. + if match := re.match(r"Chandra_(\d{2})(\d{3})_(\d{2})(\d{3}).stk$", name): + yr1, doy1, yr2, doy2 = (int(val) for val in match.groups()) + yr1 += 1900 if yr1 >= 98 else 2000 + yr2 += 1900 if yr2 >= 98 else 2000 + + start_stk = CxoTime(f"{yr1}:{doy1}:00:00:00.000") + stop_stk = CxoTime(f"{yr2}:{doy2}:23:59:59.999") + + # Intervals overlap? + if not (start_stk > stop or stop_stk < start): + file_stk = { + "path": f"{dir_path}/{name}", + "start": start_stk, + "stop": stop_stk, + } + files_stk.append(file_stk) + + # If latest_only then we can skip EPHEM_STK_ARCHIVE_DIR if we found any files + # that start before the start time. + if latest_only and any(file_stk["start"] <= start for file_stk in files_stk): + break + + files_stk = sorted(files_stk, key=lambda x: x["start"].date) + + if latest_only: + for ii, file_stk in enumerate(reversed(files_stk)): + if file_stk["start"] <= start: + files_stk = files_stk[-ii - 1 :] + break + + return files_stk + + +# Cache the last call for the common case of fetching all 6 components of the ephemeris +# over the same time range. +@functools.lru_cache(maxsize=1) +def get_ephemeris_stk(start: CxoTimeLike, stop: CxoTimeLike) -> np.ndarray: + """Get ephemeris data from OCCweb or local cache STK files. + + Returns a structured array with columns time, x, y, z, vx, vy, vz. The time is in + CXC seconds since 1998.0, and the position and velocity are in meters and + meters/second, respectively. + + The last result of this function is cached in memory. This is for the common use + case of fetching (via fetch.Msid) multiple components of the ephemeris over the same + time range. + + Parameters + ---------- + start : CxoTimeLike + Start time of interval + stop : CxoTimeLike + Stop time of interval + + Returns + ------- + dat : np.ndarray + Structured array with columns time, x, y, z, vx, vy, vz + """ + start = CxoTime(start) + stop = CxoTime(stop) + + stk_paths = get_ephem_stk_occweb_paths(start, stop) + dats = [_read_stk_file_cxc_cached(stk_path["path"]) for stk_path in stk_paths] + + # The ephemeris all overlap with the next one, so we can clip them to the interval. + dats_clip = [] + for dat0, dat1 in zip(dats[:-1], dats[1:]): + # This is equivalent to dat0[dat0["Time"] < dat1["Time"][0]] but faster. + # See: https://occweb.cfa.harvard.edu/twiki/Aspect/SkaPython + idx1 = np.searchsorted(dat0["time"], dat1["time"][0], side="left") + dats_clip.append(dat0[:idx1]) + dats_clip.append(dats[-1]) + + dat = np.concatenate(dats_clip) + + idx0 = np.searchsorted(dat["time"], start.secs, side="left") + idx1 = np.searchsorted(dat["time"], stop.secs, side="left") + + return dat[idx0:idx1] + + +class Comp_EphemSTK(ComputedMsid): + """Computed MSID for returning the STK orbit ephemeris. + + This defines the following computed MSIDs: + + - ``orbitephem_stk_x``: STK orbit ephemeris X position (m) + - ``orbitephem_stk_y``: STK orbit ephemeris Y position (m) + - ``orbitephem_stk_z``: STK orbit ephemeris Z position (m) + - ``orbitephem_stk_vx``: STK orbit ephemeris X velocity (m/s) + - ``orbitephem_stk_vy``: STK orbit ephemeris Y velocity (m/s) + - ``orbitephem_stk_vz``: STK orbit ephemeris Z velocity (m/s) + + Example:: + + >>> from cheta import fetch + >>> orbit_x = fetch.Msid('orbitephem_stk_x', '2022:001', '2023:001') + >>> orbit_x.plot() + # Makes a plot + + """ + + msid_match = r"orbitephem_stk_(x|y|z|vx|vy|vz)" + + def get_msid_attrs(self, tstart: float, tstop: float, msid: str, msid_args: tuple): + # Component of the STK ephemeris to fetch (x, y, z, vx, vy, vz) + comp = msid_args[0] + dat = get_ephemeris_stk(tstart, tstop) + bads = np.zeros(dat[comp].shape, dtype=bool) + out = {"vals": dat[comp], "bads": bads, "times": dat["time"], "unit": None} + + return out From 2aae726e3a3249c59df3e58b38905886eab00a8f Mon Sep 17 00:00:00 2001 From: Tom Aldcroft Date: Sat, 24 Feb 2024 06:23:01 -0500 Subject: [PATCH 2/2] Refactor comps into a separate subpackage --- cheta/comps/__init__.py | 1 + cheta/{derived => comps}/comps.py | 349 +------------------------- cheta/comps/computed_msid.py | 332 ++++++++++++++++++++++++ cheta/{derived => comps}/ephem_stk.py | 2 +- cheta/fetch.py | 2 +- cheta/tests/test_comps.py | 2 +- setup.py | 2 +- 7 files changed, 350 insertions(+), 340 deletions(-) create mode 100644 cheta/comps/__init__.py rename cheta/{derived => comps}/comps.py (57%) create mode 100644 cheta/comps/computed_msid.py rename cheta/{derived => comps}/ephem_stk.py (99%) diff --git a/cheta/comps/__init__.py b/cheta/comps/__init__.py new file mode 100644 index 00000000..3ccda61f --- /dev/null +++ b/cheta/comps/__init__.py @@ -0,0 +1 @@ +from .comps import * # noqa diff --git a/cheta/derived/comps.py b/cheta/comps/comps.py similarity index 57% rename from cheta/derived/comps.py rename to cheta/comps/comps.py index a1e8579e..aec3d7b8 100644 --- a/cheta/derived/comps.py +++ b/cheta/comps/comps.py @@ -2,18 +2,18 @@ """ Support computed MSIDs in the cheta archive. -- :class:`~cheta.derived.comps.ComputedMsid`: base class for user-generated comps. -- :class:`~cheta.derived.comps.Comp_MUPS_Valve_Temp_Clean`: +- :class:`~cheta.comps.ComputedMsid`: base class for user-generated comps. +- :class:`~cheta.comps.Comp_MUPS_Valve_Temp_Clean`: Cleaned MUPS valve temperatures MSIDs ``(pm2thv1t|pm1thv2t)_clean``. -- :class:`~cheta.derived.comps.Comp_KadiCommandState`: +- :class:`~cheta.comps.Comp_KadiCommandState`: Commanded states ``cmd_state__
`` for any kadi commanded state value. -- :class:`~cheta.derived.comps.Comp_Quat`: +- :class:`~cheta.comps.Comp_Quat`: Quaternions - ``quat_aoattqt`` = ``AOATTQT[1-4]`` - ``quat_aoatupq`` = ``AOATUPQ[1-3]`` - ``quat_aocmdqt`` = ``AOCMDQT[1-3]`` - ``quat_aotarqt`` = ``AOTARQT[1-3]`` -- :class:`~cheta.derived.comps.Comp_Pitch_Roll_OBC_Safe`: +- :class:`~cheta.comps.Comp_Pitch_Roll_OBC_Safe`: Sun Pitch ``pitch_comp`` and off-nominal roll ``roll_comp`` which are valid in NPNT, NMAN, NSUN and safe mode. @@ -21,347 +21,24 @@ """ # noqa import functools -import re import astropy.table as tbl import numpy as np from cxotime import CxoTime -from ..units import converters as unit_converter_funcs +from .computed_msid import ComputedMsid +from .ephem_stk import Comp_EphemSTK __all__ = [ "ComputedMsid", - "Comp_MUPS_Valve_Temp_Clean", + "Comp_EphemSTK", "Comp_KadiCommandState", + "Comp_MUPS_Valve_Temp_Clean", "Comp_Pitch_Roll_OBC_Safe", "Comp_Quat", ] -def calc_stats_vals(msid, rows, indexes, interval): - """ - Compute statistics values for ``msid`` over specified intervals. - This is a very slightly modified version of the same function from - update_archive.py. However, cannot directly import that because - it has side effects that break everything, probably related to - enabling caching. - - The mods here are basically to take out handling of state codes - and turn a warning about negative dts into an exception. - - :param msid: Msid object (filter_bad=True) - :param rows: Msid row indices corresponding to stat boundaries - :param indexes: Universal index values for stat (row times // dt) - :param interval: interval name (5min or daily) - - :returns: np.recarray of stats values - """ - import scipy.stats - - quantiles = (1, 5, 16, 50, 84, 95, 99) - n_out = len(rows) - 1 - - # Check if data type is "numeric". Boolean values count as numeric, - # partly for historical reasons, in that they support funcs like - # mean (with implicit conversion to float). - msid_dtype = msid.vals.dtype - msid_is_numeric = issubclass(msid_dtype.type, (np.number, np.bool_)) - - # If MSID data is unicode, then for stats purposes cast back to bytes - # by creating the output array as a like-sized S-type array. - if msid_dtype.kind == "U": - msid_dtype = re.sub(r"U", "S", msid.vals.dtype.str) - - # Predeclare numpy arrays of correct type and sufficient size for accumulating results. - out = {} - out["index"] = np.ndarray((n_out,), dtype=np.int32) - out["n"] = np.ndarray((n_out,), dtype=np.int32) - out["val"] = np.ndarray((n_out,), dtype=msid_dtype) - - if msid_is_numeric: - out["min"] = np.ndarray((n_out,), dtype=msid_dtype) - out["max"] = np.ndarray((n_out,), dtype=msid_dtype) - out["mean"] = np.ndarray((n_out,), dtype=np.float32) - - if interval == "daily": - out["std"] = np.ndarray((n_out,), dtype=msid_dtype) - for quantile in quantiles: - out["p{:02d}".format(quantile)] = np.ndarray((n_out,), dtype=msid_dtype) - - i = 0 - for row0, row1, index in zip(rows[:-1], rows[1:], indexes[:-1]): - vals = msid.vals[row0:row1] - times = msid.times[row0:row1] - - n_vals = len(vals) - if n_vals > 0: - out["index"][i] = index - out["n"][i] = n_vals - out["val"][i] = vals[n_vals // 2] - if msid_is_numeric: - if n_vals <= 2: - dts = np.ones(n_vals, dtype=np.float64) - else: - dts = np.empty(n_vals, dtype=np.float64) - dts[0] = times[1] - times[0] - dts[-1] = times[-1] - times[-2] - dts[1:-1] = ( - (times[1:-1] - times[:-2]) + (times[2:] - times[1:-1]) - ) / 2.0 - negs = dts < 0.0 - if np.any(negs): - times_dts = [ - (CxoTime(t).date, dt) - for t, dt in zip(times[negs], dts[negs]) - ] - raise ValueError( - "WARNING - negative dts in {} at {}".format( - msid.MSID, times_dts - ) - ) - - # Clip to range 0.001 to 300.0. The low bound is just there - # for data with identical time stamps. This shouldn't happen - # but in practice might. The 300.0 represents 5 minutes and - # is the largest normal time interval. Data near large gaps - # will get a weight of 5 mins. - dts.clip(0.001, 300.0, out=dts) - sum_dts = np.sum(dts) - - out["min"][i] = np.min(vals) - out["max"][i] = np.max(vals) - out["mean"][i] = np.sum(dts * vals) / sum_dts - if interval == "daily": - # biased weighted estimator of variance (N should be big enough) - # http://en.wikipedia.org/wiki/Mean_square_weighted_deviation - sigma_sq = np.sum(dts * (vals - out["mean"][i]) ** 2) / sum_dts - out["std"][i] = np.sqrt(sigma_sq) - quant_vals = scipy.stats.mstats.mquantiles( - vals, np.array(quantiles) / 100.0 - ) - for quant_val, quantile in zip(quant_vals, quantiles): - out["p%02d" % quantile][i] = quant_val - - i += 1 - - return np.rec.fromarrays([x[:i] for x in out.values()], names=list(out.keys())) - - -class ComputedMsid: - """Base class for cheta computed MSID. - - Sub-classes must define at least the following: - - * ``msid_match`` class attribute as a regex to match for the MSID. - * ``get_msid_attrs()`` method to perform the computation and return - a dict with the result. - - Optionally: - - * ``units`` attribute to specify unit handling. - - See the fetch tutorial Computed MSIDs section for details. - """ - - # Global dict of registered computed MSIDs - msid_classes = [] - - # Base units specification (None implies no unit handling) - units = None - - def __init__(self, unit_system="eng"): - self.unit_system = unit_system - - def __init_subclass__(cls, **kwargs): - """Validate and register ComputedMSID subclass.""" - super().__init_subclass__(**kwargs) - - if not hasattr(cls, "msid_match"): - raise ValueError(f"comp {cls.__name__} must define msid_match") - - cls.msid_classes.append(cls) - - @classmethod - def get_matching_comp_cls(cls, msid): - """Get computed classes that match ``msid`` - - :param msid: str, input msid - :returns: first ComputedMsid subclass that matches ``msid`` or None - """ - for comp_cls in ComputedMsid.msid_classes: - match = re.match(comp_cls.msid_match + "$", msid, re.IGNORECASE) - if match: - return comp_cls - - return None - - # These four properties are provided as a convenience because the module - # itself cannot import fetch because this is circular. - @property - def fetch_eng(self): - """Fetch in TDB engineering units like DEGF""" - from .. import fetch_eng - - return fetch_eng - - @property - def fetch_sci(self): - """Fetch in scientific units like DEGC""" - from .. import fetch_sci - - return fetch_sci - - @property - def fetch_cxc(self): - """Fetch in CXC (FITS standard) units like K""" - from .. import fetch - - return fetch - - @property - def fetch_sys(self): - """Fetch in the unit system specified for the class""" - fetch = getattr(self, f"fetch_{self.unit_system}") - return fetch - - def __call__(self, tstart, tstop, msid, interval=None): - """Emulate the fetch.MSID() API, but return a dict of MSID attributes. - - The returned dict turned into a proper MSID object by the upstream caller - `fetch.MSID._get_comp_data()`. - - :param tstart: float, start time (CXC seconds) - :param tstop: float, stop time (CXC seconds) - :param msid: str, MSID name - :param interval: str or None, stats interval (None, '5min', 'daily') - - :returns: dict of MSID attributes including 'times', 'vals', 'bads' - """ - # Parse any arguments from the input `msid` - match = re.match(self.msid_match, msid, re.IGNORECASE) - if not match: - raise RuntimeError(f"unexpected mismatch of {msid} with {self.msid_match}") - match_args = [ - arg.lower() if isinstance(arg, str) else arg for arg in match.groups() - ] - - if interval is None: - # Call the actual user-supplied work method to compute the MSID values - msid_attrs = self.get_msid_attrs(tstart, tstop, msid.lower(), match_args) - - for attr in ("vals", "bads", "times", "unit"): - if attr not in msid_attrs: - raise ValueError( - f"computed MSID {self.__class__.__name__} failed " - f"to set required attribute {attr}" - ) - - # Presence of a non-None `units` class attribute means that the MSID has - # units that should be converted to `self.unit_system` if required, where - # unit_system is 'cxc', 'sci', or 'eng'. - if self.units is not None: - msid_attrs = self.convert_units(msid_attrs) - else: - msid_attrs = self.get_stats_attrs( - tstart, tstop, msid.lower(), match_args, interval - ) - - return msid_attrs - - def convert_units(self, msid_attrs): - """ - Convert required elements of ``msid_attrs`` to ``self.unit_system``. - - Unit_system can be one of 'cxc', 'sci', 'eng'. - - :param msid_attrs: dict, input MSID attributes - :param unit_system: str, unit system - - :returns: dict, converted MSID attributes - """ - unit_current = self.units[self.units["internal_system"]] - unit_new = self.units[self.unit_system] - - out = msid_attrs.copy() - out["unit"] = unit_new - - if unit_current != unit_new: - for attr in self.units["convert_attrs"]: - out[attr] = unit_converter_funcs[unit_current, unit_new]( - msid_attrs[attr] - ) - - return out - - def get_msid_attrs(self, tstart, tstop, msid, msid_args): - """Get the attributes required for this MSID. - - Get attributes for computed MSID, which must include at least - ``vals``, ``bads``, ``times``, and may include additional attributes. - - This MUST be supplied by sub-classes. - - :param tstart: start time (CXC secs) - :param tstop: stop time (CXC secs) - :param msid: full MSID name e.g. tephin_plus_5 - :param msid_args: tuple of regex match groups (msid_name,) - :returns: dict of MSID attributes - """ - raise NotImplementedError("sub-class must implement get_msid_attrs()") - - def get_stats_attrs(self, tstart, tstop, msid, match_args, interval): - """Get 5-min or daily stats attributes. - - This is normally not overridden by sub-classes. - - :param tstart: start time (CXC secs) - :param tstop: stop time (CXC secs) - :param msid: full MSID name e.g. tephin_plus_5 - :param msid_args: tuple of regex match groups (msid_name,) - :returns: dict of MSID attributes - """ - from ..fetch import _plural - - # Replicate a stripped-down version of processing in update_archive. - # This produces a recarray with columns that correspond to the raw - # stats HDF5 files. - dt = {"5min": 328, "daily": 86400}[interval] - index0 = int(np.floor(tstart / dt)) - index1 = int(np.ceil(tstop / dt)) - tstart = (index0 - 1) * dt - tstop = (index1 + 1) * dt - - msid_obj = self.fetch_sys.Msid(msid, tstart, tstop) - - indexes = np.arange(index0, index1 + 1) - times = indexes * dt # This is the *start* time of each bin - - if len(times) > 0: - rows = np.searchsorted(msid_obj.times, times) - vals_stats = calc_stats_vals(msid_obj, rows, indexes, interval) - else: - raise ValueError - - # Replicate the name munging that fetch does going from the HDF5 columns - # to what is seen in a stats fetch query. - out = {} - for key in vals_stats.dtype.names: - out_key = _plural(key) if key != "n" else "samples" - out[out_key] = vals_stats[key] - out["times"] = (vals_stats["index"] + 0.5) * dt - out["bads"] = np.zeros(len(vals_stats), dtype=bool) - out["midvals"] = out["vals"] - out["vals"] = out["means"] - out["unit"] = msid_obj.unit - - return out - - -############################################################################ -# Built-in computed MSIDs -############################################################################ - - class Comp_Quat(ComputedMsid): """Computed MSID for returning the quaternion telemetry as a Quat object @@ -475,7 +152,7 @@ def get_msid_attrs(self, tstart, tstop, msid, msid_args): :returns: dict of MSID attributes """ - from .mups_valve import fetch_clean_msid + from ..derived.mups_valve import fetch_clean_msid # Git version of chandra_models to use for MUPS model spec from 2nd match group. # If not supplied it will be None so use default main version. @@ -613,7 +290,7 @@ def get_roll_pitch_tlm_safe(start, stop): def calc_css_pitch_safe(start, stop): - from .pcad import arccos_clip, calc_sun_vec_body_css + from ..derived.pcad import arccos_clip, calc_sun_vec_body_css # Get the raw telemetry value in user-requested unit system dat = get_roll_pitch_tlm_safe(start, stop) @@ -637,7 +314,7 @@ def calc_css_roll_safe(start, stop): based on the solar array angles 6SARES1 and 6SARES2. """ - from .pcad import calc_sun_vec_body_css + from ..derived.pcad import calc_sun_vec_body_css # Get the raw telemetry value in user-requested unit system data = get_roll_pitch_tlm_safe(start, stop) @@ -659,7 +336,7 @@ def calc_pitch_roll_obc(tstart: float, tstop: float, pitch_roll: str): :param tstop: stop time (CXC seconds) :param pitch_roll: 'pitch' or 'roll' """ - from .pcad import DP_PITCH, DP_ROLL + from ..derived.pcad import DP_PITCH, DP_ROLL dp = DP_PITCH() if pitch_roll == "pitch" else DP_ROLL() # Pad by 12 minutes on each side to ensure ephemeris data are available. diff --git a/cheta/comps/computed_msid.py b/cheta/comps/computed_msid.py new file mode 100644 index 00000000..d2fb3801 --- /dev/null +++ b/cheta/comps/computed_msid.py @@ -0,0 +1,332 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +""" +Provide base class ``ComputedMsid`` for computed MSIDs in the cheta archive. +""" + +import re + +import numpy as np +from cxotime import CxoTime + +from ..units import converters as unit_converter_funcs + +__all__ = ["ComputedMsid"] + + +def calc_stats_vals(msid, rows, indexes, interval): + """ + Compute statistics values for ``msid`` over specified intervals. + This is a very slightly modified version of the same function from + update_archive.py. However, cannot directly import that because + it has side effects that break everything, probably related to + enabling caching. + + The mods here are basically to take out handling of state codes + and turn a warning about negative dts into an exception. + + :param msid: Msid object (filter_bad=True) + :param rows: Msid row indices corresponding to stat boundaries + :param indexes: Universal index values for stat (row times // dt) + :param interval: interval name (5min or daily) + + :returns: np.recarray of stats values + """ + import scipy.stats + + quantiles = (1, 5, 16, 50, 84, 95, 99) + n_out = len(rows) - 1 + + # Check if data type is "numeric". Boolean values count as numeric, + # partly for historical reasons, in that they support funcs like + # mean (with implicit conversion to float). + msid_dtype = msid.vals.dtype + msid_is_numeric = issubclass(msid_dtype.type, (np.number, np.bool_)) + + # If MSID data is unicode, then for stats purposes cast back to bytes + # by creating the output array as a like-sized S-type array. + if msid_dtype.kind == "U": + msid_dtype = re.sub(r"U", "S", msid.vals.dtype.str) + + # Predeclare numpy arrays of correct type and sufficient size for accumulating results. + out = {} + out["index"] = np.ndarray((n_out,), dtype=np.int32) + out["n"] = np.ndarray((n_out,), dtype=np.int32) + out["val"] = np.ndarray((n_out,), dtype=msid_dtype) + + if msid_is_numeric: + out["min"] = np.ndarray((n_out,), dtype=msid_dtype) + out["max"] = np.ndarray((n_out,), dtype=msid_dtype) + out["mean"] = np.ndarray((n_out,), dtype=np.float32) + + if interval == "daily": + out["std"] = np.ndarray((n_out,), dtype=msid_dtype) + for quantile in quantiles: + out["p{:02d}".format(quantile)] = np.ndarray((n_out,), dtype=msid_dtype) + + i = 0 + for row0, row1, index in zip(rows[:-1], rows[1:], indexes[:-1]): + vals = msid.vals[row0:row1] + times = msid.times[row0:row1] + + n_vals = len(vals) + if n_vals > 0: + out["index"][i] = index + out["n"][i] = n_vals + out["val"][i] = vals[n_vals // 2] + if msid_is_numeric: + if n_vals <= 2: + dts = np.ones(n_vals, dtype=np.float64) + else: + dts = np.empty(n_vals, dtype=np.float64) + dts[0] = times[1] - times[0] + dts[-1] = times[-1] - times[-2] + dts[1:-1] = ( + (times[1:-1] - times[:-2]) + (times[2:] - times[1:-1]) + ) / 2.0 + negs = dts < 0.0 + if np.any(negs): + times_dts = [ + (CxoTime(t).date, dt) + for t, dt in zip(times[negs], dts[negs]) + ] + raise ValueError( + "WARNING - negative dts in {} at {}".format( + msid.MSID, times_dts + ) + ) + + # Clip to range 0.001 to 300.0. The low bound is just there + # for data with identical time stamps. This shouldn't happen + # but in practice might. The 300.0 represents 5 minutes and + # is the largest normal time interval. Data near large gaps + # will get a weight of 5 mins. + dts.clip(0.001, 300.0, out=dts) + sum_dts = np.sum(dts) + + out["min"][i] = np.min(vals) + out["max"][i] = np.max(vals) + out["mean"][i] = np.sum(dts * vals) / sum_dts + if interval == "daily": + # biased weighted estimator of variance (N should be big enough) + # http://en.wikipedia.org/wiki/Mean_square_weighted_deviation + sigma_sq = np.sum(dts * (vals - out["mean"][i]) ** 2) / sum_dts + out["std"][i] = np.sqrt(sigma_sq) + quant_vals = scipy.stats.mstats.mquantiles( + vals, np.array(quantiles) / 100.0 + ) + for quant_val, quantile in zip(quant_vals, quantiles): + out["p%02d" % quantile][i] = quant_val + + i += 1 + + return np.rec.fromarrays([x[:i] for x in out.values()], names=list(out.keys())) + + +class ComputedMsid: + """Base class for cheta computed MSID. + + Sub-classes must define at least the following: + + * ``msid_match`` class attribute as a regex to match for the MSID. + * ``get_msid_attrs()`` method to perform the computation and return + a dict with the result. + + Optionally: + + * ``units`` attribute to specify unit handling. + + See the fetch tutorial Computed MSIDs section for details. + """ + + # Global dict of registered computed MSIDs + msid_classes = [] + + # Base units specification (None implies no unit handling) + units = None + + def __init__(self, unit_system="eng"): + self.unit_system = unit_system + + def __init_subclass__(cls, **kwargs): + """Validate and register ComputedMSID subclass.""" + super().__init_subclass__(**kwargs) + + if not hasattr(cls, "msid_match"): + raise ValueError(f"comp {cls.__name__} must define msid_match") + + cls.msid_classes.append(cls) + + @classmethod + def get_matching_comp_cls(cls, msid): + """Get computed classes that match ``msid`` + + :param msid: str, input msid + :returns: first ComputedMsid subclass that matches ``msid`` or None + """ + for comp_cls in ComputedMsid.msid_classes: + match = re.match(comp_cls.msid_match + "$", msid, re.IGNORECASE) + if match: + return comp_cls + + return None + + # These four properties are provided as a convenience because the module + # itself cannot import fetch because this is circular. + @property + def fetch_eng(self): + """Fetch in TDB engineering units like DEGF""" + from .. import fetch_eng + + return fetch_eng + + @property + def fetch_sci(self): + """Fetch in scientific units like DEGC""" + from .. import fetch_sci + + return fetch_sci + + @property + def fetch_cxc(self): + """Fetch in CXC (FITS standard) units like K""" + from .. import fetch + + return fetch + + @property + def fetch_sys(self): + """Fetch in the unit system specified for the class""" + fetch = getattr(self, f"fetch_{self.unit_system}") + return fetch + + def __call__(self, tstart, tstop, msid, interval=None): + """Emulate the fetch.MSID() API, but return a dict of MSID attributes. + + The returned dict turned into a proper MSID object by the upstream caller + `fetch.MSID._get_comp_data()`. + + :param tstart: float, start time (CXC seconds) + :param tstop: float, stop time (CXC seconds) + :param msid: str, MSID name + :param interval: str or None, stats interval (None, '5min', 'daily') + + :returns: dict of MSID attributes including 'times', 'vals', 'bads' + """ + # Parse any arguments from the input `msid` + match = re.match(self.msid_match, msid, re.IGNORECASE) + if not match: + raise RuntimeError(f"unexpected mismatch of {msid} with {self.msid_match}") + match_args = [ + arg.lower() if isinstance(arg, str) else arg for arg in match.groups() + ] + + if interval is None: + # Call the actual user-supplied work method to compute the MSID values + msid_attrs = self.get_msid_attrs(tstart, tstop, msid.lower(), match_args) + + for attr in ("vals", "bads", "times", "unit"): + if attr not in msid_attrs: + raise ValueError( + f"computed MSID {self.__class__.__name__} failed " + f"to set required attribute {attr}" + ) + + # Presence of a non-None `units` class attribute means that the MSID has + # units that should be converted to `self.unit_system` if required, where + # unit_system is 'cxc', 'sci', or 'eng'. + if self.units is not None: + msid_attrs = self.convert_units(msid_attrs) + else: + msid_attrs = self.get_stats_attrs( + tstart, tstop, msid.lower(), match_args, interval + ) + + return msid_attrs + + def convert_units(self, msid_attrs): + """ + Convert required elements of ``msid_attrs`` to ``self.unit_system``. + + Unit_system can be one of 'cxc', 'sci', 'eng'. + + :param msid_attrs: dict, input MSID attributes + :param unit_system: str, unit system + + :returns: dict, converted MSID attributes + """ + unit_current = self.units[self.units["internal_system"]] + unit_new = self.units[self.unit_system] + + out = msid_attrs.copy() + out["unit"] = unit_new + + if unit_current != unit_new: + for attr in self.units["convert_attrs"]: + out[attr] = unit_converter_funcs[unit_current, unit_new]( + msid_attrs[attr] + ) + + return out + + def get_msid_attrs(self, tstart, tstop, msid, msid_args): + """Get the attributes required for this MSID. + + Get attributes for computed MSID, which must include at least + ``vals``, ``bads``, ``times``, and may include additional attributes. + + This MUST be supplied by sub-classes. + + :param tstart: start time (CXC secs) + :param tstop: stop time (CXC secs) + :param msid: full MSID name e.g. tephin_plus_5 + :param msid_args: tuple of regex match groups (msid_name,) + :returns: dict of MSID attributes + """ + raise NotImplementedError("sub-class must implement get_msid_attrs()") + + def get_stats_attrs(self, tstart, tstop, msid, match_args, interval): + """Get 5-min or daily stats attributes. + + This is normally not overridden by sub-classes. + + :param tstart: start time (CXC secs) + :param tstop: stop time (CXC secs) + :param msid: full MSID name e.g. tephin_plus_5 + :param msid_args: tuple of regex match groups (msid_name,) + :returns: dict of MSID attributes + """ + from ..fetch import _plural + + # Replicate a stripped-down version of processing in update_archive. + # This produces a recarray with columns that correspond to the raw + # stats HDF5 files. + dt = {"5min": 328, "daily": 86400}[interval] + index0 = int(np.floor(tstart / dt)) + index1 = int(np.ceil(tstop / dt)) + tstart = (index0 - 1) * dt + tstop = (index1 + 1) * dt + + msid_obj = self.fetch_sys.Msid(msid, tstart, tstop) + + indexes = np.arange(index0, index1 + 1) + times = indexes * dt # This is the *start* time of each bin + + if len(times) > 0: + rows = np.searchsorted(msid_obj.times, times) + vals_stats = calc_stats_vals(msid_obj, rows, indexes, interval) + else: + raise ValueError + + # Replicate the name munging that fetch does going from the HDF5 columns + # to what is seen in a stats fetch query. + out = {} + for key in vals_stats.dtype.names: + out_key = _plural(key) if key != "n" else "samples" + out[out_key] = vals_stats[key] + out["times"] = (vals_stats["index"] + 0.5) * dt + out["bads"] = np.zeros(len(vals_stats), dtype=bool) + out["midvals"] = out["vals"] + out["vals"] = out["means"] + out["unit"] = msid_obj.unit + + return out diff --git a/cheta/derived/ephem_stk.py b/cheta/comps/ephem_stk.py similarity index 99% rename from cheta/derived/ephem_stk.py rename to cheta/comps/ephem_stk.py index d8aa5530..b7706137 100644 --- a/cheta/derived/ephem_stk.py +++ b/cheta/comps/ephem_stk.py @@ -14,7 +14,7 @@ from cxotime import CxoTime, CxoTimeLike from kadi import occweb -from cheta.derived.comps import ComputedMsid +from .computed_msid import ComputedMsid logger = logging.getLogger("cheta.fetch") diff --git a/cheta/fetch.py b/cheta/fetch.py index 20aa8d39..331a7c6c 100644 --- a/cheta/fetch.py +++ b/cheta/fetch.py @@ -28,7 +28,7 @@ file_defs, remote_access, ) -from .derived.comps import ComputedMsid +from .comps import ComputedMsid from .lazy import LazyDict from .remote_access import ENG_ARCHIVE from .units import Units diff --git a/cheta/tests/test_comps.py b/cheta/tests/test_comps.py index c9109648..b704b82d 100644 --- a/cheta/tests/test_comps.py +++ b/cheta/tests/test_comps.py @@ -10,8 +10,8 @@ from .. import fetch as fetch_cxc from .. import fetch_eng, fetch_sci +from ..comps import ComputedMsid from ..derived.base import DerivedParameter -from ..derived.comps import ComputedMsid try: import maude diff --git a/setup.py b/setup.py index f0fe44d6..f8bc0c2b 100755 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ namespace = "Ska.engarchive" package_dir = {name: name} -packages = ["cheta", "cheta.derived", "cheta.tests"] +packages = ["cheta", "cheta.derived", "cheta.comps", "cheta.tests"] package_data = { "cheta": ["task_schedule.cfg", "*.dat", "units_*.pkl", "archfiles_def.sql"], "cheta.tests": ["*.dat"],