Skip to content

Commit

Permalink
LIU-420: Action sourcery code review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
myxie committed Nov 21, 2024
1 parent 9998d50 commit 24f82eb
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 50 deletions.
6 changes: 6 additions & 0 deletions daliuge-engine/dlg/deploy/create_dlg_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,15 @@ def create_experiment_group(parser: optparse.OptionParser):
return group

def create_job_group():
"""
TODO: LIU-424
"""
pass

def create_graph_group():
"""
TODO: LIU-424
"""
pass


Expand Down
102 changes: 58 additions & 44 deletions daliuge-engine/dlg/deploy/slurm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import subprocess
import shutil
import tempfile
import string
from dlg import remote
from dlg.runtime import __git_version__ as git_commit

Expand Down Expand Up @@ -101,8 +102,7 @@ def __init__(
self.exec_prefix = config["exec_prefix"]
self.username = config['user'] if 'user' in config else sys.exit(1)
if not self.username:
print("Username not configured in INI file.")
sys.exit(1)
print("Username not configured in INI file, using local username...")
else:
# Setup SLURM environment variables using config
config = ConfigFactory.create_config(facility=facility, user=username)
Expand Down Expand Up @@ -164,6 +164,13 @@ def __init__(


def create_session_suffix(self, suffix=None):
"""
Create a suffix to identify the session. If no suffix is specified, use the
current datetime setting.
:param: suffix, used to specify a non-datetime suffix.
:return: the final suffix
"""
if not suffix:
return datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
else:
Expand All @@ -179,35 +186,28 @@ def get_session_dirname(self):


def apply_slurm_template(self, template_str, session_id, dlg_root):
import string
"""
Given a string from a template file, use a string.Template object to perform
safe substution on the string and replace $VALUES with the correct value
specified.
"""
intermed_slurm = string.Template(template_str)
return intermed_slurm.safe_substitute(session_id=session_id, dlg_root=dlg_root)
ims = intermed_slurm.safe_substitute(session_id=session_id, dlg_root=dlg_root)
print("Creating job description")
return ims + "\n\n" + dlg_exec_str

def create_job_desc(self, physical_graph_file):
"""
Creates the slurm script from a physical graph
def create_paramater_mapping(self, session_dir, physical_graph_file):
"""
session_dir = "{0}/workspace/{1}".format(
self.dlg_root, self.get_session_dirname()
)

pardict = dict()
Map the runtime or configured parameters to the session environment and SLURM
script paramteres, in anticipation of using substition.
"""
pardict = {}
pardict["SESSION_ID"] = os.path.split(session_dir)[-1]
pardict["MODULES"] = self.modules
pardict["DLG_ROOT"] = self.dlg_root
pardict["EXEC_PREFIX"] = self.exec_prefix
slurm_str = dlg_exec_str
if self._slurm_template:
intermed_slurm = self.apply_slurm_template(
self._slurm_template,
pardict["SESSION_ID"],
pardict["DLG_ROOT"]
)
print("Creating job description")
slurm_str = intermed_slurm + "\n\n" + dlg_exec_str
else:
pardict["NUM_NODES"] = str(self._num_nodes)
pardict["JOB_DURATION"] = label_job_dur(self._job_dur)
pardict["NUM_NODES"] = str(self._num_nodes)
pardict["JOB_DURATION"] = label_job_dur(self._job_dur)

pardict["VENV"] = self.venv
pardict["PIP_NAME"] = self._pip_name
Expand Down Expand Up @@ -238,12 +238,31 @@ def create_job_desc(self, physical_graph_file):
pardict["CHECK_WITH_SESSION"] = (
"--check_with_session" if self._check_with_session else ""
)
return pardict

def create_job_desc(self, physical_graph_file):
"""
Creates the slurm script from a physical graph
This uses string.Template to apply substitutions that are linked to the
parameters defined at runtime. These parameters map to $VALUEs in a pre-defined
execution command that contains the necessary parameters to run DALiuGE through
SLURM.
"""

session_dir = "{0}/workspace/{1}".format(
self.dlg_root, self.get_session_dirname()
)
pardict = self.create_paramater_mapping(session_dir, physical_graph_file)

if self._slurm_template:
import string
job_desc = string.Template(slurm_str).safe_substitute(pardict)
else:
job_desc = init_tpl.safe_substitute(pardict)
return job_desc
slurm_str = self.apply_slurm_template(self._slurm_template,
pardict['SESSION_ID'],
pardict['DLG_ROOT'])
return string.Template(slurm_str).safe_substitute(pardict)

return init_tpl.safe_substitute(pardict)


def mk_session_dir(self, dlg_root: str = ""):
"""
Expand Down Expand Up @@ -279,15 +298,24 @@ def mk_session_dir(self, dlg_root: str = ""):
print(
f"ERROR: Unable to create {session_dir} on {self.username}@{self.host}, {str(e)}"
)
return None

return session_dir

def submit_job(self):
"""
Submits the slurm script to the requested facility
:returns: jobId, the id of the SLURM job create on the facility.
None if a remote directory could not be created or if an error occurs
during connection.
"""
jobId = None
session_dir = self.mk_session_dir()
if not session_dir:
print("No session_dir created.")
return jobId

physical_graph_file_name = "{0}/{1}".format(session_dir, self._pip_name)
if self._physical_graph_template_file:
if self._remote:
Expand All @@ -305,8 +333,7 @@ def submit_job(self):

job_file_name = "{0}/jobsub.sh".format(session_dir)
job_desc = self.create_job_desc(physical_graph_file_name)
print(job_desc)
# sys.exit()

if self._remote:
print(f"Creating SLURM script remotely: {job_file_name}")
tjob = tempfile.mktemp()
Expand Down Expand Up @@ -339,16 +366,3 @@ def submit_job(self):
else:
print(f"Created job submission script {job_file_name}")
return jobId


# class ConfigManager:
# """
# """

# def process_config():
# pass

# def create_slurm_script():
# """
# """
# job_desc = init_tpl.safe_substitute(pardict)
8 changes: 2 additions & 6 deletions daliuge-engine/test/deploy/test_slurm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
# Note this test will only run with a full installation of DALiuGE.
pexpect = pytest.importorskip("dlg.dropmake.pg_generator")


try:
from importlib.resources import files
except ModuleNotFoundError:
from importlib_resources import files # type: ignore
import dlg.deploy.configs as configs
from dlg.deploy.create_dlg_job import process_config
import daliuge_tests.engine.graphs as test_graphs


from dlg.deploy.slurm_client import SlurmClient
import json

Expand Down Expand Up @@ -64,7 +65,6 @@ def test_client_with_configfile(self):
- That we produce the same as the CLI with the same parameters
- That we can use the INI file to produce alternative parameters
"""
from dlg.deploy.create_dlg_job import process_config
pg = files(test_graphs) / "SLURM_HelloWorld_simplePG.graph"
cfg_file = Path(__file__).parent / "setonix.ini"
cfg = process_config(cfg_file)
Expand All @@ -81,8 +81,6 @@ def test_client_with_configfile(self):
job_desc = client.create_job_desc(pg)
curr_file = Path(__file__)
compare_script = curr_file.parent / "slurm_script.sh"
with open('slurm_script.sh', 'w') as fp:
fp.write(job_desc)
with compare_script.open() as fp:
script = fp.read()
self.assertEqual(script, job_desc)
Expand All @@ -107,8 +105,6 @@ def test_client_with_slurm_template(self):
job_desc = client.create_job_desc(pg)
curr_file = Path(__file__)
compare_script = curr_file.parent / "slurm_script_from_template.sh"
with open('output.sh', 'w') as fp:
fp.write(job_desc)
with compare_script.open() as fp:
script = fp.read()
self.assertEqual(script, job_desc)

0 comments on commit 24f82eb

Please sign in to comment.