Skip to content

Commit

Permalink
Rename Job to ForwardModelStep
Browse files Browse the repository at this point in the history
Plus follow-up in test code and filenames
  • Loading branch information
berland committed Oct 24, 2024
1 parent a867d62 commit a630109
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 101 deletions.
2 changes: 1 addition & 1 deletion src/_ert/forward_model_runner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""_ert.forward_model_runner is called by ert to run jobs in the runpath.
"""_ert.forward_model_runner is called by ert to run forward model steps in the runpath.
It is split into its own package for performance reasons,
simply importing ert can take several seconds, which is not ideal when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def killed_by_oom(pids: Sequence[int]) -> bool:
return False


class Job:
class ForwardModelStep:
MEMORY_POLL_PERIOD = 5 # Seconds between memory polls

def __init__(
Expand Down
16 changes: 8 additions & 8 deletions src/_ert/forward_model_runner/reporting/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import psutil

if TYPE_CHECKING:
from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.forward_model_step import ForwardModelStep

class _ChecksumDictBase(TypedDict):
type: Literal["file"]
Expand Down Expand Up @@ -71,7 +71,7 @@ def __repr__(cls):
class Message(metaclass=_MetaMessage):
def __init__(self, job=None):
self.timestamp = dt.now()
self.job: Optional[Job] = job
self.job: Optional[ForwardModelStep] = job
self.error_message: Optional[str] = None

def __repr__(self):
Expand Down Expand Up @@ -116,19 +116,19 @@ def __init__(self):


class Start(Message):
def __init__(self, job: "Job"):
super().__init__(job)
def __init__(self, fm_step: "ForwardModelStep"):
super().__init__(fm_step)


class Running(Message):
def __init__(self, job: "Job", memory_status: ProcessTreeStatus):
super().__init__(job)
def __init__(self, fm_step: "ForwardModelStep", memory_status: ProcessTreeStatus):
super().__init__(fm_step)
self.memory_status = memory_status


class Exited(Message):
def __init__(self, job, exit_code: int):
super().__init__(job)
def __init__(self, fm_step, exit_code: int):
super().__init__(fm_step)
self.exit_code = exit_code


Expand Down
6 changes: 3 additions & 3 deletions src/_ert/forward_model_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from typing import List

from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.forward_model_step import ForwardModelStep
from _ert.forward_model_runner.reporting.message import Checksum, Finish, Init


Expand All @@ -21,9 +21,9 @@ def __init__(self, jobs_data):
if self.simulation_id is not None:
os.environ["ERT_RUN_ID"] = self.simulation_id

self.jobs: List[Job] = []
self.jobs: List[ForwardModelStep] = []
for index, job_data in enumerate(job_data_list):
self.jobs.append(Job(job_data, index))
self.jobs.append(ForwardModelStep(job_data, index))

self._set_environment()

Expand Down
98 changes: 59 additions & 39 deletions tests/ert/unit_tests/forward_model_runner/test_event_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ClientConnectionClosedOK,
ClientConnectionError,
)
from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.forward_model_step import ForwardModelStep
from _ert.forward_model_runner.reporting import Event
from _ert.forward_model_runner.reporting.message import (
Exited,
Expand All @@ -41,11 +41,13 @@ def test_report_with_successful_start_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Start(job1))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Start(fmstep1))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -63,13 +65,15 @@ def test_report_with_failed_start_message_argument(unused_tcp_port):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)

job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))

msg = Start(job1).with_error("massive_failure")
msg = Start(fmstep1).with_error("massive_failure")

reporter.report(msg)
reporter.report(Finish())
Expand All @@ -84,12 +88,14 @@ def test_report_with_successful_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(job1, 0))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(fmstep1, 0))
reporter.report(Finish().with_error("failed"))

assert len(lines) == 1
Expand All @@ -101,12 +107,14 @@ def test_report_with_failed_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(job1, 1).with_error("massive_failure"))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(fmstep1, 1).with_error("massive_failure"))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -119,12 +127,14 @@ def test_report_with_running_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -138,12 +148,14 @@ def test_report_only_job_running_for_successful_run(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -153,12 +165,14 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish().with_error("massive_failure"))

assert len(lines) == 1
Expand Down Expand Up @@ -195,16 +209,18 @@ def mock_send(msg):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
reporter._reporter_timeout = 4
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
# set _stop_timestamp
reporter.report(Finish())
if reporter._event_publisher_thread.is_alive():
Expand Down Expand Up @@ -234,16 +250,18 @@ def send_func(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch("_ert.forward_model_runner.client.Client.send") as patched_send:
patched_send.side_effect = send_func

reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))

_wait_until(
condition=lambda: patched_send.call_count == 3,
Expand Down Expand Up @@ -275,12 +293,14 @@ def mock_send(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))

# sleep until both Running events have been received
_wait_until(
Expand All @@ -292,20 +312,20 @@ def mock_send(msg):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))
# Make sure the publisher thread exits because it got
# ClientConnectionClosedOK. If it hangs it could indicate that the
# exception is not caught/handled correctly
if reporter._event_publisher_thread.is_alive():
reporter._event_publisher_thread.join()

reporter.report(Running(job1, ProcessTreeStatus(max_rss=400, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=400, rss=10)))
reporter.report(Finish())

# set _stop_timestamp was not set to None since the reporter finished on time
assert reporter._timeout_timestamp is not None

# The Running(job1, 300, 10) is popped from the queue, but never sent.
# The Running(fmstep1, 300, 10) is popped from the queue, but never sent.
# The following Running is added to queue along with the sentinel
assert reporter._event_queue.qsize() == 2
# None of the messages after ClientConnectionClosedOK was raised, has been sent
Expand Down
Loading

0 comments on commit a630109

Please sign in to comment.