From fbda2446a1601cefe6a1c2e5014e6fb478261195 Mon Sep 17 00:00:00 2001 From: Evgeny Blokhin Date: Fri, 8 Mar 2024 14:35:29 +0100 Subject: [PATCH] feat: stable tasks ordering + minor polishing --- examples/submit_topas_input.py | 2 +- yascheduler/db.py | 10 +++++----- yascheduler/scheduler.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/submit_topas_input.py b/examples/submit_topas_input.py index 514e1b1..d6a5789 100644 --- a/examples/submit_topas_input.py +++ b/examples/submit_topas_input.py @@ -28,6 +28,6 @@ yac = Yascheduler() -result = yac.queue_submit_task(LABEL, {"calc.inp": PATTERN_REQUEST, "structure.inc": ""}, "topas") +result = yac.queue_submit_task(LABEL, {"calc.inp": PATTERN_REQUEST, "structure.inc": "", "input.xy: ""}, "topas") print(LABEL) print(result) diff --git a/yascheduler/db.py b/yascheduler/db.py index ba8458f..11cdbae 100644 --- a/yascheduler/db.py +++ b/yascheduler/db.py @@ -17,7 +17,7 @@ @unique class TaskStatus(int, Enum): - """Tast status enum""" + """Task possible states enum""" TO_DO = 0 RUNNING = 1 @@ -243,7 +243,7 @@ async def get_task_ids_by_ip_and_status( ) -> Sequence[int]: """Get task ids by ip and status""" rows = await self.run( - "SELECT task_id FROM yascheduler_tasks WHERE ip=:ip AND status=:status;", + "SELECT task_id FROM yascheduler_tasks WHERE ip=:ip AND status=:status ORDER BY task_id;", ip=ip_addr, status=status.value, ) @@ -254,7 +254,7 @@ async def get_tasks_by_jobs(self, jobs: Sequence[int]) -> Sequence[TaskModel]: rows = await self.run( """SELECT task_id, label, ip, status, metadata FROM yascheduler_tasks - WHERE task_id IN (SELECT unnest(CAST (:task_ids AS int[])));""", + WHERE task_id IN (SELECT unnest(CAST (:task_ids AS int[]))) ORDER BY task_id;""", task_ids=jobs, ) return [TaskModel(*x) for x in (rows or [])] @@ -266,7 +266,7 @@ async def get_tasks_by_status( rows = await self.run( """SELECT task_id, label, ip, status, metadata FROM yascheduler_tasks - WHERE status IN (SELECT unnest(CAST (:statuses AS int[]))) + WHERE status IN (SELECT unnest(CAST (:statuses AS int[]))) ORDER BY task_id LIMIT :lim;""", statuses=[x.value for x in statuses], lim=limit, @@ -282,7 +282,7 @@ async def get_tasks_with_cloud_by_id_status( FROM yascheduler_tasks AS t JOIN yascheduler_nodes AS n ON n.ip=t.ip WHERE status=:status AND - task_id IN (SELECT unnest(CAST (:ids AS int[])));""", + task_id IN (SELECT unnest(CAST (:ids AS int[]))) ORDER BY task_id;""", ids=ids, status=status.value, ) diff --git a/yascheduler/scheduler.py b/yascheduler/scheduler.py index 7d437aa..92ba6cb 100755 --- a/yascheduler/scheduler.py +++ b/yascheduler/scheduler.py @@ -148,7 +148,7 @@ async def do_task_webhook( if not url: return async with self.webhook_sem: - self.log.info(f"Executing webhook to {url}") + self.log.info(f"Executing webhook of type {status.value} to {url}") payload = WebhookPayload( task_id, status.value, metadata.get("webhook_custom_params", {}) )