diff --git a/cads_processing_api_service/clients.py b/cads_processing_api_service/clients.py index 47e4713..be943d3 100644 --- a/cads_processing_api_service/clients.py +++ b/cads_processing_api_service/clients.py @@ -16,6 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License +import datetime import uuid import attrs @@ -242,8 +243,9 @@ def post_process_execution( def get_jobs( self, processID: list[str] | None = fastapi.Query(None), - status: list[ogc_api_processes_fastapi.models.StatusCode] - | None = fastapi.Query(None), + status: ( + list[ogc_api_processes_fastapi.models.StatusCode] | None + ) = fastapi.Query(None), limit: int | None = fastapi.Query(10, ge=1, le=10000), sortby: utils.JobSortCriterion | None = fastapi.Query( utils.JobSortCriterion.created_at_desc @@ -317,34 +319,38 @@ def get_jobs( **job_filters, ) job_entries = compute_session.scalars(statement).all() - if back: - job_entries = reversed(job_entries) - jobs = [] - catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker( - db_utils.ConnectionMode.read - ) - for job in job_entries: - with catalogue_sessionmaker() as catalogue_session: - try: - (dataset_title,) = utils.get_resource_properties( - resource_id=job.process_id, - properties="title", - table=self.process_table, - session=catalogue_session, + if back: + job_entries = reversed(job_entries) + jobs = [] + catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker( + db_utils.ConnectionMode.read + ) + for job in job_entries: + with catalogue_sessionmaker() as catalogue_session: + try: + (dataset_title,) = utils.get_resource_properties( + resource_id=job.process_id, + properties="title", + table=self.process_table, + session=catalogue_session, + ) + except ogc_api_processes_fastapi.exceptions.NoSuchProcess: + dataset_title = config.ensure_settings().missing_dataset_title + results = utils.parse_results_from_broker_db( + job, session=compute_session + ) + jobs.append( + utils.make_status_info( + job=job, + results=results, + dataset_metadata={"title": dataset_title}, + qos={ + "status": cads_broker.database.get_qos_status_from_request( + job + ) + }, ) - except ogc_api_processes_fastapi.exceptions.NoSuchProcess: - dataset_title = config.ensure_settings().missing_dataset_title - results = utils.parse_results_from_broker_db(job) - jobs.append( - utils.make_status_info( - job=job, - results=results, - dataset_metadata={"title": dataset_title}, - qos={ - "status": cads_broker.database.get_qos_status_from_request(job) - }, ) - ) job_list = models.JobList( jobs=jobs, links=[ogc_api_processes_fastapi.models.Link(href="")], @@ -367,6 +373,9 @@ def get_job( qos: bool = fastapi.Query(False), request: bool = fastapi.Query(False), log: bool = fastapi.Query(False), + log_start_time: datetime.datetime | None = fastapi.Query( + None, alias="logStartTime" + ), ) -> models.StatusInfo: """Implement OGC API - Processes `GET /jobs/{job_id}` endpoint. @@ -386,6 +395,8 @@ def get_job( Whether to include the request in the response log : bool, optional Whether to include the job's log in the response + log_start_time: datetime.datetime, optional + Datetime of the first log message to be returned Returns ------- @@ -403,7 +414,16 @@ def get_job( job_id=job_id, session=compute_session ) if qos: - job_qos_info = utils.collect_job_qos_info(job, compute_session) + job_qos_info = utils.get_job_qos_info(job, compute_session) + # These lines are inside the session context because the related fields + # are lazy loaded + if log: + job_log = utils.get_job_events( + job, + compute_session, + "user_visible_log", + log_start_time, + ) except ogc_api_processes_fastapi.exceptions.NoSuchJob: compute_sessionmaker = db_utils.get_compute_sessionmaker( mode=db_utils.ConnectionMode.write @@ -413,7 +433,16 @@ def get_job( job_id=job_id, session=compute_session ) if qos: - job_qos_info = utils.collect_job_qos_info(job, compute_session) + job_qos_info = utils.get_job_qos_info(job, compute_session) + # These lines are inside the session context because the related fields + # are lazy loaded + if log: + job_log = utils.get_job_events( + job, + compute_session, + "user_visible_log", + log_start_time, + ) if job.portal not in portals: raise ogc_api_processes_fastapi.exceptions.NoSuchJob( detail=f"job {job_id} not found" @@ -438,13 +467,15 @@ def get_job( request_ids, form_data ), } + if log: + kwargs["log"] = [ + (message[0].isoformat(), message[1]) for message in job_log + ] if qos: kwargs["qos"] = { **job_qos_info, "status": cads_broker.database.get_qos_status_from_request(job), } - if log: - kwargs["log"] = utils.extract_job_log(job) status_info = utils.make_status_info(job=job, **kwargs) return status_info @@ -483,8 +514,8 @@ def get_job_results( job = utils.get_job_from_broker_db( job_id=job_id, session=compute_session ) + results = utils.get_results_from_job(job=job, session=compute_session) auth.verify_permission(user_uid, job) - results = utils.get_results_from_job(job=job) except ( ogc_api_processes_fastapi.exceptions.NoSuchJob, ogc_api_processes_fastapi.exceptions.ResultsNotReady, @@ -496,8 +527,8 @@ def get_job_results( job = utils.get_job_from_broker_db( job_id=job_id, session=compute_session ) + results = utils.get_results_from_job(job=job, session=compute_session) auth.verify_permission(user_uid, job) - results = utils.get_results_from_job(job=job) handle_download_metrics(job.process_id, results) return results diff --git a/cads_processing_api_service/models.py b/cads_processing_api_service/models.py index 7c72f76..6cdfc0b 100644 --- a/cads_processing_api_service/models.py +++ b/cads_processing_api_service/models.py @@ -26,7 +26,7 @@ class StatusInfoMetadata(pydantic.BaseModel): results: dict[str, Any] | None = None datasetMetadata: dict[str, Any] | None = None qos: dict[str, Any] | None = None - log: list[str] | None = None + log: list[tuple[str, str]] | None = None class StatusInfo(ogc_api_processes_fastapi.models.StatusInfo): @@ -56,3 +56,4 @@ class JobList(ogc_api_processes_fastapi.models.JobList): class Exception(ogc_api_processes_fastapi.models.Exception): trace_id: str | None = None traceback: str | None = None + messages: list[tuple[str, str]] | None = None diff --git a/cads_processing_api_service/utils.py b/cads_processing_api_service/utils.py index 33647c1..314d34e 100644 --- a/cads_processing_api_service/utils.py +++ b/cads_processing_api_service/utils.py @@ -15,8 +15,8 @@ # limitations under the License. import base64 +import datetime import enum -import json from typing import Any, Callable, Mapping import cachetools @@ -429,7 +429,8 @@ def dictify_job(request: cads_broker.database.SystemRequest) -> dict[str, Any]: def get_job_from_broker_db( - job_id: str, session: sqlalchemy.orm.Session + job_id: str, + session: sqlalchemy.orm.Session, ) -> cads_broker.SystemRequest: """Get job description from the Broker database. @@ -467,7 +468,9 @@ def get_job_from_broker_db( return job -def get_results_from_job(job: cads_broker.SystemRequest) -> dict[str, Any]: +def get_results_from_job( + job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session +) -> dict[str, Any]: """Get job results description from SystemRequest instance. Parameters @@ -498,9 +501,13 @@ def get_results_from_job(job: cads_broker.SystemRequest) -> dict[str, Any]: detail=f"results of job {job_id} expired" ) elif job_status == "failed": + error_messages = get_job_events( + job=job, session=session, event_type="user_visible_error" + ) + traceback = "\n".join([message[1] for message in error_messages]) raise ogc_api_processes_fastapi.exceptions.JobResultsFailed( status_code=fastapi.status.HTTP_400_BAD_REQUEST, - traceback=str(job.response_error["message"]), # type: ignore + traceback=traceback, ) elif job_status in ("accepted", "running"): raise ogc_api_processes_fastapi.exceptions.ResultsNotReady( @@ -509,15 +516,17 @@ def get_results_from_job(job: cads_broker.SystemRequest) -> dict[str, Any]: return results -def parse_results_from_broker_db(job: cads_broker.SystemRequest) -> dict[str, Any]: +def parse_results_from_broker_db( + job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session +) -> dict[str, Any]: try: - results = get_results_from_job(job=job) + results = get_results_from_job(job=job, session=session) except ogc_api_processes_fastapi.exceptions.OGCAPIException as exc: results = exceptions.format_exception_content(exc=exc) return results -def collect_job_qos_info( +def get_job_qos_info( job: cads_broker.SystemRequest, session: sqlalchemy.orm.Session ) -> dict[str, Any]: entry_point = str(job.entry_point) @@ -560,13 +569,22 @@ def collect_job_qos_info( return qos -def extract_job_log(job: cads_broker.SystemRequest) -> list[str]: - log = [] - if job.response_user_visible_log: - job_log = json.loads(str(job.response_user_visible_log)) - for log_timestamp, log_message in job_log: - log.append(log_message) - return log +def get_job_events( + job: cads_broker.SystemRequest, + session: sqlalchemy.orm.Session, + event_type: str | None = None, + start_time: datetime.datetime | None = None, +) -> list[tuple[datetime.datetime, str]]: + events = [] + request_uid = str(job.request_uid) + request_events: list[ + cads_broker.database.Events + ] = cads_broker.database.get_events_from_request( + request_uid, session, event_type, start_time + ) + for request_event in request_events: + events.append((request_event.timestamp, request_event.message)) + return events # type: ignore def make_status_info( @@ -575,7 +593,7 @@ def make_status_info( results: dict[str, Any] | None = None, dataset_metadata: dict[str, Any] | None = None, qos: dict[str, Any] | None = None, - log: list[str] | None = None, + log: list[tuple[str, str]] | None = None, ) -> models.StatusInfo: """Compose job's status information. diff --git a/tests/test_30_utils.py b/tests/test_30_utils.py index e0c4f90..3581c28 100644 --- a/tests/test_30_utils.py +++ b/tests/test_30_utils.py @@ -241,6 +241,7 @@ def test_get_job_from_broker_db() -> None: def test_get_results_from_job() -> None: + mock_session = unittest.mock.Mock(spec=sqlalchemy.orm.Session) job = cads_broker.SystemRequest( **{ "status": "successful", @@ -250,7 +251,7 @@ def test_get_results_from_job() -> None: ), } ) - results = utils.get_results_from_job(job) + results = utils.get_results_from_job(job, session=mock_session) exp_results = {"asset": {"value": {"key": "value"}}} assert results == exp_results @@ -258,19 +259,26 @@ def test_get_results_from_job() -> None: **{ "status": "failed", "request_uid": "1234", - "response_error": {"message": "traceback"}, } ) - with pytest.raises(ogc_api_processes_fastapi.exceptions.JobResultsFailed): - results = utils.get_results_from_job(job) + with pytest.raises(ogc_api_processes_fastapi.exceptions.JobResultsFailed) as exc: + with unittest.mock.patch( + "cads_processing_api_service.utils.get_job_events" + ) as mock_get_job_events: + mock_get_job_events.return_value = [ + "2024-01-01T16:20:12.175021", + "error message", + ] + results = utils.get_results_from_job(job, session=mock_session) + assert exc.value.traceback == "error message" job = cads_broker.SystemRequest(**{"status": "accepted", "request_uid": "1234"}) with pytest.raises(ogc_api_processes_fastapi.exceptions.ResultsNotReady): - results = utils.get_results_from_job(job) + results = utils.get_results_from_job(job, session=mock_session) job = cads_broker.SystemRequest(**{"status": "running", "request_uid": "1234"}) with pytest.raises(ogc_api_processes_fastapi.exceptions.ResultsNotReady): - results = utils.get_results_from_job(job) + results = utils.get_results_from_job(job, session=mock_session) def test_make_status_info() -> None: