Skip to content

Commit

Permalink
Merge pull request #14 from ecmwf-projects/aux-variables-fix_COPDS-1668
Browse files Browse the repository at this point in the history
Aux variables fix copds 1668
  • Loading branch information
aperezpredictia authored Jun 19, 2024
2 parents 6f2df54 + 22cd827 commit cae44b0
Show file tree
Hide file tree
Showing 18 changed files with 273 additions and 134 deletions.
37 changes: 2 additions & 35 deletions cdsobs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
from cdsobs.cdm.api import (
apply_unit_changes,
check_cdm_compliance,
read_cdm_code_tables,
)
from cdsobs.cdm.tables import read_cdm_tables
from cdsobs.config import CDSObsConfig, DatasetConfig
from cdsobs.ingestion.api import (
EmptyBatchException,
Expand All @@ -21,15 +19,14 @@
sort,
)
from cdsobs.ingestion.core import (
DatasetMetadata,
DatasetPartition,
SpaceBatch,
TimeBatch,
TimeSpaceBatch,
get_variables_from_service_definition,
)
from cdsobs.ingestion.partition import get_partitions, save_partitions
from cdsobs.ingestion.serialize import serialize_partition
from cdsobs.metadata import get_dataset_metadata
from cdsobs.observation_catalogue.repositories.cads_dataset import CadsDatasetRepository
from cdsobs.retrieve.filter_datasets import between
from cdsobs.service_definition.service_definition_models import ServiceDefinition
Expand Down Expand Up @@ -279,7 +276,7 @@ def _read_homogenise_and_partition(
time_space_batch: TimeSpaceBatch,
) -> Iterator[DatasetPartition]:
dataset_config = config.get_dataset(dataset_name)
dataset_metadata = _get_dataset_metadata(
dataset_metadata = get_dataset_metadata(
config, dataset_config, service_definition, source
)
# Get the data as a single big table with the names remmaped from
Expand Down Expand Up @@ -312,36 +309,6 @@ def _read_homogenise_and_partition(
return sorted_partitions


def _get_dataset_metadata(
config: CDSObsConfig,
dataset_config: DatasetConfig,
service_definition: ServiceDefinition,
source: str,
) -> DatasetMetadata:
# Handle the main variables
variables = get_variables_from_service_definition(service_definition, source)
# Read CDM tables
cdm_tables = read_cdm_tables(
config.cdm_tables_location, dataset_config.available_cdm_tables
)
cdm_code_tables = read_cdm_code_tables(config.cdm_tables_location)
# Get the name of the space columns
if service_definition.space_columns is not None:
space_columns = service_definition.space_columns
else:
space_columns = service_definition.sources[source].space_columns # type: ignore
# Pack dataset metadata into an object to carry on.
dataset_metadata = DatasetMetadata(
dataset_config.name,
source,
variables,
cdm_tables,
cdm_code_tables,
space_columns,
)
return dataset_metadata


def _validate_time_interval(homogenised_data: pandas.DataFrame, time_batch: TimeBatch):
start_date, end_date = time_batch.get_time_coverage()
times_check = between(homogenised_data.report_timestamp, start_date, end_date).all()
Expand Down
2 changes: 1 addition & 1 deletion cdsobs/api_rest/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,5 @@ def get_dataset_service_definition(dataset: str) -> ServiceDefinition:


@router.get("/cdm/lite_variables")
def get_cdm_lite_variables() -> list[str]:
def get_cdm_lite_variables() -> dict[str, list[str]]:
return cdm_lite_variables
21 changes: 4 additions & 17 deletions cdsobs/cdm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
check_table_cdm_compliance,
)
from cdsobs.cdm.code_tables import CDMCodeTable, CDMCodeTables
from cdsobs.cdm.lite import auxiliary_variable_names
from cdsobs.cdm.tables import CDMTables
from cdsobs.ingestion.core import (
DatasetMetadata,
Expand Down Expand Up @@ -326,29 +327,15 @@ def get_aux_fields_mapping_from_service_definition(
source_definition: SourceDefinition, variables: List[str]
) -> AuxFields:
"""Return the auxiliary (uncertainty) fields for each variable."""
possible_aux_fields = [
"total_uncertainty",
"positive_total_uncertainty",
"negative_total_uncertainty",
"max_positive_total_uncertainty",
"max_negative_total_uncertainty",
"min_positive_total_uncertainty",
"min_negative_total_uncertainty",
"random_uncertainty",
"positive_systematic_uncertainty",
"negative_systematic_uncertainty",
"quasisystematic_uncertainty",
"positive_quasisystematic_uncertainty",
"negative_quasisystematic_uncertainty",
"flag",
]
descriptions = source_definition.descriptions

def _get_aux_fields(description: Description) -> List:
description_dict = description.model_dump()
rename_dict = source_definition.cdm_mapping.rename
aux_fields = [
description_dict[af] for af in possible_aux_fields if af in description_dict
description_dict[af]
for af in auxiliary_variable_names
if af in description_dict
]
if rename_dict is not None:
aux_fields = [
Expand Down
23 changes: 21 additions & 2 deletions cdsobs/cdm/lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"actual_time",
"agency",
"observation_value",
"observation_value_total_uncertainty",
"city",
"country",
"height_of_station_above_sea_level",
Expand Down Expand Up @@ -61,4 +60,24 @@
"fg_depar@body",
"an_depar@body",
]
cdm_lite_variables = variable_names + optional_variable_names
auxiliary_variable_names = [
"total_uncertainty",
"positive_total_uncertainty",
"negative_total_uncertainty",
"max_positive_total_uncertainty",
"max_negative_total_uncertainty",
"min_positive_total_uncertainty",
"min_negative_total_uncertainty",
"random_uncertainty",
"positive_systematic_uncertainty",
"negative_systematic_uncertainty",
"quasisystematic_uncertainty",
"positive_quasisystematic_uncertainty",
"negative_quasisystematic_uncertainty",
"flag",
]
cdm_lite_variables = dict(
mandatory=variable_names,
optional=optional_variable_names,
auxiliary=auxiliary_variable_names,
)
16 changes: 16 additions & 0 deletions cdsobs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,19 @@
"EPN": ["precipitable_water_column"],
},
}
AUX_FIELDS = [
"total_uncertainty",
"positive_total_uncertainty",
"negative_total_uncertainty",
"max_positive_total_uncertainty",
"max_negative_total_uncertainty",
"min_positive_total_uncertainty",
"min_negative_total_uncertainty",
"random_uncertainty",
"positive_systematic_uncertainty",
"negative_systematic_uncertainty",
"quasisystematic_uncertainty",
"positive_quasisystematic_uncertainty",
"negative_quasisystematic_uncertainty",
"flag",
]
37 changes: 5 additions & 32 deletions cdsobs/data/insitu-observations-gnss/service_definition.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ out_columns_order:
- report_timestamp
- zenith_total_delay
- zenith_total_delay_random_uncertainty
- total_column_water_vapour
- precipitable_water_column
- precipitable_water_column_combined_uncertainty
- total_column_water_vapour_era5
- precipitable_water_column_era5
products_hierarchy:
- variables
- combined_uncertainty
Expand Down Expand Up @@ -120,7 +120,7 @@ sources:
precipitable_water_column_era5:
description: This parameter is the total amount of water vapour in a column extending vertically from the GNSS receiver position (near the surface) to the top of the atmosphere, retrieved from ERA5 at the station coordinates, altitude, date, and time (csv-lev only).
dtype: float32
long_name: total_column_water_vapour_era5
long_name: precipitable_water_column_era5
units: kg m-2
zenith_total_delay:
description: This parameter characterizes the delay of the GNSS signal on the path from a satellite to the receiver due to atmospheric refraction and bending, mapped into the zenith direction. It is expressed as an equivalent distance travelled additionally by the radio waves, due to the atmosphere. The numerical value of zenith total delay correlates with the amount of total column water vapour (i.e., not including effects of liquid water and/or ice) above the GNSS receiver antenna. It is hence used to estimate the total column water vapour.
Expand Down Expand Up @@ -166,9 +166,6 @@ sources:
- precipitable_water_column_era5
- zenith_total_delay
group_name: variables
- columns:
- era_ipw
group_name: era5
- columns:
- uncert_epn_ipw
group_name: total_uncertainty
Expand All @@ -178,8 +175,6 @@ sources:
space_columns:
y: latitude|station_configuration
x: longitude|station_configuration


