Skip to content

Commit

Permalink
Merge pull request #172 from ecmwf-projects/COPDS-1504-live-logs
Browse files Browse the repository at this point in the history
Live logs
  • Loading branch information
francesconazzaro authored Feb 9, 2024
2 parents b83f2b8 + 7beb791 commit 759de92
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 56 deletions.
99 changes: 65 additions & 34 deletions cads_processing_api_service/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License

import datetime
import uuid

import attrs
Expand Down Expand Up @@ -242,8 +243,9 @@ def post_process_execution(
def get_jobs(
self,
processID: list[str] | None = fastapi.Query(None),
status: list[ogc_api_processes_fastapi.models.StatusCode]
| None = fastapi.Query(None),
status: (
list[ogc_api_processes_fastapi.models.StatusCode] | None
) = fastapi.Query(None),
limit: int | None = fastapi.Query(10, ge=1, le=10000),
sortby: utils.JobSortCriterion | None = fastapi.Query(
utils.JobSortCriterion.created_at_desc
Expand Down Expand Up @@ -317,34 +319,38 @@ def get_jobs(
**job_filters,
)
job_entries = compute_session.scalars(statement).all()
if back:
job_entries = reversed(job_entries)
jobs = []
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
db_utils.ConnectionMode.read
)
for job in job_entries:
with catalogue_sessionmaker() as catalogue_session:
try:
(dataset_title,) = utils.get_resource_properties(
resource_id=job.process_id,
properties="title",
table=self.process_table,
session=catalogue_session,
if back:
job_entries = reversed(job_entries)
jobs = []
catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker(
db_utils.ConnectionMode.read
)
for job in job_entries:
with catalogue_sessionmaker() as catalogue_session:
try:
(dataset_title,) = utils.get_resource_properties(
resource_id=job.process_id,
properties="title",
table=self.process_table,
session=catalogue_session,
)
except ogc_api_processes_fastapi.exceptions.NoSuchProcess:
dataset_title = config.ensure_settings().missing_dataset_title
results = utils.parse_results_from_broker_db(
job, session=compute_session
)
jobs.append(
utils.make_status_info(
job=job,
results=results,
dataset_metadata={"title": dataset_title},
qos={
"status": cads_broker.database.get_qos_status_from_request(
job
)
},
)
except ogc_api_processes_fastapi.exceptions.NoSuchProcess:
dataset_title = config.ensure_settings().missing_dataset_title
results = utils.parse_results_from_broker_db(job)
jobs.append(
utils.make_status_info(
job=job,
results=results,
dataset_metadata={"title": dataset_title},
qos={
"status": cads_broker.database.get_qos_status_from_request(job)
},
)
)
job_list = models.JobList(
jobs=jobs,
links=[ogc_api_processes_fastapi.models.Link(href="")],
Expand All @@ -367,6 +373,9 @@ def get_job(
qos: bool = fastapi.Query(False),
request: bool = fastapi.Query(False),
log: bool = fastapi.Query(False),
log_start_time: datetime.datetime | None = fastapi.Query(
None, alias="logStartTime"
),
) -> models.StatusInfo:
"""Implement OGC API - Processes `GET /jobs/{job_id}` endpoint.
Expand All @@ -386,6 +395,8 @@ def get_job(
Whether to include the request in the response
log : bool, optional
Whether to include the job's log in the response
log_start_time: datetime.datetime, optional
Datetime of the first log message to be returned
Returns
-------
Expand All @@ -403,7 +414,16 @@ def get_job(
job_id=job_id, session=compute_session
)
if qos:
job_qos_info = utils.collect_job_qos_info(job, compute_session)
job_qos_info = utils.get_job_qos_info(job, compute_session)
# These lines are inside the session context because the related fields
# are lazy loaded
if log:
job_log = utils.get_job_events(
job,
compute_session,
"user_visible_log",
log_start_time,
)
except ogc_api_processes_fastapi.exceptions.NoSuchJob:
compute_sessionmaker = db_utils.get_compute_sessionmaker(
mode=db_utils.ConnectionMode.write
Expand All @@ -413,7 +433,16 @@ def get_job(
job_id=job_id, session=compute_session
)
if qos:
job_qos_info = utils.collect_job_qos_info(job, compute_session)
job_qos_info = utils.get_job_qos_info(job, compute_session)
# These lines are inside the session context because the related fields
# are lazy loaded
if log:
job_log = utils.get_job_events(
job,
compute_session,
"user_visible_log",
log_start_time,
)
if job.portal not in portals:
raise ogc_api_processes_fastapi.exceptions.NoSuchJob(
detail=f"job {job_id} not found"
Expand All @@ -438,13 +467,15 @@ def get_job(
request_ids, form_data
),
}
if log:
kwargs["log"] = [
(message[0].isoformat(), message[1]) for message in job_log
]
if qos:
kwargs["qos"] = {
**job_qos_info,
"status": cads_broker.database.get_qos_status_from_request(job),
}
if log:
kwargs["log"] = utils.extract_job_log(job)
status_info = utils.make_status_info(job=job, **kwargs)
return status_info

Expand Down Expand Up @@ -483,8 +514,8 @@ def get_job_results(
job = utils.get_job_from_broker_db(
job_id=job_id, session=compute_session
)
results = utils.get_results_from_job(job=job, session=compute_session)
auth.verify_permission(user_uid, job)
results = utils.get_results_from_job(job=job)
except (
ogc_api_processes_fastapi.exceptions.NoSuchJob,
ogc_api_processes_fastapi.exceptions.ResultsNotReady,
Expand All @@ -496,8 +527,8 @@ def get_job_results(
job = utils.get_job_from_broker_db(
job_id=job_id, session=compute_session
)
results = utils.get_results_from_job(job=job, session=compute_session)
auth.verify_permission(user_uid, job)
results = utils.get_results_from_job(job=job)
handle_download_metrics(job.process_id, results)
return results

Expand Down
3 changes: 2 additions & 1 deletion cads_processing_api_service/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class StatusInfoMetadata(pydantic.BaseModel):
results: dict[str, Any] | None = None
datasetMetadata: dict[str, Any] | None = None
qos: dict[str, Any] | None = None
log: list[str] | None = None
log: list[tuple[str, str]] | None = None


class StatusInfo(ogc_api_processes_fastapi.models.StatusInfo):
Expand Down Expand Up @@ -56,3 +56,4 @@ class JobList(ogc_api_processes_fastapi.models.JobList):
class Exception(ogc_api_processes_fastapi.models.Exception):
trace_id: str | None = None
traceback: str | None = None
messages: list[tuple[str, str]] | None = None
48 changes: 33 additions & 15 deletions cads_processing_api_service/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# limitations under the License.

import base64
import datetime
import enum
import json
from typing import Any, Callable, Mapping

import cachetools
Expand Down Expand Up @@ -429,7 +429,8 @@ def dictify_job(request: cads_broker.database.SystemRequest) -> dict[str, Any]:


def get_job_from_broker_db(
job_id: str, session: sqlalchemy.orm.Session
job_id: str,
session: sqlalchemy.orm.Session,
) -> cads_broker.SystemRequest:
"""Get job description from the Broker database.
Expand Down Expand Up @@ -467,7 +468,9 @@ def get_job_from_broker_db(
return job


def get_results_from_job(job: cads_broker.SystemRequest) -> dict[str, Any]:
def get_results_from_job(
job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session
) -> dict[str, Any]:
"""Get job results description from SystemRequest instance.
Parameters
Expand Down Expand Up @@ -498,9 +501,13 @@ def get_results_from_job(job: cads_broker.SystemRequest) -> dict[str, Any]:
detail=f"results of job {job_id} expired"
)
elif job_status == "failed":
error_messages = get_job_events(
job=job, session=session, event_type="user_visible_error"
)
traceback = "\n".join([message[1] for message in error_messages])
raise ogc_api_processes_fastapi.exceptions.JobResultsFailed(
status_code=fastapi.status.HTTP_400_BAD_REQUEST,
traceback=str(job.response_error["message"]), # type: ignore
traceback=traceback,
)
elif job_status in ("accepted", "running"):
raise ogc_api_processes_fastapi.exceptions.ResultsNotReady(
Expand All @@ -509,15 +516,17 @@ def get_results_from_job(job: cads_broker.SystemRequest) -> dict[str, Any]:
return results


def parse_results_from_broker_db(job: cads_broker.SystemRequest) -> dict[str, Any]:
def parse_results_from_broker_db(
job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session
) -> dict[str, Any]:
try:
results = get_results_from_job(job=job)
results = get_results_from_job(job=job, session=session)
except ogc_api_processes_fastapi.exceptions.OGCAPIException as exc:
results = exceptions.format_exception_content(exc=exc)
return results


def collect_job_qos_info(
def get_job_qos_info(
job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session
) -> dict[str, Any]:
entry_point = str(job.entry_point)
Expand Down Expand Up @@ -560,13 +569,22 @@ def collect_job_qos_info(
return qos


def extract_job_log(job: cads_broker.SystemRequest) -> list[str]:
log = []
if job.response_user_visible_log:
job_log = json.loads(str(job.response_user_visible_log))
for log_timestamp, log_message in job_log:
log.append(log_message)
return log
def get_job_events(
job: cads_broker.SystemRequest,
session: sqlalchemy.orm.Session,
event_type: str | None = None,
start_time: datetime.datetime | None = None,
) -> list[tuple[datetime.datetime, str]]:
events = []
request_uid = str(job.request_uid)
request_events: list[
cads_broker.database.Events
] = cads_broker.database.get_events_from_request(
request_uid, session, event_type, start_time
)
for request_event in request_events:
events.append((request_event.timestamp, request_event.message))
return events # type: ignore


def make_status_info(
Expand All @@ -575,7 +593,7 @@ def make_status_info(
results: dict[str, Any] | None = None,
dataset_metadata: dict[str, Any] | None = None,
qos: dict[str, Any] | None = None,
log: list[str] | None = None,
log: list[tuple[str, str]] | None = None,
) -> models.StatusInfo:
"""Compose job's status information.
Expand Down
20 changes: 14 additions & 6 deletions tests/test_30_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def test_get_job_from_broker_db() -> None:


def test_get_results_from_job() -> None:
mock_session = unittest.mock.Mock(spec=sqlalchemy.orm.Session)
job = cads_broker.SystemRequest(
**{
"status": "successful",
Expand All @@ -250,27 +251,34 @@ def test_get_results_from_job() -> None:
),
}
)
results = utils.get_results_from_job(job)
results = utils.get_results_from_job(job, session=mock_session)
exp_results = {"asset": {"value": {"key": "value"}}}
assert results == exp_results

job = cads_broker.SystemRequest(
**{
"status": "failed",
"request_uid": "1234",
"response_error": {"message": "traceback"},
}
)
with pytest.raises(ogc_api_processes_fastapi.exceptions.JobResultsFailed):
results = utils.get_results_from_job(job)
with pytest.raises(ogc_api_processes_fastapi.exceptions.JobResultsFailed) as exc:
with unittest.mock.patch(
"cads_processing_api_service.utils.get_job_events"
) as mock_get_job_events:
mock_get_job_events.return_value = [
"2024-01-01T16:20:12.175021",
"error message",
]
results = utils.get_results_from_job(job, session=mock_session)
assert exc.value.traceback == "error message"

job = cads_broker.SystemRequest(**{"status": "accepted", "request_uid": "1234"})
with pytest.raises(ogc_api_processes_fastapi.exceptions.ResultsNotReady):
results = utils.get_results_from_job(job)
results = utils.get_results_from_job(job, session=mock_session)

job = cads_broker.SystemRequest(**{"status": "running", "request_uid": "1234"})
with pytest.raises(ogc_api_processes_fastapi.exceptions.ResultsNotReady):
results = utils.get_results_from_job(job)
results = utils.get_results_from_job(job, session=mock_session)


def test_make_status_info() -> None:
Expand Down

0 comments on commit 759de92

Please sign in to comment.