Skip to content

Commit

Permalink
Do not merge yet: Obs adaptor with http api (copds 1550) (#143)
Browse files Browse the repository at this point in the history
* first complete version of the adaptor, but still depends on cadsobs

* refactor in smaller blocks

* mode all the stuff we need

* add lite variables

* removed all obs manager dependencies. Now we need to refactor

* refactor cadsobs adaptor as a package

* refactor to use a CadsobsApiClient

* Fixed mistake: Picked an out of date version of the cadsobs functions.

* updated environment. tests and mypyp pass. refactor csv functs

* use h5netcdf for the assert not xarray

* pass info from http as args to retrieve

* set threads here too

* add h5netcdf to pyproject.yml

* add aiohttp to pyproject.yml
  • Loading branch information
garciampred authored Jun 6, 2024
1 parent 3df0515 commit 7b589ce
Show file tree
Hide file tree
Showing 11 changed files with 753 additions and 194 deletions.
4 changes: 2 additions & 2 deletions cads_adaptors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
__version__ = "999"

from cads_adaptors.adaptors import AbstractAdaptor, Context, DummyAdaptor
from cads_adaptors.adaptors.cadsobs import ObservationsAdaptor
from cads_adaptors.adaptors.cadsobs.adaptor import ObservationsAdaptor
from cads_adaptors.adaptors.cds import AbstractCdsAdaptor, DummyCdsAdaptor
from cads_adaptors.adaptors.insitu import (
InsituDatabaseCdsAdaptor,
Expand Down Expand Up @@ -53,6 +53,6 @@
"UrlCdsAdaptor",
"MultiAdaptor",
"MultiMarsCdsAdaptor",
"ObservationsAdaptor",
"RoocsCdsAdaptor",
"ObservationsAdaptor",
]
Empty file.
Original file line number Diff line number Diff line change
@@ -1,38 +1,52 @@
import logging
import tempfile
from pathlib import Path

from cads_adaptors.adaptors.cadsobs.api_client import CadsobsApiClient
from cads_adaptors.adaptors.cadsobs.retrieve import retrieve_data
from cads_adaptors.adaptors.cds import AbstractCdsAdaptor

logger = logging.getLogger(__name__)


class ObservationsAdaptor(AbstractCdsAdaptor):
def retrieve(self, request):
from cdsobs.retrieve.api import retrieve_observations
from cdsobs.retrieve.models import RetrieveArgs

# Maps observation_type to source. This sets self.mapped_request
self._pre_retrieve(request)
# Assignment to avoid repeating self too many times
mapped_request = self.mapped_request
# Catalogue credentials are in config, which is parsed from adaptor.json
catalogue_url = self.config["catalogue_url"]
storage_url = self.config["storage_url"]
obs_api_url = self.config["obs_api_url"]
# Dataset name is in this config too
dataset_name = self.config["collection_id"]
# dataset_source must be a string, asking for two sources is unsupported
dataset_source = mapped_request["dataset_source"]
if isinstance(dataset_source, list):
if len(dataset_source) > 1:
self.context.add_user_visible_error(
"Asking for more than one observation_types in the same"
"request is currently unsupported."
)
raise RuntimeError(
"Asking for more than one observation_types in the same"
"request is currently unsupported."
)
else:
# Get the string if there is only one item in the list.
dataset_source = dataset_source[0]
dataset_source = self.handle_sources_list(dataset_source)
mapped_request["dataset_source"] = dataset_source
mapped_request = self.adapt_parameters(mapped_request)
# Request parameters validation happens here, not sure about how to move this to
# validate method
cadsobs_client = CadsobsApiClient(obs_api_url)
object_urls = cadsobs_client.get_objects_to_retrieve(
dataset_name, mapped_request
)
cdm_lite_variables = cadsobs_client.get_cdm_lite_variables()
global_attributes = cadsobs_client.get_service_definition(dataset_name)[
"global_attributes"
]
logger.debug(f"The following objects are going to be filtered: {object_urls}")
output_dir = Path(tempfile.mkdtemp())
output_path = retrieve_data(
dataset_name,
mapped_request,
output_dir,
object_urls,
cdm_lite_variables,
global_attributes,
)
return open(output_path, "rb")

def adapt_parameters(self, mapped_request: dict) -> dict:
# We need these changes right now to adapt the parameters to what we need
# Turn single values into length one lists
for key_to_listify in ["variables", "stations", "year", "month", "day"]:
Expand All @@ -48,21 +62,23 @@ def retrieve(self, request):
area = mapped_request.pop("area")
mapped_request["latitude_coverage"] = [area[2], area[0]]
mapped_request["longitude_coverage"] = [area[1], area[3]]
# Request parameters validation happens here, not sure about how to move this to
# validate method
try:
retrieve_args = RetrieveArgs(dataset=dataset_name, params=mapped_request)
tempdir = tempfile.mkdtemp()
output_file = retrieve_observations(
catalogue_url,
storage_url,
retrieve_args,
tempdir,
size_limit=1000000000000,
)
except Exception as e:
message = f"The adaptor failed with the following error: {e}"
self.context.add_user_visible_error(message=message)
self.context.add_stderr(message=message)
raise
return open(output_file, "rb")
return mapped_request

def handle_sources_list(self, dataset_source: list | str) -> str:
"""Raise error if many, extract if list."""
if isinstance(dataset_source, list):
if len(dataset_source) > 1:
self.context.add_user_visible_error(
"Asking for more than one observation_types in the same"
"request is currently unsupported."
)
raise RuntimeError(
"Asking for more than one observation_types in the same"
"request is currently unsupported."
)
else:
# Get the string if there is only one item in the list.
dataset_source_str = dataset_source[0]
else:
dataset_source_str = dataset_source
return dataset_source_str
38 changes: 38 additions & 0 deletions cads_adaptors/adaptors/cadsobs/api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Literal

import requests


class CadsobsApiClient:
"""TODO: Inplement auth."""

def __init__(self, baseurl: str):
self.baseurl = baseurl

def _send_request(
self, method: Literal["GET", "POST"], endpoint: str, payload: dict | None = None
):
with requests.Session() as session:
response = session.request(
method=method, url=f"{self.baseurl}/{endpoint}", json=payload
)
response.raise_for_status()
return response.json()

def get_service_definition(self, dataset: str) -> dict:
return self._send_request("GET", f"{dataset}/service_definition")

def get_cdm_lite_variables(self):
return self._send_request("GET", "cdm/lite_variables")

def get_objects_to_retrieve(
self, dataset_name: str, mapped_request: dict
) -> list[str]:
payload = dict(
retrieve_args=dict(dataset=dataset_name, params=mapped_request),
config=dict(size_limit=100000),
)
objects_to_retrieve = self._send_request(
"POST", "get_object_urls_and_check_size", payload=payload
)
return objects_to_retrieve
78 changes: 78 additions & 0 deletions cads_adaptors/adaptors/cadsobs/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging
from pathlib import Path

import numpy
import xarray

from cads_adaptors.adaptors.cadsobs.models import RetrieveArgs
from cads_adaptors.adaptors.cadsobs.utils import _get_output_path

logger = logging.getLogger(__name__)


def to_csv(
output_dir: Path, output_path_netcdf: Path, retrieve_args: RetrieveArgs
) -> Path:
"""Transform the output netCDF to CSV format."""
output_path = _get_output_path(output_dir, retrieve_args.dataset, "csv")
cdm_lite_dataset = xarray.open_dataset(
output_path_netcdf, chunks=dict(observation_id=100000), decode_times=True
)
logger.info("Transforming netCDF to CSV")
with output_path.open("w") as ofileobj:
header = get_csv_header(retrieve_args, cdm_lite_dataset)
ofileobj.write(header)
# Beware this will not work with old dask versions because of a bug
# https://github.com/dask/dask/issues/10414
cdm_lite_dataset.to_dask_dataframe().astype("str").to_csv(
output_path,
index=False,
single_file=True,
mode="a",
compute_kwargs={"scheduler": "single-threaded"},
)
return output_path


def get_csv_header(
retrieve_args: RetrieveArgs, cdm_lite_dataset: xarray.Dataset
) -> str:
"""Return the header of the CSV file."""
template = """
########################################################################################
# This file contains data retrieved from the CDS https://cds.climate.copernicus.eu/cdsapp#!/dataset/{dataset}
# This is a C3S product under the following licences:
# - licence-to-use-copernicus-products
# - woudc-data-policy
# This is a CSV file following the CDS convention cdm-obs
# Data source: {dataset_source}
# Version:
# Time extent: {time_start} - {time_end}
# Geographic area (minlat/maxlat/minlon/maxlon): {area}
# Variables selected and units
{varstr}
########################################################################################
"""
area = "{}/{}/{}/{}".format(
cdm_lite_dataset.latitude.min().compute().item(),
cdm_lite_dataset.latitude.max().compute().item(),
cdm_lite_dataset.longitude.min().compute().item(),
cdm_lite_dataset.longitude.max().compute().item(),
)
time_start = "{:%Y%m%d}".format(cdm_lite_dataset.report_timestamp.to_index()[0])
time_end = "{:%Y%m%d}".format(cdm_lite_dataset.report_timestamp.to_index()[-1])
vars_and_units = zip(
numpy.unique(cdm_lite_dataset.observed_variable.to_index().str.decode("utf-8")),
numpy.unique(cdm_lite_dataset.units.to_index().str.decode("utf-8")),
)
varstr = "\n".join([f"# {v} [{u}]" for v, u in vars_and_units])
header_params = dict(
dataset=retrieve_args.dataset,
dataset_source=retrieve_args.params.dataset_source,
area=area,
time_start=time_start,
time_end=time_end,
varstr=varstr,
)
header = template.format(**header_params)
return header
24 changes: 24 additions & 0 deletions cads_adaptors/adaptors/cadsobs/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from datetime import datetime
from typing import List, Literal

from pydantic import BaseModel

RetrieveFormat = Literal["netCDF", "csv"]


class RetrieveParams(BaseModel, extra="forbid"):
dataset_source: str
stations: None | List[str] = None
variables: List[str] | None = None
latitude_coverage: None | tuple[float, float] = None
longitude_coverage: None | tuple[float, float] = None
time_coverage: None | tuple[datetime, datetime] = None
year: None | List[int] = None
month: None | List[int] = None
day: None | List[int] = None
format: RetrieveFormat = "netCDF"


class RetrieveArgs(BaseModel):
dataset: str
params: RetrieveParams
68 changes: 68 additions & 0 deletions cads_adaptors/adaptors/cadsobs/retrieve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging
from pathlib import Path

import dask
import fsspec
import h5netcdf

from cads_adaptors.adaptors.cadsobs.csv import to_csv
from cads_adaptors.adaptors.cadsobs.models import RetrieveArgs, RetrieveParams
from cads_adaptors.adaptors.cadsobs.utils import (
_add_attributes,
_filter_asset_and_save,
_get_char_sizes,
_get_output_path,
)

logger = logging.getLogger(__name__)


def retrieve_data(
dataset_name: str,
mapped_request: dict,
output_dir: Path,
object_urls: list[str],
cdm_lite_variables: list[str],
global_attributes: dict,
) -> Path:
output_path_netcdf = _get_output_path(output_dir, dataset_name, "netCDF")
logger.info(f"Streaming data to {output_path_netcdf}")
# We first need to loop over the files to get the max size of the strings fields
# This way we can know the size of the output
# background cache will download blocks in the background ahead of time using a
# thread.
fs = fsspec.filesystem("https", cache_type="background", block_size=10 * (1024**2))
# Silence fsspec log as background cache does print unformatted log lines.
logging.getLogger("fsspec").setLevel(logging.WARNING)
# Get the maximum size of the character arrays
char_sizes = _get_char_sizes(fs, object_urls)
variables = mapped_request["variables"]
char_sizes["observed_variable"] = max([len(v) for v in variables])
# Open the output file and dump the data from each input file.
retrieve_args = RetrieveArgs(
dataset=dataset_name, params=RetrieveParams(**mapped_request)
)
with h5netcdf.File(output_path_netcdf, "w") as oncobj:
oncobj.dimensions["index"] = None
for url in object_urls:
_filter_asset_and_save(
fs, oncobj, retrieve_args, url, char_sizes, cdm_lite_variables
)
# Check if the resulting file is empty
if len(oncobj.variables) == 0 or len(oncobj.variables["report_timestamp"]) == 0:
raise RuntimeError(
"No data was found, try a different parameter combination."
)
# Add atributes
_add_attributes(oncobj, global_attributes)
# If the user asked for a CSV, we transform the file to CSV
if retrieve_args.params.format == "netCDF":
output_path = output_path_netcdf
else:
try:
with dask.config.set(scheduler="threads"):
output_path = to_csv(output_dir, output_path_netcdf, retrieve_args)
finally:
# Ensure that the netCDF is not left behind taking disk space.
output_path_netcdf.unlink()
return output_path
Loading

0 comments on commit 7b589ce

Please sign in to comment.