From cc731466c34ef0a9e762ebe738752557c6417ac8 Mon Sep 17 00:00:00 2001 From: xjules Date: Thu, 15 Aug 2024 13:14:39 +0200 Subject: [PATCH] Refactor --- src/ert/simulator/batch_simulator_context.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/ert/simulator/batch_simulator_context.py b/src/ert/simulator/batch_simulator_context.py index 4c256e4c551..ace8478be63 100644 --- a/src/ert/simulator/batch_simulator_context.py +++ b/src/ert/simulator/batch_simulator_context.py @@ -12,7 +12,7 @@ import numpy as np -from _ert.async_utils import get_running_loop +from _ert.async_utils import new_event_loop from ert.config import HookRuntime from ert.enkf_main import create_run_path from ert.ensemble_evaluator import Realization @@ -114,7 +114,8 @@ def __post_init__(self) -> None: Handle which can be used to query status and results for batch simulation. """ ert_config = self.ert_config - self._loop = get_running_loop() + self._loop = new_event_loop() + asyncio.set_event_loop(self._loop) driver = create_driver(ert_config.queue_config) self._scheduler = Scheduler( driver, max_running=self.ert_config.queue_config.max_running @@ -153,14 +154,15 @@ def __post_init__(self) -> None: for workflow in ert_config.hooked_workflows[HookRuntime.PRE_SIMULATION]: WorkflowRunner(workflow, None, self.ensemble).run_blocking() + self._loop.run_until_complete(self.run_forward_model()) + + async def run_forward_model(self): self._sim_task = self._loop.create_task( _submit_and_run_jobqueue(self.ert_config, self._scheduler, self.run_args) ) - - # Wait until the queue is active before we finish the creation - # to ensure sane job status while running while self.running() and not self._scheduler.is_active(): - time.sleep(0.1) + await asyncio.sleep(0.1) + await self._sim_task def __len__(self) -> int: return len(self.mask) @@ -307,6 +309,7 @@ def job_progress(self, iens: int) -> Optional[ForwardModelStatus]: def stop(self) -> None: self._scheduler.kill_all_jobs() self._loop.run_until_complete(self._sim_task) + self._loop.close() def run_path(self, iens: int) -> str: return self.run_args[iens].runpath