diff --git a/src/tests/ftest/util/job_manager_utils.py b/src/tests/ftest/util/job_manager_utils.py index 2b5f2cd6c26..afca5df86f1 100644 --- a/src/tests/ftest/util/job_manager_utils.py +++ b/src/tests/ftest/util/job_manager_utils.py @@ -473,6 +473,7 @@ def __init__(self, job, subprocess=False, mpi_type="openmpi"): self.tmpdir_base = FormattedParameter("--mca orte_tmpdir_base {}", None) self.args = BasicParameter(None, None) self.mpi_type = mpi_type + self.hostlist = FormattedParameter("-hosts {}", None) def assign_hosts(self, hosts, path=None, slots=None, hostfile=True): """Assign the hosts to use with the command (-f). diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 2d1c11e722e..60351cb2163 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -25,10 +25,11 @@ from soak_utils import (SoakTestError, add_pools, build_job_script, cleanup_dfuse, create_app_cmdline, create_dm_cmdline, create_fio_cmdline, create_ior_cmdline, create_macsio_cmdline, create_mdtest_cmdline, - create_racer_cmdline, ddhhmmss_format, get_harassers, - launch_exclude_reintegrate, launch_extend, launch_reboot, - launch_server_stop_start, launch_snapshot, launch_vmd_identify_check, - reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check) + create_racer_cmdline, ddhhmmss_format, debug_logging, get_harassers, + get_id, get_job_logs, job_cleanup, launch_exclude_reintegrate, + launch_extend, launch_jobscript, launch_reboot, launch_server_stop_start, + launch_snapshot, launch_vmd_identify_check, reserved_file_copy, + run_event_check, run_metrics_check, run_monitor_check) class SoakTestBase(TestWithServers): @@ -78,7 +79,11 @@ def __init__(self, *args, **kwargs): self.soak_log_dir = None self.soak_dir = None self.enable_scrubber = False + self.job_scheduler = None + self.joblist = None + self.enable_debug_msg = False self.enable_rebuild_logmasks = False + self.down_nodes = None def setUp(self): """Define test setup to be done.""" @@ -97,30 +102,29 @@ def setUp(self): self.sharedsoaktest_dir = self.sharedsoak_dir + "/pass" + str(self.loop) # Initialize dmg cmd self.dmg_command = self.get_dmg_command() - # Fail if slurm partition is not defined - # NOTE: Slurm reservation and partition are created before soak runs. - # CI uses partition=daos_client and no reservation. - # A21 uses partition=normal/default and reservation=daos-test. - # Partition and reservation names are updated in the yaml file. - # It is assumed that if there is no reservation (CI only), then all - # the nodes in the partition will be used for soak. - if not self.host_info.clients.partition.name: - raise SoakTestError( - "<>") - self.srun_params = {"partition": self.host_info.clients.partition.name} - if self.host_info.clients.partition.reservation: - self.srun_params["reservation"] = self.host_info.clients.partition.reservation - # Include test node for log cleanup; remove from client list + self.job_scheduler = self.params.get("job_scheduler", "/run/*", default="slurm") + # soak jobs do not run on the local node local_host_list = include_local_host(None) - self.slurm_exclude_nodes.add(local_host_list) if local_host_list[0] in self.hostlist_clients: self.hostlist_clients.remove((local_host_list[0])) if not self.hostlist_clients: - self.fail( - "There are no valid nodes in this partition to run " - "soak. Check partition {} for valid nodes".format( - self.host_info.clients.partition.name)) + self.fail("There are no valid nodes to run soak") + if self.job_scheduler == "slurm": + # Fail if slurm partition is not defined + # NOTE: Slurm reservation and partition are created before soak runs. + # CI uses partition=daos_client and no reservation. + # A21 uses partition=normal/default and reservation=daos-test. + # Partition and reservation names are updated in the yaml file. + # It is assumed that if there is no reservation (CI only), then all + # the nodes in the partition will be used for soak. + if not self.host_info.clients.partition.name: + raise SoakTestError( + "<>") + self.srun_params = {"partition": self.host_info.clients.partition.name} + if self.host_info.clients.partition.reservation: + self.srun_params["reservation"] = self.host_info.clients.partition.reservation + # Include test node for log cleanup; remove from client list + self.slurm_exclude_nodes.add(local_host_list) def pre_tear_down(self): """Tear down any test-specific steps prior to running tearDown(). @@ -133,7 +137,7 @@ def pre_tear_down(self): self.log.info("<> at %s", time.ctime()) errors = [] # clear out any jobs in squeue; - if self.failed_job_id_list: + if self.failed_job_id_list and self.job_scheduler == "slurm": job_id = " ".join([str(job) for job in self.failed_job_id_list]) self.log.info("<>", job_id) cmd = "scancel --partition {} -u {} {}".format( @@ -144,7 +148,8 @@ def pre_tear_down(self): if self.all_failed_jobs: errors.append("SOAK FAILED: The following jobs failed {} ".format( " ,".join(str(j_id) for j_id in self.all_failed_jobs))) - + # cleanup any remaining jobs + job_cleanup(self.log, self.hostlist_clients) # verify reserved container data if self.resv_cont: final_resv_file = os.path.join(self.test_dir, "final", "resv_file") @@ -284,6 +289,122 @@ def harasser_job_done(self, args): self.harasser_results[args["name"]] = args["status"] self.harasser_args[args["name"]] = args["vars"] + def schedule_jobs(self, node_list): + """Schedule jobs with internal scheduler. + + Args: + node_list (list): list of nodes to use in jobs + """ + debug_logging(self.log, self.enable_debug_msg, "DBG: schedule_jobs ENTERED ") + job_queue = multiprocessing.Queue() + jobid_list = [] + jobs_not_done = [] + # remove any nodes marked as DOWN + node_list.difference_update(self.down_nodes) + lib_path = os.getenv("LD_LIBRARY_PATH") + path = os.getenv("PATH") + v_env = os.getenv("VIRTUAL_ENV") + env = ";".join([f"export LD_LIBRARY_PATH={lib_path}", + f"export PATH={path}"]) + if v_env: + env = ";".join([env, f"export VIRTUAL_ENV={v_env}"]) + for job_dict in self.joblist: + jobid_list.append(job_dict["jobid"]) + jobs_not_done.append(job_dict["jobid"]) + self.log.info("Submitting %s jobs at %s", str(len(jobid_list)), time.ctime()) + job_threads = [] + while True: + if time.time() > self.end_time or len(jobs_not_done) == 0: + break + job_results = {} + # verify that there are enough nodes to run remaining jobs + if len(job_threads) == 0: + for job_dict in self.joblist: + job_id = job_dict["jobid"] + if job_id in jobs_not_done: + node_count = job_dict["nodesperjob"] + if len(node_list) < node_count: + # cancel job + self.soak_results.update({job_id: "CANCELLED"}) + self.log.info( + "FINAL STATE: soak job %s completed with : %s at %s", + job_id, + "CANCELLED", + time.ctime()) + jobs_not_done.remove(job_id) + for job_dict in self.joblist: + job_id = job_dict["jobid"] + if job_id in jobid_list: + node_count = job_dict["nodesperjob"] + if len(node_list) >= node_count: + debug_logging( + self.log, self.enable_debug_msg, f"DBG: node_count {node_count}") + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list initial/queue {node_list}") + job_node_list = node_list[:node_count] + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list before launch_job {node_list}") + script = job_dict["jobscript"] + timeout = job_dict["jobtimeout"] + log = job_dict["joblog"] + error_log = job_dict["joberrlog"] + method = launch_jobscript + params = (self.log, job_queue, job_id, job_node_list, + env, script, log, error_log, timeout, self) + name = f"SOAK JOB {job_id}" + _thread = threading.Thread( + target=method, args=params, name=name, daemon=True) + job_threads.append(_thread) + jobid_list.remove(job_id) + node_list = node_list[node_count:] + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list after launch_job {node_list}") + + # Start this job + _thread.start() + + # If we don't process any results this time, we'll sleep before checking again + do_sleep = True + + # Keep reference only to threads that are still running + _alive_threads = [] + for job in job_threads: + if job.is_alive(): + _alive_threads.append(job) + continue + # join finished threads to be safe + job.join() + # Don't sleep - starting scheduling immediately + do_sleep = False + job_threads = _alive_threads + + # Process results, if any + while not job_queue.empty(): + job_results = job_queue.get() + # Results to return in queue + node_list.update(job_results["host_list"]) + self.down_nodes.update(job_results["down_nodes"]) + debug_logging(self.log, self.enable_debug_msg, "DBG: Updating soak results") + self.soak_results[job_results["handle"]] = job_results["state"] + job_done_id = job_results["handle"] + jobs_not_done.remove(job_done_id) + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list returned from queue {node_list}") + + # Sleep to avoid spin lock + if do_sleep: + time.sleep(3) + + debug_logging(self.log, self.enable_debug_msg, "DBG: schedule_jobs EXITED ") + def job_setup(self, jobs, pool): """Create the cmdline needed to launch job. @@ -292,28 +413,27 @@ def job_setup(self, jobs, pool): pool (obj): TestPool obj Returns: - job_cmdlist: list of sbatch scripts that can be launched - by slurm job manager + job_cmdlist: list of dictionary of jobs that can be launched """ - job_cmdlist = [] self.log.info("<> at %s", self.test_name, time.ctime()) for job in jobs: - jobscript = [] + # list of all job scripts + jobscripts = [] + # command is a list of [sbatch_cmds, log_name] to create a single job script commands = [] - nodesperjob = self.params.get( - "nodesperjob", "/run/" + job + "/*", [1]) - taskspernode = self.params.get( - "taskspernode", "/run/" + job + "/*", [1]) + total_nodes = NodeSet(self.hostlist_clients) + if self.down_nodes: + total_nodes.difference_update(self.down_nodes) + nodesperjob = self.params.get("nodesperjob", "/run/" + job + "/*", [1]) + taskspernode = self.params.get("taskspernode", "/run/" + job + "/*", [1]) for npj in list(nodesperjob): # nodesperjob = -1 indicates to use all nodes in client hostlist if npj < 0: - npj = len(self.hostlist_clients) - if len(self.hostlist_clients) / npj < 1: - raise SoakTestError( - "<> at %s", self.test_name, time.ctime()) job_id_list = [] - # before submitting the jobs to the queue, check the job timeout; + # before starting jobs, check the job timeout; if time.time() > self.end_time: self.log.info("<< SOAK test timeout in Job Startup>>") return job_id_list - # job_cmdlist is a list of batch script files - for script in job_cmdlist: - try: - job_id = slurm_utils.run_slurm_script(self.log, str(script)) - except slurm_utils.SlurmFailed as error: - self.log.error(error) - # Force the test to exit with failure - job_id = None - if job_id: - self.log.info( - "<> at %s", - job_id, script, time.ctime()) - slurm_utils.register_for_job_results(job_id, self, max_wait=self.test_timeout) - # keep a list of the job_id's - job_id_list.append(int(job_id)) - else: - # one of the jobs failed to queue; exit on first fail for now. - err_msg = f"Slurm failed to submit job for {script}" - job_id_list = [] - raise SoakTestError(f"<>") + if self.job_scheduler == "slurm": + for job_dict in self.joblist: + script = job_dict["jobscript"] + try: + job_id = slurm_utils.run_slurm_script(self.log, str(script)) + except slurm_utils.SlurmFailed as error: + self.log.error(error) + # Force the test to exit with failure + job_id = None + if job_id: + self.log.info( + "<> at %s", job_id, script, time.ctime()) + slurm_utils.register_for_job_results(job_id, self, max_wait=self.test_timeout) + # Update Job_List with the job_id + job_dict["job_id"] = int(job_id) + job_id_list.append(int(job_id)) + else: + # one of the jobs failed to queue; exit on first fail for now. + err_msg = f"Job failed to run for {script}" + job_id_list = [] + raise SoakTestError(f"<>") + else: + for job_dict in self.joblist: + job_dict["jobid"] = get_id() + job_id_list.append(job_dict["jobid"]) + node_list = NodeSet(self.hostlist_clients) + node_list.difference_update(self.down_nodes) + # self.schedule_jobs() + method = self.schedule_jobs + name = "Job Scheduler" + params = (node_list, ) + scheduler = threading.Thread( + target=method, args=params, name=name, daemon=True) + scheduler.start() + return job_id_list def job_completion(self, job_id_list): @@ -385,8 +530,9 @@ def job_completion(self, job_id_list): failed_job_id_list: IDs of each job that failed in slurm """ - self.log.info( - "<> at %s", self.test_name, time.ctime()) + # pylint: disable=too-many-nested-blocks + + self.log.info("<> at %s", self.test_name, time.ctime()) harasser_interval = 0 failed_harasser_msg = None harasser_timer = time.time() @@ -395,21 +541,28 @@ def job_completion(self, job_id_list): since = journalctl_time() # loop time exists after the first pass; no harassers in the first pass if self.harasser_loop_time and self.harassers: - harasser_interval = self.harasser_loop_time / ( - len(self.harassers) + 1) + harasser_interval = self.harasser_loop_time / (len(self.harassers) + 1) # If there is nothing to do; exit if job_id_list: # wait for all the jobs to finish while len(self.soak_results) < len(job_id_list): - # wait for the jobs to complete. - # enter tearDown before hitting the avocado timeout + debug_logging( + self.log, self.enable_debug_msg, f"DBG: SOAK RESULTS 1 {self.soak_results}") + # wait for the jobs to complete unless test_timeout occurred if time.time() > self.end_time: - self.log.info( - "<< SOAK test timeout in Job Completion at %s >>", - time.ctime()) - for job in job_id_list: - if not slurm_utils.cancel_jobs(self.log, self.control, int(job)).passed: - self.fail(f"Error canceling Job {job}") + self.log.info("<< SOAK test timeout in Job Completion at %s >>", time.ctime()) + if self.job_scheduler == "slurm": + for job in job_id_list: + if not slurm_utils.cancel_jobs(self.log, self.control, int(job)).passed: + self.fail(f"Error canceling Job {job}") + else: + # update soak_results to include job id NOT run and set state = CANCELLED + for job in job_id_list: + if job not in self.soak_results: + self.soak_results.update({job: "CANCELLED"}) + self.log.info("FINAL STATE: soak job %s completed with : %s at %s", + job, "CANCELLED", time.ctime()) + break # monitor events every 15 min if datetime.now() > check_time: run_monitor_check(self) @@ -444,27 +597,14 @@ def job_completion(self, job_id_list): if failed_harasser_msg is not None: self.all_failed_harassers.append(failed_harasser_msg) # check for JobStatus = COMPLETED or CANCELLED (i.e. TEST TO) + debug_logging( + self.log, self.enable_debug_msg, f"DBG: SOAK RESULTS 2 {self.soak_results}") for job, result in list(self.soak_results.items()): if result in ["COMPLETED", "CANCELLED"]: job_id_list.remove(int(job)) else: - self.log.info( - "<< Job %s failed with status %s>>", job, result) - # gather all the logfiles for this pass and cleanup test nodes - cmd = f"/usr/bin/rsync -avtr --min-size=1B {self.soak_log_dir} {self.outputsoak_dir}/" - cmd2 = f"/usr/bin/rm -rf {self.soak_log_dir}" - if self.enable_remote_logging: - # Limit fan out to reduce burden on filesystem - result = run_remote(self.log, self.hostlist_clients, cmd, timeout=600, fanout=64) - if result.passed: - result = run_remote(self.log, self.hostlist_clients, cmd2, timeout=600) - if not result.passed: - self.log.error("Remote copy failed on %s", str(result.failed_hosts)) - # copy the local files; local host not included in hostlist_client - if not run_local(self.log, cmd, timeout=600).passed: - self.log.info("Local copy failed: %s", cmd) - if not run_local(self.log, cmd2, timeout=600).passed: - self.log.info("Local copy failed: %s", cmd2) + self.log.info("<< Job %s failed with status %s>>", job, result) + get_job_logs(self) self.soak_results = {} return job_id_list @@ -487,7 +627,8 @@ def execute_jobs(self, jobs, pools): SoakTestError """ - job_script_list = [] + jobid_list = [] + self.joblist = [] # Update the remote log directories from new loop/pass sharedsoaktest_dir = self.sharedsoak_dir + "/pass" + str(self.loop) outputsoaktest_dir = self.outputsoak_dir + "/pass" + str(self.loop) @@ -507,18 +648,15 @@ def execute_jobs(self, jobs, pools): else: self.soak_log_dir = sharedsoaktest_dir # create the batch scripts - job_script_list = self.job_setup(jobs, pools) - # randomize job list - random.seed(4) - random.shuffle(job_script_list) + self.job_setup(jobs, pools) # Gather the job_ids - job_id_list = self.job_startup(job_script_list) + jobid_list = self.job_startup() # Initialize the failed_job_list to job_list so that any # unexpected failures will clear the squeue in tearDown - self.failed_job_id_list = job_id_list + self.failed_job_id_list = jobid_list # Wait for jobs to finish and cancel/kill jobs if necessary - self.failed_job_id_list = self.job_completion(job_id_list) + self.failed_job_id_list = self.job_completion(jobid_list) # Log the failing job ID if self.failed_job_id_list: self.log.info( @@ -537,6 +675,7 @@ def run_soak(self, test_param): """ self.soak_results = {} + self.joblist = [] self.pool = [] self.container = [] self.harasser_results = {} @@ -547,6 +686,8 @@ def run_soak(self, test_param): self.soak_errors = [] self.check_errors = [] self.used = [] + self.down_nodes = NodeSet() + self.enable_debug_msg = self.params.get("enable_debug_msg", "/run/*", default=False) self.mpi_module = self.params.get("mpi_module", "/run/*", default="mpi/mpich-x86_64") self.mpi_module_use = self.params.get( "mpi_module_use", "/run/*", default="/usr/share/modulefiles") @@ -559,7 +700,7 @@ def run_soak(self, test_param): resv_bytes = self.params.get("resv_bytes", test_param + "*", 500000000) ignore_soak_errors = self.params.get("ignore_soak_errors", test_param + "*", False) self.enable_il = self.params.get("enable_intercept_lib", test_param + "*", False) - self.sudo_cmd = "sudo" if enable_sudo else "" + self.sudo_cmd = "sudo -n" if enable_sudo else "" self.enable_remote_logging = self.params.get( "enable_remote_logging", os.path.join(test_param, "*"), False) self.enable_scrubber = self.params.get( diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index c527e67fea8..429a09d8de9 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -5,16 +5,19 @@ """ # pylint: disable=too-many-lines +import getpass import os import random import re +import stat import threading import time -from itertools import product +from itertools import count, product -import slurm_utils from avocado.core.exceptions import TestFail from avocado.utils.distro import detect +from ClusterShell.NodeSet import NodeSet +from command_utils import command_as_user from command_utils_base import EnvironmentVariables from daos_racer_utils import DaosRacerCommand from data_mover_utils import DcpCommand, FsCopy @@ -23,7 +26,7 @@ from duns_utils import format_path from exception_utils import CommandFailure from fio_utils import FioCommand -from general_utils import (DaosTestError, check_ping, check_ssh, get_host_data, get_log_file, +from general_utils import (DaosTestError, check_ping, check_ssh, get_journalctl, get_log_file, get_random_bytes, get_random_string, list_to_str, pcmd, run_command, run_pcmd, wait_for_result) from ior_utils import IorCommand @@ -32,10 +35,11 @@ from mdtest_utils import MdtestCommand from oclass_utils import extract_redundancy_factor from pydaos.raw import DaosApiError, DaosSnapshot -from run_utils import run_remote +from run_utils import run_local, run_remote from test_utils_container import add_container H_LOCK = threading.Lock() +id_counter = count(start=1) def ddhhmmss_format(seconds): @@ -56,6 +60,27 @@ def ddhhmmss_format(seconds): "%H:%M:%S", time.gmtime(seconds % 86400))) +def get_id(): + """Increment a counter to generate job ids + + Returns: + int : next counter value + """ + return next(id_counter) + + +def debug_logging(log, enable_debug_msg, log_msg): + """Enable debug messages in log file. + + Args: + log (logger): logger for the messages produced by this method + enable_debug_msg (boolean): If true, the debug message will be written to log + log_msg (str): debug message to write to log + """ + if enable_debug_msg: + log.debug(log_msg) + + def add_pools(self, pool_names, ranks=None): """Create a list of pools that the various tests use for storage. @@ -181,7 +206,7 @@ def run_event_check(self, since, until): hosts = list(set(self.hostlist_servers)) if events: for journalctl_type in ["kernel", "daos_server"]: - for output in get_journalctl(self, hosts, since, until, journalctl_type): + for output in get_journalctl(hosts, since, until, journalctl_type): for event in events: lines = output["data"].splitlines() for line in lines: @@ -195,7 +220,7 @@ def run_event_check(self, since, until): return events_found -def get_journalctl(self, hosts, since, until, journalctl_type, logging=False): +def get_journalctl_logs(self, hosts, since, until, journalctl_type): """Run the journalctl on daos servers. Args: @@ -211,18 +236,14 @@ def get_journalctl(self, hosts, since, until, journalctl_type, logging=False): "data": data requested for the group of hosts """ - command = "{} /usr/bin/journalctl --system -t {} --since=\"{}\" --until=\"{}\"".format( - self.sudo_cmd, journalctl_type, since, until) - err = "Error gathering system log events" - results = get_host_data(hosts, command, "journalctl", err) + results = get_journalctl(hosts, since, until, journalctl_type) name = f"journalctl_{journalctl_type}.log" destination = self.outputsoak_dir - if logging: - for result in results: - for host in result["hosts"]: - log_name = name + "-" + str(host) - self.log.info("Logging %s output to %s", command, log_name) - write_logfile(result["data"], log_name, destination) + for result in results: + for host in result["hosts"]: + log_name = name + "-" + str(host) + self.log.info("Logging output to %s", log_name) + write_logfile(result["data"], log_name, destination) return results @@ -233,7 +254,7 @@ def get_daos_server_logs(self): self (obj): soak obj """ daos_dir = self.outputsoak_dir + "/daos_server_logs" - logs_dir = os.environ.get("DAOS_TEST_LOG_DIR", "/var/tmp/daos_testing/") + logs_dir = self.test_env.log_dir + "/*log*" hosts = self.hostlist_servers if not os.path.exists(daos_dir): os.mkdir(daos_dir) @@ -244,6 +265,34 @@ def get_daos_server_logs(self): raise SoakTestError(f"<>") from error +def get_job_logs(self): + """Gather all job logs for the current pass of soak.""" + + # gather all the logfiles for this pass and cleanup client nodes + cmd = f"/usr/bin/rsync -avtr --min-size=1B {self.soak_log_dir} {self.outputsoak_dir}/" + cmd2 = f"/usr/bin/rm -rf {self.soak_log_dir}" + if self.enable_remote_logging: + # Limit fan out to reduce burden on filesystem + result = run_remote(self.log, self.hostlist_clients, cmd, timeout=600, fanout=64) + if result.passed: + result = run_remote(self.log, self.hostlist_clients, cmd2, timeout=600) + if not result.passed: + self.log.error("Remote copy failed on %s", str(result.failed_hosts)) + # copy script files from shared dir + sharedscr_dir = self.sharedsoak_dir + "/pass" + str(self.loop) + cmd3 = f"/usr/bin/rsync -avtr --min-size=1B {sharedscr_dir} {self.outputsoak_dir}/" + cmd4 = f"/usr/bin/rm -rf {sharedscr_dir}" + if not run_local(self.log, cmd3, timeout=600).passed: + self.log.info("Script file copy failed with %s", cmd3) + if not run_local(self.log, cmd4, timeout=600).passed: + self.log.info("Script file copy failed with %s", cmd4) + # copy the local files; local host not included in hostlist_client + if not run_local(self.log, cmd, timeout=600).passed: + self.log.info("Local copy failed: %s", cmd) + if not run_local(self.log, cmd2, timeout=600).passed: + self.log.info("Local copy failed: %s", cmd2) + + def run_monitor_check(self): """Monitor server cpu, memory usage periodically. @@ -340,6 +389,108 @@ def wait_for_pool_rebuild(self, pool, name): return rebuild_status +def job_cleanup(log, hosts): + """Cleanup after job is done. + + Args: + log (logger): logger for the messages produced by this method + hosts (list): list of node to pass to job script + """ + current_user = getpass.getuser() + for job in ["mpirun", "palsd", "dfuse"]: + cmd = [f"/usr/bin/bash -c 'for pid in $(pgrep -u {current_user} {job})", + "do kill -HUP $pid", + "done'"] + run_remote( + log, hosts, ";".join(cmd), verbose=False, timeout=600, task_debug=False, stderr=False) + if job == "dfuse": + cmd2 = [ + "/usr/bin/bash -c 'for dir in $(find /tmp/soak_dfuse_*/)", + "do fusermount3 -uz $dir", + "rm -rf $dir", + "done'"] + run_remote(log, hosts, ";".join(cmd2), verbose=False, timeout=600, task_debug=False, + stderr=False) + + +def launch_jobscript( + log, job_queue, job_id, host_list, env, script, job_log, error_log, timeout, test): + """Launch the job script on remote node. + + Args: + log (logger): logger for the messages produced by this method + job_queue (Queue): job queue to post status of job + job_id (int): unique job identifier + host_list (list): list of node to pass to job script + env (str): environment variables for job script + script (str): full path to job script + job_log (str): job std out + error_log (str): job std error + timeout (int): job timeout + test (TestObj): soak test obj + """ + + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} ENTERED launch_jobscript") + job_results = [] + node_results = [] + down_nodes = NodeSet() + state = "UNKNOWN" + if time.time() >= test.end_time: + results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") + job_queue.put(results) + return + if isinstance(host_list, str): + # assume one host in list + hosts = host_list + rhost = host_list + else: + hosts = ",".join(sorted(host_list)) + rhost = NodeSet(hosts)[0] + job_log1 = job_log.replace("JOBID", str(job_id)) + error_log1 = error_log.replace("JOBID", str(job_id)) + joblog = job_log1.replace("RHOST", str(rhost)) + errorlog = error_log1.replace("RHOST", str(rhost)) + cmd = ";".join([env, f"{script} {hosts} {job_id} {joblog} {errorlog}"]) + job_results = run_remote( + log, rhost, cmd, verbose=False, timeout=timeout * 60, task_debug=False, stderr=False) + if job_results: + if job_results.timeout: + state = "TIMEOUT" + elif job_results.passed: + state = "COMPLETED" + elif not job_results.passed: + state = "FAILED" + else: + state = "UNKNOWN" + # attempt to cleanup any leftover job processes on timeout + job_cleanup(log, hosts) + if time.time() >= test.end_time: + results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") + job_queue.put(results) + # give time to update the queue before exiting + time.sleep(0.5) + return + + # check if all nodes are available + cmd = f"ls {test.test_env.log_dir}" + node_results = run_remote(log, NodeSet(hosts), cmd, verbose=False) + if node_results.failed_hosts: + for node in node_results.failed_hosts: + host_list.remove(node) + down_nodes.update(node) + log.info(f"DBG: Node {node} is marked as DOWN in job {job_id}") + + log.info("FINAL STATE: soak job %s completed with : %s at %s", job_id, state, time.ctime()) + results = {"handle": job_id, "state": state, "host_list": host_list, "down_nodes": down_nodes} + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") + job_queue.put(results) + # give time to update the queue before exiting + time.sleep(0.5) + return + + def launch_snapshot(self, pool, name): """Create a basic snapshot of the reserved pool. @@ -504,16 +655,16 @@ def launch_reboot(self, pools, name, results, args): self.log.info( "<<>>\n", self.loop, name, ranks, time.ctime()) # reboot host in 1 min - result = run_remote(self.log, reboot_host, "sudo shutdown -r +1") + result = run_remote(self.log, reboot_host, command_as_user("shutdown -r +1", "root")) if result.passed: status = True else: - self.log.error(f"<<>>\n", self.loop, name, reboot_host, time.ctime()) - cmd_results = run_remote(self.log, reboot_host, "sudo systemctl restart daos_server") + cmd_results = run_remote( + self.log, reboot_host, command_as_user("systemctl restart daos_server", "root")) if cmd_results.passed: self.dmg_command.system_query() for pool in pools: @@ -861,16 +1013,19 @@ def start_dfuse(self, pool, container, name=None, job_spec=None): dfuselog = os.path.join( self.soak_log_dir, self.test_name + "_" + name + "_`hostname -s`_" - "" + "${SLURM_JOB_ID}_" + "daos_dfuse.log") - dfuse_env = f"export D_LOG_FILE_APPEND_PID=1;export D_LOG_MASK=ERR;export D_LOG_FILE={dfuselog}" - module_load = f"module use {self.mpi_module_use};module load {self.mpi_module}" + "" + "${JOB_ID}_" + "daos_dfuse.log") + dfuse_env = ";".join( + ["export D_LOG_FILE_APPEND_PID=1", + "export D_LOG_MASK=ERR", + f"export D_LOG_FILE={dfuselog}"]) + module_load = ";".join([f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"]) dfuse_start_cmds = [ - "clush -S -w $SLURM_JOB_NODELIST \"mkdir -p {}\"".format(dfuse.mount_dir.value), - "clush -S -w $SLURM_JOB_NODELIST \"cd {};{};{};{}\"".format( + "clush -S -w $HOSTLIST \"mkdir -p {}\"".format(dfuse.mount_dir.value), + "clush -S -w $HOSTLIST \"cd {};{};{};{}\"".format( dfuse.mount_dir.value, dfuse_env, module_load, str(dfuse)), "sleep 10", - "clush -S -w $SLURM_JOB_NODELIST \"df -h {}\"".format(dfuse.mount_dir.value), + "clush -S -w $HOSTLIST \"df -h {}\"".format(dfuse.mount_dir.value), ] return dfuse, dfuse_start_cmds @@ -892,8 +1047,8 @@ def stop_dfuse(dfuse, vol=False): dfuse.mount_dir.value)]) dfuse_stop_cmds.extend([ - "clush -S -w $SLURM_JOB_NODELIST \"fusermount3 -uz {0}\"".format(dfuse.mount_dir.value), - "clush -S -w $SLURM_JOB_NODELIST \"rm -rf {0}\"".format(dfuse.mount_dir.value)]) + f'clush -S -w $HOSTLIST "fusermount3 -uz {dfuse.mount_dir.value}"', + f'clush -S -w $HOSTLIST "rm -rf {dfuse.mount_dir.value}"']) return dfuse_stop_cmds @@ -989,7 +1144,7 @@ def create_ior_cmdline(self, job_spec, pool, ppn, nodesperjob, oclass_list=None, file_dir_oclass[0], nodesperjob * ppn, nodesperjob, ppn) daos_log = os.path.join( self.soak_log_dir, self.test_name + "_" + log_name - + "_`hostname -s`_${SLURM_JOB_ID}_daos.log") + + "_`hostname -s`_${JOB_ID}_daos.log") env = ior_cmd.get_default_env("mpirun", log_file=daos_log) env["D_LOG_FILE_APPEND_PID"] = "1" sbatch_cmds = [f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"] @@ -1009,17 +1164,21 @@ def create_ior_cmdline(self, job_spec, pool, ppn, nodesperjob, oclass_list=None, # add envs if api is HDF5-VOL if api == "HDF5-VOL": vol = True + cont_props = container.properties.value + env["HDF5_DAOS_FILE_PROP"] = '"' + cont_props.replace(",", ";") + '"' + env["HDF5_DAOS_OBJ_CLASS"] = file_dir_oclass[0] env["HDF5_VOL_CONNECTOR"] = "daos" env["HDF5_PLUGIN_PATH"] = str(plugin_path) mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.assign_environment(env, True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") if api in ["HDF5-VOL", "POSIX", "POSIX-LIBPIL4DFS", "POSIX-LIBIOIL"]: sbatch_cmds.extend(stop_dfuse(dfuse, vol)) commands.append([sbatch_cmds, log_name]) - self.log.info(f"<>:") + self.log.info(f"<>: ") for cmd in sbatch_cmds: self.log.info(cmd) return commands @@ -1059,13 +1218,13 @@ def create_macsio_cmdline(self, job_spec, pool, ppn, nodesperjob): job_spec, api, file_oclass, nodesperjob * ppn, nodesperjob, ppn) daos_log = os.path.join( self.soak_log_dir, self.test_name - + "_" + log_name + "_`hostname -s`_${SLURM_JOB_ID}_daos.log") + + "_" + log_name + "_`hostname -s`_${JOB_ID}_daos.log") macsio_log = os.path.join( self.soak_log_dir, self.test_name - + "_" + log_name + "_`hostname -s`_${SLURM_JOB_ID}_macsio-log.log") + + "_" + log_name + "_`hostname -s`_${JOB_ID}_macsio-log.log") macsio_timing_log = os.path.join( self.soak_log_dir, self.test_name - + "_" + log_name + "_`hostname -s`_${SLURM_JOB_ID}_macsio-timing.log") + + "_" + log_name + "_`hostname -s`_${JOB_ID}_macsio-timing.log") macsio.log_file_name.update(macsio_log) macsio.timings_file_name.update(macsio_timing_log) env = macsio.env.copy() @@ -1087,12 +1246,13 @@ def create_macsio_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.working_dir.update(dfuse.mount_dir.value) mpirun_cmd.assign_environment(env, True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") if api in ["HDF5-VOL"]: sbatch_cmds.extend(stop_dfuse(dfuse, vol=True)) commands.append([sbatch_cmds, log_name]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in sbatch_cmds: self.log.info(cmd) return commands @@ -1158,7 +1318,7 @@ def create_mdtest_cmdline(self, job_spec, pool, ppn, nodesperjob): ppn) daos_log = os.path.join( self.soak_log_dir, self.test_name + "_" + log_name - + "_`hostname -s`_${SLURM_JOB_ID}_daos.log") + + "_`hostname -s`_${JOB_ID}_daos.log") env = mdtest_cmd.get_default_env("mpirun", log_file=daos_log) env["D_LOG_FILE_APPEND_PID"] = "1" sbatch_cmds = [f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"] @@ -1178,12 +1338,13 @@ def create_mdtest_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.assign_environment(env, True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") if api in ["POSIX", "POSIX-LIBPIL4DFS", "POSIX-LIBIOIL"]: sbatch_cmds.extend(stop_dfuse(dfuse)) commands.append([sbatch_cmds, log_name]) - self.log.info(f"<>:") + self.log.info(f"<>: ") for cmd in sbatch_cmds: self.log.info(cmd) return commands @@ -1213,7 +1374,7 @@ def create_racer_cmdline(self, job_spec): racer_log = os.path.join( self.soak_log_dir, self.test_name + "_" + job_spec + "_`hostname -s`_" - "${SLURM_JOB_ID}_" + "racer_log") + "${JOB_ID}_" + "racer_log") daos_racer.env["D_LOG_FILE"] = get_log_file(racer_log) log_name = job_spec cmds = [] @@ -1221,7 +1382,7 @@ def create_racer_cmdline(self, job_spec): cmds.append("status=$?") # add exit code commands.append([cmds, log_name]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in cmds: self.log.info(cmd) return commands @@ -1307,7 +1468,7 @@ def create_fio_cmdline(self, job_spec, pool): cmds.append("cd -") cmds.extend(stop_dfuse(dfuse)) commands.append([cmds, log_name]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in cmds: self.log.info(cmd) return commands @@ -1341,14 +1502,14 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): # ${DAOS_TEST_APP_SRC}/suse => apps built with suse and gnu-mpich # pylint: disable-next=wrong-spelling-in-comment,fixme # ${DAOS_TEST_APP_SRC}/suse/intelmpi => apps built with suse and intelmpi - if "suse" in detect().name.lower(): + if "suse" in detect().name.lower() and os.environ.get("DAOS_TEST_MODE") is None: os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "suse") - if "mpi/latest" in mpi_module: + if "mpi/latest" in mpi_module and os.environ.get("DAOS_TEST_MODE") is None: os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "intelmpi") os.environ["I_MPI_OFI_LIBRARY_INTERNAL"] = "0" app_cmd = os.path.expandvars(self.params.get("cmdline", app_params, default=None)) if app_cmd is None: - self.log.info(f"<<{job_spec} command line not specified in yaml; job will not be run>>") + self.log.info(f"<<{job_spec} command line not specified in yaml>>") return commands oclass_list = self.params.get("oclass", app_params) for file_oclass, dir_oclass in oclass_list: @@ -1378,6 +1539,7 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.assign_environment(env, True) mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") if api in ["POSIX", "POSIX-LIBIOIL", "POSIX-LIBPIL4DFS"]: mpirun_cmd.working_dir.update(dfuse.mount_dir.value) cmdline = str(mpirun_cmd) @@ -1386,7 +1548,7 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): if api in ["POSIX", "POSIX-LIBIOIL", "POSIX-LIBPIL4DFS"]: sbatch_cmds.extend(stop_dfuse(dfuse)) commands.append([sbatch_cmds, log_name]) - self.log.info(f"<<{job_spec.upper()} cmdlines>>:") + self.log.info(f"<<{job_spec.upper()} cmdlines>>: ") for cmd in sbatch_cmds: self.log.info("%s", cmd) if mpi_module != self.mpi_module: @@ -1428,7 +1590,7 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): dcp_cmd.set_params(src=src_file, dst=dst_file) env_vars = { "D_LOG_FILE": os.path.join(self.soak_log_dir, self.test_name + "_" - + log_name + "_`hostname -s`_${SLURM_JOB_ID}_daos.log"), + + log_name + "_`hostname -s`_${JOB_ID}_daos.log"), "D_LOG_FILE_APPEND_PID": "1" } mpirun_cmd = Mpirun(dcp_cmd, mpi_type=self.mpi_module) @@ -1436,6 +1598,7 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.assign_environment(EnvironmentVariables(env_vars), True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") @@ -1443,7 +1606,7 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): dm_commands = create_ior_cmdline( self, ior_spec, pool, ppn, nodesperjob, [[file_oclass, dir_oclass]], cont_2) sbatch_cmds.extend(dm_commands[0][0]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in sbatch_cmds: self.log.info("%s", cmd) commands.append([sbatch_cmds, log_name]) @@ -1451,52 +1614,115 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): def build_job_script(self, commands, job, nodesperjob, ppn): - """Create a slurm batch script that will execute a list of cmdlines. + """Generate a script that will execute a list of commands. Args: - self (obj): soak obj - commands(list): command lines and cmd specific log_name - job(str): the job name that will be defined in the slurm script + path (str): where to write the script file + name (str): job name + output (str): where to put the output (full path) + nodecount (int): number of compute nodes to execute on + cmds (list): shell commands that are to be executed + uniq (str): a unique string to append to the job and log files + sbatch_params (dict, optional): dictionary containing other less often used parameters to + sbatch, e.g. mem:100. Defaults to None. + + Raises: + SoakTestError: if missing require parameters for the job script Returns: - script_list: list of slurm batch scripts + str: the full path of the script """ - job_timeout = self.params.get("job_timeout", "/run/" + job + "/*", 10) - self.log.info("<> at %s", time.ctime()) + self.log.info("<> at %s", time.ctime()) script_list = [] - # if additional cmds are needed in the batch script + # Additional commands needed in the job script prepend_cmds = ["set +e", "echo Job_Start_Time `date \\+\"%Y-%m-%d %T\"`", "daos pool query {} ".format(self.pool[1].identifier), "daos pool query {} ".format(self.pool[0].identifier)] + append_cmds = ["daos pool query {} ".format(self.pool[1].identifier), "daos pool query {} ".format(self.pool[0].identifier), "echo Job_End_Time `date \\+\"%Y-%m-%d %T\"`"] exit_cmd = ["exit $status"] - # Create the sbatch script for each list of cmdlines + for cmd, log_name in commands: - if isinstance(cmd, str): - cmd = [cmd] - output = os.path.join( - self.soak_log_dir, self.test_name + "_" + log_name + "_%N_" + "%j_") - error = os.path.join(str(output) + "ERROR_") - sbatch = { - "time": str(job_timeout) + ":00", - "exclude": str(self.slurm_exclude_nodes), - "error": str(error), - "export": "ALL", - "exclusive": None, - "ntasks": str(nodesperjob * ppn) - } - # include the cluster specific params - sbatch.update(self.srun_params) unique = get_random_string(5, self.used) - script = slurm_utils.write_slurm_script( - self.soak_log_dir, job, output, nodesperjob, - prepend_cmds + cmd + append_cmds + exit_cmd, unique, sbatch) - script_list.append(script) self.used.append(unique) + if isinstance(cmd, str): + cmd = [cmd] + if self.job_scheduler == "slurm": + job_timeout = self.params.get("job_timeout", "/run/" + job + "/*", 10) + job_log = os.path.join( + self.soak_log_dir, self.test_name + "_" + log_name + "_%N_" + "%j_") + output = job_log + unique + error = job_log + "ERROR_" + unique + sbatch_params = { + "time": str(job_timeout) + ":00", + "exclude": str(self.slurm_exclude_nodes), + "error": str(error), + "export": "ALL", + "exclusive": None, + "ntasks": str(nodesperjob * ppn) + } + # include the cluster specific params + sbatch_params.update(self.srun_params) + else: + job_log = os.path.join( + self.soak_log_dir, self.test_name + "_" + log_name + "_RHOST" + "_JOBID_") + output = job_log + unique + error = job_log + "ERROR_" + unique + + job_cmds = prepend_cmds + cmd + append_cmds + exit_cmd + # Write script file to shared dir + sharedscript_dir = self.sharedsoak_dir + "/pass" + str(self.loop) + scriptfile = sharedscript_dir + '/jobscript' + "_" + str(unique) + ".sh" + with open(scriptfile, 'w') as script_file: + script_file.write("#!/bin/bash\n#\n") + if self.job_scheduler == "slurm": + # write the slurm directives in the job script + script_file.write("#SBATCH --job-name={}\n".format(job)) + script_file.write("#SBATCH --nodes={}\n".format(nodesperjob)) + script_file.write("#SBATCH --distribution=cyclic\n") + script_file.write("#SBATCH --output={}\n".format(output)) + if sbatch_params: + for key, value in list(sbatch_params.items()): + if value is not None: + script_file.write("#SBATCH --{}={}\n".format(key, value)) + else: + script_file.write("#SBATCH --{}\n".format(key)) + script_file.write("\n") + script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n") + script_file.write(" echo \"VIRTUAL_ENV not defined\" \n") + script_file.write("else \n") + script_file.write(" source $VIRTUAL_ENV/bin/activate \n") + script_file.write("fi \n") + script_file.write("HOSTLIST=`nodeset -e -S \",\" $SLURM_JOB_NODELIST` \n") + script_file.write("JOB_ID=$SLURM_JOB_ID \n") + script_file.write("echo \"SLURM NODES: $SLURM_JOB_NODELIST \" \n") + script_file.write("echo \"NODE COUNT: $SLURM_JOB_NUM_NODES \" \n") + script_file.write("echo \"JOB ID: $JOB_ID \" \n") + script_file.write("echo \"HOSTLIST: $HOSTLIST \" \n") + script_file.write("\n") + else: + script_file.write("HOSTLIST=$1 \n") + script_file.write("JOB_ID=$2 \n") + script_file.write("JOB_LOG=$3 \n") + script_file.write("JOB_ERROR_LOG=$4 \n") + script_file.write("echo \"JOB NODES: $HOSTLIST \" \n") + script_file.write("echo \"JOB ID: $JOB_ID \" \n") + script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n") + script_file.write(" echo \"VIRTUAL_ENV not defined\" \n") + script_file.write("else \n") + script_file.write(" source $VIRTUAL_ENV/bin/activate \n") + script_file.write("fi \n") + script_file.write("exec 1> $JOB_LOG \n") + script_file.write("exec 2> $JOB_ERROR_LOG \n") + + for cmd in list(job_cmds): + script_file.write(cmd + "\n") + os.chmod(scriptfile, stat.S_IXUSR | stat.S_IRUSR) + script_list.append([scriptfile, output, error]) return script_list