IGS:
cdm_mapping:
melt_columns: true
Expand Down Expand Up @@ -269,14 +264,6 @@ sources:
dtype: object
long_name: idstation
name_for_output: station_name
total_column_water_vapour:
total_uncertainty: uncert_gnss_ipw
description: This parameter is the total amount of water vapour in a column extending vertically from the GNSS receiver position (near the surface) to the top of the atmosphere.
dtype: float32
era5: era_ipw
long_name: gnss_ipw
name_for_output: total_column_water_vapour
units: kg m-2
precipitable_water_column_total_uncertainty:
description: This parameter is the combined sum of all uncertainties in the total column water vapour derived from zenith total delay and ancillary meteorological data. The uncertainties that are included in the calculation include uncertainties of the observed zenith total delay, uncertainties of the ancillary data, and uncertainties of the coefficients used in the retrieval (csv-lev only).
dtype: float32
Expand All @@ -285,7 +272,7 @@ sources:
precipitable_water_column_era5:
description: This parameter is the total amount of water vapour in a column extending vertically from the GNSS receiver position (near the surface) to the top of the atmosphere, retrieved from ERA5 at the station coordinates, altitude, date, and time (csv-lev only).
dtype: float32
long_name: total_column_water_vapour_era5
long_name: precipitable_water_column_era5
units: kg m-2
zenith_total_delay:
description: This parameter characterizes the delay of the GNSS signal on the path from a satellite to the receiver due to atmospheric refraction and bending, mapped into the zenith direction. It is expressed as an equivalent distance travelled additionally by the radio waves, due to the atmosphere. The numerical value of zenith total delay correlates with the amount of total column water vapour (i.e., not including effects of liquid water and/or ice) above the GNSS receiver antenna. It is hence used to estimate the total column water vapour.
Expand Down Expand Up @@ -331,9 +318,6 @@ sources:
- precipitable_water_column_era5
- zenith_total_delay
group_name: variables
- columns:
- era_ipw
group_name: era5
- columns:
- uncert_gnss_ipw
group_name: total_uncertainty
Expand Down Expand Up @@ -432,14 +416,6 @@ sources:
dtype: object
long_name: idstation
name_for_output: station_name
total_column_water_vapour:
total_uncertainty: uncert_gnss_ipw
description: This parameter is the total amount of water vapour in a column extending vertically from the GNSS receiver position (near the surface) to the top of the atmosphere.
dtype: float32
era5: era_ipw
long_name: gnss_ipw
name_for_output: total_column_water_vapour
units: kg m-2
precipitable_water_column_total_uncertainty:
description: This parameter is the combined sum of all uncertainties in the total column water vapour derived from zenith total delay and ancillary meteorological data. The uncertainties that are included in the calculation include uncertainties of the observed zenith total delay, uncertainties of the ancillary data, and uncertainties of the coefficients used in the retrieval (csv-lev only).
dtype: float32
Expand All @@ -448,7 +424,7 @@ sources:
precipitable_water_column_era5:
description: This parameter is the total amount of water vapour in a column extending vertically from the GNSS receiver position (near the surface) to the top of the atmosphere, retrieved from ERA5 at the station coordinates, altitude, date, and time (csv-lev only).
dtype: float32
long_name: total_column_water_vapour_era5
long_name: precipitable_water_column_era5
units: kg m-2
zenith_total_delay:
description: This parameter characterizes the delay of the GNSS signal on the path from a satellite to the receiver due to atmospheric refraction and bending, mapped into the zenith direction. It is expressed as an equivalent distance travelled additionally by the radio waves, due to the atmosphere. The numerical value of zenith total delay correlates with the amount of total column water vapour (i.e., not including effects of liquid water and/or ice) above the GNSS receiver antenna. It is hence used to estimate the total column water vapour.
Expand Down Expand Up @@ -494,9 +470,6 @@ sources:
- precipitable_water_column_era5
- zenith_total_delay
group_name: variables
- columns:
- era_ipw
group_name: era5
- columns:
- uncert_gnss_ipw
group_name: total_uncertainty
Expand Down
64 changes: 53 additions & 11 deletions cdsobs/forms_jsons.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@

