Skip to content

Commit

Permalink
Merge pull request #34 from ecmwf-projects/cuon-fixes_COPS-2028
Browse files Browse the repository at this point in the history
Cuon fixes cops 2028
  • Loading branch information
aperezpredictia authored Sep 25, 2024
2 parents 8de6302 + 0953402 commit 6985e72
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cdsobs/cdm/lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"quality_flag",
"combined_uncertainty",
"processing_level",
"desroziers_30_uncertainy",
"desroziers_30_uncertainty",
"RISE_bias_estimate",
"humidity_bias_estimate",
"wind_bias_estimate",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ sources:
units: seconds since 1900-01-01 00:00:00
air_dewpoint:
description: Dewpoint measurement (from profile measurement)
desroziers_30_uncertainy: desroziers_30_uncertainy
desroziers_30_uncertainty: desroziers_30_uncertainy
dtype: float32
humidity_bias_estimate: humidity_bias_estimate
long_name: Air Dewpoint
Expand All @@ -62,7 +62,7 @@ sources:
air_temperature:
RISE_bias_estimate: RISE_bias_estimate
description: Air temperature (from profile measurement)
desroziers_30_uncertainy: desroziers_30_uncertainy
desroziers_30_uncertainty: desroziers_30_uncertainy
dtype: float32
long_name: Air Temperature
units: K
Expand All @@ -84,10 +84,10 @@ sources:
description: WMOessential, WMOadditional, WMOother
dtype: int32
long_name: Data Policy Licence
desroziers_30_uncertainy:
desroziers_30_uncertainty:
description: Desroziers uncertainty v 1.0 - calculated using Desroziers (2005) method applied to moving 30 days windows of ERA5 obs-bg and obs-an data
dtype: float32
long_name: Desroziers 30 Uncertainy
long_name: Desroziers 30 Uncertainty
units: same as the variable
dew_point_depression:
description: The difference between air temperature and dew point temperature. The dew point temperature is the temperature to which a given air parcel must be cooled at constant pressure and constant water vapour content in order for saturation to occur
Expand Down Expand Up @@ -119,7 +119,7 @@ sources:
units: same as the variable
geopotential_height:
description: Height of a standard or significant pressure level in meters
desroziers_30_uncertainy: desroziers_30_uncertainy
desroziers_30_uncertainy: desroziers_30_uncertainty
dtype: float32
long_name: Geopotential Height
units: J kg-1
Expand Down Expand Up @@ -155,7 +155,7 @@ sources:
units: degrees_east
northward_wind_speed:
description: Wind towards the north
desroziers_30_uncertainy: desroziers_30_uncertainy
desroziers_30_uncertainy: desroziers_30_uncertainty
dtype: float32
long_name: Northward Wind Speed
units: m s-1
Expand Down Expand Up @@ -195,7 +195,7 @@ sources:
long_name: Quality Flag
relative_humidity:
description: Relative humidity (from profile measurement) (range 0-1)
desroziers_30_uncertainy: desroziers_30_uncertainy
desroziers_30_uncertainty: desroziers_30_uncertainty
dtype: float32
humidity_bias_estimate: humidity_bias_estimate
long_name: Relative Humidity
Expand Down Expand Up @@ -235,7 +235,7 @@ sources:
long_name: Spatial Representativeness
specific_humidity:
description: specific means per unit mass. Specific humidity is the mass fraction of water vapor in (moist) air.
desroziers_30_uncertainy: desroziers_30_uncertainy
desroziers_30_uncertainty: desroziers_30_uncertainty
dtype: float32
humidity_bias_estimate: humidity_bias_estimate
long_name: Specific Humidity
Expand Down Expand Up @@ -263,14 +263,14 @@ sources:
units: same as the variable
wind_from_direction:
description: "Wind direction with 0°:north, 90°:east, 180°:south, 270°:west"
desroziers_30_uncertainy: desroziers_30_uncertainy
desroziers_30_uncertainty: desroziers_30_uncertainty
dtype: float32
long_name: Wind From Direction
units: deg
wind_bias_estimate: wind_bias_estimate
wind_speed:
description: Wind speed. Adjustment for wind is not available or zero
desroziers_30_uncertainy: desroziers_30_uncertainy
desroziers_30_uncertainty: desroziers_30_uncertainty
dtype: float32
long_name: Wind Speed
units: m s-1
Expand Down Expand Up @@ -321,7 +321,7 @@ sources:
- wind_bias_estimate
group_name: bias_estimates
- columns:
- desroziers_30_uncertainy
- desroziers_30_uncertainty
group_name: uncertainty
space_columns:
x: longitude|header_table
Expand Down
35 changes: 22 additions & 13 deletions cdsobs/ingestion/readers/cuon.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,16 +363,22 @@ def get_denormalized_table_file(
table_data = read_table_data(
file_and_slices, table_name_in_file, time_space_batch.time_batch
)
# Make sure that latitude and longiture always carry on their table name.
table_data = _fix_table_data(
dataset_cdm,
table_data,
table_definition,
table_name,
file_and_slices.path,
time_space_batch,
)
dataset_cdm[table_name] = table_data
# Don't try to fix empty tables unless it is one of the main tables
if len(table_data) > 0 or table_name in ["header_table", "observations_table"]:
# Make sure that latitude and longitude always carry on their table name.
table_data = _fix_table_data(
dataset_cdm,
table_data,
table_definition,
table_name,
file_and_slices.path,
time_space_batch,
)
dataset_cdm[table_name] = table_data
else:
# Copy tables to use and remove the empty table
# this is for not losing the stations with no homogenisation_table
tables_to_use = [t for t in tables_to_use if t != table_name]
# Filter stations outside ofthe Batch
lats = dataset_cdm["header_table"]["latitude"]
lons = dataset_cdm["header_table"]["longitude"]
Expand Down Expand Up @@ -434,6 +440,11 @@ def _fix_table_data(
"crs",
]
table_data = table_data.drop(vars_to_drop, axis=1, errors="ignore")

if table_name == "station_configuration":
vars_to_drop = ["station_automation"]
table_data = table_data.drop(vars_to_drop, axis=1, errors="ignore")

# Check that observation id is unique and fix if not
if table_name == "observations_table":
# If there is nothing here it is a waste of time to continue
Expand All @@ -445,7 +456,7 @@ def _fix_table_data(
# Check if observation ids are unique and replace them if not
if not table_data.observation_id.is_unique:
logger.warning(f"observation_id is not unique in {file_path}, fixing")
table_data["observation_id"] = numpy.arange(
table_data.loc[:, "observation_id"] = numpy.arange(
len(table_data), dtype="int"
).astype("bytes")
# Remove missing values to save memory
Expand Down Expand Up @@ -527,8 +538,6 @@ def read_nc_file_slices(
record_times = record_times[:]
first_timestamp = record_times.min()
last_timestamp = record_times.max()
# if numpy.isnan(last_timestamp):
# last_timestamp = record_times[-2]

if first_timestamp > selected_end or last_timestamp < selected_start:
# Return None if there are no times inside the batch
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,22 @@ def test_repository(test_session, test_s3_client, test_config):
start_year,
end_year,
)

# dataset_name = "insitu-comprehensive-upper-air-observation-network"
# service_definition = get_service_definition(dataset_name)
# for dataset_source in [
# "CUON",
# ]:
# start_year, end_year = get_test_years(dataset_source)
# run_ingestion_pipeline(
# dataset_name,
# service_definition,
# dataset_source,
# test_session,
# test_config,
# start_year,
# end_year,
# )
catalogue_repository = CatalogueRepository(test_session)
return TestRepository(catalogue_repository, test_s3_client)

Expand Down
8 changes: 4 additions & 4 deletions tests/retrieve/test_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ def test_adaptor_cuon(tmp_path):
"variable": [
"air_temperature",
"geopotential_height",
"desroziers_30_uncertainy",
"desroziers_30_uncertainty",
"RISE_bias_estimate",
],
"year": ["1907"],
"month": ["2"],
"day": ["01"],
"year": ["1960"],
"month": ["01"],
"day": [f"{dd:02d}" for dd in range(32)],
"dataset_source": "CUON",
}
test_form = {}
Expand Down
85 changes: 61 additions & 24 deletions tests/scripts/subset_cuon_file.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
# Select a few moths only for the tests
from pathlib import Path

import cftime
import netCDF4
import numpy
import xarray

from cdsobs.constants import TIME_UNITS
from cdsobs.ingestion.core import TimeBatch
from cdsobs.ingestion.readers.cuon import read_nc_file_slices


def concat_chars(in_ds: xarray.Dataset) -> xarray.Dataset:
out_ds = in_ds.copy()
Expand All @@ -21,10 +26,9 @@ def concat_chars(in_ds: xarray.Dataset) -> xarray.Dataset:
return out_ds


def main(ifile: Path):
nreports = 1000
index_start = 10000
obs_slice = slice(index_start, index_start + nreports)
def main(ifile: Path, nfile):
time_batch = TimeBatch(1960, 1)
file_and_slices = read_nc_file_slices(ifile, time_batch)
sorted_by_variable = [
"advanced_homogenisation",
"advanced_uncertainty",
Expand All @@ -38,18 +42,23 @@ def main(ifile: Path):
groups = list(inc.groups)

with xarray.open_dataset(ifile, group="observations_table") as obs_ds:
varcodes = sorted(set(obs_ds["observed_variable"].values))
obs_ds_subset_list = []
for vc in varcodes:
obs_ds_var = obs_ds.isel(index=file_and_slices.variable_slices[str(vc)]) # type: ignore
obs_ds_subset_list.append(obs_ds_var)
obs_ds_subset = xarray.concat(obs_ds_subset_list, dim="index")
# Get the report ids of the header
obs_ds = obs_ds.isel(index=obs_slice)
obs_ds = concat_chars(obs_ds)
report_ids = obs_ds["report_id"]
obs_ds.to_netcdf(ofile, mode="a", group="observations_table")
report_ids = numpy.sort(numpy.unique(obs_ds_subset["report_id"].values))
obs_ds_subset = concat_chars(obs_ds_subset)
obs_ds_subset.to_netcdf(ofile, mode="a", group="observations_table")

with xarray.open_dataset(ifile, group="header_table") as header_ds:
# Get the first 100 times
report_id_mask = header_ds["report_id"].isin(report_ids)
report_id_mask = header_ds["report_id"].load().isin(report_ids)
header_ds = header_ds.sel(index=report_id_mask)
header_ds = concat_chars(header_ds)
print("Dates are:", header_ds["report_timestamp"]),
print("Dates are:", header_ds["report_timestamp"])
header_ds.to_netcdf(ofile, mode="a", group="header_table")

tables_remaining = [
Expand All @@ -58,24 +67,52 @@ def main(ifile: Path):
for table_name in tables_remaining:
if table_name in sorted_by_variable:
with xarray.open_dataset(ifile, group=table_name) as table_ds:
table_ds_subset = table_ds.isel(index=obs_slice)
table_ds_subset_list = []
for vc in varcodes:
table_ds_var = table_ds.isel(
index=file_and_slices.variable_slices[str(vc)] # type: ignore
)
table_ds_subset_list.append(table_ds_var)
table_ds_subset = xarray.concat(table_ds_subset_list, dim="index")
else:
with xarray.open_dataset(ifile, group=table_name) as table_ds:
if len(table_ds.index) == len(report_id_mask):
table_ds_subset = table_ds.sel(index=report_id_mask)
elif table_name == "recordindices":
# Recordtimestamp is missing the last value in the stored array+
# We have to load all except the last and define it as nan
table_ds["recordtimestamp"] = xarray.DataArray(
numpy.append(
table_ds["recordtimestamp"].isel(index=slice(0, -1)).values,
numpy.nan,
),
coords=dict(index=table_ds.index),
dims="index",
indices = {}

for varcode in varcodes:
indices[varcode] = numpy.nonzero(
obs_ds_subset["observed_variable"].values == int(varcode)
)[0]

nindices = max([len(ii) for ii in indices.values()])
table_ds_subset = table_ds.isel(index=slice(0, nindices)).copy()

for varcode in varcodes:
varcode_indices = indices[varcode]
if len(varcode_indices) == 0:
del table_ds_subset[str(varcode)]
else:
to_pad = nindices - len(varcode_indices)
table_ds_subset[str(varcode)][:] = numpy.pad(
varcode_indices, (to_pad, 0), mode="edge"
)

longest_varcode = [
vc for vc in varcodes if len(indices[vc]) == nindices
][0]
report_ids = obs_ds_subset.isel(
index=indices[longest_varcode]
).report_id
recordtimestamps = (
header_ds.set_index(index="report_id")
.sel(index=report_ids)
.report_timestamp
)
table_ds_subset["recordtimestamp"][:] = cftime.date2num(
recordtimestamps.to_index().to_pydatetime(), units=TIME_UNITS
)
report_id_mask_recordindices = numpy.append(report_id_mask, True)
table_ds_subset = table_ds.sel(index=report_id_mask_recordindices)
else:
table_ds_subset = table_ds
if table_name == "station_configuration_codes":
Expand All @@ -92,5 +129,5 @@ def main(ifile: Path):
Path("../data/cuon_data/old/0-20001-0-53845_CEUAS_merged_v3.nc"),
Path("../data/cuon_data/old/0-20001-0-53772_CEUAS_merged_v3.nc"),
]
for ifile in ifiles:
main(ifile)
for i, ifile in enumerate(ifiles):
main(ifile, i)
5 changes: 5 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ def test_run_ingestion_pipeline(
.where(Catalogue.dataset == dataset_name)
)
assert counter > 0
if dataset_name == "insitu-comprehensive-upper-air-observation-network":
stations = test_session.scalar(
sa.select(Catalogue.stations).where(Catalogue.dataset == dataset_name)
)
assert stations == ["0-20001-0-53772", "0-20001-0-53845"]


def test_make_cdm(test_config, tmp_path, caplog):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_read_cuon.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def test_read_cuon(test_config):
dataset_name = "insitu-comprehensive-upper-air-observation-network"
dataset_config = [d for d in test_config.datasets if d.name == dataset_name][0]
service_definition = get_service_definition(dataset_name)
time_space_batch = TimeSpaceBatch(TimeBatch(1958, 8))
time_space_batch = TimeSpaceBatch(TimeBatch(1960, 1))
os.environ["CADSOBS_AVOID_MULTIPROCESS"] = "True"
cuon_data = read_cuon_netcdfs(
dataset_name,
Expand Down
4 changes: 2 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def get_test_years(source: str) -> Tuple[int, int]:
start_year = 1978
end_year = 1979
case "CUON":
start_year = 1957
end_year = 1958
start_year = 1960
end_year = 1961
case "GRUAN":
start_year = 2010
end_year = 2011
Expand Down

0 comments on commit 6985e72

Please sign in to comment.