Skip to content

Commit

Permalink
added support for processing level
Browse files Browse the repository at this point in the history
  • Loading branch information
garciampred committed Aug 5, 2024
1 parent ab3d6c3 commit ad66565
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 32 deletions.
22 changes: 22 additions & 0 deletions cdsobs/cdm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ def to_cdm_dataset(partition: DatasetPartition) -> CdmDataset:
cdm_variables += cdm_variables_with_table_names
cdm_variables = unique([v for v in cdm_variables if v in partition.data])
data = partition.data.loc[:, cdm_variables].set_index("observation_id")
original_variables = set(partition.data.columns)
removed_variables = original_variables - set(cdm_variables)
if len(removed_variables) > 0:
logger.warning(
"The following variables where read but are not in the CDM and "
f"are going to be dropped: {removed_variables}"
)
return CdmDataset(data, partition.partition_params, partition.dataset_metadata)


Expand Down Expand Up @@ -330,6 +337,21 @@ def auxfield2metadata_name(self, var: str, aux_var: str) -> str:
auxf["metadata_name"] for auxf in self[var] if auxf["auxvar"] == aux_var
][0]

def vars_with_processing_level(self) -> list[str]:
return [v for v in self if self.var_has_processing_level(v)]

def var_has_processing_level(self, var: str) -> bool:
return any(auxf["auxvar"] in self.processing_level_fields for auxf in self[var])

@property
def processing_level_fields(self) -> list[str]:
return [auxf for auxf in self.all_list if "processing_level" in auxf]

def get_var_processing_level_field_name(self, var: str) -> str:
return [
auxf["auxvar"] for auxf in self[var] if "processing_level" in auxf["auxvar"]
][0]


