Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add post/pre experiment simulation hooks #8993

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions docs/ert/reference/workflows/complete_workflows.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,35 @@ With the keyword :code:`HOOK_WORKFLOW` you can configure workflow
points during ERTs execution. Currently there are five points in ERTs
flow of execution where you can hook in a workflow:

- Before the simulations (all forward models for a realization) start using :code:`PRE_SIMULATION`,
- Before the experiment starts using :code:`PRE_EXPERIMENT`
- before the simulations (all forward models for a realization) start using :code:`PRE_SIMULATION`,
- after all the simulations have completed using :code:`POST_SIMULATION`,
- before the update step using :code:`PRE_UPDATE`
- after the update step using :code:`POST_UPDATE` and
- only before the first update using :code:`PRE_FIRST_UPDATE`.
- after the experiment has completed using :code:`POST_EXPERIMENT`

For non-iterative algorithms, :code:`PRE_FIRST_UPDATE` is equal to :code:`PRE_UPDATE`.
The :code:`POST_SIMULATION` hook is typically used to trigger QC workflows.

::

HOOK_WORKFLOW initWFLOW PRE_SIMULATION
HOOK_WORKFLOW preUpdateWFLOW PRE_UPDATE
HOOK_WORKFLOW postUpdateWFLOW POST_UPDATE
HOOK_WORKFLOW QC_WFLOW1 POST_SIMULATION
HOOK_WORKFLOW QC_WFLOW2 POST_SIMULATION

In this example the workflow :code:`initWFLOW` will run after all the
HOOK_WORKFLOW preExperimentWFLOW PRE_EXPERIMENT
HOOK_WORKFLOW initWFLOW PRE_SIMULATION
HOOK_WORKFLOW preUpdateWFLOW PRE_UPDATE
HOOK_WORKFLOW postUpdateWFLOW POST_UPDATE
HOOK_WORKFLOW QC_WFLOW1 POST_SIMULATION
HOOK_WORKFLOW QC_WFLOW2 POST_SIMULATION
HOOK_WORKFLOW postExperimentWFLOW POST_EXPERIMENT

In this example the workflow, :code:`preExperimentWFLOW` will run,
then :code:`initWFLOW` will run at the start of every iteration, when
simulation directories have been created, just before the forward
model is submitted to the queue. The workflow :code:`preUpdateWFLOW`
will be run before the update step and :code:`postUpdateWFLOW` will be
run after the update step. When all the simulations have completed the
run after the update step. At the end of each forward model run, the
two workflows :code:`QC_WFLOW1` and :code:`QC_WFLOW2` will be run.
After all iterations are complete, the :code:`postExperimentWFLOW` will
run.

Observe that the workflows being 'hooked in' with the
:code:`HOOK_WORKFLOW` must be loaded with the :code:`LOAD_WORKFLOW`
Expand Down
2 changes: 2 additions & 0 deletions src/ert/config/parsing/hook_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ class HookRuntime(StrEnum):
PRE_UPDATE = "PRE_UPDATE"
POST_UPDATE = "POST_UPDATE"
PRE_FIRST_UPDATE = "PRE_FIRST_UPDATE"
PRE_EXPERIMENT = "PRE_EXPERIMENT"
POST_EXPERIMENT = "POST_EXPERIMENT"
5 changes: 4 additions & 1 deletion src/ert/run_models/base_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,10 @@ def validate(self) -> None:

@tracer.start_as_current_span(f"{__name__}.run_workflows")
def run_workflows(
self, runtime: HookRuntime, storage: Storage, ensemble: Ensemble
self,
runtime: HookRuntime,
storage: Storage | None = None,
ensemble: Ensemble | None = None,
) -> None:
for workflow in self.ert_config.hooked_workflows[runtime]:
WorkflowRunner(workflow, storage, ensemble).run_blocking()
Expand Down
5 changes: 4 additions & 1 deletion src/ert/run_models/ensemble_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import numpy as np

