Skip to content

Commit

Permalink
Merge pull request #28 from ecmwf-projects/update-cuon-service-defini…
Browse files Browse the repository at this point in the history
…tion

add uncertaty and bias variables to CUON
  • Loading branch information
aperezpredictia authored Aug 26, 2024
2 parents a23c08c + b5662a3 commit 243a7dd
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 59 deletions.
2 changes: 1 addition & 1 deletion cdsobs/cdm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ def _extract_variable_units_change(
observed_variable = homogenised_data.loc[:, "observed_variable"]
description_units = source_definition.descriptions[variable].units
observed_variable_mask = observed_variable == variable
if variable not in unit_changes:
if unit_changes is None or variable not in unit_changes:
# Do not change units, set both units columns to be equal.
new_units = description_units
original_units = description_units
Expand Down
6 changes: 5 additions & 1 deletion cdsobs/cdm/denormalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ def denormalize_tables(
dataset_cdm[parent_table_name] = joined_table
# Here we merge the tables that have a one to one relationship with the
# observations table
for table_name in ["era5fb_table", "advanced_homogenisation"]:
for table_name in [
"era5fb_table",
"advanced_homogenisation",
"advanced_uncertainty",
]:
if table_name in tables_to_use:
if len(dataset_cdm["observations_table"]) != len(dataset_cdm[table_name]):
logger.warning(
Expand Down
5 changes: 5 additions & 0 deletions cdsobs/cdm/lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"exposure_of_sensor",
"fg_depar@body",
"an_depar@body",
"fg_depar@offline",
]
auxiliary_variable_names = [
"total_uncertainty",
Expand All @@ -76,6 +77,10 @@
"quality_flag",
"combined_uncertainty",
"processing_level",
"desroziers_30_uncertainy",
"RISE_bias_estimate",
"humidity_bias_estimate",
"wind_bias_estimate",
]
cdm_lite_variables = dict(
mandatory=variable_names,
Expand Down
1 change: 1 addition & 0 deletions cdsobs/data/cdsobs_config_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ datasets:
- station_configuration
- era5fb_table
- advanced_homogenisation
- advanced_uncertainty
reader: "cdsobs.ingestion.readers.cuon.read_cuon_netcdfs"
reader_extra_args:
input_dir: "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,11 @@ products_hierarchy:
sources:
CUON:
cdm_mapping:
rename:
desroziers_30: desroziers_30_uncertainy
melt_columns: false
unit_changes:
aerosol_absorption_optical_depth:
names:
'1': '1'
offset: 0
scale: 1
data_table: observations_table
descriptions:
aerosol_absorption_optical_depth:
description: Vertical column integral of spectral aerosol absorption coefficient
dtype: float32
long_name: aerosol_absorption_optical_depth
units: '1'
air_pressure:
description: Barometric air pressure
dtype: float32
Expand All @@ -57,42 +48,62 @@ sources:
dtype: float32
long_name: air_temperature
units: K
RISE_bias_estimate: RISE_bias_estimate
desroziers_30_uncertainy: desroziers_30_uncertainy
air_dewpoint:
description: Dewpoint measurement (from profile measurement)
dtype: float32
long_name: air_dewpoint
units: K
humidity_bias_estimate: humidity_bias_estimate
desroziers_30_uncertainy: desroziers_30_uncertainy
desroziers_30_uncertainy:
description: Desroziers uncertainty v 1.0 - 30 days window
long_name: desroziers_30_uncertainy
dtype: float32
units: same as the variable
dew_point_depression:
description: The difference between air temperature and dew point temperature. The dew point temperature is the temperature to which a given air parcel must be cooled at constant pressure and constant water vapour content in order for saturation to occur
long_name: dew_point_depression
dtype: float32
units: K
desroziers_30_uncertainy: desroziers_30_uncertainy
eastward_wind_speed:
description: Wind towards the east
dtype: float32
long_name: eastward_wind_speed
wind_bias_estimate: wind_bias_estimate
units: m s-1
desroziers_30_uncertainy: desroziers_30_uncertainy
northward_wind_speed:
description: Wind towards the north
dtype: float32
long_name: northward_wind_speed
units: m s-1
desroziers_30_uncertainy: desroziers_30_uncertainy
geopotential_height:
description: Height of a standard or significant pressure level in meters
dtype: float32
long_name: geopotential_height
units: m
location_latitude:
units: J kg-1
desroziers_30_uncertainy: desroziers_30_uncertainy
latitude|header_table:
description: Latitude of the station (deg. North)
dtype: float32
long_name: latitude
units: degree_north
location_longitude:
units: degrees_north
longitude|header_table:
description: Longitude of the station (deg. East)
dtype: float32
long_name: longitude
units: degree_east
units: degrees_east
relative_humidity:
description: Relative humidity (from profile measurement)
dtype: float32
long_name: relative_humidity
units: m
units: '1'
humidity_bias_estimate: humidity_bias_estimate
desroziers_30_uncertainy: desroziers_30_uncertainy
report_timestamp:
description: Observation date time UTC
dtype: datetime64[ns]
Expand All @@ -101,6 +112,9 @@ sources:
description: specific means per unit mass. Specific humidity is the mass fraction of water vapor in (moist) air.
dtype: float32
long_name: specific_humidity
humidity_bias_estimate: humidity_bias_estimate
units: kg kg-1
desroziers_30_uncertainy: desroziers_30_uncertainy
station_name:
description: Station identification code
dtype: object
Expand All @@ -109,10 +123,15 @@ sources:
description: "Wind direction with 0°:north, 90°:east, 180°:south, 270°:west"
dtype: float32
long_name: wind_from_direction
wind_bias_estimate: wind_bias_estimate
units: deg
desroziers_30_uncertainy: desroziers_30_uncertainy
wind_speed:
description: Wind speed
dtype: float32
long_name: wind_speed
units: m s-1
desroziers_30_uncertainy: desroziers_30_uncertainy
secondary_id:
description: Secondary id
dtype: object
Expand All @@ -121,6 +140,21 @@ sources:
description: Model ID where applicable.
dtype: object
long_name: instrument_model
RISE_bias_estimate:
description: RISE bias estimate
long_name: RISE_bias_estimate
dtype: float32
units: same as the variable
humidity_bias_estimate:
description: Humidity bias estimate
long_name: humidity_bias_estimate
dtype: float32
units: same as the variable
wind_bias_estimate:
description: Wind bias estimate
long_name: wind_bias_estimate
dtype: float32
units: same as the variable


header_columns:
Expand All @@ -144,7 +178,6 @@ sources:
- air_pressure
products:
- columns:
- aerosol_absorption_optical_depth
- air_temperature
- geopotential_height
- relative_humidity
Expand All @@ -157,6 +190,14 @@ sources:
- specific_humidity
- wind_from_direction
group_name: variables
- columns:
- RISE_bias_estimate
- humidity_bias_estimate
- wind_bias_estimate
group_name: bias_estimates
- columns:
- desroziers_30_uncertainy
group_name: uncertainty
space_columns:
y: latitude|station_configuration
x: longitude|station_configuration
y: latitude|header_table
x: longitude|header_table
16 changes: 4 additions & 12 deletions cdsobs/ingestion/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from importlib import import_module
from pathlib import Path
from typing import List, Tuple
from typing import List

import numpy
import pandas
Expand Down Expand Up @@ -151,20 +151,12 @@ def cast_to_descriptions(
return data_renamed


def _get_latlon_names(data: pandas.DataFrame) -> Tuple[str, str]:
if "longitude" in data:
latname = "latitude"
lonname = "longitude"
else:
latname = "latitude|station_configuration"
lonname = "longitude|station_configuration"
return latname, lonname


def sort(partition: DatasetPartition) -> DatasetPartition:
"""Sort data of a partition."""
logger.info("Sorting partition data")
latname, lonname = _get_latlon_names(partition.data)
space_columns = partition.dataset_metadata.space_columns
latname = space_columns.y
lonname = space_columns.x
partition.data.sort_values(
by=["report_timestamp", latname, lonname], kind="mergesort", inplace=True
)
Expand Down
71 changes: 63 additions & 8 deletions cdsobs/ingestion/readers/cuon.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,39 @@ def _process_table(
slices: dict[str, slice],
) -> dict[str, dict[str, numpy.ndarray]]:
if table_name in sorted_by_variable:
vals_to_exclude = ["index", "recordtimestamp", "string1"]
vals_to_exclude = [
"index",
"recordtimestamp",
"string1",
"type",
"expver",
"class",
"stream",
"report_status@hdr",
"report_event1@hdr",
"report_rdbflag@hdr",
"lat@hdr",
"lon@hdr",
"lsm@modsurf",
"orography@modsurf",
"windspeed10m@modsurf",
"vertco_reference_2@body",
"ppcode@conv_body",
"datum_anflag@body",
"datum_status@body",
"datum_event1@body",
"datum_rdbflag@body",
"qc_pge@body",
"lsm@surfbody_feedback",
"obs_error@errstat",
"final_obs_error@errstat",
"fg_error@errstat",
"eda_spread@errstat",
"processing_level",
"location_method",
"source_id",
"crs",
]
file_vars = [
fv
for fv in numpy.array(hfile["recordindices"])
Expand All @@ -111,7 +143,11 @@ def _process_table(
logger.debug(f"Reading variable {variable}")
selector = slices[variable]
# dropping string dims - not necessary for dataframes
fields = [f for f in hfile[table_name] if "string" not in f]
fields = [
f
for f in hfile[table_name]
if "string" not in f and f not in vals_to_exclude
]
data: dict[str, numpy.ndarray] = {
field: _get_field_data(field, hfile, selector, table_name)
for field in fields
Expand Down Expand Up @@ -331,8 +367,12 @@ def get_denormalized_table_file(
)
dataset_cdm[table_name] = table_data
# Filter stations outside ofthe Batch
lats = dataset_cdm["header_table"]["latitude|header_table"]
lons = dataset_cdm["header_table"]["longitude|header_table"]
lats = dataset_cdm["header_table"]["latitude"]
lons = dataset_cdm["header_table"]["longitude"]
if (lats.dtype.kind == "S") or (lons.dtype.kind == "S"):
raise NoDataInFileException(
f"Skipping file {file_and_slices.path} with malformed latitudes"
)
lon_start, lon_end, lat_start, lat_end = time_space_batch.get_spatial_coverage()
lon_mask = between(lons, lon_start, lon_end)
lat_mask = between(lats, lat_start, lat_end)
Expand Down Expand Up @@ -377,15 +417,25 @@ def _fix_table_data(
file_path: Path,
time_space_batch: TimeSpaceBatch,
):
for coord in ["latitude", "longitude", "source_id"]:
if coord in table_data:
table_data = table_data.rename({coord: coord + "|" + table_name}, axis=1)
# the name in station_configuration
if table_name == "header_table":
vars_to_drop = [
"station_name",
"platform_sub_type",
"platform_type",
"station_type",
"crs",
]
table_data = table_data.drop(vars_to_drop, axis=1, errors="ignore")
# Check that observation id is unique and fix if not
if table_name == "observations_table":
# If there is nothing here it is a waste of time to continue
if len(table_data) == 0:
logger.warning(f"No data found in {file_path} for {time_space_batch}.")
raise NoDataInFileException
# Remove obstype 0, as is unassigned data we don't need
table_data = table_data.loc[table_data["observed_variable"] != 0]
# Check if observation ids are unique and replace them if not
if not table_data.observation_id.is_unique:
logger.warning(f"observation_id is not unique in {file_path}, fixing")
table_data["observation_id"] = numpy.arange(
Expand All @@ -398,9 +448,14 @@ def _fix_table_data(
table_data = table_data.drop_duplicates(
subset=["primary_id", "record_number"], ignore_index=True
)
table_data = table_data.drop(["latitude", "longitude"], axis=1)
# Check primary keys can be used to build a unique index
primary_keys = table_definition.primary_keys
if table_name in ["era5fb_table", "advanced_homogenisation"]:
if table_name in [
"era5fb_table",
"advanced_homogenisation",
"advanced_uncertainty",
]:
table_data = table_data.reset_index()
table_data_len = len(table_data)
obs_table_len = len(dataset_cdm["observations_table"])
Expand Down
4 changes: 4 additions & 0 deletions cdsobs/retrieve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def retrieve_observations(
size_limit :
Maximum size allowed for the download
"""
from cads_adaptors.adaptors import Context

logger.info("Starting retrieve pipeline.")
# Query the storage to get the URLS of the files that contain the data requested
with get_database_session(catalogue_url) as session:
Expand All @@ -57,12 +59,14 @@ def retrieve_observations(
retrieve_args.dataset
).global_attributes
cdm_lite_vars = list(itertools.chain.from_iterable(cdm_lite_variables.values()))
context = Context()
output_path = retrieve_data(
retrieve_args.dataset,
retrieve_args.params.model_dump(),
output_dir,
object_urls,
cdm_lite_vars,
global_attributes,
context,
)
return output_path
Loading

0 comments on commit 243a7dd

Please sign in to comment.