From 62784222f40d5c43adeec73d5f3424512f27d5fc Mon Sep 17 00:00:00 2001 From: Marco Cucchi Date: Wed, 20 Nov 2024 13:06:47 +0100 Subject: [PATCH] use compute write instance for requests coming from ui --- cads_processing_api_service/clients.py | 80 ++++++++++++++++---------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/cads_processing_api_service/clients.py b/cads_processing_api_service/clients.py index 4a6e3a4..2ba0e3e 100644 --- a/cads_processing_api_service/clients.py +++ b/cads_processing_api_service/clients.py @@ -397,7 +397,7 @@ def get_jobs( ) statement = utils.apply_limit(statement, limit) compute_sessionmaker = db_utils.get_compute_sessionmaker( - mode=db_utils.ConnectionMode.read + mode=db_utils.ConnectionMode.write ) with compute_sessionmaker() as compute_session: jobs_count = cads_broker.database.count_requests( @@ -493,9 +493,14 @@ def get_job( if auth_info.portal_header else None ) + compute_connection_mode = ( + db_utils.ConnectionMode.write + if auth_info.request_origin == "ui" + else db_utils.ConnectionMode.read + ) try: compute_sessionmaker = db_utils.get_compute_sessionmaker( - mode=db_utils.ConnectionMode.read + mode=compute_connection_mode ) with compute_sessionmaker() as compute_session: job = utils.get_job_from_broker_db( @@ -516,27 +521,32 @@ def get_job( log_start_time, ) except ogc_api_processes_fastapi.exceptions.NoSuchJob: - compute_sessionmaker = db_utils.get_compute_sessionmaker( - mode=db_utils.ConnectionMode.write - ) - with compute_sessionmaker() as compute_session: - job = utils.get_job_from_broker_db( - job_id=job_id, session=compute_session + if compute_connection_mode == db_utils.ConnectionMode.write: + raise + else: + compute_sessionmaker = db_utils.get_compute_sessionmaker( + mode=db_utils.ConnectionMode.write ) - if qos: - job_qos_info = { - **utils.get_job_qos_info(job, compute_session), - "status": cads_broker.database.get_qos_status_from_request(job), - } - # These lines are inside the session context because the related fields - # are lazy loaded - if log: - job_log = utils.get_job_events( - job, - compute_session, - "user_visible_log", - log_start_time, + with compute_sessionmaker() as compute_session: + job = utils.get_job_from_broker_db( + job_id=job_id, session=compute_session ) + if qos: + job_qos_info = { + **utils.get_job_qos_info(job, compute_session), + "status": cads_broker.database.get_qos_status_from_request( + job + ), + } + # These lines are inside the session context because the related fields + # are lazy loaded + if log: + job_log = utils.get_job_events( + job, + compute_session, + "user_visible_log", + log_start_time, + ) if job.portal not in portals: raise ogc_api_processes_fastapi.exceptions.NoSuchJob( detail=f"job {job_id} not found" @@ -600,9 +610,14 @@ def get_job_results( structlog.contextvars.bind_contextvars( job_id=job_id, user_uid=auth_info.user_uid ) + compute_connection_mode = ( + db_utils.ConnectionMode.write + if auth_info.request_origin == "ui" + else db_utils.ConnectionMode.read + ) try: compute_sessionmaker = db_utils.get_compute_sessionmaker( - mode=db_utils.ConnectionMode.read + mode=compute_connection_mode ) with compute_sessionmaker() as compute_session: job = utils.get_job_from_broker_db( @@ -614,15 +629,20 @@ def get_job_results( ogc_api_processes_fastapi.exceptions.NoSuchJob, ogc_api_processes_fastapi.exceptions.ResultsNotReady, ): - compute_sessionmaker = db_utils.get_compute_sessionmaker( - mode=db_utils.ConnectionMode.write - ) - with compute_sessionmaker() as compute_session: - job = utils.get_job_from_broker_db( - job_id=job_id, session=compute_session + if compute_connection_mode == db_utils.ConnectionMode.write: + raise + else: + compute_sessionmaker = db_utils.get_compute_sessionmaker( + mode=db_utils.ConnectionMode.write ) - results = utils.get_results_from_job(job=job, session=compute_session) - auth.verify_permission(auth_info.user_uid, job) + with compute_sessionmaker() as compute_session: + job = utils.get_job_from_broker_db( + job_id=job_id, session=compute_session + ) + results = utils.get_results_from_job( + job=job, session=compute_session + ) + auth.verify_permission(auth_info.user_uid, job) handle_download_metrics(job.process_id, results) return results