from ert.config import HookRuntime
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Ensemble, Experiment, Storage
Expand Down Expand Up @@ -64,6 +65,7 @@ def run_experiment(
) -> None:
self.log_at_startup()
if not restart:
self.run_workflows(HookRuntime.PRE_EXPERIMENT)
self.experiment = self._storage.create_experiment(
name=self.experiment_name,
parameters=self.ert_config.ensemble_config.parameter_configuration,
Expand All @@ -89,17 +91,18 @@ def run_experiment(
np.array(self.active_realizations, dtype=bool),
ensemble=self.ensemble,
)

sample_prior(
self.ensemble,
np.where(self.active_realizations)[0],
random_seed=self.random_seed,
)

self._evaluate_and_postprocess(
run_args,
self.ensemble,
evaluator_server_config,
)
self.run_workflows(HookRuntime.POST_EXPERIMENT, self._storage, self.ensemble)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not give storage and ensemble here (they are optional further down in the callstack, so just need to modify the typing)


@classmethod
def name(cls) -> str:
Expand Down
4 changes: 3 additions & 1 deletion src/ert/run_models/ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import numpy as np

from ert.config import ErtConfig
from ert.config import ErtConfig, HookRuntime
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Storage
Expand Down Expand Up @@ -62,6 +62,7 @@ def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
self.log_at_startup()
self.run_workflows(HookRuntime.PRE_EXPERIMENT)
ensemble_format = self.target_ensemble_format
experiment = self._storage.create_experiment(
parameters=self.ert_config.ensemble_config.parameter_configuration,
Expand Down Expand Up @@ -107,6 +108,7 @@ def run_experiment(
posterior,
evaluator_server_config,
)
self.run_workflows(HookRuntime.POST_EXPERIMENT, self._storage, prior)

@classmethod
def name(cls) -> str:
Expand Down
16 changes: 15 additions & 1 deletion src/ert/run_models/iterated_ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def run_experiment(
self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False
) -> None:
self.log_at_startup()

self.run_workflows(HookRuntime.PRE_EXPERIMENT)
target_ensemble_format = self.target_ensemble_format
experiment = self._storage.create_experiment(
parameters=self.ert_config.ensemble_config.parameter_configuration,
Expand All @@ -146,11 +146,18 @@ def run_experiment(
ensemble=prior,
)

self.set_env_key("_ERT_ITERATION", "0")
self.set_env_key(
"_IS_FINAL_ITERATION",
"False",
)

sample_prior(
prior,
np.where(self.active_realizations)[0],
random_seed=self.random_seed,
)

self._evaluate_and_postprocess(
prior_args,
prior,
Expand All @@ -159,6 +166,11 @@ def run_experiment(

self.run_workflows(HookRuntime.PRE_FIRST_UPDATE, self._storage, prior)
for prior_iter in range(self._total_iterations):
self.set_env_key("_ERT_ITERATION", str(prior_iter + 1))
self.set_env_key(
"_IS_FINAL_ITERATION",
"True" if (prior_iter == self._total_iterations - 1) else "False",
)
self.send_event(
RunModelUpdateBeginEvent(iteration=prior_iter, run_id=prior.id)
)
Expand Down Expand Up @@ -221,6 +233,8 @@ def run_experiment(
)
prior = posterior

self.run_workflows(HookRuntime.POST_EXPERIMENT, self._storage, prior)

@classmethod
def name(cls) -> str:
return "Iterated ensemble smoother"
Expand Down
14 changes: 13 additions & 1 deletion src/ert/run_models/multiple_data_assimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import numpy as np

from ert.config import ErtConfig
from ert.config import ErtConfig, HookRuntime
from ert.enkf_main import sample_prior
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.storage import Ensemble, Storage
Expand Down Expand Up @@ -99,11 +99,20 @@ def run_experiment(
f"Experiment misconfigured, got starting iteration: {self.start_iteration},"
f"restart iteration = {prior.iteration + 1}"
)

self.set_env_key("_ERT_ITERATION", str(self.start_iteration))
self.set_env_key(
"_IS_FINAL_ITERATION",
"True"
if (self.start_iteration == self._total_iterations - 1)
else "False",
)
except (KeyError, ValueError) as err:
raise ErtRunError(
f"Prior ensemble with ID: {id} does not exists"
) from err
else:
self.run_workflows(HookRuntime.PRE_EXPERIMENT)
sim_args = {"weights": self._relative_weights}
experiment = self._storage.create_experiment(
parameters=self.ert_config.ensemble_config.parameter_configuration,
Expand All @@ -126,6 +135,7 @@ def run_experiment(
np.array(self.active_realizations, dtype=bool),
ensemble=prior,
)

sample_prior(
prior,
np.where(self.active_realizations)[0],
Expand Down Expand Up @@ -157,6 +167,8 @@ def run_experiment(
)
prior = posterior

self.run_workflows(HookRuntime.POST_EXPERIMENT, self._storage, prior)

@staticmethod
def parse_weights(weights: str) -> List[float]:
"""Parse weights string and scale weights such that their reciprocals sum
Expand Down
73 changes: 73 additions & 0 deletions tests/ert/ui_tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,79 @@ def test_that_stop_on_fail_workflow_jobs_stop_ert(
run_cli(TEST_RUN_MODE, "--disable-monitor", "poly.ert")


@pytest.mark.usefixtures("copy_poly_case")
def test_that_pre_post_experiment_hook_works(
monkeypatch,
):
monkeypatch.setattr(_ert.threading, "_can_raise", False)

# The executable
with open("hello_post_exp.sh", "w", encoding="utf-8") as f:
f.write(
dedent("""#!/bin/bash
echo "just sending regards" > from_post_experiment.txt
""")
)
os.chmod("hello_post_exp.sh", 0o755)

# The workflow job
with open("SAY_HELLO_POST_EXP", "w", encoding="utf-8") as s:
s.write("""
INTERNAL False
EXECUTABLE hello_post_exp.sh
""")

# The workflow
with open("SAY_HELLO_POST_EXP.wf", "w", encoding="utf-8") as s:
s.write("""dump_final_ensemble_id""")

# The executable
with open("hello_pre_exp.sh", "w", encoding="utf-8") as f:
f.write(
dedent("""#!/bin/bash
echo "first" > from_pre_experiment.txt
""")
)
os.chmod("hello_pre_exp.sh", 0o755)

# The workflow job
with open("SAY_HELLO_PRE_EXP", "w", encoding="utf-8") as s:
s.write("""
INTERNAL False
EXECUTABLE hello_pre_exp.sh
""")

# The workflow
with open("SAY_HELLO_PRE_EXP.wf", "w", encoding="utf-8") as s:
s.write("""dump_first_ensemble_id""")

with open("poly.ert", mode="a", encoding="utf-8") as fh:
fh.write(
dedent(
"""
NUM_REALIZATIONS 2

LOAD_WORKFLOW_JOB SAY_HELLO_POST_EXP dump_final_ensemble_id
LOAD_WORKFLOW SAY_HELLO_POST_EXP.wf POST_EXPERIMENT_DUMP
HOOK_WORKFLOW POST_EXPERIMENT_DUMP POST_EXPERIMENT

LOAD_WORKFLOW_JOB SAY_HELLO_PRE_EXP dump_first_ensemble_id
LOAD_WORKFLOW SAY_HELLO_PRE_EXP.wf PRE_EXPERIMENT_DUMP
HOOK_WORKFLOW PRE_EXPERIMENT_DUMP PRE_EXPERIMENT
"""
)
)

run_cli(ITERATIVE_ENSEMBLE_SMOOTHER_MODE, "--disable-monitor", "poly.ert")

assert (Path(os.getcwd()) / "from_pre_experiment.txt").read_text(
"utf-8"
) == "first\n"
assert (Path(os.getcwd()) / "from_post_experiment.txt").read_text(
"utf-8"
) == "just sending regards\n"


@pytest.fixture(name="mock_cli_run")
def fixture_mock_cli_run(monkeypatch):
end_event = Mock()
Expand Down
11 changes: 8 additions & 3 deletions tests/ert/unit_tests/cli/test_model_hook_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
)

EXPECTED_CALL_ORDER = [
HookRuntime.PRE_EXPERIMENT,
HookRuntime.PRE_SIMULATION,
HookRuntime.POST_SIMULATION,
HookRuntime.PRE_FIRST_UPDATE,
HookRuntime.PRE_UPDATE,
HookRuntime.POST_UPDATE,
HookRuntime.PRE_SIMULATION,
HookRuntime.POST_SIMULATION,
HookRuntime.POST_EXPERIMENT,
]


Expand Down Expand Up @@ -57,7 +59,8 @@ def test_hook_call_order_ensemble_smoother(monkeypatch):
test_class.run_experiment(MagicMock())

expected_calls = [
call(expected_call, ANY, ANY) for expected_call in EXPECTED_CALL_ORDER
call(HookRuntime.PRE_EXPERIMENT),
*[call(expected_call, ANY, ANY) for expected_call in EXPECTED_CALL_ORDER[1:]],
]
assert run_wfs_mock.mock_calls == expected_calls

Expand Down Expand Up @@ -93,7 +96,8 @@ def test_hook_call_order_es_mda(monkeypatch):
test_class.run_experiment(MagicMock())

expected_calls = [
call(expected_call, ANY, ANY) for expected_call in EXPECTED_CALL_ORDER
call(HookRuntime.PRE_EXPERIMENT),
*[call(expected_call, ANY, ANY) for expected_call in EXPECTED_CALL_ORDER[1:]],
]
assert run_wfs_mock.mock_calls == expected_calls

Expand Down Expand Up @@ -128,6 +132,7 @@ def test_hook_call_order_iterative_ensemble_smoother(monkeypatch):
test_class.run_experiment(MagicMock())

expected_calls = [
call(expected_call, ANY, ANY) for expected_call in EXPECTED_CALL_ORDER
call(HookRuntime.PRE_EXPERIMENT),
*[call(expected_call, ANY, ANY) for expected_call in EXPECTED_CALL_ORDER[1:]],
]
assert run_wfs_mock.mock_calls == expected_calls
Loading