From 24b9fb59b815ad2d86fca7203118f107586e6e0a Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Sun, 15 Sep 2024 15:33:50 +0200 Subject: [PATCH 01/19] first version of write_levels() function (uses xcube) --- tests/test_levels.py | 73 ++++++++++++++++++ zappend/levels.py | 176 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 249 insertions(+) create mode 100644 tests/test_levels.py create mode 100644 zappend/levels.py diff --git a/tests/test_levels.py b/tests/test_levels.py new file mode 100644 index 0000000..81c371c --- /dev/null +++ b/tests/test_levels.py @@ -0,0 +1,73 @@ +# Copyright © 2024 Norman Fomferra and contributors +# Permissions are hereby granted under the terms of the MIT License: +# https://opensource.org/licenses/MIT. +import json +import unittest + + +from zappend.fsutil import FileObj +from zappend.levels import write_levels +from zappend.levels import get_variables_config +from .helpers import clear_memory_fs +from .helpers import make_test_dataset + +try: + import xcube +except: + xcube = None + + +class GetVariablesConfigTest(unittest.TestCase): + def test_it(self): + dataset = make_test_dataset() + variables = get_variables_config(dataset, dict(x=512, y=256, time=1)) + self.assertEqual( + { + "x": {"encoding": {"chunks": None}}, + "y": {"encoding": {"chunks": None}}, + "time": {"encoding": {"chunks": None}}, + "chl": {"encoding": {"chunks": [1, 256, 512]}}, + "tsm": {"encoding": {"chunks": [1, 256, 512]}}, + }, + variables, + ) + + +@unittest.skipIf(xcube is None, reason="xcube is not installed") +class WriteLevelsTest(unittest.TestCase): + def setUp(self): + clear_memory_fs() + + def test_it(self): + source_path = "memory://source.zarr" + make_test_dataset( + uri=source_path, + dims=("time", "y", "x"), + shape=(3, 1024, 2048), + chunks=(1, 128, 256), + crs="EPSG:4326", + ) + + target_dir = FileObj("memory://target.levels") + self.assertFalse(target_dir.exists()) + + write_levels(source_path=source_path, target_path=target_dir.uri) + + self.assertTrue(target_dir.exists()) + + levels_file = target_dir.for_path(".zlevels") + self.assertTrue(levels_file.exists()) + levels_info = json.loads(levels_file.read()) + self.assertEqual( + { + "version": "1.0", + "num_levels": 4, + "agg_methods": {"chl": "mean", "tsm": "mean"}, + "use_saved_levels": False, + }, + levels_info, + ) + + # ds = xr.open_zarr(target_dir.uri + "/0.zarr") + # self.assertEqual({"time": 1, "y": 10, "x": 20}, ds.sizes) + # self.assertEqual({"x", "y", "time", "chl", "tsm"}, set(ds.variables)) diff --git a/zappend/levels.py b/zappend/levels.py new file mode 100644 index 0000000..a4841ed --- /dev/null +++ b/zappend/levels.py @@ -0,0 +1,176 @@ +# Copyright © 2024 Norman Fomferra and contributors +# Permissions are hereby granted under the terms of the MIT License: +# https://opensource.org/licenses/MIT. + +import json +import logging +from typing import Any, Hashable + +import xarray as xr +import fsspec + +from zappend.api import zappend + + +def write_levels( + source_path: str, + source_storage_options: dict[str, Any] | None = None, + target_path: str | None = None, + num_levels: int | None = None, + agg_methods: dict[str, Any] | None = None, + use_saved_levels: bool = False, + link_level_zero: bool = False, + xy_dim_names: tuple[str, str] | None = None, + tile_size: tuple[int, int] | None = None, + **zappend_config, +): + from xcube.core.tilingscheme import get_num_levels + from xcube.core.gridmapping import GridMapping + from xcube.core.subsampling import get_dataset_agg_methods + from xcube.core.subsampling import subsample_dataset + + target_dir = zappend_config.pop("target_dir", None) + if not target_dir and not target_path: + raise ValueError("either 'target_dir' or 'target_path' can be given, not both") + if target_dir and target_path and target_dir != target_path: + raise ValueError("either 'target_dir' or 'target_path' can be given, not both") + target_path = target_path or target_dir + target_storage_options = zappend_config.pop( + "target_storage_options", source_storage_options or {} + ) + target_fs, target_root = fsspec.core.url_to_fs( + target_path, **target_storage_options + ) + + source_fs, source_root = fsspec.core.url_to_fs( + source_path, + **( + source_storage_options + if source_storage_options is not None + else target_storage_options + ), + ) + source_store = source_fs.get_mapper(root=source_root) + source_ds = xr.open_zarr(source_store) + + logger = logging.getLogger("zappend") + + grid_mapping: GridMapping | None = None + + if xy_dim_names is None: + grid_mapping = grid_mapping or GridMapping.from_dataset(source_ds) + xy_dim_names = grid_mapping.xy_dim_names + + if tile_size is None: + grid_mapping = grid_mapping or GridMapping.from_dataset(source_ds) + tile_size = grid_mapping.tile_size + + if num_levels is None: + grid_mapping = grid_mapping or GridMapping.from_dataset(source_ds) + num_levels = get_num_levels(grid_mapping.size, tile_size) + + agg_methods = get_dataset_agg_methods( + source_ds, + xy_dim_names=xy_dim_names, + agg_methods=agg_methods, + ) + + force_new = zappend_config.pop("force_new", None) + + append_dim = zappend_config.pop("append_dim", "time") + append_coord = source_ds.coords[append_dim] + + variables = get_variables_config( + source_ds, + { + xy_dim_names[0]: tile_size[0], + xy_dim_names[0]: tile_size[1], + append_dim: 1, + }, + variables=zappend_config.pop("variables", None), + ) + + with target_fs.open(f"{target_root}/.zlevels", "wt") as fp: + levels_data: dict[str, Any] = dict( + version="1.0", + num_levels=num_levels, + agg_methods=dict(agg_methods), + use_saved_levels=use_saved_levels, + ) + json.dump(levels_data, fp, indent=2) + + if link_level_zero: + with target_fs.open(f"{target_root}/0.link", "wt") as fp: + fp.write(source_root) + + subsample_dataset_kwargs = dict(xy_dim_names=xy_dim_names, agg_methods=agg_methods) + + for slice_index in range(append_coord.size): + slice_ds_indexer = {append_dim: slice(slice_index, slice_index + 1)} + slice_ds = source_ds.isel(slice_ds_indexer) + + for level_index in range(num_levels): + if level_index == 0: + level_slice_ds = slice_ds + elif use_saved_levels: + prev_level_path = f"{target_root}/{level_index - 1}.zarr" + prev_level_store = target_fs.get_mapper(root=prev_level_path) + prev_level_ds = xr.open_zarr(prev_level_store) + level_slice_ds = subsample_dataset( + prev_level_ds.isel(slice_ds_indexer), + step=2, + **subsample_dataset_kwargs, + ) + else: + level_slice_ds = subsample_dataset( + slice_ds, + step=2**level_index, + **subsample_dataset_kwargs, + ) + + if not link_level_zero or level_index > 0: + level_slice_path = f"{target_path}/{level_index}.zarr" + zappend( + [level_slice_ds], + target_dir=level_slice_path, + target_storage_options=target_storage_options, + append_dim=append_dim, + force_new=force_new if slice_index == 0 else False, + variables=variables, + **zappend_config, + ) + + logger.info(f"done writing {target_path}") + + +def get_variables_config( + dataset: xr.Dataset, + chunk_sizes: dict[Hashable, int], + variables: dict[str, dict[str, Any]] | None = None, +): + """Define the chunk sizes for the variables in *dataset*. + + Args: + dataset: The dataset + chunk_sizes: The chunk sizes + variables: Value of the zappend ``variables`` + configuration parameter + Return: + A zappend compatible with the zappend ``variables`` + configuration parameter. + """ + var_configs = dict(variables or {}) + for var_name, var in dataset.variables.items(): + var_name = str(var_name) + var_config = dict(var_configs.get(var_name, {})) + var_encoding = dict(var_config.get("encoding", {})) + var_chunks = var_encoding.get("chunks") + if not var_chunks and var.dims: + if var_name in dataset.coords: + var_chunks = None + else: + var_chunks = [chunk_sizes.get(dim) for dim in var.dims] + var_encoding["chunks"] = var_chunks + var_config["encoding"] = var_encoding + var_configs[var_name] = var_config + return var_configs From 223bb6bc544459b2f12cf162dc56c515fd7882aa Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Sun, 15 Sep 2024 15:39:45 +0200 Subject: [PATCH 02/19] more asserts --- tests/test_levels.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/test_levels.py b/tests/test_levels.py index 81c371c..63dc4d2 100644 --- a/tests/test_levels.py +++ b/tests/test_levels.py @@ -1,9 +1,11 @@ # Copyright © 2024 Norman Fomferra and contributors # Permissions are hereby granted under the terms of the MIT License: # https://opensource.org/licenses/MIT. + import json import unittest +import xarray as xr from zappend.fsutil import FileObj from zappend.levels import write_levels @@ -68,6 +70,14 @@ def test_it(self): levels_info, ) - # ds = xr.open_zarr(target_dir.uri + "/0.zarr") - # self.assertEqual({"time": 1, "y": 10, "x": 20}, ds.sizes) - # self.assertEqual({"x", "y", "time", "chl", "tsm"}, set(ds.variables)) + ds0 = xr.open_zarr(target_dir.uri + f"/0.zarr") + self.assertEqual({"time": 3, "y": 1024, "x": 2048}, ds0.sizes) + + ds1 = xr.open_zarr(target_dir.uri + f"/1.zarr") + self.assertEqual({"time": 3, "y": 512, "x": 1024}, ds1.sizes) + + ds2 = xr.open_zarr(target_dir.uri + f"/2.zarr") + self.assertEqual({"time": 3, "y": 256, "x": 512}, ds2.sizes) + + ds3 = xr.open_zarr(target_dir.uri + f"/3.zarr") + self.assertEqual({"time": 3, "y": 128, "x": 256}, ds3.sizes) From 2e04e9eab91946a3ecb4641ed2722a5ea10a92c5 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Mon, 16 Sep 2024 08:11:06 +0200 Subject: [PATCH 03/19] fixes --- tests/test_levels.py | 168 ++++++++++++++++++++++++++++++++++++++++--- zappend/levels.py | 31 +++++--- 2 files changed, 181 insertions(+), 18 deletions(-) diff --git a/tests/test_levels.py b/tests/test_levels.py index 63dc4d2..bd68b40 100644 --- a/tests/test_levels.py +++ b/tests/test_levels.py @@ -20,7 +20,7 @@ class GetVariablesConfigTest(unittest.TestCase): - def test_it(self): + def test_no_variables_given(self): dataset = make_test_dataset() variables = get_variables_config(dataset, dict(x=512, y=256, time=1)) self.assertEqual( @@ -34,13 +34,35 @@ def test_it(self): variables, ) + def test_variables_given(self): + dataset = make_test_dataset() + variables = get_variables_config( + dataset, + dict(x=512, y=256, time=1), + variables={ + "time": {"encoding": {"chunks": [3]}}, + "chl": {"encoding": {"chunks": [3, 100, 100]}}, + "tsm": {"encoding": {"dtype": "uint16"}}, + }, + ) + self.assertEqual( + { + "x": {"encoding": {"chunks": None}}, + "y": {"encoding": {"chunks": None}}, + "time": {"encoding": {"chunks": [3]}}, + "chl": {"encoding": {"chunks": [3, 100, 100]}}, + "tsm": {"encoding": {"chunks": [1, 256, 512], "dtype": "uint16"}}, + }, + variables, + ) + @unittest.skipIf(xcube is None, reason="xcube is not installed") class WriteLevelsTest(unittest.TestCase): def setUp(self): clear_memory_fs() - def test_it(self): + def test_default_x_y_with_crs(self): source_path = "memory://source.zarr" make_test_dataset( uri=source_path, @@ -70,14 +92,140 @@ def test_it(self): levels_info, ) - ds0 = xr.open_zarr(target_dir.uri + f"/0.zarr") - self.assertEqual({"time": 3, "y": 1024, "x": 2048}, ds0.sizes) + self.assert_level(target_dir.uri + "/0.zarr", 0, has_crs=True) + self.assert_level(target_dir.uri + "/1.zarr", 1, has_crs=True) + self.assert_level(target_dir.uri + "/2.zarr", 2, has_crs=True) + self.assert_level(target_dir.uri + "/3.zarr", 3, has_crs=True) + + def test_default_lon_lat_no_crs(self): + source_path = "memory://source.zarr" + make_test_dataset( + uri=source_path, + dims=("time", "lat", "lon"), + shape=(3, 1024, 2048), + chunks=(1, 128, 256), + ) + + target_dir = FileObj("memory://target.levels") + self.assertFalse(target_dir.exists()) + + write_levels(source_path=source_path, target_path=target_dir.uri) + + self.assertTrue(target_dir.exists()) - ds1 = xr.open_zarr(target_dir.uri + f"/1.zarr") - self.assertEqual({"time": 3, "y": 512, "x": 1024}, ds1.sizes) + levels_file = target_dir.for_path(".zlevels") + self.assertTrue(levels_file.exists()) + levels_info = json.loads(levels_file.read()) + self.assertEqual( + { + "version": "1.0", + "num_levels": 4, + "agg_methods": {"chl": "mean", "tsm": "mean"}, + "use_saved_levels": False, + }, + levels_info, + ) - ds2 = xr.open_zarr(target_dir.uri + f"/2.zarr") - self.assertEqual({"time": 3, "y": 256, "x": 512}, ds2.sizes) + xy_dims = "lon", "lat" + self.assert_level(target_dir.uri + "/0.zarr", 0, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/1.zarr", 1, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/2.zarr", 2, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/3.zarr", 3, xy_dims=xy_dims) - ds3 = xr.open_zarr(target_dir.uri + f"/3.zarr") - self.assertEqual({"time": 3, "y": 128, "x": 256}, ds3.sizes) + def test_link_level_zero(self): + source_dir = FileObj("memory://source.zarr") + make_test_dataset( + uri=source_dir.uri, + dims=("time", "y", "x"), + shape=(3, 1024, 2048), + chunks=(1, 128, 256), + crs="EPSG:4326", + ) + + target_dir = FileObj("memory://target.levels") + self.assertFalse(target_dir.exists()) + + write_levels( + source_path=source_dir.uri, + target_path=target_dir.uri, + link_level_zero=True, + ) + + self.assertTrue(target_dir.exists()) + + levels_file = target_dir.for_path(".zlevels") + self.assertTrue(levels_file.exists()) + levels_info = json.loads(levels_file.read()) + self.assertEqual( + { + "version": "1.0", + "num_levels": 4, + "agg_methods": {"chl": "mean", "tsm": "mean"}, + "use_saved_levels": False, + }, + levels_info, + ) + + level_zero_file = target_dir.for_path("0.link") + self.assertTrue(level_zero_file.exists()) + self.assertEqual(b"../source.zarr", level_zero_file.read()) + self.assert_level(target_dir.uri + "/1.zarr", 1, has_crs=True) + self.assert_level(target_dir.uri + "/2.zarr", 2, has_crs=True) + self.assert_level(target_dir.uri + "/3.zarr", 3, has_crs=True) + + def test_link_level_zero_use_saved_levels(self): + source_dir = FileObj("memory://source.zarr") + make_test_dataset( + uri=source_dir.uri, + dims=("time", "lat", "lon"), + shape=(3, 1024, 2048), + chunks=(1, 128, 256), + ) + + target_dir = FileObj("memory://target.levels") + self.assertFalse(target_dir.exists()) + + write_levels( + source_path=source_dir.uri, + target_path=target_dir.uri, + link_level_zero=True, + use_saved_levels=True, + ) + + self.assertTrue(target_dir.exists()) + + levels_file = target_dir.for_path(".zlevels") + self.assertTrue(levels_file.exists()) + levels_info = json.loads(levels_file.read()) + self.assertEqual( + { + "version": "1.0", + "num_levels": 4, + "agg_methods": {"chl": "mean", "tsm": "mean"}, + "use_saved_levels": True, + }, + levels_info, + ) + + xy_dims = "lon", "lat" + level_zero_file = target_dir.for_path("0.link") + self.assertTrue(level_zero_file.exists()) + self.assertEqual(b"../source.zarr", level_zero_file.read()) + self.assert_level(target_dir.uri + "/1.zarr", 1, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/2.zarr", 2, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/3.zarr", 3, xy_dims=xy_dims) + + def assert_level(self, uri: str, level: int, xy_dims=("x", "y"), has_crs=False): + x_dim, y_dim = xy_dims + dataset = xr.open_zarr(uri) + z = 2**level + f = 2 ** (3 - level) + self.assertEqual({"time": 3, y_dim: 1024 // z, x_dim: 2048 // z}, dataset.sizes) + self.assertEqual( + {"time": 3 * (1,), y_dim: f * (128,), x_dim: f * (256,)}, dataset.chunksizes + ) + self.assertEqual({x_dim, y_dim, "time"}, set(dataset.coords)) + if has_crs: + self.assertEqual({"chl", "tsm", "crs"}, set(dataset.data_vars)) + else: + self.assertEqual({"chl", "tsm"}, set(dataset.data_vars)) diff --git a/zappend/levels.py b/zappend/levels.py index a4841ed..52a58b4 100644 --- a/zappend/levels.py +++ b/zappend/levels.py @@ -11,6 +11,9 @@ from zappend.api import zappend +# Note, the function may be easily adapted to zappend +# to existing multi-level datasets. + def write_levels( source_path: str, @@ -24,10 +27,12 @@ def write_levels( tile_size: tuple[int, int] | None = None, **zappend_config, ): + """TODO - document me""" from xcube.core.tilingscheme import get_num_levels from xcube.core.gridmapping import GridMapping from xcube.core.subsampling import get_dataset_agg_methods from xcube.core.subsampling import subsample_dataset + from xcube.util.fspath import get_fs_path_class target_dir = zappend_config.pop("target_dir", None) if not target_dir and not target_path: @@ -84,7 +89,7 @@ def write_levels( source_ds, { xy_dim_names[0]: tile_size[0], - xy_dim_names[0]: tile_size[1], + xy_dim_names[1]: tile_size[1], append_dim: 1, }, variables=zappend_config.pop("variables", None), @@ -94,14 +99,21 @@ def write_levels( levels_data: dict[str, Any] = dict( version="1.0", num_levels=num_levels, - agg_methods=dict(agg_methods), + agg_methods=agg_methods, use_saved_levels=use_saved_levels, ) json.dump(levels_data, fp, indent=2) if link_level_zero: + path_class = get_fs_path_class(target_fs) + rel_source_path = ( + "../" + + path_class(source_root) + .relative_to(path_class(target_root).parent) + .as_posix() + ) with target_fs.open(f"{target_root}/0.link", "wt") as fp: - fp.write(source_root) + fp.write(rel_source_path) subsample_dataset_kwargs = dict(xy_dim_names=xy_dim_names, agg_methods=agg_methods) @@ -113,9 +125,12 @@ def write_levels( if level_index == 0: level_slice_ds = slice_ds elif use_saved_levels: - prev_level_path = f"{target_root}/{level_index - 1}.zarr" - prev_level_store = target_fs.get_mapper(root=prev_level_path) - prev_level_ds = xr.open_zarr(prev_level_store) + if level_index == 1: + prev_level_ds = source_ds + else: + prev_level_path = f"{target_root}/{level_index - 1}.zarr" + prev_level_store = target_fs.get_mapper(root=prev_level_path) + prev_level_ds = xr.open_zarr(prev_level_store) level_slice_ds = subsample_dataset( prev_level_ds.isel(slice_ds_indexer), step=2, @@ -165,8 +180,8 @@ def get_variables_config( var_config = dict(var_configs.get(var_name, {})) var_encoding = dict(var_config.get("encoding", {})) var_chunks = var_encoding.get("chunks") - if not var_chunks and var.dims: - if var_name in dataset.coords: + if "chunks" not in var_encoding and var.dims: + if var_name in dataset.coords or set(var.dims).isdisjoint(chunk_sizes): var_chunks = None else: var_chunks = [chunk_sizes.get(dim) for dim in var.dims] From 35c0af6f5e9a16b90b21dd4b02bad36608966a7c Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Mon, 16 Sep 2024 10:22:42 +0200 Subject: [PATCH 04/19] started docstr --- zappend/levels.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/zappend/levels.py b/zappend/levels.py index 52a58b4..1b4dd03 100644 --- a/zappend/levels.py +++ b/zappend/levels.py @@ -27,7 +27,20 @@ def write_levels( tile_size: tuple[int, int] | None = None, **zappend_config, ): - """TODO - document me""" + """Writes a dataset at `source_path` to a xcube multi-level + dataset at `target_path`. + + The source dataset is opened and subdivided into dataset slices + along the append dimension given by `append_dim`, which defaults to `"time"`. + The slice size in the append dimension is one and also the target dataset's + chunk size in the append dimension will be one. + + Args: + source_path: The source dataset path. + source_storage_options: Storage options for the source + dataset's filesystem. + target_path: + """ from xcube.core.tilingscheme import get_num_levels from xcube.core.gridmapping import GridMapping from xcube.core.subsampling import get_dataset_agg_methods From 8744f31182ed3009f71eeb3678e67447fedf8ac6 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Mon, 16 Sep 2024 11:42:11 +0200 Subject: [PATCH 05/19] Added docstring --- docs/api.md | 6 +++++ zappend/levels.py | 61 ++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/docs/api.md b/docs/api.md index 53cbe62..2335a01 100644 --- a/docs/api.md +++ b/docs/api.md @@ -45,3 +45,9 @@ All described objects can be imported from the `zappend.api` module. ::: zappend.api.ConfigLike options: show_root_heading: true + +## Function `write_levels()` for `xcube` + +::: zappend.levels.write_levels + options: + show_root_heading: true diff --git a/zappend/levels.py b/zappend/levels.py index 1b4dd03..0ff845c 100644 --- a/zappend/levels.py +++ b/zappend/levels.py @@ -20,26 +20,71 @@ def write_levels( source_storage_options: dict[str, Any] | None = None, target_path: str | None = None, num_levels: int | None = None, - agg_methods: dict[str, Any] | None = None, + tile_size: tuple[int, int] | None = None, + agg_methods: str | dict[str, Any] | None = None, use_saved_levels: bool = False, link_level_zero: bool = False, xy_dim_names: tuple[str, str] | None = None, - tile_size: tuple[int, int] | None = None, **zappend_config, ): - """Writes a dataset at `source_path` to a xcube multi-level - dataset at `target_path`. + """Writes a dataset at `source_path` to `target_path` using the + [multi-level dataset format](https://xcube.readthedocs.io/en/latest/mldatasets.html) + as specified by + [xcube](https://github.com/xcube-dev/xcube). The source dataset is opened and subdivided into dataset slices - along the append dimension given by `append_dim`, which defaults to `"time"`. - The slice size in the append dimension is one and also the target dataset's - chunk size in the append dimension will be one. + along the append dimension given by `append_dim`, which defaults + to `"time"`. The slice size in the append dimension is one. + Each slice is downsampled to the number of levels and each slice level + dataset is created/appended the target dataset's individual level + datasets. + + The target dataset's chunk size in the spatial x- and y-dimensions will + be the same as the specified (or derived) tile size. + The append dimension will be one. The chunking will be reflected as the + `variables` configuration parameter passed to each `zappend()` call. + If configuration parameter `variables` is also given as part of + `zappend_config`, it will be merged with the chunk definitions. + + Important: This function requires the `xcube` package to be installed. Args: source_path: The source dataset path. source_storage_options: Storage options for the source dataset's filesystem. - target_path: + target_path: The source multi-level dataset path. + Filename extension should be `.levels`, by convention. + If not given, `target_dir` should be passed as part of the + `zappend_config`. (The name `target_path` is used here for + consistency with `source_path`.) + num_levels: Optional number of levels. + If not given, a reasonable number of levels is computed + from `tile_size`. + tile_size: Optional tile size in the x- and y-dimension in pixels. + If not given, the tile size is computed from the source + dataset's chunk sizes in the x- and y-dimensions. + xy_dim_names: + Optional dimension names that identify the x- and y-dimensions. + If not given, derived from the source dataset's grid mapping, + if any. + agg_methods: An aggregation method for all data variables or a + mapping that provides the aggregation method for a variable + name. Possible aggregation methods are + `"first"`, `"min"`, `"max"`, `"mean"`, `"median"`. + use_saved_levels: Whether a given, already written resolution level + serves as input to aggregation for the next level. If `False`, + the default, each resolution level other than zero is computed + from the source dataset. If `True`, the function may perform + significantly faster, but be aware that the aggregation + methods `"first"` and `"median"` will produce inaccurate results. + link_level_zero: Whether to _not_ write the level zero of the target + multi-level dataset and link it instead. In this case, a link + file `{target_path}/0.link` will be written. + If `False`, the default, a level dataset `{target_path}/0.zarr` + will be written instead. + zappend_config: + Configuration passed to the `zappend()` call for each + slice in the append dimension. """ from xcube.core.tilingscheme import get_num_levels from xcube.core.gridmapping import GridMapping From 06748a950bd067c2c8ce8a36274ce640d7a12ec7 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Mon, 16 Sep 2024 12:53:10 +0200 Subject: [PATCH 06/19] Fixed problem with dims and added more logging --- tests/test_levels.py | 35 ++++++++++++++++++++++++----------- zappend/levels.py | 21 +++++++++++++++++++-- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/tests/test_levels.py b/tests/test_levels.py index bd68b40..48fb67f 100644 --- a/tests/test_levels.py +++ b/tests/test_levels.py @@ -14,8 +14,9 @@ from .helpers import make_test_dataset try: + # noinspection PyUnresolvedReferences import xcube -except: +except ImportError: xcube = None @@ -25,11 +26,17 @@ def test_no_variables_given(self): variables = get_variables_config(dataset, dict(x=512, y=256, time=1)) self.assertEqual( { - "x": {"encoding": {"chunks": None}}, - "y": {"encoding": {"chunks": None}}, - "time": {"encoding": {"chunks": None}}, - "chl": {"encoding": {"chunks": [1, 256, 512]}}, - "tsm": {"encoding": {"chunks": [1, 256, 512]}}, + "x": {"dims": ["x"], "encoding": {"chunks": None}}, + "y": {"dims": ["y"], "encoding": {"chunks": None}}, + "time": {"dims": ["time"], "encoding": {"chunks": None}}, + "chl": { + "dims": ["time", "y", "x"], + "encoding": {"chunks": [1, 256, 512]}, + }, + "tsm": { + "dims": ["time", "y", "x"], + "encoding": {"chunks": [1, 256, 512]}, + }, }, variables, ) @@ -47,11 +54,17 @@ def test_variables_given(self): ) self.assertEqual( { - "x": {"encoding": {"chunks": None}}, - "y": {"encoding": {"chunks": None}}, - "time": {"encoding": {"chunks": [3]}}, - "chl": {"encoding": {"chunks": [3, 100, 100]}}, - "tsm": {"encoding": {"chunks": [1, 256, 512], "dtype": "uint16"}}, + "x": {"dims": ["x"], "encoding": {"chunks": None}}, + "y": {"dims": ["y"], "encoding": {"chunks": None}}, + "time": {"dims": ["time"], "encoding": {"chunks": [3]}}, + "chl": { + "dims": ["time", "y", "x"], + "encoding": {"chunks": [3, 100, 100]}, + }, + "tsm": { + "dims": ["time", "y", "x"], + "encoding": {"chunks": [1, 256, 512], "dtype": "uint16"}, + }, }, variables, ) diff --git a/zappend/levels.py b/zappend/levels.py index 0ff845c..01af7a7 100644 --- a/zappend/levels.py +++ b/zappend/levels.py @@ -153,6 +153,7 @@ def write_levels( variables=zappend_config.pop("variables", None), ) + target_fs.mkdirs(target_root, exist_ok=True) with target_fs.open(f"{target_root}/.zlevels", "wt") as fp: levels_data: dict[str, Any] = dict( version="1.0", @@ -175,7 +176,8 @@ def write_levels( subsample_dataset_kwargs = dict(xy_dim_names=xy_dim_names, agg_methods=agg_methods) - for slice_index in range(append_coord.size): + num_slices = append_coord.size + for slice_index in range(num_slices): slice_ds_indexer = {append_dim: slice(slice_index, slice_index + 1)} slice_ds = source_ds.isel(slice_ds_indexer) @@ -212,8 +214,18 @@ def write_levels( variables=variables, **zappend_config, ) + steps_total = num_slices * num_levels + percent_total = ( + 100 * ((slice_index * num_levels) + level_index + 1) / steps_total + ) + logger.info( + f"Slice {level_slice_path} written," + f" {slice_index + 1}/{num_slices} slices," + f" {level_index + 1}/{num_levels} levels," + f" {percent_total:.2f}% total" + ) - logger.info(f"done writing {target_path}") + logger.info(f"Done appending {num_slices} slices to {target_path}") def get_variables_config( @@ -236,6 +248,10 @@ def get_variables_config( for var_name, var in dataset.variables.items(): var_name = str(var_name) var_config = dict(var_configs.get(var_name, {})) + + if "dims" not in var_config and var.dims: + var_config["dims"] = [str(dim) for dim in var.dims] + var_encoding = dict(var_config.get("encoding", {})) var_chunks = var_encoding.get("chunks") if "chunks" not in var_encoding and var.dims: @@ -245,5 +261,6 @@ def get_variables_config( var_chunks = [chunk_sizes.get(dim) for dim in var.dims] var_encoding["chunks"] = var_chunks var_config["encoding"] = var_encoding + var_configs[var_name] = var_config return var_configs From b2cf4f33611b52d297ede88445383f4e17ef2c79 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Mon, 16 Sep 2024 13:00:22 +0200 Subject: [PATCH 07/19] Exclude zappend.levels from coverage --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2949de3..34cea02 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -36,7 +36,7 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest --cov=zappend + pytest --cov=zappend --cov-exclude=zappend.levels - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v3 env: From 91f65157b2d062e7385205dbdb0d9793b6cf31e4 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Mon, 16 Sep 2024 13:05:06 +0200 Subject: [PATCH 08/19] Exclude zappend.levels from coverage (2) --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 34cea02..5423ff1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -36,7 +36,7 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest --cov=zappend --cov-exclude=zappend.levels + pytest --cov=zappend --cov-omit=zappend/levels.py - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v3 env: From e61af006239c796f9236ef6fd6a0e7ca9f1ca145 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Mon, 16 Sep 2024 14:14:52 +0200 Subject: [PATCH 09/19] Exclude zappend.levels from coverage (3) --- .coveragerc | 3 +++ .github/workflows/tests.yml | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 .coveragerc diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..8b2c475 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,3 @@ +[run] +omit = + zappend/levels.py \ No newline at end of file diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5423ff1..2949de3 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -36,7 +36,7 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest --cov=zappend --cov-omit=zappend/levels.py + pytest --cov=zappend - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v3 env: From 259ac6dc24d876b3ff64c28450cc4d383ed0619a Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Tue, 17 Sep 2024 08:34:35 +0200 Subject: [PATCH 10/19] Exclude zappend.levels from coverage (3) --- CHANGES.md | 16 ++++++++++++++++ zappend/__init__.py | 2 +- zappend/levels.py | 10 +++++++++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7bbfbed..7905648 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,19 @@ +## Version 0.8.0 (in development) + +* Added experimental function `zappend.levels.write_levels()` that generates + datasets using the + [multi-level dataset format](https://xcube.readthedocs.io/en/latest/mldatasets.html) + as specified by + [xcube](https://github.com/xcube-dev/xcube). + It resembles the `store.write_data(cube, ".levels", ...)` method + provided by the xcube filesystem data stores ("file", "s3", "memory", etc.). + The zappend version may be used for potentially very large datasets in terms + of dimension sizes or for datasets with very large number of chunks. + It is considerably slower than the xcube version (which basically uses + `xarray.to_zarr()` for each resolution level), but should run robustly with + stable memory consumption. + The function requires `xcube` package to be installed. (#19) + ## Version 0.7.1 (from 2024-05-30) * The function `zappend.api.zappend()` now returns the number of slices diff --git a/zappend/__init__.py b/zappend/__init__.py index b65edce..3182780 100644 --- a/zappend/__init__.py +++ b/zappend/__init__.py @@ -2,4 +2,4 @@ # Permissions are hereby granted under the terms of the MIT License: # https://opensource.org/licenses/MIT. -__version__ = "0.7.1" +__version__ = "0.8.0.dev0" diff --git a/zappend/levels.py b/zappend/levels.py index 01af7a7..efac177 100644 --- a/zappend/levels.py +++ b/zappend/levels.py @@ -32,7 +32,15 @@ def write_levels( as specified by [xcube](https://github.com/xcube-dev/xcube). - The source dataset is opened and subdivided into dataset slices + It resembles the `store.write_data(cube, ".levels", ...)` method + provided by the xcube filesystem data stores ("file", "s3", "memory", etc.). + The zappend version may be used for potentially very large datasets in terms + of dimension sizes or for datasets with very large number of chunks. + It is considerably slower than the xcube version (which basically uses + `xarray.to_zarr()` for each resolution level), but should run robustly with + stable memory consumption. + + The function opens the source dataset and subdivides it into dataset slices along the append dimension given by `append_dim`, which defaults to `"time"`. The slice size in the append dimension is one. Each slice is downsampled to the number of levels and each slice level From 5d3b073a7208fbe3691fac518549e6ffcf349743 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Tue, 17 Sep 2024 11:48:18 +0200 Subject: [PATCH 11/19] Started 0.8.0.dev1 --- zappend/__init__.py | 2 +- zappend/levels.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/zappend/__init__.py b/zappend/__init__.py index 3182780..a64ae45 100644 --- a/zappend/__init__.py +++ b/zappend/__init__.py @@ -2,4 +2,4 @@ # Permissions are hereby granted under the terms of the MIT License: # https://opensource.org/licenses/MIT. -__version__ = "0.8.0.dev0" +__version__ = "0.8.0.dev1" diff --git a/zappend/levels.py b/zappend/levels.py index efac177..0501f02 100644 --- a/zappend/levels.py +++ b/zappend/levels.py @@ -60,7 +60,7 @@ def write_levels( source_path: The source dataset path. source_storage_options: Storage options for the source dataset's filesystem. - target_path: The source multi-level dataset path. + target_path: The target multi-level dataset path. Filename extension should be `.levels`, by convention. If not given, `target_dir` should be passed as part of the `zappend_config`. (The name `target_path` is used here for From 4889848ab1e438d6290b58b3aba946be58929934 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Wed, 18 Sep 2024 11:10:58 +0200 Subject: [PATCH 12/19] Added args validation and removal of existing target --- tests/test_levels.py | 54 ++++++++++++++++++++++++++++++++++++++++ zappend/levels.py | 59 +++++++++++++++++++++++++++++--------------- 2 files changed, 93 insertions(+), 20 deletions(-) diff --git a/tests/test_levels.py b/tests/test_levels.py index 48fb67f..b378582 100644 --- a/tests/test_levels.py +++ b/tests/test_levels.py @@ -5,6 +5,7 @@ import json import unittest +import pytest import xarray as xr from zappend.fsutil import FileObj @@ -75,6 +76,59 @@ class WriteLevelsTest(unittest.TestCase): def setUp(self): clear_memory_fs() + # noinspection PyMethodMayBeStatic + def test_args_validation(self): + source_path = "memory://source.zarr" + target_path = "memory://target.levels" + + make_test_dataset( + uri=source_path, + dims=("time", "lat", "lon"), + shape=(3, 1024, 2048), + chunks=(1, 128, 256), + ) + + with pytest.raises( + ValueError, + match="missing 'target_path' argument", + ): + write_levels(source_path=source_path) + + with pytest.raises( + ValueError, + match="either 'target_dir' or 'target_path' can be given, not both", + ): + write_levels( + source_path=source_path, + target_path=target_path, + target_dir=target_path, + ) + + with pytest.raises( + TypeError, + match="write_levels\\(\\) got an unexpected keyword argument 'config'", + ): + write_levels( + source_path=source_path, + target_path=target_path, + config={"dry_run": True}, + ) + + with pytest.raises( + FileNotFoundError, + match="Target parent directory does not exist: /target.levels", + ): + with pytest.warns( + UserWarning, + match="'use_saved_levels' argument is not applicable if dry_run=True", + ): + write_levels( + source_path=source_path, + target_path=target_path, + dry_run=True, + use_saved_levels=True, + ) + def test_default_x_y_with_crs(self): source_path = "memory://source.zarr" make_test_dataset( diff --git a/zappend/levels.py b/zappend/levels.py index 0501f02..a0bdaee 100644 --- a/zappend/levels.py +++ b/zappend/levels.py @@ -4,6 +4,7 @@ import json import logging +import warnings from typing import Any, Hashable import xarray as xr @@ -91,19 +92,29 @@ def write_levels( If `False`, the default, a level dataset `{target_path}/0.zarr` will be written instead. zappend_config: - Configuration passed to the `zappend()` call for each - slice in the append dimension. + Configuration passed to zappend as `zappend(slice, **zappend_config)` + for each slice in the append dimension. The zappend `config` + parameter is not supported. """ - from xcube.core.tilingscheme import get_num_levels from xcube.core.gridmapping import GridMapping from xcube.core.subsampling import get_dataset_agg_methods from xcube.core.subsampling import subsample_dataset + from xcube.core.tilingscheme import get_num_levels from xcube.util.fspath import get_fs_path_class + dry_run = zappend_config.pop("dry_run", False) + + if dry_run and use_saved_levels: + warnings.warn(f"'use_saved_levels' argument is not applicable if dry_run=True") + use_saved_levels = False + config = zappend_config.pop("config", None) + if config is not None: + raise TypeError("write_levels() got an unexpected keyword argument 'config'") + target_dir = zappend_config.pop("target_dir", None) - if not target_dir and not target_path: - raise ValueError("either 'target_dir' or 'target_path' can be given, not both") - if target_dir and target_path and target_dir != target_path: + if not target_path and not target_dir: + raise ValueError("missing 'target_path' argument") + if target_dir and target_path: raise ValueError("either 'target_dir' or 'target_path' can be given, not both") target_path = target_path or target_dir target_storage_options = zappend_config.pop( @@ -161,17 +172,24 @@ def write_levels( variables=zappend_config.pop("variables", None), ) - target_fs.mkdirs(target_root, exist_ok=True) - with target_fs.open(f"{target_root}/.zlevels", "wt") as fp: - levels_data: dict[str, Any] = dict( - version="1.0", - num_levels=num_levels, - agg_methods=agg_methods, - use_saved_levels=use_saved_levels, - ) - json.dump(levels_data, fp, indent=2) - - if link_level_zero: + if target_fs.exists(target_root): + if target_fs.exists(target_root): + logger.warning(f"Permanently deleting {target_dir}") + if not dry_run: + target_fs.rm(target_root, recursive=True) + + if not dry_run: + target_fs.mkdirs(target_root, exist_ok=True) + with target_fs.open(f"{target_root}/.zlevels", "wt") as fp: + levels_data: dict[str, Any] = dict( + version="1.0", + num_levels=num_levels, + agg_methods=agg_methods, + use_saved_levels=use_saved_levels, + ) + json.dump(levels_data, fp, indent=2) + + if link_level_zero and not dry_run: path_class = get_fs_path_class(target_fs) rel_source_path = ( "../" @@ -218,6 +236,7 @@ def write_levels( target_dir=level_slice_path, target_storage_options=target_storage_options, append_dim=append_dim, + dry_run=dry_run, force_new=force_new if slice_index == 0 else False, variables=variables, **zappend_config, @@ -227,13 +246,13 @@ def write_levels( 100 * ((slice_index * num_levels) + level_index + 1) / steps_total ) logger.info( - f"Slice {level_slice_path} written," + f"Level slice written to {level_slice_path}," f" {slice_index + 1}/{num_slices} slices," f" {level_index + 1}/{num_levels} levels," f" {percent_total:.2f}% total" ) - - logger.info(f"Done appending {num_slices} slices to {target_path}") + logger.info(f"Done appending {num_levels} level slices to {target_path}") + logger.info(f"Done appending {num_slices} slices to {target_path}") def get_variables_config( From 732f3555572c88cd4645068ffde33e33f5aff981 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Wed, 18 Sep 2024 11:24:59 +0200 Subject: [PATCH 13/19] Testing force_new --- tests/test_levels.py | 20 ++++++++++++++++++++ zappend/levels.py | 15 ++++++++++----- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/tests/test_levels.py b/tests/test_levels.py index b378582..166276e 100644 --- a/tests/test_levels.py +++ b/tests/test_levels.py @@ -129,6 +129,26 @@ def test_args_validation(self): use_saved_levels=True, ) + def test_force_new(self): + source_path = "memory://source.zarr" + make_test_dataset( + uri=source_path, + dims=("time", "lat", "lon"), + shape=(3, 1024, 2048), + chunks=(1, 128, 256), + ) + + target_dir = FileObj("memory://target.levels") + self.assertFalse(target_dir.exists()) + target_dir.mkdir() + (target_dir / "0.zarr").mkdir() + (target_dir / "0.zarr" / ".zgroup").write("{}") + self.assertTrue(target_dir.exists()) + + write_levels(source_path=source_path, target_path=target_dir.uri, force_new=True) + + self.assertTrue(target_dir.exists()) + def test_default_x_y_with_crs(self): source_path = "memory://source.zarr" make_test_dataset( diff --git a/zappend/levels.py b/zappend/levels.py index a0bdaee..19e37ec 100644 --- a/zappend/levels.py +++ b/zappend/levels.py @@ -172,14 +172,19 @@ def write_levels( variables=zappend_config.pop("variables", None), ) - if target_fs.exists(target_root): - if target_fs.exists(target_root): - logger.warning(f"Permanently deleting {target_dir}") + target_exists = target_fs.exists(target_root) + if target_exists: + logger.info(f"Target directory {target_path} exists") + if force_new: + logger.warning(f"Permanently deleting {target_path} (no rollback)") if not dry_run: target_fs.rm(target_root, recursive=True) + else: + logger.info(f"Creating target directory {target_path}") + if not dry_run: + target_fs.mkdirs(target_root, exist_ok=True) if not dry_run: - target_fs.mkdirs(target_root, exist_ok=True) with target_fs.open(f"{target_root}/.zlevels", "wt") as fp: levels_data: dict[str, Any] = dict( version="1.0", @@ -189,7 +194,7 @@ def write_levels( ) json.dump(levels_data, fp, indent=2) - if link_level_zero and not dry_run: + if (not dry_run) and link_level_zero: path_class = get_fs_path_class(target_fs) rel_source_path = ( "../" From 056f2f7606f81ba71a5432b816f733ba1fe270b1 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Wed, 2 Oct 2024 13:20:02 +0200 Subject: [PATCH 14/19] Introduced keyword arguments `source_ds` and `source_append_offset` --- tests/contrib/__init__.py | 3 + tests/{ => contrib}/test_levels.py | 157 ++++++++++++++++++++++++++--- zappend/contrib/__init__.py | 5 + zappend/{ => contrib}/levels.py | 88 +++++++++++----- 4 files changed, 215 insertions(+), 38 deletions(-) create mode 100644 tests/contrib/__init__.py rename tests/{ => contrib}/test_levels.py (68%) create mode 100644 zappend/contrib/__init__.py rename zappend/{ => contrib}/levels.py (83%) diff --git a/tests/contrib/__init__.py b/tests/contrib/__init__.py new file mode 100644 index 0000000..cd88eca --- /dev/null +++ b/tests/contrib/__init__.py @@ -0,0 +1,3 @@ +# Copyright © 2024 Norman Fomferra and contributors +# Permissions are hereby granted under the terms of the MIT License: +# https://opensource.org/licenses/MIT. diff --git a/tests/test_levels.py b/tests/contrib/test_levels.py similarity index 68% rename from tests/test_levels.py rename to tests/contrib/test_levels.py index 166276e..93eb9a4 100644 --- a/tests/test_levels.py +++ b/tests/contrib/test_levels.py @@ -9,10 +9,10 @@ import xarray as xr from zappend.fsutil import FileObj -from zappend.levels import write_levels -from zappend.levels import get_variables_config -from .helpers import clear_memory_fs -from .helpers import make_test_dataset +from zappend.contrib import write_levels +from zappend.contrib.levels import get_variables_config +from tests.helpers import clear_memory_fs +from tests.helpers import make_test_dataset try: # noinspection PyUnresolvedReferences @@ -71,16 +71,15 @@ def test_variables_given(self): ) +source_path = "memory://source.zarr" +target_path = "memory://target.levels" + + +# noinspection PyMethodMayBeStatic @unittest.skipIf(xcube is None, reason="xcube is not installed") -class WriteLevelsTest(unittest.TestCase): +class WriteLevelsArgsTest(unittest.TestCase): def setUp(self): clear_memory_fs() - - # noinspection PyMethodMayBeStatic - def test_args_validation(self): - source_path = "memory://source.zarr" - target_path = "memory://target.levels" - make_test_dataset( uri=source_path, dims=("time", "lat", "lon"), @@ -88,12 +87,14 @@ def test_args_validation(self): chunks=(1, 128, 256), ) + def test_target_path_not_given(self): with pytest.raises( ValueError, match="missing 'target_path' argument", ): write_levels(source_path=source_path) + def test_target_dir_and_target_path_given(self): with pytest.raises( ValueError, match="either 'target_dir' or 'target_path' can be given, not both", @@ -104,6 +105,7 @@ def test_args_validation(self): target_dir=target_path, ) + def test_config_given(self): with pytest.raises( TypeError, match="write_levels\\(\\) got an unexpected keyword argument 'config'", @@ -114,6 +116,7 @@ def test_args_validation(self): config={"dry_run": True}, ) + def test_dry_run_and_use_saved_levels_given(self): with pytest.raises( FileNotFoundError, match="Target parent directory does not exist: /target.levels", @@ -129,8 +132,60 @@ def test_args_validation(self): use_saved_levels=True, ) + def test_source_path_and_source_ds_not_given(self): + with pytest.raises( + TypeError, + match="'source_ds' argument must be of type 'xarray.Dataset'," + " but was 'NoneType'", + ): + write_levels( + target_path=target_path, + ) + + def test_source_path_not_given_and_link_level_zero_is_true(self): + with pytest.raises( + ValueError, + match="'source_path' argument must be provided" + " if 'link_level_zero' is used", + ): + write_levels( + source_ds=xr.Dataset(), target_path=target_path, link_level_zero=True + ) + + def test_source_append_offset_not_int(self): + with pytest.raises( + TypeError, + match="'source_append_offset' argument must be of type 'int'," + " but was 'str'", + ): + # noinspection PyTypeChecker + write_levels( + source_ds=xr.open_zarr("memory://source.zarr"), + source_append_offset="2", + target_path=target_path, + ) + + def test_source_append_offset_out_of_range(self): + with pytest.raises( + ValueError, + match="'source_append_offset' argument must be >=0 and <3, but was 13", + ): + # noinspection PyTypeChecker + write_levels( + source_ds=xr.open_zarr("memory://source.zarr"), + source_append_offset=13, + target_path=target_path, + ) + + +@unittest.skipIf(xcube is None, reason="xcube is not installed") +class WriteLevelsTest(unittest.TestCase): + def setUp(self): + clear_memory_fs() + + # noinspection PyMethodMayBeStatic + def test_force_new(self): - source_path = "memory://source.zarr" make_test_dataset( uri=source_path, dims=("time", "lat", "lon"), @@ -145,12 +200,13 @@ def test_force_new(self): (target_dir / "0.zarr" / ".zgroup").write("{}") self.assertTrue(target_dir.exists()) - write_levels(source_path=source_path, target_path=target_dir.uri, force_new=True) + write_levels( + source_path=source_path, target_path=target_dir.uri, force_new=True + ) self.assertTrue(target_dir.exists()) def test_default_x_y_with_crs(self): - source_path = "memory://source.zarr" make_test_dataset( uri=source_path, dims=("time", "y", "x"), @@ -185,7 +241,6 @@ def test_default_x_y_with_crs(self): self.assert_level(target_dir.uri + "/3.zarr", 3, has_crs=True) def test_default_lon_lat_no_crs(self): - source_path = "memory://source.zarr" make_test_dataset( uri=source_path, dims=("time", "lat", "lon"), @@ -219,6 +274,78 @@ def test_default_lon_lat_no_crs(self): self.assert_level(target_dir.uri + "/2.zarr", 2, xy_dims=xy_dims) self.assert_level(target_dir.uri + "/3.zarr", 3, xy_dims=xy_dims) + def test_default_lon_lat_no_crs_from_source_ds(self): + source_ds = make_test_dataset( + uri=source_path, + dims=("time", "lat", "lon"), + shape=(3, 1024, 2048), + chunks=(1, 128, 256), + ) + + target_dir = FileObj("memory://target.levels") + self.assertFalse(target_dir.exists()) + + write_levels(source_ds=source_ds, target_path=target_dir.uri) + + self.assertTrue(target_dir.exists()) + + levels_file = target_dir.for_path(".zlevels") + self.assertTrue(levels_file.exists()) + levels_info = json.loads(levels_file.read()) + self.assertEqual( + { + "version": "1.0", + "num_levels": 4, + "agg_methods": {"chl": "mean", "tsm": "mean"}, + "use_saved_levels": False, + }, + levels_info, + ) + + xy_dims = "lon", "lat" + self.assert_level(target_dir.uri + "/0.zarr", 0, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/1.zarr", 1, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/2.zarr", 2, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/3.zarr", 3, xy_dims=xy_dims) + + def test_default_lon_lat_no_crs_from_source_ds_with_offset(self): + source_ds = make_test_dataset( + uri=source_path, + dims=("time", "lat", "lon"), + shape=(10, 1024, 2048), + chunks=(1, 128, 256), + ) + + target_dir = FileObj("memory://target.levels") + self.assertFalse(target_dir.exists()) + + write_levels( + source_ds=source_ds, + source_append_offset=7, + target_path=target_dir.uri, + ) + + self.assertTrue(target_dir.exists()) + + levels_file = target_dir.for_path(".zlevels") + self.assertTrue(levels_file.exists()) + levels_info = json.loads(levels_file.read()) + self.assertEqual( + { + "version": "1.0", + "num_levels": 4, + "agg_methods": {"chl": "mean", "tsm": "mean"}, + "use_saved_levels": False, + }, + levels_info, + ) + + xy_dims = "lon", "lat" + self.assert_level(target_dir.uri + "/0.zarr", 0, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/1.zarr", 1, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/2.zarr", 2, xy_dims=xy_dims) + self.assert_level(target_dir.uri + "/3.zarr", 3, xy_dims=xy_dims) + def test_link_level_zero(self): source_dir = FileObj("memory://source.zarr") make_test_dataset( diff --git a/zappend/contrib/__init__.py b/zappend/contrib/__init__.py new file mode 100644 index 0000000..5a121ba --- /dev/null +++ b/zappend/contrib/__init__.py @@ -0,0 +1,5 @@ +# Copyright © 2024 Norman Fomferra and contributors +# Permissions are hereby granted under the terms of the MIT License: +# https://opensource.org/licenses/MIT. + +from .levels import write_levels diff --git a/zappend/levels.py b/zappend/contrib/levels.py similarity index 83% rename from zappend/levels.py rename to zappend/contrib/levels.py index 19e37ec..69b368f 100644 --- a/zappend/levels.py +++ b/zappend/contrib/levels.py @@ -17,8 +17,11 @@ def write_levels( - source_path: str, + *, + source_ds: xr.Dataset | None = None, + source_path: str | None = None, source_storage_options: dict[str, Any] | None = None, + source_append_offset: int | None = None, target_path: str | None = None, num_levels: int | None = None, tile_size: tuple[int, int] | None = None, @@ -28,12 +31,13 @@ def write_levels( xy_dim_names: tuple[str, str] | None = None, **zappend_config, ): - """Writes a dataset at `source_path` to `target_path` using the + """Writes a dataset given by `source_ds` or `source_path` to `target_path` + using the [multi-level dataset format](https://xcube.readthedocs.io/en/latest/mldatasets.html) as specified by [xcube](https://github.com/xcube-dev/xcube). - It resembles the `store.write_data(cube, ".levels", ...)` method + It resembles the `store.write_data(dataset, ".levels", ...)` method provided by the xcube filesystem data stores ("file", "s3", "memory", etc.). The zappend version may be used for potentially very large datasets in terms of dimension sizes or for datasets with very large number of chunks. @@ -58,9 +62,17 @@ def write_levels( Important: This function requires the `xcube` package to be installed. Args: + source_ds: The source dataset. + Must be given in case `source_path` is not given. source_path: The source dataset path. + If `source_ds` is provided and `link_level_zero` is true, + then `source_path` must also be provided in order + to determine the path of the level zero source. source_storage_options: Storage options for the source dataset's filesystem. + source_append_offset: Optional offset in the append dimension. + Only slices with indexes greater or equal the offset are + appended. target_path: The target multi-level dataset path. Filename extension should be `.levels`, by convention. If not given, `target_dir` should be passed as part of the @@ -102,14 +114,15 @@ def write_levels( from xcube.core.tilingscheme import get_num_levels from xcube.util.fspath import get_fs_path_class + config = zappend_config.pop("config", None) + if config is not None: + raise TypeError("write_levels() got an unexpected keyword argument 'config'") + dry_run = zappend_config.pop("dry_run", False) if dry_run and use_saved_levels: warnings.warn(f"'use_saved_levels' argument is not applicable if dry_run=True") use_saved_levels = False - config = zappend_config.pop("config", None) - if config is not None: - raise TypeError("write_levels() got an unexpected keyword argument 'config'") target_dir = zappend_config.pop("target_dir", None) if not target_path and not target_dir: @@ -124,16 +137,49 @@ def write_levels( target_path, **target_storage_options ) - source_fs, source_root = fsspec.core.url_to_fs( - source_path, - **( - source_storage_options - if source_storage_options is not None - else target_storage_options - ), - ) - source_store = source_fs.get_mapper(root=source_root) - source_ds = xr.open_zarr(source_store) + force_new = zappend_config.pop("force_new", None) + + if source_path is not None: + source_fs, source_root = fsspec.core.url_to_fs( + source_path, + **( + source_storage_options + if source_storage_options is not None + else target_storage_options + ), + ) + if source_ds is None: + source_store = source_fs.get_mapper(root=source_root) + source_ds = xr.open_zarr(source_store) + else: + source_root = None + if not isinstance(source_ds, xr.Dataset): + raise TypeError( + f"'source_ds' argument must be of type 'xarray.Dataset'," + f" but was {type(source_ds).__name__!r}" + ) + if link_level_zero: + raise ValueError( + f"'source_path' argument must be provided" + f" if 'link_level_zero' is used" + ) + + append_dim = zappend_config.pop("append_dim", "time") + append_coord = source_ds.coords[append_dim] + + if source_append_offset is None: + source_append_offset = 0 + elif not isinstance(source_append_offset, int): + raise TypeError( + f"'source_append_offset' argument must be of type 'int'," + f" but was {type(source_append_offset).__name__!r}" + ) + if not (0 <= source_append_offset < append_coord.size): + raise ValueError( + f"'source_append_offset' argument" + f" must be >=0 and <{append_coord.size}," + f" but was {source_append_offset}" + ) logger = logging.getLogger("zappend") @@ -157,11 +203,6 @@ def write_levels( agg_methods=agg_methods, ) - force_new = zappend_config.pop("force_new", None) - - append_dim = zappend_config.pop("append_dim", "time") - append_coord = source_ds.coords[append_dim] - variables = get_variables_config( source_ds, { @@ -207,9 +248,10 @@ def write_levels( subsample_dataset_kwargs = dict(xy_dim_names=xy_dim_names, agg_methods=agg_methods) - num_slices = append_coord.size + num_slices = append_coord.size - source_append_offset for slice_index in range(num_slices): - slice_ds_indexer = {append_dim: slice(slice_index, slice_index + 1)} + append_index = source_append_offset + slice_index + slice_ds_indexer = {append_dim: slice(append_index, append_index + 1)} slice_ds = source_ds.isel(slice_ds_indexer) for level_index in range(num_levels): From a8fd5b9663685353c5f15265e83fcf74d423330a Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Fri, 4 Oct 2024 10:09:03 +0200 Subject: [PATCH 15/19] Fix --- .coveragerc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.coveragerc b/.coveragerc index 8b2c475..f42f396 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,3 +1,3 @@ [run] omit = - zappend/levels.py \ No newline at end of file + zappend/contrib \ No newline at end of file From 1ab65c41cd3ec8cc2af15c5facbbf2c85bfacf88 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Fri, 4 Oct 2024 10:41:42 +0200 Subject: [PATCH 16/19] Enhanced docs --- docs/api.md | 10 ++++++---- zappend/contrib/__init__.py | 10 ++++++++++ zappend/contrib/levels.py | 14 ++++++++++++-- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/docs/api.md b/docs/api.md index 2335a01..353f937 100644 --- a/docs/api.md +++ b/docs/api.md @@ -46,8 +46,10 @@ All described objects can be imported from the `zappend.api` module. options: show_root_heading: true -## Function `write_levels()` for `xcube` +## Contributions -::: zappend.levels.write_levels - options: - show_root_heading: true +::: zappend.contrib + +### Function `write_levels()` + +::: zappend.contrib.write_levels diff --git a/zappend/contrib/__init__.py b/zappend/contrib/__init__.py index 5a121ba..03ab527 100644 --- a/zappend/contrib/__init__.py +++ b/zappend/contrib/__init__.py @@ -2,4 +2,14 @@ # Permissions are hereby granted under the terms of the MIT License: # https://opensource.org/licenses/MIT. +""" +This module contributes to zappend's core functionality. + +The function signatures in this module are less stable, and their implementations +are considered experimental. They may also rely on external packages. For more +information, please refer to the individual function documentation. +Due to these reasons, this module is excluded from the project's automatic +coverage analysis. +""" + from .levels import write_levels diff --git a/zappend/contrib/levels.py b/zappend/contrib/levels.py index 69b368f..622b444 100644 --- a/zappend/contrib/levels.py +++ b/zappend/contrib/levels.py @@ -31,7 +31,7 @@ def write_levels( xy_dim_names: tuple[str, str] | None = None, **zappend_config, ): - """Writes a dataset given by `source_ds` or `source_path` to `target_path` + """Write a dataset given by `source_ds` or `source_path` to `target_path` using the [multi-level dataset format](https://xcube.readthedocs.io/en/latest/mldatasets.html) as specified by @@ -59,7 +59,17 @@ def write_levels( If configuration parameter `variables` is also given as part of `zappend_config`, it will be merged with the chunk definitions. - Important: This function requires the `xcube` package to be installed. + **Important notes:** + + - This function requires the `xcube` package. + - `write_levels()` is not as robust as zappend itself. For example, + there may be inconsistent dataset levels if the processing + is interrupted while a level is appended. + - There is a remaining issue that with (coordinate) variables that + have a dimension that is not a dimension of any variable that has + one of the spatial dimensions, e.g., `time_bnds` with dimensions + `time` and `bnds`. Please exclude such variables using the parameter + `excluded_variables`. Args: source_ds: The source dataset. From 6930a41525ca242012be7313833624c038c0c935 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Fri, 4 Oct 2024 10:47:11 +0200 Subject: [PATCH 17/19] Enhanced docs --- zappend/contrib/levels.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zappend/contrib/levels.py b/zappend/contrib/levels.py index 622b444..2231fdd 100644 --- a/zappend/contrib/levels.py +++ b/zappend/contrib/levels.py @@ -61,7 +61,8 @@ def write_levels( **Important notes:** - - This function requires the `xcube` package. + - This function depends on `xcube.core.gridmapping.GridMapping` and + ` xcube.core.subsampling.subsample_dataset()` of the `xcube` package. - `write_levels()` is not as robust as zappend itself. For example, there may be inconsistent dataset levels if the processing is interrupted while a level is appended. From 1c681bb17bf31b5fe6c98982de25befeb5177938 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Fri, 4 Oct 2024 11:16:58 +0200 Subject: [PATCH 18/19] Fix --- .coveragerc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.coveragerc b/.coveragerc index f42f396..44ba55b 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,3 +1,3 @@ [run] omit = - zappend/contrib \ No newline at end of file + zappend/contrib/*.py From b6af3ec07fc21af59e086a71eb73819737f641c3 Mon Sep 17 00:00:00 2001 From: Norman Fomferra Date: Fri, 4 Oct 2024 11:19:36 +0200 Subject: [PATCH 19/19] Fix --- CHANGES.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 7905648..4c1a1df 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,9 @@ ## Version 0.8.0 (in development) -* Added experimental function `zappend.levels.write_levels()` that generates +* Added module `zappend.contrib` that contributes functions to + zappend's core functionality. + +* Added experimental function `zappend.contrib.write_levels()` that generates datasets using the [multi-level dataset format](https://xcube.readthedocs.io/en/latest/mldatasets.html) as specified by