Skip to content

Commit

Permalink
Merge pull request #129 from tilde-lab/ordering
Browse files Browse the repository at this point in the history
feat: stable tasks ordering + minor polishing
  • Loading branch information
blokhin authored Mar 13, 2024
2 parents 8c54f3f + fbda244 commit fb3854b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion examples/submit_topas_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@


yac = Yascheduler()
result = yac.queue_submit_task(LABEL, {"calc.inp": PATTERN_REQUEST, "structure.inc": ""}, "topas")
result = yac.queue_submit_task(LABEL, {"calc.inp": PATTERN_REQUEST, "structure.inc": "", "input.xy: ""}, "topas")
print(LABEL)
print(result)
10 changes: 5 additions & 5 deletions yascheduler/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

@unique
class TaskStatus(int, Enum):
"""Tast status enum"""
"""Task possible states enum"""

TO_DO = 0
RUNNING = 1
Expand Down Expand Up @@ -243,7 +243,7 @@ async def get_task_ids_by_ip_and_status(
) -> Sequence[int]:
"""Get task ids by ip and status"""
rows = await self.run(
"SELECT task_id FROM yascheduler_tasks WHERE ip=:ip AND status=:status;",
"SELECT task_id FROM yascheduler_tasks WHERE ip=:ip AND status=:status ORDER BY task_id;",
ip=ip_addr,
status=status.value,
)
Expand All @@ -254,7 +254,7 @@ async def get_tasks_by_jobs(self, jobs: Sequence[int]) -> Sequence[TaskModel]:
rows = await self.run(
"""SELECT task_id, label, ip, status, metadata
FROM yascheduler_tasks
WHERE task_id IN (SELECT unnest(CAST (:task_ids AS int[])));""",
WHERE task_id IN (SELECT unnest(CAST (:task_ids AS int[]))) ORDER BY task_id;""",
task_ids=jobs,
)
return [TaskModel(*x) for x in (rows or [])]
Expand All @@ -266,7 +266,7 @@ async def get_tasks_by_status(
rows = await self.run(
"""SELECT task_id, label, ip, status, metadata
FROM yascheduler_tasks
WHERE status IN (SELECT unnest(CAST (:statuses AS int[])))
WHERE status IN (SELECT unnest(CAST (:statuses AS int[]))) ORDER BY task_id
LIMIT :lim;""",
statuses=[x.value for x in statuses],
lim=limit,
Expand All @@ -282,7 +282,7 @@ async def get_tasks_with_cloud_by_id_status(
FROM yascheduler_tasks AS t
JOIN yascheduler_nodes AS n ON n.ip=t.ip
WHERE status=:status AND
task_id IN (SELECT unnest(CAST (:ids AS int[])));""",
task_id IN (SELECT unnest(CAST (:ids AS int[]))) ORDER BY task_id;""",
ids=ids,
status=status.value,
)
Expand Down
2 changes: 1 addition & 1 deletion yascheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async def do_task_webhook(
if not url:
return
async with self.webhook_sem:
self.log.info(f"Executing webhook to {url}")
self.log.info(f"Executing webhook of type {status.value} to {url}")
payload = WebhookPayload(
task_id, status.value, metadata.get("webhook_custom_params", {})
)
Expand Down

0 comments on commit fb3854b

Please sign in to comment.