diff --git a/cdsobs/cdm/api.py b/cdsobs/cdm/api.py index 8128a82..ea59361 100644 --- a/cdsobs/cdm/api.py +++ b/cdsobs/cdm/api.py @@ -83,6 +83,13 @@ def to_cdm_dataset(partition: DatasetPartition) -> CdmDataset: cdm_variables += cdm_variables_with_table_names cdm_variables = unique([v for v in cdm_variables if v in partition.data]) data = partition.data.loc[:, cdm_variables].set_index("observation_id") + original_variables = set(partition.data.columns) + removed_variables = original_variables - set(cdm_variables) + if len(removed_variables) > 0: + logger.warning( + "The following variables where read but are not in the CDM and " + f"are going to be dropped: {removed_variables}" + ) return CdmDataset(data, partition.partition_params, partition.dataset_metadata) @@ -330,6 +337,21 @@ def auxfield2metadata_name(self, var: str, aux_var: str) -> str: auxf["metadata_name"] for auxf in self[var] if auxf["auxvar"] == aux_var ][0] + def vars_with_processing_level(self) -> list[str]: + return [v for v in self if self.var_has_processing_level(v)] + + def var_has_processing_level(self, var: str) -> bool: + return any(auxf["auxvar"] in self.processing_level_fields for auxf in self[var]) + + @property + def processing_level_fields(self) -> list[str]: + return [auxf for auxf in self.all_list if "processing_level" in auxf] + + def get_var_processing_level_field_name(self, var: str) -> str: + return [ + auxf["auxvar"] for auxf in self[var] if "processing_level" in auxf["auxvar"] + ][0] + def get_aux_fields_mapping_from_service_definition( source_definition: SourceDefinition, variables: List[str] diff --git a/cdsobs/cdm/lite.py b/cdsobs/cdm/lite.py index d96e4ff..14f1a73 100644 --- a/cdsobs/cdm/lite.py +++ b/cdsobs/cdm/lite.py @@ -75,6 +75,7 @@ "negative_quasisystematic_uncertainty", "quality_flag", "combined_uncertainty", + "processing_level", ] cdm_lite_variables = dict( mandatory=variable_names, diff --git a/cdsobs/data/insitu-observations-near-surface-temperature-us-climate-reference-network/service_definition.yml b/cdsobs/data/insitu-observations-near-surface-temperature-us-climate-reference-network/service_definition.yml index 1584ab4..7884f2a 100644 --- a/cdsobs/data/insitu-observations-near-surface-temperature-us-climate-reference-network/service_definition.yml +++ b/cdsobs/data/insitu-observations-near-surface-temperature-us-climate-reference-network/service_definition.yml @@ -722,7 +722,7 @@ sources: units: K soil_temperature_processing_level: description: 'This parameter indicates the level of processing applied to the soil temperature measurement: raw data (''R''), corrected data (''C''). A letter ''U'' indicates that this information is unknown.' - dtype: float32 + dtype: object long_name: sur_temp_type soil_temperature_quality_flag: description: This parameter indicates if the "Soil temperature" (described in this list) may be used because it is based on good data (value of 0) or if it should be treated with suspicion because erroneous data were detected (value of 3). diff --git a/cdsobs/ingestion/api.py b/cdsobs/ingestion/api.py index efbd43c..46c0e7e 100644 --- a/cdsobs/ingestion/api.py +++ b/cdsobs/ingestion/api.py @@ -317,8 +317,9 @@ def _melt_variables( ) # Add quality flags - homogenised_data_melted["quality_flag"] = 3.0 vars_with_qf = aux_fields.vars_with_quality_field + if len(vars_with_qf) > 0: + homogenised_data_melted["quality_flag"] = 3.0 for var in vars_with_qf: var_mask = homogenised_data_melted["observed_variable"] == var flag_name = aux_fields.get_var_quality_flag_field_name(var) @@ -330,6 +331,20 @@ def _melt_variables( homogenised_data_melted["quality_flag"] = ( homogenised_data_melted["quality_flag"].fillna(3).astype("int") ) + # Add processing level + vars_with_pl = aux_fields.vars_with_processing_level() + if len(vars_with_pl) > 0: + homogenised_data_melted["processing_level"] = 6 + for var in vars_with_pl: + var_mask = homogenised_data_melted["observed_variable"] == var + pl_name = aux_fields.get_var_processing_level_field_name(var) + var_processing_level = homogenised_data_melted.loc[var_mask, pl_name] + homogenised_data_melted.loc[ + var_mask, "processing_level" + ] = var_processing_level + homogenised_data_melted = homogenised_data_melted.drop(pl_name, axis=1) + + homogenised_data_melted["processing_level"] = 6 # Encode observed_variables logger.info("Encoding observed variables using the CDM variable codes.") code_table = read_cdm_code_table(cdm_tables_location, "observed_variable").table diff --git a/cdsobs/ingestion/serialize.py b/cdsobs/ingestion/serialize.py index 3bc2a55..8aa33cb 100644 --- a/cdsobs/ingestion/serialize.py +++ b/cdsobs/ingestion/serialize.py @@ -106,6 +106,9 @@ def write_pandas_to_netcdf( if attrs is not None and v in attrs: ovar.attrs.update(attrs[v]) + oncobj.sync() + oncobj.close() + def to_netcdf( cdm_dataset: CdmDataset, tempdir: Path, encode_variables: bool = True diff --git a/tests/test_api.py b/tests/test_api.py index 6f70535..2af9e61 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -5,28 +5,35 @@ import sqlalchemy as sa from cdsobs.api import run_ingestion_pipeline, run_make_cdm +from cdsobs.cdm.api import open_netcdf +from cdsobs.cdm.lite import auxiliary_variable_names +from cdsobs.ingestion.core import get_aux_vars_from_service_definition from cdsobs.observation_catalogue.models import Catalogue from cdsobs.service_definition.api import get_service_definition +from cdsobs.storage import S3Client +from cdsobs.utils.logutils import get_logger from tests.utils import get_test_years +logger = get_logger(__name__) + @pytest.mark.parametrize( "dataset_name,source,test_update", [ - ( - "insitu-observations-woudc-ozone-total-column-and-profiles", - "OzoneSonde", - False, - ), - ( - "insitu-observations-woudc-ozone-total-column-and-profiles", - "TotalOzone", - False, - ), - ("insitu-observations-igra-baseline-network", "IGRA", False), - ("insitu-observations-igra-baseline-network", "IGRA_H", False), - ("insitu-comprehensive-upper-air-observation-network", "CUON", True), - ("insitu-observations-gruan-reference-network", "GRUAN", False), + # ( + # "insitu-observations-woudc-ozone-total-column-and-profiles", + # "OzoneSonde", + # False, + # ), + # ( + # "insitu-observations-woudc-ozone-total-column-and-profiles", + # "TotalOzone", + # False, + # ), + # ("insitu-observations-igra-baseline-network", "IGRA", False), + # ("insitu-observations-igra-baseline-network", "IGRA_H", False), + # ("insitu-comprehensive-upper-air-observation-network", "CUON", True), + # ("insitu-observations-gruan-reference-network", "GRUAN", False), ( "insitu-observations-near-surface-temperature-us-climate-reference-network", "uscrn_subhourly", @@ -47,25 +54,25 @@ "uscrn_monthly", False, ), - ( - "insitu-observations-gnss", - "IGS", - False, - ), - ( - "insitu-observations-gnss", - "EPN", - False, - ), - ( - "insitu-observations-gnss", - "IGS_R3", - False, - ), + # ( + # "insitu-observations-gnss", + # "IGS", + # False, + # ), + # ( + # "insitu-observations-gnss", + # "EPN", + # False, + # ), + # ( + # "insitu-observations-gnss", + # "IGS_R3", + # False, + # ), ], ) def test_run_ingestion_pipeline( - dataset_name, source, test_update, test_session, test_config, caplog + dataset_name, source, test_update, test_session, test_config, caplog, tmp_path ): start_year, end_year = get_test_years(source) service_definition = get_service_definition(dataset_name) @@ -83,6 +90,36 @@ def test_run_ingestion_pipeline( # assert insertions have been made counter = test_session.scalar(sa.select(sa.func.count()).select_from(Catalogue)) assert counter > 0 + # Check variables + asset = test_session.scalar(sa.select(Catalogue.asset)) + s3client = S3Client.from_config(test_config.s3config) + asset_filename = asset.split("/")[1] + asset_local_path = Path(tmp_path, asset_filename) + s3client.download_file( + s3client.get_bucket_name(dataset_name), asset_filename, asset_local_path + ) + dataset = open_netcdf(asset_local_path, decode_variables=True) + variables_in_file = set( + dataset.columns.tolist() + dataset.observed_variable.unique().tolist() + ) + aux_variables = get_aux_vars_from_service_definition(service_definition, source) + expected_variables = set(service_definition.sources[source].descriptions) - set( + aux_variables + ) + for v in [ + "observed_variable", + "observation_value", + "units", + ] + auxiliary_variable_names: + if v in variables_in_file: + expected_variables.add(v) + logger.info( + f"{variables_in_file - expected_variables} are in file but not in the descriptions" + ) + logger.info( + f"{expected_variables - variables_in_file} are not in file but are in the description" + ) + # assert variables_in_file == expected_variables if test_update: # testing update flag