Skip to content

Commit

Permalink
Merge pull request #227 from ecmwf-projects/COPDS-2242-master-broker-db
Browse files Browse the repository at this point in the history
Use compute db master instance for requests coming from ui
  • Loading branch information
mcucchi9 authored Nov 20, 2024
2 parents c53e849 + 6278422 commit fe3f0bf
Showing 1 changed file with 50 additions and 30 deletions.
80 changes: 50 additions & 30 deletions cads_processing_api_service/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def get_jobs(
)
statement = utils.apply_limit(statement, limit)
compute_sessionmaker = db_utils.get_compute_sessionmaker(
mode=db_utils.ConnectionMode.read
mode=db_utils.ConnectionMode.write
)
with compute_sessionmaker() as compute_session:
jobs_count = cads_broker.database.count_requests(
Expand Down Expand Up @@ -493,9 +493,14 @@ def get_job(
if auth_info.portal_header
else None
)
compute_connection_mode = (
db_utils.ConnectionMode.write
if auth_info.request_origin == "ui"
else db_utils.ConnectionMode.read
)
try:
compute_sessionmaker = db_utils.get_compute_sessionmaker(
mode=db_utils.ConnectionMode.read
mode=compute_connection_mode
)
with compute_sessionmaker() as compute_session:
job = utils.get_job_from_broker_db(
Expand All @@ -516,27 +521,32 @@ def get_job(
log_start_time,
)
except ogc_api_processes_fastapi.exceptions.NoSuchJob:
compute_sessionmaker = db_utils.get_compute_sessionmaker(
mode=db_utils.ConnectionMode.write
)
with compute_sessionmaker() as compute_session:
job = utils.get_job_from_broker_db(
job_id=job_id, session=compute_session
if compute_connection_mode == db_utils.ConnectionMode.write:
raise
else:
compute_sessionmaker = db_utils.get_compute_sessionmaker(
mode=db_utils.ConnectionMode.write
)
if qos:
job_qos_info = {
**utils.get_job_qos_info(job, compute_session),
"status": cads_broker.database.get_qos_status_from_request(job),
}
# These lines are inside the session context because the related fields
# are lazy loaded
if log:
job_log = utils.get_job_events(
job,
compute_session,
"user_visible_log",
log_start_time,
with compute_sessionmaker() as compute_session:
job = utils.get_job_from_broker_db(
job_id=job_id, session=compute_session
)
if qos:
job_qos_info = {
**utils.get_job_qos_info(job, compute_session),
"status": cads_broker.database.get_qos_status_from_request(
job
),
}
# These lines are inside the session context because the related fields
# are lazy loaded
if log:
job_log = utils.get_job_events(
job,
compute_session,
"user_visible_log",
log_start_time,
)
if job.portal not in portals:
raise ogc_api_processes_fastapi.exceptions.NoSuchJob(
detail=f"job {job_id} not found"
Expand Down Expand Up @@ -600,9 +610,14 @@ def get_job_results(
structlog.contextvars.bind_contextvars(
job_id=job_id, user_uid=auth_info.user_uid
)
compute_connection_mode = (
db_utils.ConnectionMode.write
if auth_info.request_origin == "ui"
else db_utils.ConnectionMode.read
)
try:
compute_sessionmaker = db_utils.get_compute_sessionmaker(
mode=db_utils.ConnectionMode.read
mode=compute_connection_mode
)
with compute_sessionmaker() as compute_session:
job = utils.get_job_from_broker_db(
Expand All @@ -614,15 +629,20 @@ def get_job_results(
ogc_api_processes_fastapi.exceptions.NoSuchJob,
ogc_api_processes_fastapi.exceptions.ResultsNotReady,
):
compute_sessionmaker = db_utils.get_compute_sessionmaker(
mode=db_utils.ConnectionMode.write
)
with compute_sessionmaker() as compute_session:
job = utils.get_job_from_broker_db(
job_id=job_id, session=compute_session
if compute_connection_mode == db_utils.ConnectionMode.write:
raise
else:
compute_sessionmaker = db_utils.get_compute_sessionmaker(
mode=db_utils.ConnectionMode.write
)
results = utils.get_results_from_job(job=job, session=compute_session)
auth.verify_permission(auth_info.user_uid, job)
with compute_sessionmaker() as compute_session:
job = utils.get_job_from_broker_db(
job_id=job_id, session=compute_session
)
results = utils.get_results_from_job(
job=job, session=compute_session
)
auth.verify_permission(auth_info.user_uid, job)
handle_download_metrics(job.process_id, results)
return results

Expand Down

0 comments on commit fe3f0bf

Please sign in to comment.