Skip to content

Commit

Permalink
Merge pull request #29 from ecmwf-projects/check-for-missing-variables
Browse files Browse the repository at this point in the history
Systematic checks for missing variables
  • Loading branch information
garciampred authored Aug 28, 2024
2 parents 243a7dd + 0126c75 commit 7cf8d5f
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 117 deletions.
9 changes: 8 additions & 1 deletion cdsobs/ingestion/readers/cuon.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,14 @@ def read_cuon_netcdfs(
# Check for emptiness
if all([dt is None for dt in denormalized_tables]):
raise EmptyBatchException
return pandas.concat(denormalized_tables)
result = pandas.concat(denormalized_tables)
allnan_cols = []
for col in result:
if result[col].isnull().all():
allnan_cols.append(col)
logger.info(f"Removing columns {allnan_cols} as they don't have any data")
result = result.drop(allnan_cols, axis=1)
return result


def get_scheduler():
Expand Down
73 changes: 72 additions & 1 deletion cdsobs/sanity_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from datetime import datetime
from pathlib import Path

try:
import cdsapi
except ImportError:
pass
import pandas
import xarray
from cads_adaptors.adaptors.cadsobs.csv import to_csv

from cdsobs.cli._object_storage import check_if_missing_in_object_storage
from cdsobs.config import CDSObsConfig
Expand Down Expand Up @@ -58,7 +64,8 @@ def _sanity_check_dataset(
dataset_source=dataset_source,
latitude_coverage=latitude_coverage,
longitude_coverage=longitude_coverage,
time_coverage=(start_date, end_date),
year=[year],
month=[1],
format="netCDF",
variables=variables_from_service_definition,
)
Expand Down Expand Up @@ -91,6 +98,70 @@ def _sanity_check_dataset(
longitude_coverage,
variables_from_service_definition,
)
# Transform to CSV and read the file
csv_path = to_csv(Path(tmpdir), output_path, retrieve_args)
df = pandas.read_csv(csv_path, comment="#")
# Get the equivalent file from the legacy CDS
csv_legacy_path = f"{tmpdir}/{dataset_name}_{dataset_source}.csv-obs.zip"
source_name_mapping = {
"insitu-observations-woudc-ozone-total-column-and-profiles": "observation_type",
"insitu-observations-gnss": "vertical_profile",
"insitu-observations-near-surface-temperature-us-climate-reference-network": "time_aggregation",
"insitu-observations-igra-baseline-network": "archive",
}

sources_mapping = {
"OzoneSonde": "vertical_profile",
"IGS": "IGS daily",
"EPN": "EPN-repro2",
"IGS_R3": "IGS-repro3",
"uscrn_daily": "Daily",
"uscrn_hourly": "Hourly",
"uscrn_monthly": "Monthly",
"uscrn_subhourly": "Sub - hourly",
"IGRA": "Global radiosonde archive",
"IGRA_H": "Harmonised global radiosonde archive",
}
c = cdsapi.Client()
legacy_params = {
"variable": retrieve_args.params.variables,
"year": [str(yy) for yy in retrieve_args.params.year], # type: ignore
"month": "01",
"day": [
"01",
"02",
"03",
"08",
"09",
"10",
"13",
"15",
"17",
"20",
"21",
"22",
"23",
"24",
"27",
"28",
"29",
"30",
],
"format": "csv-obs.zip",
"area": [
latitude_coverage[1],
longitude_coverage[0],
latitude_coverage[0],
longitude_coverage[1],
],
}
if dataset_name in source_name_mapping:
legacy_params[source_name_mapping[dataset_name]] = sources_mapping[
dataset_source
]
c.retrieve(dataset_name, legacy_params, csv_legacy_path)
df_legacy = pandas.read_csv(csv_legacy_path, comment="#")
pandas.testing.assert_frame_equal(df, df_legacy)


def check_retrieved_dataset(
Expand Down
85 changes: 85 additions & 0 deletions tests/system/check_missing_variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import os
from pathlib import Path

import pandas
import pytest
import sqlalchemy as sa

from cdsobs.api import run_ingestion_pipeline
from cdsobs.cdm.api import open_netcdf
from cdsobs.cdm.lite import auxiliary_variable_names
from cdsobs.ingestion.core import get_aux_vars_from_service_definition
from cdsobs.observation_catalogue.models import Catalogue
from cdsobs.service_definition.api import get_service_definition
from cdsobs.storage import S3Client
from tests.test_api import TEST_API_PARAMETERS
from tests.utils import get_test_years


@pytest.mark.parametrize("dataset_name,source", TEST_API_PARAMETERS)
def test_run_ingestion_pipeline(
dataset_name, source, test_session, test_config, caplog, tmp_path
):
start_year, end_year = get_test_years(source)
service_definition = get_service_definition(dataset_name)
os.environ["CADSOBS_AVOID_MULTIPROCESS"] = "0"
run_ingestion_pipeline(
dataset_name,
service_definition,
source,
test_session,
test_config,
start_year=start_year,
end_year=end_year,
update=False,
)
# Check variables
variable_check_results_file = Path("variable_check_results.csv")
index_cols = ["dataset_name", "dataset_source"]
if variable_check_results_file.exists():
results = pandas.read_csv(variable_check_results_file, index_col=index_cols)
else:
results = pandas.DataFrame(
columns=[
"dataset_name",
"dataset_source",
"in_file_not_in_descriptions",
"in_descriptions_not_in_file",
]
).set_index(index_cols)
# Get the file
asset = test_session.scalar(
sa.select(Catalogue.asset).where(Catalogue.dataset == dataset_name)
)
s3client = S3Client.from_config(test_config.s3config)
asset_filename = asset.split("/")[1]
asset_local_path = Path(tmp_path, asset_filename)
s3client.download_file(
s3client.get_bucket_name(dataset_name), asset_filename, asset_local_path
)
dataset = open_netcdf(asset_local_path, decode_variables=True)
# Get variables in file
variables_in_file = set(
dataset.columns.tolist() + dataset.observed_variable.unique().tolist()
)
# Get expected variables according to service definition file
aux_variables = get_aux_vars_from_service_definition(service_definition, source)
expected_variables = set(service_definition.sources[source].descriptions) - set(
aux_variables
)
# Here we add some more variables to expected variables
for v in [
"observed_variable",
"observation_value",
"units",
] + auxiliary_variable_names:
if v in variables_in_file:
expected_variables.add(v)
in_file_not_in_descriptions = tuple(variables_in_file - expected_variables)
in_descriptions_not_in_file = tuple(expected_variables - variables_in_file)

results.loc[(dataset_name, source), :] = pandas.Series(
index=("in_file_not_in_descriptions", "in_descriptions_not_in_file"),
data=[in_file_not_in_descriptions, in_descriptions_not_in_file],
)
results.to_csv(variable_check_results_file)
166 changes: 51 additions & 115 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,74 +5,66 @@
import sqlalchemy as sa

from cdsobs.api import run_ingestion_pipeline, run_make_cdm
from cdsobs.cdm.api import open_netcdf
from cdsobs.cdm.lite import auxiliary_variable_names
from cdsobs.ingestion.core import get_aux_vars_from_service_definition
from cdsobs.observation_catalogue.models import Catalogue
from cdsobs.service_definition.api import get_service_definition
from cdsobs.storage import S3Client
from cdsobs.utils.logutils import get_logger
from tests.utils import get_test_years

logger = get_logger(__name__)

TEST_API_PARAMETERS = [
("insitu-observations-woudc-ozone-total-column-and-profiles", "OzoneSonde"),
("insitu-observations-woudc-ozone-total-column-and-profiles", "TotalOzone"),
(
"insitu-observations-igra-baseline-network",
"IGRA",
),
(
"insitu-observations-igra-baseline-network",
"IGRA_H",
),
(
"insitu-comprehensive-upper-air-observation-network",
"CUON",
),
(
"insitu-observations-gruan-reference-network",
"GRUAN",
),
(
"insitu-observations-near-surface-temperature-us-climate-reference-network",
"uscrn_subhourly",
),
(
"insitu-observations-near-surface-temperature-us-climate-reference-network",
"uscrn_hourly",
),
(
"insitu-observations-near-surface-temperature-us-climate-reference-network",
"uscrn_daily",
),
(
"insitu-observations-near-surface-temperature-us-climate-reference-network",
"uscrn_monthly",
),
(
"insitu-observations-gnss",
"IGS",
),
(
"insitu-observations-gnss",
"EPN",
),
(
"insitu-observations-gnss",
"IGS_R3",
),
]

@pytest.mark.parametrize(
"dataset_name,source,test_update",
[
(
"insitu-observations-woudc-ozone-total-column-and-profiles",
"OzoneSonde",
False,
),
(
"insitu-observations-woudc-ozone-total-column-and-profiles",
"TotalOzone",
False,
),
("insitu-observations-igra-baseline-network", "IGRA", False),
("insitu-observations-igra-baseline-network", "IGRA_H", False),
("insitu-comprehensive-upper-air-observation-network", "CUON", False),
("insitu-observations-gruan-reference-network", "GRUAN", False),
(
"insitu-observations-near-surface-temperature-us-climate-reference-network",
"uscrn_subhourly",
False,
),
(
"insitu-observations-near-surface-temperature-us-climate-reference-network",
"uscrn_hourly",
False,
),
(
"insitu-observations-near-surface-temperature-us-climate-reference-network",
"uscrn_daily",
False,
),
(
"insitu-observations-near-surface-temperature-us-climate-reference-network",
"uscrn_monthly",
False,
),
(
"insitu-observations-gnss",
"IGS",
False,
),
(
"insitu-observations-gnss",
"EPN",
False,
),
(
"insitu-observations-gnss",
"IGS_R3",
False,
),
],
)

@pytest.mark.parametrize("dataset_name,source", TEST_API_PARAMETERS)
def test_run_ingestion_pipeline(
dataset_name, source, test_update, test_session, test_config, caplog, tmp_path
dataset_name, source, test_session, test_config, caplog, tmp_path
):
start_year, end_year = get_test_years(source)
service_definition = get_service_definition(dataset_name)
Expand All @@ -94,62 +86,6 @@ def test_run_ingestion_pipeline(
.where(Catalogue.dataset == dataset_name)
)
assert counter > 0
# Check variables
asset = test_session.scalar(
sa.select(Catalogue.asset).where(Catalogue.dataset == dataset_name)
)
s3client = S3Client.from_config(test_config.s3config)
asset_filename = asset.split("/")[1]
asset_local_path = Path(tmp_path, asset_filename)
s3client.download_file(
s3client.get_bucket_name(dataset_name), asset_filename, asset_local_path
)
dataset = open_netcdf(asset_local_path, decode_variables=True)
variables_in_file = set(
dataset.columns.tolist() + dataset.observed_variable.unique().tolist()
)
aux_variables = get_aux_vars_from_service_definition(service_definition, source)
expected_variables = set(service_definition.sources[source].descriptions) - set(
aux_variables
)
for v in [
"observed_variable",
"observation_value",
"units",
] + auxiliary_variable_names:
if v in variables_in_file:
expected_variables.add(v)
logger.info(
f"{variables_in_file - expected_variables} are in file but not in the descriptions"
)
logger.info(
f"{expected_variables - variables_in_file} are not in file but are in the description"
)
# assert variables_in_file == expected_variables

if test_update:
# testing update flag
run_ingestion_pipeline(
dataset_name,
service_definition,
source,
test_session,
test_config,
start_year=start_year,
end_year=end_year,
update=False,
)

found_log = [
"A partition with the chosen parameters already exists" in r.msg
for r in caplog.records
]
assert any(found_log)
# no insertions have been made
assert (
test_session.scalar(sa.select(sa.func.count()).select_from(Catalogue))
== counter
)


def test_make_cdm(test_config, tmp_path, caplog):
Expand Down
3 changes: 3 additions & 0 deletions tests/test_run_sanity_checks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import pytest

from cdsobs.constants import TEST_YEARS
from cdsobs.sanity_checks import run_sanity_checks


@pytest.mark.skip("Depends on cdsapi")
def test_run_sanity_checks(test_config, test_repository):
run_sanity_checks(
test_config,
Expand Down

0 comments on commit 7cf8d5f

Please sign in to comment.