from cdsobs.cli._catalogue_explorer import stats_summary
from cdsobs.constraints import iterative_ordering
from cdsobs.ingestion.core import get_variables_from_service_definition
from cdsobs.ingestion.core import (
get_aux_vars_from_service_definition,
get_variables_from_sc_group,
)
from cdsobs.observation_catalogue.models import Catalogue
from cdsobs.observation_catalogue.repositories.catalogue import CatalogueRepository
from cdsobs.retrieve.retrieve_services import merged_constraints_table
Expand Down Expand Up @@ -61,15 +64,8 @@ def get_variables_json(dataset: str, output_path: Path) -> Path:
service_definition = get_service_definition(dataset)
variables_json_content = {}
for source_name, source in service_definition.sources.items():
source_variables = get_variables_from_service_definition(
service_definition, source_name
)
# We do not want to pick up auxiliary variables here.
descriptions = {
k: v.model_dump()
for k, v in source.descriptions.items()
if k in source_variables
}
# We also pick up auxiliary variables here.
descriptions = {k: v.model_dump() for k, v in source.descriptions.items()}
variables_json_content[source_name] = descriptions
output_file_path = Path(output_path, "variables.json")
logger.info(f"Writing {output_file_path}")
Expand Down Expand Up @@ -105,6 +101,22 @@ def get_constraints_json(session, output_path: Path, dataset) -> Path:
flat_constraints["month"] = times.dt.month.astype("str").str.rjust(2, "0")
flat_constraints["day"] = times.dt.day.astype("str").str.rjust(2, "0")
flat_constraints = flat_constraints.drop("time", axis=1)
# Auxiliary variables are metadata, but they are still added as variables here for
# backwards compatibility
service_definition = get_service_definition(dataset)
aux_variables = get_all_aux_variables(service_definition)
aux_flat_constraints_dicts = []
# When we find an aux variable related to the main variable we add it
# Would it be better to use the descriptions? Probably
# TODO: Use the descriptions
for row in flat_constraints.itertuples(index=False):
for auxvar in aux_variables:
if row.variables in auxvar:
newrow = row._replace(variables=auxvar)
aux_flat_constraints_dicts.append(newrow)
aux_flat_constraints = pandas.DataFrame(aux_flat_constraints_dicts)
flat_constraints = pandas.concat([flat_constraints, aux_flat_constraints])
# Compress constraints
logger.info("Computing compressed constraints")
compressed_constraints = iterative_ordering(
flat_constraints, flat_constraints.columns
Expand All @@ -119,9 +131,11 @@ def get_constraints_json(session, output_path: Path, dataset) -> Path:
def get_widgets_json(session, output_path: Path, dataset: str) -> Path:
"""JSON file with the variables and their metadata."""
catalogue_entries = get_catalogue_entries_stream(session, dataset)
service_definition = get_service_definition(dataset)
variables = get_all_variables_in_products(service_definition)
summary = stats_summary(catalogue_entries)
widgets_json_content = dict()
widgets_json_content["variables"] = summary["available variables"]
widgets_json_content["variables"] = variables
time_coverage_start, time_coverage_end = summary["total time coverage"]
start_year = int(time_coverage_start[0:4])
end_year = int(time_coverage_end[0:4])
Expand All @@ -143,6 +157,34 @@ def get_widgets_json(session, output_path: Path, dataset: str) -> Path:
return widgets_output_path


def get_all_variables(service_definition):
variables = []
# Includes also auxiliary variables
for source_name, source in service_definition.sources.items():
variables.extend([variable for variable in source.descriptions])
return variables


def get_all_variables_in_products(service_definition):
variables = []
# Includes also auxiliary variables
for source_name, source in service_definition.sources.items():
for product in source.products:
variables.extend(get_variables_from_sc_group(source, product.group_name))
return list(set(variables))


def get_all_aux_variables(service_definition):
variables = []
# Includes also auxiliary variables
for source_name, source in service_definition.sources.items():
aux_variables = get_aux_vars_from_service_definition(
service_definition, source_name
)
variables.extend(aux_variables)
return list(set(variables))


def get_station_summary(
dataset: str, session, storage_url: str, output_path: Path
) -> Path:
Expand Down
Loading

0 comments on commit cae44b0

Please sign in to comment.