From 0f34b3b0fa028b4dcb6fae6a130228817c31dbc2 Mon Sep 17 00:00:00 2001 From: garciam Date: Wed, 28 Aug 2024 16:47:16 +0200 Subject: [PATCH 1/3] fixes for woudc according to the new check this adds the missing metadata fields --- cdsobs/cdm/api.py | 4 +- .../service_definition.yml | 57 ++++++++----------- cdsobs/ingestion/readers/cuon.py | 13 +---- cdsobs/ingestion/readers/sql.py | 20 +------ .../service_definition_models.py | 9 ++- cdsobs/utils/utils.py | 19 ++++++- tests/system/check_missing_variables.py | 4 +- 7 files changed, 60 insertions(+), 66 deletions(-) diff --git a/cdsobs/cdm/api.py b/cdsobs/cdm/api.py index 5453c89..0e3feba 100644 --- a/cdsobs/cdm/api.py +++ b/cdsobs/cdm/api.py @@ -412,7 +412,9 @@ def apply_unit_changes( _extract_variable_units_change( homogenised_data, source_definition, unit_changes, variable, varname2units ) - + # Check if no units have changed and then remove original units. + if homogenised_data["original_units"].equals(homogenised_data["units"]): + homogenised_data = homogenised_data.drop("original_units", axis=1) return homogenised_data diff --git a/cdsobs/data/insitu-observations-woudc-ozone-total-column-and-profiles/service_definition.yml b/cdsobs/data/insitu-observations-woudc-ozone-total-column-and-profiles/service_definition.yml index d7b3a37..067dc25 100644 --- a/cdsobs/data/insitu-observations-woudc-ozone-total-column-and-profiles/service_definition.yml +++ b/cdsobs/data/insitu-observations-woudc-ozone-total-column-and-profiles/service_definition.yml @@ -5,7 +5,7 @@ global_attributes: out_columns_order: - platform_type - station_name -- other_ids +- secondary_id - sensor_id - report_timestamp - daily_timestamp @@ -65,7 +65,7 @@ sources: ozone_reference_model: reference_model ozone_reference_total_o3: ozone_reference_total_ozone ozone_reference_utc_mean: ozone_reference_time_mean - platform_gaw_id: other_ids + platform_gaw_id: secondary_id platform_id: primary_station_id platform_type: platform_type pressure: z_coordinate @@ -82,19 +82,17 @@ sources: wind_speed: wind_speed add: z_coordinate_type: pressure (Pa) - unit_changes: - geopotential_height: - names: - Pa: m - offset: 0 - scale: 1 data_table: woudc_ozonesonde_value descriptions: - air_pressure: - description: Atmospheric pressure of each level in Pascals. + z_coordinate: + description: z coordinate of observation dtype: float32 long_name: pressure units: Pa + z_coordinate_type: + description: Type of z coordinate + dtype: object + long_name: z_coordinate_type air_temperature: description: Level temperature Kelvin. dtype: float32 @@ -117,7 +115,7 @@ sources: units: decimal degrees level_code: description: Code for the level type. - dtype: float32 + dtype: int long_name: level_code latitude|station_configuration: description: Latitude of the instrument. @@ -139,7 +137,7 @@ sources: dtype: float32 long_name: height units: m - other_ids: + secondary_id: description: Three-letter GAW ID as issued by GAWSIS, if available (recommended). dtype: object long_name: platform_gaw_id @@ -231,10 +229,14 @@ sources: dtype: float32 long_name: wind_speed units: m s^-1 + report_id: + description: Unique ID for report + dtype: object + long_name: report_id header_columns: - platform_type - primary_station_id - - other_ids + - secondary_id - sensor_id - longitude|station_configuration - latitude|station_configuration @@ -251,7 +253,7 @@ sources: mandatory_columns: - platform_type - primary_station_id - - other_ids + - secondary_id - sensor_id - sensor_model - reference_model @@ -303,7 +305,7 @@ sources: monthly_npts: monthly_npts n_obs: number_of_observations obs_code: obs_code - platform_gaw_id: other_ids + platform_gaw_id: secondary_id platform_id: primary_station_id std_dev_o3: total_ozone_column_standard_deviation timestamp_datetime_first_day: report_timestamp @@ -311,17 +313,6 @@ sources: utc_end: time_end utc_mean: time_mean wl_code: wl_code - unit_changes: - column_sulphur_dioxide: - names: - Dobson-units: Dobson-units - offset: 0 - scale: 1 - total_ozone_column: - names: - Dobson-units: Dobson-units - offset: 0 - scale: 1 data_table: woudc_totalozone_value descriptions: column_sulphur_dioxide: @@ -364,7 +355,7 @@ sources: description: Code to designate the type of total ozone measurement. dtype: object long_name: obs_code - other_ids: + secondary_id: description: Three-letter GAW ID as issued by GAWSIS, if available (recommended). dtype: object long_name: platform_gaw_id @@ -415,10 +406,14 @@ sources: dtype: float32 long_name: wl_code name_for_output: wl_code + report_id: + description: Unique ID for report + dtype: object + long_name: report_id header_columns: - platform_type - primary_station_id - - other_ids + - secondary_id - sensor_id - longitude|station_configuration - latitude|station_configuration @@ -432,7 +427,7 @@ sources: mandatory_columns: - platform_type - primary_station_id - - other_ids + - secondary_id - sensor_id - monthly_npts - longitude @@ -453,10 +448,8 @@ sources: - columns: - total_ozone_column - column_sulphur_dioxide + - total_ozone_column_standard_deviation group_name: variables - - columns: - - std_dev_O3 - group_name: standard_deviation space_columns: y: latitude|station_configuration x: longitude|station_configuration diff --git a/cdsobs/ingestion/readers/cuon.py b/cdsobs/ingestion/readers/cuon.py index f0f15cb..66cd278 100644 --- a/cdsobs/ingestion/readers/cuon.py +++ b/cdsobs/ingestion/readers/cuon.py @@ -322,14 +322,7 @@ def read_cuon_netcdfs( # Check for emptiness if all([dt is None for dt in denormalized_tables]): raise EmptyBatchException - result = pandas.concat(denormalized_tables) - allnan_cols = [] - for col in result: - if result[col].isnull().all(): - allnan_cols.append(col) - logger.info(f"Removing columns {allnan_cols} as they don't have any data") - result = result.drop(allnan_cols, axis=1) - return result + return pandas.concat(denormalized_tables) def get_scheduler(): @@ -377,9 +370,7 @@ def get_denormalized_table_file( lats = dataset_cdm["header_table"]["latitude"] lons = dataset_cdm["header_table"]["longitude"] if (lats.dtype.kind == "S") or (lons.dtype.kind == "S"): - raise NoDataInFileException( - f"Skipping file {file_and_slices.path} with malformed latitudes" - ) + raise NoDataInFileException("Skipping file with malformed latitudes") lon_start, lon_end, lat_start, lat_end = time_space_batch.get_spatial_coverage() lon_mask = between(lons, lon_start, lon_end) lat_mask = between(lats, lat_start, lat_end) diff --git a/cdsobs/ingestion/readers/sql.py b/cdsobs/ingestion/readers/sql.py index 9388b54..3b33b0a 100644 --- a/cdsobs/ingestion/readers/sql.py +++ b/cdsobs/ingestion/readers/sql.py @@ -1,5 +1,5 @@ import datetime -from typing import Any, Protocol, Tuple +from typing import Protocol, Tuple import connectorx as cx import pandas @@ -13,6 +13,7 @@ SourceDefinition, ) from cdsobs.utils.logutils import get_logger +from cdsobs.utils.utils import invert_dict logger = get_logger(__name__) @@ -303,23 +304,6 @@ def get_sql_data_types( } -def is_unique(x: Any) -> bool: - return len(x) == len(set(x)) - - -def invert_dict(idict: dict) -> dict: - """Return the inverse of a dictionary. - - It must be bijective, this is, keys and values need to be unique - """ - try: - assert is_unique(idict) - assert is_unique(idict.values()) - except AssertionError: - raise AssertionError("Dict must be bijective (keys and values must be unique.)") - return {v: k for k, v in idict.items()} - - def get_time_field(source_definition: SourceDefinition) -> Tuple[str, bool]: rename_dict = source_definition.cdm_mapping.rename descriptions = source_definition.descriptions diff --git a/cdsobs/service_definition/service_definition_models.py b/cdsobs/service_definition/service_definition_models.py index 51d6135..5ac559b 100644 --- a/cdsobs/service_definition/service_definition_models.py +++ b/cdsobs/service_definition/service_definition_models.py @@ -12,6 +12,7 @@ from cdsobs.service_definition.utils import custom_assert from cdsobs.utils.types import NotEmptyUniqueStrList, StrNotBlank +from cdsobs.utils.utils import invert_dict """ models """ @@ -147,13 +148,17 @@ def valid_mandatory_column_defs( return mandatory_columns def get_raw_header_columns(self): + inv_rename_dict = invert_dict(self.cdm_mapping.rename) return [ - k for k, v in self.cdm_mapping.rename.items() if v in self.header_columns + inv_rename_dict[v] if v in inv_rename_dict else v + for v in self.header_columns ] def get_raw_mandatory_columns(self): + inv_rename_dict = invert_dict(self.cdm_mapping.rename) return [ - k for k, v in self.cdm_mapping.rename.items() if v in self.mandatory_columns + inv_rename_dict[v] if v in inv_rename_dict else v + for v in self.mandatory_columns ] diff --git a/cdsobs/utils/utils.py b/cdsobs/utils/utils.py index b201c2e..2432783 100644 --- a/cdsobs/utils/utils.py +++ b/cdsobs/utils/utils.py @@ -1,7 +1,7 @@ import hashlib import json from pathlib import Path -from typing import Sequence, cast +from typing import Any, Sequence, cast import h5netcdf import numpy @@ -89,3 +89,20 @@ def datetime_to_seconds(dates: pandas.Series) -> numpy.ndarray: def get_database_session(url: str) -> Session: engine = create_engine(url) # echo=True for more descriptive logs return sessionmaker(autocommit=False, autoflush=False, bind=engine)() + + +def invert_dict(idict: dict) -> dict: + """Return the inverse of a dictionary. + + It must be bijective, this is, keys and values need to be unique + """ + try: + assert is_unique(idict) + assert is_unique(idict.values()) + except AssertionError: + raise AssertionError("Dict must be bijective (keys and values must be unique.)") + return {v: k for k, v in idict.items()} + + +def is_unique(x: Any) -> bool: + return len(x) == len(set(x)) diff --git a/tests/system/check_missing_variables.py b/tests/system/check_missing_variables.py index 8223f6d..1ba35df 100644 --- a/tests/system/check_missing_variables.py +++ b/tests/system/check_missing_variables.py @@ -49,7 +49,9 @@ def test_run_ingestion_pipeline( ).set_index(index_cols) # Get the file asset = test_session.scalar( - sa.select(Catalogue.asset).where(Catalogue.dataset == dataset_name) + sa.select(Catalogue.asset).where( + Catalogue.dataset == dataset_name, Catalogue.dataset_source == source + ) ) s3client = S3Client.from_config(test_config.s3config) asset_filename = asset.split("/")[1] From fb91f2a62c653fab37af9a9f547a8e792e42e539 Mon Sep 17 00:00:00 2001 From: garciam Date: Wed, 28 Aug 2024 17:21:30 +0200 Subject: [PATCH 2/3] fixed test --- .../service_definition.yml | 2 +- tests/test_cdm_api.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/cdsobs/data/insitu-observations-gruan-reference-network/service_definition.yml b/cdsobs/data/insitu-observations-gruan-reference-network/service_definition.yml index 12b0b72..1ec5231 100644 --- a/cdsobs/data/insitu-observations-gruan-reference-network/service_definition.yml +++ b/cdsobs/data/insitu-observations-gruan-reference-network/service_definition.yml @@ -87,7 +87,7 @@ sources: names: s: s offset: 0 - scale: 11 + scale: 1 data_table: gruan_data_value descriptions: air_pressure: diff --git a/tests/test_cdm_api.py b/tests/test_cdm_api.py index 4e316bd..bbd73df 100644 --- a/tests/test_cdm_api.py +++ b/tests/test_cdm_api.py @@ -16,6 +16,7 @@ ) from cdsobs.metadata import get_dataset_metadata from cdsobs.service_definition.api import get_service_definition +from cdsobs.service_definition.service_definition_models import UnitChange def test_check_cdm_compliance(test_config, caplog): @@ -210,6 +211,11 @@ def test_apply_variable_unit_change(test_config): dataset_name, service_definition, source, test_config ) source_definition = service_definition.sources[source] + source_definition.descriptions["geopotential_height"].units = "J kg-1" + source_definition.cdm_mapping.unit_changes = dict() + source_definition.cdm_mapping.unit_changes["geopotential_height"] = UnitChange( + names={"m": "J kg-1"}, offset=0, scale=0.01020408163265306 + ) cdm_code_tables = read_cdm_code_tables(test_config.cdm_tables_location) actual = apply_unit_changes( homogenised_data, source_definition, cdm_code_tables["observed_variable"] From 8ad57584c5e12ed2957a096cafb7f6ece0f4ba8c Mon Sep 17 00:00:00 2001 From: garciam Date: Wed, 28 Aug 2024 17:41:42 +0200 Subject: [PATCH 3/3] add woudc_metadata CDM table to WOUDC --- cdsobs/data/cdsobs_config_template.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cdsobs/data/cdsobs_config_template.yml b/cdsobs/data/cdsobs_config_template.yml index 0763a69..be9f06a 100644 --- a/cdsobs/data/cdsobs_config_template.yml +++ b/cdsobs/data/cdsobs_config_template.yml @@ -23,6 +23,11 @@ datasets: lon_tile_size: 180 lat_tile_size: 90 time_tile_size: year + available_cdm_tables: + - observations_table + - header_table + - station_configuration + - woudc_metadata - name: insitu-observations-igra-baseline-network lon_tile_size: 45 lat_tile_size: 45