diff --git a/src/everest/bin/everest_script.py b/src/everest/bin/everest_script.py index 535d62da809..0633c3e92ca 100755 --- a/src/everest/bin/everest_script.py +++ b/src/everest/bin/everest_script.py @@ -1,25 +1,21 @@ #!/usr/bin/env python import argparse +import asyncio import json import logging import signal import threading from functools import partial -from ert.config import ErtConfig -from ert.storage import open_storage from everest.config import EverestConfig, ServerConfig from everest.detached import ( ServerStatus, everserver_status, - generate_everserver_ert_config, server_is_running, start_server, - wait_for_context, wait_for_server, ) -from everest.plugins.site_config_env import PluginSiteConfigEnv from everest.util import makedirs_if_needed, version_info from .utils import ( @@ -48,7 +44,7 @@ def everest_entry(args=None): partial(handle_keyboard_interrupt, options=options), ) - run_everest(options) + asyncio.run(run_everest(options)) def _build_args_parser(): @@ -80,7 +76,7 @@ def _build_args_parser(): return arg_parser -def run_everest(options): +async def run_everest(options): logger = logging.getLogger("everest_main") server_state = everserver_status(options.config) @@ -100,22 +96,12 @@ def run_everest(options): job_name = fm_job.split()[0] logger.info("Everest forward model contains job {}".format(job_name)) - with PluginSiteConfigEnv(): - ert_config = ErtConfig.with_plugins().from_dict( - config_dict=generate_everserver_ert_config( - options.config, options.debug - ) - ) - makedirs_if_needed(options.config.output_dir, roll_if_exists=True) - - with open_storage(ert_config.ens_path, "w") as storage, PluginSiteConfigEnv(): - context = start_server(options.config, ert_config, storage) - print("Waiting for server ...") - wait_for_server(options.config, timeout=600, context=context) - print("Everest server found!") - run_detached_monitor(options.config, show_all_jobs=options.show_all_jobs) - wait_for_context() + await start_server(options.config, options.debug) + print("Waiting for server ...") + wait_for_server(options.config, timeout=600) + print("Everest server found!") + run_detached_monitor(options.config, show_all_jobs=options.show_all_jobs) server_state = everserver_status(options.config) server_state_info = server_state["message"] diff --git a/src/everest/config/everest_config.py b/src/everest/config/everest_config.py index 57c99db4cc0..a23f1faa749 100644 --- a/src/everest/config/everest_config.py +++ b/src/everest/config/everest_config.py @@ -201,7 +201,7 @@ class EverestConfig(BaseModelWithPropertySupport): # type: ignore """, ) simulator: Optional[SimulatorConfig] = Field( - default=None, description="Simulation settings" + default_factory=SimulatorConfig, description="Simulation settings" ) forward_model: Optional[List[str]] = Field( default=None, description="List of jobs to run" diff --git a/src/everest/config/server_config.py b/src/everest/config/server_config.py index 3fadbc6fb05..19f219aa4b4 100644 --- a/src/everest/config/server_config.py +++ b/src/everest/config/server_config.py @@ -28,12 +28,12 @@ class ServerConfig(BaseModel, HasErtQueueOptions): # type: ignore """, ) # Corresponds to queue name exclude_host: Optional[str] = Field( - None, + "", description="""Comma separated list of nodes that should be excluded from the slurm run""", ) include_host: Optional[str] = Field( - None, + "", description="""Comma separated list of nodes that should be included in the slurm run""", ) diff --git a/src/everest/config/simulator_config.py b/src/everest/config/simulator_config.py index 3064a5a3c49..15f0b7d00f6 100644 --- a/src/everest/config/simulator_config.py +++ b/src/everest/config/simulator_config.py @@ -36,12 +36,12 @@ class SimulatorConfig(BaseModel, HasErtQueueOptions, extra="forbid"): # type: i "needs to be deleted.", ) exclude_host: Optional[str] = Field( - None, + "", description="""Comma separated list of nodes that should be excluded from the slurm run.""", ) include_host: Optional[str] = Field( - None, + "", description="""Comma separated list of nodes that should be included in the slurm run""", ) diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index 1f64e655eb9..b4e7473459a 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -5,17 +5,24 @@ import re import time import traceback -from datetime import datetime from enum import Enum from pathlib import Path -from typing import List, Literal, Mapping, Optional, Tuple +from typing import Literal, Mapping, Optional, Tuple import requests from seba_sqlite.exceptions import ObjectNotFoundError from seba_sqlite.snapshot import SebaSnapshot -from ert import BatchContext, BatchSimulator, JobState -from ert.config import ErtConfig, QueueSystem +from ert.config import QueueSystem +from ert.config.queue_config import ( + LocalQueueOptions, + LsfQueueOptions, + SlurmQueueOptions, + TorqueQueueOptions, +) +from ert.scheduler import create_driver +from ert.scheduler.driver import Driver, FailedSubmit +from ert.scheduler.event import StartedEvent from everest.config import EverestConfig, ServerConfig from everest.config_keys import ConfigKeys as CK from everest.strings import ( @@ -25,7 +32,6 @@ OPT_PROGRESS_ID, SIM_PROGRESS_ENDPOINT, SIM_PROGRESS_ID, - SIMULATION_DIR, STOP_ENDPOINT, ) from everest.util import configure_logger @@ -43,19 +49,7 @@ # everest.log file instead -# The Everest server is launched through ert. When running on LSF everything -# works fine. But when running on the local queue (mainly for testing and -# debugging) the ert causes all sorts of problems if the server or the -# context go out of scope. So we keep them alive for now. -# Note that, after the server is stopped (eg by a call to stop_server), the -# context does not immediately terminate. The method _context_stop_and_wait -# stops the context (if available) and waits until the context is terminated -# (to be used typically in tests) -_server = None -_context = None - - -def start_server(config: EverestConfig, ert_config: ErtConfig, storage): +async def start_server(config: EverestConfig, debug: bool = False) -> Driver: """ Start an Everest server running the optimization defined in the config """ @@ -79,14 +73,6 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage): log_level=logging.INFO, ) - global _server # noqa: PLW0603 - global _context # noqa: PLW0603 - if _context and _context.running(): - raise RuntimeError( - "Starting two instances of everest server " - "in the same process is not allowed!" - ) - try: save_config_path = os.path.join(config.output_dir, config.config_file) config.dump(save_config_path) @@ -95,48 +81,18 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage): "Failed to save optimization config: {}".format(e) ) - experiment = storage.create_experiment( - name=f"DetachedEverest@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}", - parameters=[], - responses=[], - ) - - _server = BatchSimulator( - experiment=experiment, - perferred_num_cpu=ert_config.preferred_num_cpu, - runpath_file=str(ert_config.runpath_file), - user_config_file=ert_config.user_config_file, - env_vars=ert_config.env_vars, - forward_model_steps=ert_config.forward_model_steps, - parameter_configurations=ert_config.ensemble_config.parameter_configs, - queue_config=ert_config.queue_config, - model_config=ert_config.model_config, - analysis_config=ert_config.analysis_config, - hooked_workflows=ert_config.hooked_workflows, - substitutions=ert_config.substitutions, - templates=ert_config.ert_templates, - controls={}, - results=[], - ) - - _context = _server.start("dispatch_server", [(0, {})]) - - return _context - - -def context_stop_and_wait(): - global _context # noqa: PLW0602 - if _context: - _context.stop() - while _context.running(): - time.sleep(1) - - -def wait_for_context(): - global _context # noqa: PLW0602 - if _context and _context.running(): - while _context.running(): - time.sleep(1) + driver = create_driver(get_server_queue_options(config)) + try: + args = ["--config-file", config.config_file] + if debug: + args.append("--debug") + await driver.submit(0, "everserver", *args) + except FailedSubmit as err: + raise ValueError(f"Failed to submit Everserver with error: {err}") from err + status = await driver.event_queue.get() + if not isinstance(status, StartedEvent): + raise ValueError(f"Everserver not started as expected, got status: {status}") + return driver def stop_server(config: EverestConfig, retries: int = 5): @@ -167,9 +123,7 @@ def extract_errors_from_file(path: str): return re.findall(r"(Error \w+.*)", content) -def wait_for_server( - config: EverestConfig, timeout: int, context: Optional[BatchContext] = None -) -> None: +def wait_for_server(config: EverestConfig, timeout: int) -> None: """ Checks everest server has started _HTTP_REQUEST_RETRY times. Waits progressively longer between each check. @@ -190,33 +144,6 @@ def wait_for_server( raise SystemExit( "Failed to start Everest with error:\n{}".format(status["message"]) ) - # Job queueing may fail: - if context is not None and context.has_job_failed(0): - job_progress = context.job_progress(0) - - if job_progress is not None: - path = context.job_progress(0).steps[0].std_err_file - for err in extract_errors_from_file(path): - update_everserver_status( - config, ServerStatus.failed, message=err - ) - logging.error(err) - raise SystemExit("Failed to start Everest server.") - else: - try: - state = context.get_job_state(0) - - if state == JobState.WAITING: - # Job did fail, but is now in WAITING - logging.error( - "Race condition in wait_for_server, job did fail but is now in WAITING" - ) - except IndexError as e: - # Job is no longer registered in scheduler - logging.error( - f"Race condition in wait_for_server, failed job removed from scheduler\n{e}" - ) - raise SystemExit("Failed to start Everest server.") from e sleep_time = sleep_time_increment * (2**retry_count) time.sleep(sleep_time) @@ -364,68 +291,6 @@ def start_monitor(config: EverestConfig, callback, polling_interval=5): } -def _add_simulator_defaults( - options, - config: EverestConfig, - queue_options: List[Tuple[str, str]], - queue_system: Literal["LSF", "SLURM"], -): - simulator_options = ( - config.simulator.extract_ert_queue_options( - queue_system=queue_system, everest_to_ert_key_tuples=queue_options - ) - if config.simulator is not None - else [] - ) - - option_names = [option[1] for option in options] - simulator_option_names = [option[1] for option in simulator_options] - options.extend( - simulator_options[simulator_option_names.index(res_key)] - for _, res_key in queue_options - if res_key not in option_names and res_key in simulator_option_names - ) - return options - - -def _generate_queue_options( - config: EverestConfig, - queue_options: List[Tuple[str, str]], - res_queue_name: str, # Literal["LSF_QUEUE", "PARTITION"]? - queue_system: Literal["LSF", "SLURM", "TORQUE"], -): - queue_name_simulator = ( - config.simulator.name if config.simulator is not None else None - ) - - queue_name = config.server.name if config.server is not None else None - - if queue_name is None: - queue_name = queue_name_simulator - - options = ( - config.server.extract_ert_queue_options( - queue_system=queue_system, everest_to_ert_key_tuples=queue_options - ) - if config.server is not None - else [(queue_system, "MAX_RUNNING", 1)] - ) - - if queue_name: - options.append( - ( - queue_system, - res_queue_name, - queue_name, - ), - ) - # Inherit the include/exclude_host from the simulator config entry, if necessary. - # Currently this is only used by the slurm driver. - if queue_system == "SLURM": - options = _add_simulator_defaults(options, config, queue_options, queue_system) - return options - - def _find_res_queue_system(config: EverestConfig): queue_system_simulator: Literal["lsf", "local", "slurm", "torque"] = "local" if config.simulator is not None: @@ -446,52 +311,30 @@ def _find_res_queue_system(config: EverestConfig): return QueueSystem(queue_system.upper()) -def generate_everserver_ert_config(config: EverestConfig, debug_mode: bool = False): - assert config.config_directory is not None - assert config.config_file is not None - - site_config = ErtConfig.read_site_config() - abs_everest_config = os.path.join(config.config_directory, config.config_file) - detached_node_dir = ServerConfig.get_detached_node_dir(config.output_dir) - simulation_path = os.path.join(detached_node_dir, SIMULATION_DIR) +def get_server_queue_options(config: EverestConfig): queue_system = _find_res_queue_system(config) - arg_list = ["--config-file", abs_everest_config] - if debug_mode: - arg_list.append("--debug") - - everserver_config = {} if site_config is None else site_config - everserver_config.update( - { - "RUNPATH": simulation_path, - "JOBNAME": EVEREST_SERVER_CONFIG, - "NUM_REALIZATIONS": 1, - "MAX_SUBMIT": 1, - "ENSPATH": os.path.join(detached_node_dir, EVEREST_SERVER_CONFIG), - "RUNPATH_FILE": os.path.join(detached_node_dir, ".res_runpath_list"), - } - ) - install_job = everserver_config.get("INSTALL_JOB", []) - install_job.append((EVEREST_SERVER_CONFIG, _EVERSERVER_JOB_PATH)) - everserver_config["INSTALL_JOB"] = install_job - - simulation_job = everserver_config.get("SIMULATION_JOB", []) - simulation_job.append([EVEREST_SERVER_CONFIG, *arg_list]) - everserver_config["SIMULATION_JOB"] = simulation_job - - if queue_system in _QUEUE_SYSTEMS: - everserver_config["QUEUE_SYSTEM"] = queue_system - queue_options = _generate_queue_options( - config, - _QUEUE_SYSTEMS[queue_system]["options"], - _QUEUE_SYSTEMS[queue_system]["name"], - queue_system, + + ever_queue_config = config.server if config.server is not None else config.simulator + + if queue_system == QueueSystem.LSF: + queue = LsfQueueOptions( + lsf_queue=ever_queue_config.name, + lsf_resource=ever_queue_config.options, ) - if queue_options: - everserver_config.setdefault("QUEUE_OPTION", []).extend(queue_options) + elif queue_system == QueueSystem.SLURM: + queue = SlurmQueueOptions( + exclude_host=ever_queue_config.exclude_host, + include_host=ever_queue_config.include_host, + partition=ever_queue_config.name, + ) + elif queue_system == QueueSystem.TORQUE: + queue = TorqueQueueOptions() + elif queue_system == QueueSystem.LOCAL: + queue = LocalQueueOptions() else: - everserver_config["QUEUE_SYSTEM"] = queue_system - - return everserver_config + raise ValueError(f"Unknown queue system: {queue_system}") + queue.max_running = 1 + return queue def _query_server(cert, auth, endpoint): diff --git a/tests/everest/functional/test_main_everest_entry.py b/tests/everest/functional/test_main_everest_entry.py index 279e63bc3b0..437d739b5c8 100644 --- a/tests/everest/functional/test_main_everest_entry.py +++ b/tests/everest/functional/test_main_everest_entry.py @@ -14,9 +14,7 @@ from everest.config import EverestConfig from everest.detached import ( ServerStatus, - context_stop_and_wait, everserver_status, - wait_for_context, ) CONFIG_FILE_MINIMAL = "config_minimal.yml" @@ -81,7 +79,6 @@ def test_everest_main_entry_bad_command(): @pytest.mark.flaky(reruns=5) @pytest.mark.fails_on_macos_github_workflow def test_everest_entry_run(copy_math_func_test_data_to_tmp): - wait_for_context() # Setup command line arguments with capture_streams(): start_everest(["everest", "run", CONFIG_FILE_MINIMAL]) @@ -100,8 +97,6 @@ def test_everest_entry_run(copy_math_func_test_data_to_tmp): assert best_settings.objective_value == pytest.approx(0.0, abs=0.0005) - context_stop_and_wait() - with capture_streams(): start_everest(["everest", "monitor", CONFIG_FILE_MINIMAL]) @@ -110,8 +105,6 @@ def test_everest_entry_run(copy_math_func_test_data_to_tmp): assert status["status"] == ServerStatus.completed - context_stop_and_wait() - def test_everest_entry_monitor_no_run(copy_math_func_test_data_to_tmp): with capture_streams(): @@ -122,8 +115,6 @@ def test_everest_entry_monitor_no_run(copy_math_func_test_data_to_tmp): assert status["status"] == ServerStatus.never_run - context_stop_and_wait() - def test_everest_main_export_entry(copy_math_func_test_data_to_tmp): # Setup command line arguments diff --git a/tests/everest/test_detached.py b/tests/everest/test_detached.py index cdb4df6d2fb..a3ca50f8d0a 100644 --- a/tests/everest/test_detached.py +++ b/tests/everest/test_detached.py @@ -1,16 +1,17 @@ -import logging import os -import shutil -from collections import namedtuple from unittest.mock import patch import pytest import requests -from ert import JobState -from ert.config import ErtConfig, QueueSystem -from ert.storage import open_storage -from everest.config import EverestConfig, ServerConfig +from ert.config import ErtConfig +from ert.config.queue_config import ( + LocalQueueOptions, + LsfQueueOptions, + SlurmQueueOptions, +) +from everest.config import EverestConfig +from everest.config.server_config import ServerConfig from everest.config.simulator_config import SimulatorConfig from everest.config_keys import ConfigKeys as CK from everest.detached import ( @@ -18,15 +19,12 @@ PROXY, ServerStatus, _find_res_queue_system, - _generate_queue_options, - context_stop_and_wait, everserver_status, - generate_everserver_ert_config, + get_server_queue_options, server_is_running, start_server, stop_server, update_everserver_status, - wait_for_context, wait_for_server, wait_for_server_to_stop, ) @@ -38,91 +36,65 @@ SIMULATION_DIR, ) from everest.util import makedirs_if_needed -from tests.everest.utils import relpath - - -class MockContext: - def __init__(self): - pass - - @staticmethod - def has_job_failed(*args): - return True - - @staticmethod - def job_progress(*args): - job = namedtuple("Job", "std_err_file") - job.std_err_file = "error_file.0" - job_progress = namedtuple("JobProgres", ["jobs"]) - job_progress.steps = [job] - return job_progress @pytest.mark.flaky(reruns=5) @pytest.mark.integration_test @pytest.mark.fails_on_macos_github_workflow @pytest.mark.xdist_group(name="starts_everest") -def test_https_requests(copy_math_func_test_data_to_tmp): +async def test_https_requests(copy_math_func_test_data_to_tmp): everest_config = EverestConfig.load_file("config_minimal_slow.yml") expected_server_status = ServerStatus.never_run assert expected_server_status == everserver_status(everest_config)["status"] - wait_for_context() - ert_config = ErtConfig.with_plugins().from_dict( - generate_everserver_ert_config(everest_config) - ) makedirs_if_needed(everest_config.output_dir, roll_if_exists=True) - with open_storage(ert_config.ens_path, "w") as storage: - start_server(everest_config, ert_config, storage) - try: - wait_for_server(everest_config, 120) - except SystemExit as e: - context_stop_and_wait() - raise e + await start_server(everest_config) + try: + wait_for_server(everest_config, 120) + except SystemExit as e: + raise e + + server_status = everserver_status(everest_config) + assert ServerStatus.running == server_status["status"] + + url, cert, auth = ServerConfig.get_server_context(everest_config.output_dir) + result = requests.get(url, verify=cert, auth=auth, proxies=PROXY) # noqa: ASYNC210 + assert result.status_code == 200 # Request has succeeded + + # Test http request fail + url = url.replace("https", "http") + with pytest.raises(Exception): # noqa B017 + response = requests.get(url, verify=cert, auth=auth, proxies=PROXY) # noqa: ASYNC210 + response.raise_for_status() + + # Test request with wrong password fails + url, cert, _ = ServerConfig.get_server_context(everest_config.output_dir) + usr = "admin" + password = "wrong_password" + with pytest.raises(Exception): # noqa B017 + result = requests.get(url, verify=cert, auth=(usr, password), proxies=PROXY) # noqa: ASYNC210 + result.raise_for_status() + + # Test stopping server + assert server_is_running( + *ServerConfig.get_server_context(everest_config.output_dir) + ) + if stop_server(everest_config): + wait_for_server_to_stop(everest_config, 60) server_status = everserver_status(everest_config) - assert ServerStatus.running == server_status["status"] - - url, cert, auth = ServerConfig.get_server_context(everest_config.output_dir) - result = requests.get(url, verify=cert, auth=auth, proxies=PROXY) - assert result.status_code == 200 # Request has succeeded - - # Test http request fail - url = url.replace("https", "http") - with pytest.raises(Exception): # noqa B017 - response = requests.get(url, verify=cert, auth=auth, proxies=PROXY) - response.raise_for_status() - - # Test request with wrong password fails - url, cert, _ = ServerConfig.get_server_context(everest_config.output_dir) - usr = "admin" - password = "wrong_password" - with pytest.raises(Exception): # noqa B017 - result = requests.get(url, verify=cert, auth=(usr, password), proxies=PROXY) - result.raise_for_status() - - # Test stopping server - assert server_is_running( + + # Possible the case completed while waiting for the server to stop + assert server_status["status"] in [ + ServerStatus.stopped, + ServerStatus.completed, + ] + assert not server_is_running( *ServerConfig.get_server_context(everest_config.output_dir) ) - - if stop_server(everest_config): - wait_for_server_to_stop(everest_config, 60) - context_stop_and_wait() - server_status = everserver_status(everest_config) - - # Possible the case completed while waiting for the server to stop - assert server_status["status"] in [ - ServerStatus.stopped, - ServerStatus.completed, - ] - assert not server_is_running( - *ServerConfig.get_server_context(everest_config.output_dir) - ) - else: - context_stop_and_wait() - server_status = everserver_status(everest_config) - assert ServerStatus.stopped == server_status["status"] + else: + server_status = everserver_status(everest_config) + assert ServerStatus.stopped == server_status["status"] def test_server_status(copy_math_func_test_data_to_tmp): @@ -162,85 +134,13 @@ def test_server_status(copy_math_func_test_data_to_tmp): @patch("everest.detached.server_is_running", return_value=False) -def test_wait_for_server( - server_is_running_mock, caplog, copy_test_data_to_tmp, monkeypatch -): - monkeypatch.chdir("detached") - config = EverestConfig.load_file("valid_yaml_config.yml") +def test_wait_for_server(server_is_running_mock, caplog, monkeypatch): + config = EverestConfig.with_defaults() - with caplog.at_level(logging.DEBUG), pytest.raises(RuntimeError): - wait_for_server(config, timeout=1, context=None) + with pytest.raises(RuntimeError, match="Failed to start .* timeout"): + wait_for_server(config, timeout=1) assert not caplog.messages - context = MockContext() - with caplog.at_level(logging.DEBUG), pytest.raises(SystemExit): - wait_for_server(config, timeout=120, context=context) - - expected_error_msg = ( - 'Error when parsing config_file:"DISTANCE3" ' - "Keyword:ARGLIST must have at least 1 arguments.\n" - "Error message: ext_joblist_get_job_copy: " - "asked for job:distance3 which does not exist\n" - "Error message: Program received signal:6" - ) - - assert expected_error_msg in "\n".join(caplog.messages) - - server_status = everserver_status(config) - assert server_status["status"] == ServerStatus.failed - assert server_status["message"] == expected_error_msg - - -@patch("everest.detached.server_is_running", return_value=False) -@pytest.mark.usefixtures("change_to_tmpdir") -def test_wait_for_handles_failed_job_race_condition_failed_job_to_waiting( - server_is_running_mock, caplog -): - shutil.copytree(relpath("test_data", "detached"), ".", dirs_exist_ok=True) - config = EverestConfig.load_file("valid_yaml_config.yml") - - class _MockContext(MockContext): - @staticmethod - def job_progress(*args): - return None - - @staticmethod - def get_job_state(*args): - return JobState.WAITING - - with caplog.at_level(logging.ERROR), pytest.raises(RuntimeError): - wait_for_server(config, timeout=1, context=_MockContext()) - - assert ( - "Race condition in wait_for_server, job did fail but is now in WAITING" - in caplog.messages - ) - - -@patch("everest.detached.server_is_running", return_value=False) -@pytest.mark.usefixtures("change_to_tmpdir") -def test_wait_for_handles_failed_job_race_condition_failed_job_removed_from_scheduler( - server_is_running_mock, caplog -): - shutil.copytree(relpath("test_data", "detached"), ".", dirs_exist_ok=True) - config = EverestConfig.load_file("valid_yaml_config.yml") - - class _MockContext(MockContext): - @staticmethod - def job_progress(*args): - return None - - @staticmethod - def get_job_state(*args): - raise IndexError("Some trackback") - - with caplog.at_level(logging.ERROR), pytest.raises(SystemExit): - wait_for_server(config, timeout=1, context=_MockContext()) - - assert any( - "Race condition in wait_for_server, failed job removed from scheduler" - for x in caplog.messages - ) def _get_reference_config(): @@ -279,11 +179,10 @@ def _get_reference_config(): def test_detached_mode_config_base(copy_math_func_test_data_to_tmp): - everest_config, reference = _get_reference_config() - ert_config = generate_everserver_ert_config(everest_config) + everest_config, _ = _get_reference_config() + queue_config = get_server_queue_options(everest_config) - assert ert_config is not None - assert ert_config == reference + assert queue_config == LocalQueueOptions(max_running=1) @pytest.mark.parametrize( @@ -305,35 +204,20 @@ def test_everserver_queue_config_equal_to_run_config( if name is not None: simulator_config.update({"name": name}) everest_config.simulator = SimulatorConfig(**simulator_config) - server_ert_config = generate_everserver_ert_config(everest_config) + server_queue_option = get_server_queue_options(everest_config) ert_config = _everest_to_ert_config_dict(everest_config) - server_queue_option = server_ert_config["QUEUE_OPTION"] run_queue_option = ert_config["QUEUE_OPTION"] - assert ert_config["QUEUE_SYSTEM"] == server_ert_config["QUEUE_SYSTEM"] + assert ert_config["QUEUE_SYSTEM"] == server_queue_option.name assert ( next(filter(lambda x: "MAX_RUNNING" in x, reversed(run_queue_option)))[-1] == cores ) - assert ( - next(filter(lambda x: "MAX_RUNNING" in x, reversed(server_queue_option)))[-1] - == 1 - ) + assert server_queue_option.max_running == 1 if name is not None: - assert next(filter(lambda x: name in x, run_queue_option)) == next( - filter(lambda x: name in x, server_queue_option) - ) - - -def test_detached_mode_config_debug(copy_math_func_test_data_to_tmp): - everest_config, reference = _get_reference_config() - ert_config = generate_everserver_ert_config(everest_config, debug_mode=True) - - reference["SIMULATION_JOB"][0].append("--debug") - - assert ert_config is not None - assert ert_config == reference + option = next(filter(lambda x: name in x, run_queue_option)) + assert option[-1] == name == getattr(server_queue_option, option[1].lower()) @pytest.mark.parametrize("queue_system", ["lsf", "slurm"]) @@ -344,9 +228,8 @@ def test_detached_mode_config_only_sim(copy_math_func_test_data_to_tmp, queue_sy queue_options = [(queue_system.upper(), "MAX_RUNNING", 1)] reference.setdefault("QUEUE_OPTION", []).extend(queue_options) everest_config.simulator = SimulatorConfig(**{CK.QUEUE_SYSTEM: queue_system}) - ert_config = generate_everserver_ert_config(everest_config) - assert ert_config is not None - assert ert_config == reference + queue_config = get_server_queue_options(everest_config) + assert str(queue_config.name.name).lower() == queue_system def test_detached_mode_config_error(copy_math_func_test_data_to_tmp): @@ -357,24 +240,8 @@ def test_detached_mode_config_error(copy_math_func_test_data_to_tmp): everest_config, _ = _get_reference_config() everest_config.server = ServerConfig(name="server", queue_system="lsf") - with pytest.raises(ValueError): - generate_everserver_ert_config(everest_config) - - -def test_detached_mode_config_queue_name(copy_math_func_test_data_to_tmp): - everest_config, reference = _get_reference_config() - - queue_name = "put_me_in_the_queue" - reference["QUEUE_SYSTEM"] = QueueSystem.LSF - queue_options = [(QueueSystem.LSF, "LSF_QUEUE", queue_name)] - - reference.setdefault("QUEUE_OPTION", []).extend(queue_options) - everest_config.simulator = SimulatorConfig(queue_system="lsf") - everest_config.server = ServerConfig(queue_system="lsf", name=queue_name) - - ert_config = generate_everserver_ert_config(everest_config) - assert ert_config is not None - assert ert_config == reference + with pytest.raises(ValueError, match="so must the everest server"): + get_server_queue_options(everest_config) @pytest.mark.parametrize( @@ -406,100 +273,24 @@ def test_find_queue_system(config: EverestConfig, expected_result): assert result == expected_result -def test_find_queue_system_error(): - config = EverestConfig.with_defaults(**{"server": {CK.QUEUE_SYSTEM: "lsf"}}) - - with pytest.raises(ValueError): - _find_res_queue_system(config) - - -@pytest.mark.parametrize("queue_options", [[], [("EVEREST_KEY", "RES_KEY")]]) -@pytest.mark.parametrize("queue_system", ["LSF", "SLURM", "SOME_NEW_QUEUE_SYSTEM"]) -def test_generate_queue_options_no_config(queue_options, queue_system): +def test_generate_queue_options_no_config(): config = EverestConfig.with_defaults(**{}) - res_queue_name = "SOME_ERT_KEY" # LSF_QUEUE_KEY for LSF - assert [(queue_system, "MAX_RUNNING", 1)] == _generate_queue_options( - config, queue_options, res_queue_name, queue_system - ) - - -@pytest.mark.parametrize("queue_options", [[], [("exclude_host", "RES_KEY")]]) -@pytest.mark.parametrize("queue_system", ["LSF", "SLURM", "SOME_NEW_QUEUE_SYSTEM"]) -def test_generate_queue_options_only_name(queue_options, queue_system): - config = EverestConfig.with_defaults(**{"server": {"name": "my_custom_queue_name"}}) - res_queue_name = "SOME_ERT_KEY" # LSF_QUEUE_KEY for LSF - assert _generate_queue_options( - config, queue_options, res_queue_name, queue_system - ) == [ - ( - queue_system, - res_queue_name, - "my_custom_queue_name", - ), - ] - - -@pytest.mark.parametrize( - "queue_options, expected_result", - [ - ([], []), - ( - [("options", "RES_KEY")], - [ - ( - "SOME_QUEUE_SYSTEM", - "RES_KEY", - "ever_opt_1", - ), - ], - ), - ], -) -def test_generate_queue_options_only_options(queue_options, expected_result): - config = EverestConfig.with_defaults(**{"server": {"options": "ever_opt_1"}}) - res_queue_name = "NOT_RELEVANT_IN_THIS_CONTEXT" - queue_system = "SOME_QUEUE_SYSTEM" - assert ( - _generate_queue_options(config, queue_options, res_queue_name, queue_system) - == expected_result - ) + assert get_server_queue_options(config) == LocalQueueOptions(max_running=1) @pytest.mark.parametrize( "queue_options, expected_result", [ ( - [], - [ - ( - "SLURM", - "MAX_RUNNING", - 1, - ) - ], + {"options": "ever_opt_1", "queue_system": "slurm"}, + SlurmQueueOptions(max_running=1), ), ( - [("options", "RES_KEY")], - [ - ( - "SLURM", - "MAX_RUNNING", - 1, - ), - ( - "SLURM", - "RES_KEY", - "ever_opt_1", - ), - ], + {"options": "ever_opt_1", "queue_system": "lsf"}, + LsfQueueOptions(max_running=1, lsf_resource="ever_opt_1"), ), ], ) def test_generate_queue_options_use_simulator_values(queue_options, expected_result): - config = EverestConfig.with_defaults(**{"simulator": {"options": "ever_opt_1"}}) - res_queue_name = "NOT_RELEVANT_IN_THIS_CONTEXT" - queue_system = "SLURM" - assert ( - _generate_queue_options(config, queue_options, res_queue_name, queue_system) - == expected_result - ) + config = EverestConfig.with_defaults(**{"simulator": queue_options}) + assert get_server_queue_options(config) == expected_result diff --git a/tests/everest/test_everest_output.py b/tests/everest/test_everest_output.py index 6719795b07c..4ac98fe9dad 100644 --- a/tests/everest/test_everest_output.py +++ b/tests/everest/test_everest_output.py @@ -1,7 +1,6 @@ import fnmatch import os import shutil -from unittest.mock import patch import pytest @@ -9,7 +8,7 @@ from ert.run_models.everest_run_model import EverestRunModel from ert.storage import open_storage from everest.config import EverestConfig -from everest.detached import generate_everserver_ert_config, start_server +from everest.detached import start_server from everest.simulator.everest_to_ert import _everest_to_ert_config_dict from everest.strings import ( DEFAULT_OUTPUT_DIR, @@ -42,8 +41,7 @@ def test_that_one_experiment_creates_one_ensemble_per_batch( @pytest.mark.integration_test -@patch("ert.simulator.BatchSimulator.start", return_value=None) -def test_everest_output(start_mock, copy_mocked_test_data_to_tmp): +def test_everest_output(copy_mocked_test_data_to_tmp): config_folder = os.getcwd() config = EverestConfig.load_file("mocked_test_case.yml") everest_output_dir = config.output_dir @@ -74,13 +72,8 @@ def useless_cb(*args, **kwargs): assert "storage" not in initial_folders assert DETACHED_NODE_DIR not in initial_folders - ert_config = ErtConfig.with_plugins().from_dict( - generate_everserver_ert_config(config) - ) makedirs_if_needed(config.output_dir, roll_if_exists=True) - with open_storage(ert_config.ens_path, "w") as storage: - start_server(config, ert_config, storage) - start_mock.assert_called_once() + start_server(config) (path, folders, files) = next(os.walk(config_folder)) # Check we are looking at the config folder @@ -97,26 +90,18 @@ def useless_cb(*args, **kwargs): # Check storage folder no longer created in the config folder assert "storage" not in final_folders makedirs_if_needed(config.output_dir, roll_if_exists=True) - with open_storage(ert_config.ens_path, "w") as storage: - start_server(config, ert_config, storage) - assert start_mock.call_count == 2 + start_server(config) final_files = os.listdir(config_folder) # verify two everest_output dirs present assert len(fnmatch.filter(final_files, "everest_output*")) == 2 -@patch("ert.simulator.BatchSimulator.start", return_value=None) -def test_save_running_config(start_mock, copy_math_func_test_data_to_tmp): +async def test_save_running_config(copy_math_func_test_data_to_tmp): file_name = "config_minimal.yml" config = EverestConfig.load_file(file_name) - ert_config = ErtConfig.with_plugins().from_dict( - generate_everserver_ert_config(config) - ) makedirs_if_needed(config.output_dir, roll_if_exists=True) - with open_storage(ert_config.ens_path, "w") as storage: - start_server(config, ert_config, storage) - start_mock.assert_called_once() + await start_server(config) saved_config_path = os.path.join(config.output_dir, file_name) diff --git a/tests/everest/test_logging.py b/tests/everest/test_logging.py index 764859d43d3..7269457f633 100644 --- a/tests/everest/test_logging.py +++ b/tests/everest/test_logging.py @@ -2,14 +2,10 @@ import pytest -from ert.config import ErtConfig -from ert.storage import open_storage +from ert.scheduler.event import FinishedEvent from everest.config import EverestConfig, ServerConfig from everest.detached import ( - context_stop_and_wait, - generate_everserver_ert_config, start_server, - wait_for_context, wait_for_server, ) from everest.util import makedirs_if_needed @@ -23,26 +19,26 @@ def string_exists_in_file(file_path, string): return string in txt -@pytest.mark.flaky(reruns=5) +@pytest.mark.timeout(60) # Simulation might not finish @pytest.mark.integration_test @pytest.mark.xdist_group(name="starts_everest") -@pytest.mark.fails_on_macos_github_workflow -def test_logging_setup(copy_math_func_test_data_to_tmp): +async def test_logging_setup(copy_math_func_test_data_to_tmp): + async def server_running(): + while True: + event = await driver.event_queue.get() + if isinstance(event, FinishedEvent) and event.iens == 0: + return + everest_config = EverestConfig.load_file(CONFIG_FILE) - wait_for_context() - ert_config = ErtConfig.with_plugins().from_dict( - generate_everserver_ert_config(everest_config, True) - ) makedirs_if_needed(everest_config.output_dir, roll_if_exists=True) - with open_storage(ert_config.ens_path, "w") as storage: - start_server(everest_config, ert_config, storage) - try: - wait_for_server(everest_config, 120) - wait_for_context() - except SystemExit as e: - context_stop_and_wait() - raise e + driver = await start_server(everest_config, debug=True) + try: + wait_for_server(everest_config, 120) + except SystemExit as e: + raise e + await server_running() + everest_output_path = os.path.join(os.getcwd(), "everest_output") everest_logs_dir_path = everest_config.log_dir @@ -53,20 +49,12 @@ def test_logging_setup(copy_math_func_test_data_to_tmp): everest_log_path = os.path.join(everest_logs_dir_path, "everest.log") forward_model_log_path = os.path.join(everest_logs_dir_path, "forward_models.log") simulation_log_path = os.path.join(everest_logs_dir_path, "simulations.log") - everest_server_stderr_path = os.path.join( - everest_logs_dir_path, "everest_server.stderr.0" - ) - everest_server_stdout_path = os.path.join( - everest_logs_dir_path, "everest_server.stdout.0" - ) assert os.path.exists(everest_output_path) assert os.path.exists(everest_logs_dir_path) assert os.path.exists(forward_model_log_path) assert os.path.exists(simulation_log_path) assert os.path.exists(everest_log_path) - assert os.path.exists(everest_server_stderr_path) - assert os.path.exists(everest_server_stdout_path) assert os.path.exists(endpoint_log_path) assert string_exists_in_file(everest_log_path, "everest DEBUG:")