Skip to content

Commit

Permalink
Merge pull request #10 from ecmwf-projects/http_api_COPDS-1549
Browse files Browse the repository at this point in the history
Do not merge yet. Http api copds 1549
  • Loading branch information
gbiavati authored Jun 6, 2024
2 parents 95e2a31 + 0ea3436 commit 6f2df54
Show file tree
Hide file tree
Showing 18 changed files with 429 additions and 508 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,15 @@ truncating the data tables at 10k rows, and dump it to a .sql file.
cd tests/scripts
bash make_test_ingestiondb.sh
```

### HTTP API

To deploy the HTTP API, unicorn needs to be installed. Please note that it should
not be exposed to the internet without a proxy. The deploy command is:

```
python cdsobs/api_rest/main.py
```

The /doc endpoint can be opened in a browser to check the automatically generated
documentation.
Empty file added cdsobs/api_rest/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions cdsobs/api_rest/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from fastapi import FastAPI

from cdsobs.api_rest.endpoints import router

app = FastAPI(title="cads-obs-app", version="0.1", debug=True)
app.include_router(router)
20 changes: 20 additions & 0 deletions cdsobs/api_rest/config_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from cdsobs.service_definition.api import get_service_definition


def datasets_installed() -> list[str]:
return []


def sources_installed() -> dict[str, list[str]]:
sources = dict()
for dataset in datasets_installed():
sources[dataset] = get_dataset_sources(dataset)
return sources


def get_dataset_sources(dataset: str) -> list[str]:
service_def = get_service_definition(dataset)
try:
return list(service_def.sources.keys())
except (KeyError, FileNotFoundError):
raise RuntimeError(f"Invalid service definition for {dataset=}")
94 changes: 94 additions & 0 deletions cdsobs/api_rest/endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated, Iterator

import sqlalchemy.orm
from fastapi import APIRouter, Depends, HTTPException

from cdsobs.api_rest.models import RetrievePayload
from cdsobs.cdm.lite import cdm_lite_variables
from cdsobs.cli._utils import ConfigNotFound
from cdsobs.config import CDSObsConfig, validate_config
from cdsobs.observation_catalogue.repositories.cads_dataset import CadsDatasetRepository
from cdsobs.observation_catalogue.repositories.catalogue import CatalogueRepository
from cdsobs.retrieve.retrieve_services import (
_get_catalogue_entries,
get_urls_and_check_size,
)
from cdsobs.service_definition.api import get_service_definition
from cdsobs.service_definition.service_definition_models import ServiceDefinition
from cdsobs.storage import S3Client
from cdsobs.utils.utils import get_database_session

router = APIRouter()


@dataclass
class HttpAPISession:
cdsobs_config: CDSObsConfig
catalogue_session: sqlalchemy.orm.Session


def session_gen() -> Iterator[HttpAPISession]:
if "CDSOBS_CONFIG" in os.environ:
cdsobs_config_yml = Path(os.environ["CDSOBS_CONFIG"])
else:
cdsobs_config_yml = Path.home().joinpath(".cdsobs/cdsobs_config.yml")
if not Path(cdsobs_config_yml).exists():
raise ConfigNotFound()
cdsobs_config = validate_config(cdsobs_config_yml)
try:
catalogue_session = get_database_session(cdsobs_config.catalogue_db.get_url())
session = HttpAPISession(cdsobs_config, catalogue_session)
yield session
finally:
session.catalogue_session.close()


@router.post("/get_object_urls_and_check_size")
def get_object_urls_and_check_size(
retrieve_payload: RetrievePayload,
session: Annotated[HttpAPISession, Depends(session_gen)],
) -> list[str]:
# Query the storage to get the URLS of the files that contain the data requested
retrieve_args = retrieve_payload.retrieve_args
catalogue_repository = CatalogueRepository(session.catalogue_session)
entries = _get_catalogue_entries(catalogue_repository, retrieve_args)
s3client = S3Client.from_config(session.cdsobs_config.s3config)
object_urls = get_urls_and_check_size(
entries, retrieve_args, retrieve_payload.config.size_limit, s3client.base
)
return object_urls


@router.get("/capabilities/datasets")
def get_capabilities(
session: Annotated[HttpAPISession, Depends(session_gen)]
) -> list[str]:
"""Get available datasets."""
results = CadsDatasetRepository(session.catalogue_session).get_all()
return [r.name for r in results]


@router.get("/capabilities/{dataset}/sources")
def get_sources(dataset: str) -> list[str]:
"""Get available sources for a given dataset."""
service_definition = get_service_definition(dataset)
return list(service_definition.sources)


@router.get("/{dataset}/service_definition")
def get_dataset_service_definition(dataset: str) -> ServiceDefinition:
"""Get the service definition for a dataset."""
try:
return get_service_definition(dataset)
except FileNotFoundError:
raise HTTPException(
status_code=404, detail=f"Service definition not found for {dataset=}"
)


@router.get("/cdm/lite_variables")
def get_cdm_lite_variables() -> list[str]:
return cdm_lite_variables
20 changes: 20 additions & 0 deletions cdsobs/api_rest/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os

import uvicorn
from dotenv import load_dotenv

from cdsobs.utils.logutils import get_logger

load_dotenv()
logger = get_logger(__name__)


if __name__ == "__main__":
logger.info("Running CADS observation catalogue manager app")
uvicorn.run(
"cdsobs.api_rest.app:app",
host="0.0.0.0",
port=int(os.environ.get("CADS_OBS_APP_PORT", 8000)),
reload=False,
workers=4,
)
12 changes: 12 additions & 0 deletions cdsobs/api_rest/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pydantic import BaseModel

from cdsobs.retrieve.models import RetrieveArgs


class RetrieveConfig(BaseModel):
size_limit: int = 10000


class RetrievePayload(BaseModel):
retrieve_args: RetrieveArgs
config: RetrieveConfig
14 changes: 12 additions & 2 deletions cdsobs/forms_jsons.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
from typing import Iterable, Tuple

import fsspec
import h5netcdf
import numpy
import pandas
import sqlalchemy as sa
from fsspec.implementations.http import HTTPFileSystem

from cdsobs.cli._catalogue_explorer import stats_summary
from cdsobs.constraints import iterative_ordering
from cdsobs.ingestion.core import get_variables_from_service_definition
from cdsobs.observation_catalogue.models import Catalogue
from cdsobs.observation_catalogue.repositories.catalogue import CatalogueRepository
from cdsobs.retrieve.api import get_url_ncobj
from cdsobs.retrieve.retrieve_services import merged_constraints_table
from cdsobs.service_definition.api import get_service_definition
from cdsobs.storage import S3Client
Expand Down Expand Up @@ -169,7 +170,7 @@ def get_station_summary(
]
for url in object_urls:
logger.info(f"Reading station data from {url}")
with get_url_ncobj(fs, url) as incobj:
with _get_url_ncobj(fs, url) as incobj:
stationvar = incobj.variables["primary_station_id"]
field_len, strlen = stationvar.shape
stations_in_partition = (
Expand Down Expand Up @@ -219,3 +220,12 @@ def _to_str_list(iterable: Iterable, min_chars: int | None = None) -> list[str]:
else:
result = (str(i).rjust(min_chars, "0") for i in iterable)
return list(result)


def _get_url_ncobj(fs: HTTPFileSystem, url: str) -> h5netcdf.File:
"""Open an URL as a netCDF file object with h5netcdf."""
fobj = fs.open(url)
logger.debug(f"Reading data from {url}.")
# xarray won't read bytes object directly with netCDF4
ncfile = h5netcdf.File(fobj, "r")
return ncfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get(self, record_id: Any):
sa.select(self.model).filter(self.model.id == record_id).limit(1)
).first()

def get_all(self, skip: int = 0, limit: int = 100) -> Sequence[Base]:
def get_all(self, skip: int = 0, limit: int = 100) -> Sequence[Any]:
return self.session.scalars(
sa.select(self.model).offset(skip).limit(limit)
).all()
Expand Down
Loading

0 comments on commit 6f2df54

Please sign in to comment.