diff --git a/daskms/chunking.py b/daskms/chunking.py new file mode 100644 index 00000000..7088b007 --- /dev/null +++ b/daskms/chunking.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- + +import os +from os.path import join as pjoin + +import numpy as np +import pyrap.tables as pt + +from daskms.reads import DatasetFactory + + +class TableChunking(object): + def __init__(self, ms, group_cols=None, index_cols=None): + self._ms = ms + self._group_cols = group_cols + self._index_cols = index_cols + self._dataset_factory = None + + def __call__(self, *args, **kwargs): + if self._dataset_factory is None: + factory = DatasetFactory(self._ms, [], + self._group_cols, + self._index_cols) + self._dataset_factory = factory + else: + factory = self._dataset_factory + + table_proxy = factory.table_proxy_factory() + + return self.chunk(table_proxy) + + def chunk(self, table_proxy): + from pprint import pprint + pprint(inspect_ms(self._ms)) + + print(table_proxy) + + +class MSChunking(TableChunking): + pass + + +# {(subtable, required): number_column} +SUBTABLES = { + ("FEED", False): "NUM_RECEPTORS", + ("FIELD", False): "NUM_POLY", + ("POINTING", False): "NUM_POLY", + ("POLARIZATION", True): "NUM_CORR", + ("SOURCE", False): "NUM_LINES", + ("SPECTRAL_WINDOW", True): "NUM_CHAN" +} + + +def _inspect_subtables(ms): + for (subtable, required), num_column in SUBTABLES.items(): + subtable_path = pjoin(ms, subtable) + subtable_name = "::".join((ms, subtable)) + + if not os.path.isdir(subtable_path): + if required: + raise ValueError("%s required but not present", subtable) + + continue + + with pt.table(subtable_name, ack=False, readonly=True) as T: + yield ((subtable, num_column), T.getcol(num_column)) + + +def inspect_ms(ms): + subtables = dict(_inspect_subtables(ms)) + + ddid_name = "::".join((ms, "DATA_DESCRIPTION")) + + spw = subtables[("SPECTRAL_WINDOW", "NUM_CHAN")] + pol = subtables[("POLARIZATION", "NUM_CORR")] + + with pt.table(ddid_name, ack=False, readonly=True) as T: + spw_id = T.getcol("SPECTRAL_WINDOW_ID") + pol_id = T.getcol("POLARIZATION_ID") + + nchan = spw[spw_id] + ncorr = pol[pol_id] + + ddid_shapes = np.stack([nchan, ncorr], axis=1) + return subtables, ddid_shapes + + + + + + diff --git a/daskms/ms_structure.py b/daskms/ms_structure.py new file mode 100644 index 00000000..216d57c4 --- /dev/null +++ b/daskms/ms_structure.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- + +import os +from os.path import join as pjoin + +import numpy as np +import pyrap.tables as pt + + +SUBTABLES = { + ("FEED", False): "NUM_RECEPTORS", + ("FIELD", False): "NUM_POLY", + ("POINTING", False): "NUM_POLY", + ("POLARIZATION", True): "NUM_CORR", + ("SOURCE", False): "NUM_LINES", + ("SPECTRAL_WINDOW", True): "NUM_CHAN" +} + + +def _inspect_subtables(ms): + for (subtable, required), num_column in SUBTABLES.items(): + subtable_path = pjoin(ms, subtable) + subtable_name = "::".join((ms, subtable)) + + if not os.path.isdir(subtable_path): + if required: + raise ValueError("%s required but not present", subtable) + + continue + + with pt.table(subtable_name, ack=False, readonly=True) as T: + yield ((subtable, num_column), T.getcol(num_column)) + + +def inspect_ms(ms): + subtables = dict(_inspect_subtables(ms)) + + ddid_name = "::".join((ms, "DATA_DESCRIPTION")) + + spw = subtables[("SPECTRAL_WINDOW", "NUM_CHAN")] + pol = subtables[("POLARIZATION", "NUM_CORR")] + + with pt.table(ddid_name, ack=False, readonly=True) as T: + spw_id = T.getcol("SPECTRAL_WINDOW_ID") + pol_id = T.getcol("POLARIZATION_ID") + + nchan = spw[spw_id] + ncorr = pol[pol_id] + + ddid_shapes = np.stack([nchan, ncorr], axis=1) + return ddid_shapes + + + diff --git a/daskms/reads.py b/daskms/reads.py index 6894eca0..d99d97db 100644 --- a/daskms/reads.py +++ b/daskms/reads.py @@ -303,7 +303,7 @@ def __init__(self, table, select_cols, group_cols, index_cols, **kwargs): if len(kwargs) > 0: raise ValueError("Unhandled kwargs: %s" % kwargs) - def _table_proxy_factory(self): + def table_proxy_factory(self): return TableProxy(pt.table, self.table_path, ack=False, readonly=True, lockoptions='user', __executor_key__=executor_key(self.canonical_name)) @@ -385,7 +385,7 @@ def _group_datasets(self, table_proxy, groups, exemplar_rows, orders): return datasets def datasets(self): - table_proxy = self._table_proxy_factory() + table_proxy = self.table_proxy_factory() # No grouping case if len(self.group_cols) == 0: diff --git a/daskms/tests/test_chunking.py b/daskms/tests/test_chunking.py new file mode 100644 index 00000000..390c8d02 --- /dev/null +++ b/daskms/tests/test_chunking.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +import pytest + +from daskms.chunking import MSChunking +from daskms.table_proxy import TableProxy + +def test_chunking(): + chunks = MSChunking("/home/sperkins/data/AF0236_spw01.ms/") + + chunks(1, 2) \ No newline at end of file diff --git a/daskms/tests/test_ms_structure.py b/daskms/tests/test_ms_structure.py new file mode 100644 index 00000000..488ef14b --- /dev/null +++ b/daskms/tests/test_ms_structure.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- + +from daskms.ms_structure import inspect_ms + + +def test_ms_structure(): + print(inspect_ms("/home/sperkins/data/WSRT_multiple.MS_p0"))