def get_aux_fields_mapping_from_service_definition(
source_definition: SourceDefinition, variables: List[str]
Expand Down
1 change: 1 addition & 0 deletions cdsobs/cdm/lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
"negative_quasisystematic_uncertainty",
"quality_flag",
"combined_uncertainty",
"processing_level",
]
cdm_lite_variables = dict(
mandatory=variable_names,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ sources:
units: K
soil_temperature_processing_level:
description: 'This parameter indicates the level of processing applied to the soil temperature measurement: raw data (''R''), corrected data (''C''). A letter ''U'' indicates that this information is unknown.'
dtype: float32
dtype: object
long_name: sur_temp_type
soil_temperature_quality_flag:
description: This parameter indicates if the "Soil temperature" (described in this list) may be used because it is based on good data (value of 0) or if it should be treated with suspicion because erroneous data were detected (value of 3).
Expand Down
17 changes: 16 additions & 1 deletion cdsobs/ingestion/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,9 @@ def _melt_variables(
)

# Add quality flags
homogenised_data_melted["quality_flag"] = 3.0
vars_with_qf = aux_fields.vars_with_quality_field
if len(vars_with_qf) > 0:
homogenised_data_melted["quality_flag"] = 3.0
for var in vars_with_qf:
var_mask = homogenised_data_melted["observed_variable"] == var
flag_name = aux_fields.get_var_quality_flag_field_name(var)
Expand All @@ -330,6 +331,20 @@ def _melt_variables(
homogenised_data_melted["quality_flag"] = (
homogenised_data_melted["quality_flag"].fillna(3).astype("int")
)
# Add processing level
vars_with_pl = aux_fields.vars_with_processing_level()
if len(vars_with_pl) > 0:
homogenised_data_melted["processing_level"] = 6
for var in vars_with_pl:
var_mask = homogenised_data_melted["observed_variable"] == var
pl_name = aux_fields.get_var_processing_level_field_name(var)
var_processing_level = homogenised_data_melted.loc[var_mask, pl_name]
homogenised_data_melted.loc[
var_mask, "processing_level"
] = var_processing_level
homogenised_data_melted = homogenised_data_melted.drop(pl_name, axis=1)

homogenised_data_melted["processing_level"] = 6
# Encode observed_variables
logger.info("Encoding observed variables using the CDM variable codes.")
code_table = read_cdm_code_table(cdm_tables_location, "observed_variable").table
Expand Down
3 changes: 3 additions & 0 deletions cdsobs/ingestion/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def write_pandas_to_netcdf(
if attrs is not None and v in attrs:
ovar.attrs.update(attrs[v])

oncobj.sync()
oncobj.close()


def to_netcdf(
cdm_dataset: CdmDataset, tempdir: Path, encode_variables: bool = True
Expand Down
97 changes: 67 additions & 30 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,35 @@
import sqlalchemy as sa

from cdsobs.api import run_ingestion_pipeline, run_make_cdm
from cdsobs.cdm.api import open_netcdf
from cdsobs.cdm.lite import auxiliary_variable_names
from cdsobs.ingestion.core import get_aux_vars_from_service_definition
from cdsobs.observation_catalogue.models import Catalogue
from cdsobs.service_definition.api import get_service_definition
from cdsobs.storage import S3Client
from cdsobs.utils.logutils import get_logger
from tests.utils import get_test_years

logger = get_logger(__name__)


@pytest.mark.parametrize(
"dataset_name,source,test_update",
[
(
"insitu-observations-woudc-ozone-total-column-and-profiles",
"OzoneSonde",
False,
),
(
"insitu-observations-woudc-ozone-total-column-and-profiles",
"TotalOzone",
False,
),
("insitu-observations-igra-baseline-network", "IGRA", False),
("insitu-observations-igra-baseline-network", "IGRA_H", False),
("insitu-comprehensive-upper-air-observation-network", "CUON", True),
("insitu-observations-gruan-reference-network", "GRUAN", False),
# (
# "insitu-observations-woudc-ozone-total-column-and-profiles",
# "OzoneSonde",
# False,
# ),
# (
# "insitu-observations-woudc-ozone-total-column-and-profiles",
# "TotalOzone",
# False,
# ),
# ("insitu-observations-igra-baseline-network", "IGRA", False),
# ("insitu-observations-igra-baseline-network", "IGRA_H", False),
# ("insitu-comprehensive-upper-air-observation-network", "CUON", True),
# ("insitu-observations-gruan-reference-network", "GRUAN", False),
(
"insitu-observations-near-surface-temperature-us-climate-reference-network",
"uscrn_subhourly",
Expand All @@ -47,25 +54,25 @@
"uscrn_monthly",
False,
),
(
"insitu-observations-gnss",
"IGS",
False,
),
(
"insitu-observations-gnss",
"EPN",
False,
),
(
"insitu-observations-gnss",
"IGS_R3",
False,
),
# (
# "insitu-observations-gnss",
# "IGS",
# False,
# ),
# (
# "insitu-observations-gnss",
# "EPN",
# False,
# ),
# (
# "insitu-observations-gnss",
# "IGS_R3",
# False,
# ),
],
)
def test_run_ingestion_pipeline(
dataset_name, source, test_update, test_session, test_config, caplog
dataset_name, source, test_update, test_session, test_config, caplog, tmp_path
):
start_year, end_year = get_test_years(source)
service_definition = get_service_definition(dataset_name)
Expand All @@ -83,6 +90,36 @@ def test_run_ingestion_pipeline(
# assert insertions have been made
counter = test_session.scalar(sa.select(sa.func.count()).select_from(Catalogue))
assert counter > 0
# Check variables
asset = test_session.scalar(sa.select(Catalogue.asset))
s3client = S3Client.from_config(test_config.s3config)
asset_filename = asset.split("/")[1]
asset_local_path = Path(tmp_path, asset_filename)
s3client.download_file(
s3client.get_bucket_name(dataset_name), asset_filename, asset_local_path
)
dataset = open_netcdf(asset_local_path, decode_variables=True)
variables_in_file = set(
dataset.columns.tolist() + dataset.observed_variable.unique().tolist()
)
aux_variables = get_aux_vars_from_service_definition(service_definition, source)
expected_variables = set(service_definition.sources[source].descriptions) - set(
aux_variables
)
for v in [
"observed_variable",
"observation_value",
"units",
] + auxiliary_variable_names:
if v in variables_in_file:
expected_variables.add(v)
logger.info(
f"{variables_in_file - expected_variables} are in file but not in the descriptions"
)
logger.info(
f"{expected_variables - variables_in_file} are not in file but are in the description"
)
# assert variables_in_file == expected_variables

if test_update:
# testing update flag
Expand Down

0 comments on commit ad66565

Please sign in to comment.