diff --git a/apps/mpas/bin/experiment.py b/apps/mpas/bin/experiment.py index 972285ec..55d76406 100644 --- a/apps/mpas/bin/experiment.py +++ b/apps/mpas/bin/experiment.py @@ -46,7 +46,7 @@ def main(user_config_file: Path) -> None: # Load Parsl resource configs resource_config_file = mpas_app / "config" / "resources.yaml" yaml_config = chiltepin.configure.parse_file(resource_config_file) - resources = chiltepin.configure.load(yaml_config[machine]) + resources = chiltepin.configure.load(yaml_config[machine]["resources"], resources=["service", "compute", "mpi"]) with parsl.load(resources): # Instantiate LimitedArea object diff --git a/src/chiltepin/configure.py b/src/chiltepin/configure.py index 1cf918b9..8e612d62 100644 --- a/src/chiltepin/configure.py +++ b/src/chiltepin/configure.py @@ -1,10 +1,11 @@ -from typing import Any, Dict +from typing import Any, Dict, List import yaml from parsl.config import Config from parsl.executors import GlobusComputeExecutor, HighThroughputExecutor, MPIExecutor from parsl.launchers import SimpleLauncher from parsl.providers import SlurmProvider +from globus_compute_sdk import Executor # Define function to parse yaml config @@ -180,7 +181,7 @@ def make_globus_compute_executor( """ e = GlobusComputeExecutor( label=name, - endpoint_id=config["endpoint id"], + executor=Executor(endpoint_id=config["endpoint id"]), user_endpoint_config={ "engine": config.get("engine", "GlobusComputeEngine"), "max_mpi_apps": config.get("max mpi apps", 1), @@ -198,7 +199,7 @@ def make_globus_compute_executor( return e -def load(config: Dict[str, Any]) -> Config: +def load(config: Dict[str, Any], resources: List[str]|None=None) -> Config: """Construct a list of Executors from the input configuration dictionary The list returned by this function can be used to construct a Parsl Config @@ -212,24 +213,30 @@ def load(config: Dict[str, Any]) -> Config: YAML configuration block that contains the configuration for a list of resources + resources: List[str] | None + A list of the labels of the resources to load. The default is None. + If None, all resources are loaded. Otherwise the resources whose + labels are in the list will be loaded. + Returns ------- Config """ executors = [] - for name, spec in config["resources"].items(): - match spec["engine"]: - case "HTEX": - # Make a HighThroughputExecutor - executors.append(make_htex_executor(name, spec)) - case "MPI": - # Make an MPIExecutor - executors.append(make_mpi_executor(name, spec)) - case "GlobusComputeEngine": - # Make a GlobusComputeExecutor for non-MPI jobs - executors.append(make_globus_compute_executor(name, spec)) - case "GlobusMPIEngine": - # Make a GlobusComputeExecutor for MPI jobs - executors.append(make_globus_compute_executor(name, spec)) + for name, spec in config.items(): + if resources is None or name in resources: + match spec["engine"]: + case "HTEX": + # Make a HighThroughputExecutor + executors.append(make_htex_executor(name, spec)) + case "MPI": + # Make an MPIExecutor + executors.append(make_mpi_executor(name, spec)) + case "GlobusComputeEngine": + # Make a GlobusComputeExecutor for non-MPI jobs + executors.append(make_globus_compute_executor(name, spec)) + case "GlobusMPIEngine": + # Make a GlobusComputeExecutor for MPI jobs + executors.append(make_globus_compute_executor(name, spec)) return Config(executors) diff --git a/tests/config.yaml b/tests/config.yaml index 79230f07..e416b939 100644 --- a/tests/config.yaml +++ b/tests/config.yaml @@ -54,7 +54,8 @@ hercules: account: "gsd-hpcs" <<: *hercules-env gc-mpi: - engine: MPI + engine: GlobusMPIEngine + endpoint id: "{{ mpi_endpoint_id }}" cores per node: 80 nodes per block: 3 max mpi apps: 2 @@ -62,14 +63,16 @@ hercules: account: "gsd-hpcs" <<: *hercules-env gc-service: - engine: HTEX + engine: GlobusComputeEngine + endpoint id: "{{ service_endpoint_id }}" cores per node: 1 nodes per block: 1 partition: "service" account: "gsd-hpcs" <<: *hercules-env gc-compute: - engine: HTEX + engine: GlobusComputeEngine + endpoint id: "{{ compute_endpoint_id }}" cores per node: 80 nodes per block: 1 partition: "hercules" @@ -101,7 +104,8 @@ hera: account: "gsd-hpcs" <<: *hera-env gc-mpi: - engine: MPI + engine: GlobusMPIEngine + endpoint id: "{{ mpi_endpoint_id }}" cores per node: 40 nodes per block: 3 max mpi apps: 2 @@ -109,14 +113,16 @@ hera: account: "gsd-hpcs" <<: *hera-env gc-service: - engine: HTEX + engine: GlobusComputeEngine + endpoint id: "{{ service_endpoint_id }}" cores per node: 1 nodes per block: 1 partition: "service" account: "gsd-hpcs" <<: *hera-env gc-compute: - engine: HTEX + engine: GlobusComputeEngine + endpoint id: "{{ compute_endpoint_id }}" cores per node: 40 nodes per block: 1 partition: "hera" @@ -148,7 +154,8 @@ docker: account: "" <<: *docker-env gc-mpi: - engine: MPI + engine: GlobusMPIEngine + endpoint id: "{{ mpi_endpoint_id }}" cores per node: 8 nodes per block: 3 max mpi apps: 2 @@ -156,14 +163,16 @@ docker: account: "" <<: *docker-env gc-service: - engine: HTEX + engine: GlobusComputeEngine + endpoint id: "{{ service_endpoint_id }}" cores per node: 1 nodes per block: 1 partition: "slurmpar" account: "" <<: *docker-env gc-compute: - engine: HTEX + engine: GlobusComputeEngine + endpoint id: "{{ compute_endpoint_id }}" cores per node: 8 nodes per block: 1 partition: "slurmpar" diff --git a/tests/test_globus_compute_hello.py b/tests/test_globus_compute_hello.py index a111854e..b34f241e 100644 --- a/tests/test_globus_compute_hello.py +++ b/tests/test_globus_compute_hello.py @@ -1,9 +1,12 @@ import pathlib import subprocess +import parsl +from chiltepin.tasks import python_task import pytest from globus_compute_sdk import Executor -from jinja2 import Environment, FileSystemLoader +from jinja2 import BaseLoader, Environment, FileSystemLoader +import yaml import chiltepin.configure @@ -17,18 +20,19 @@ def config(config_file, platform): f"export PYTHONPATH={pwd.parent.resolve()}" ) resources = yaml_config[platform]["resources"] - return {"resources": resources} + return resources def test_endpoint_configure(config): pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "configure", - "gc-service", + f"{endpoint}", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -39,28 +43,28 @@ def test_endpoint_configure(config): # Customize endpoint configs for this platform using Jinja2 templates jinja_env = Environment(loader=FileSystemLoader(f"{pwd}/templates/")) - for endpoint in ["gc-service"]: - template = jinja_env.get_template(f"{endpoint}.yaml") - content = template.render( - partition=config["resources"][endpoint]["partition"], - account=config["resources"][endpoint]["account"], - worker_init=";".join(config["resources"][endpoint]["environment"]) - ) - with open( - f"{pwd}/globus_compute/{endpoint}/config.yaml", mode="w", encoding="utf-8" - ) as gc_config: - gc_config.write(content) + template = jinja_env.get_template(f"{endpoint}.yaml") + content = template.render( + partition=config[endpoint]["partition"], + account=config[endpoint]["account"], + worker_init=";".join(config[endpoint]["environment"]) + ) + with open( + f"{pwd}/globus_compute/{endpoint}/config.yaml", mode="w", encoding="utf-8" + ) as gc_config: + gc_config.write(content) def test_endpoint_start(): pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "start", - "gc-service", + f"{endpoint}", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -70,13 +74,15 @@ def test_endpoint_start(): assert p.returncode == 0 -def test_hello_endpoint(): +def test_hello_endpoint(config): + @python_task def hello(): return "Hello" pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( - f"globus-compute-endpoint -c {pwd}/globus_compute list | grep gc-service | cut -d' ' -f 2", + f"globus-compute-endpoint -c {pwd}/globus_compute list | grep {endpoint} | cut -d' ' -f 2", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, @@ -84,23 +90,31 @@ def hello(): shell=True, ) assert p.returncode == 0 - hello_endpoint_id = p.stdout.strip() assert len(hello_endpoint_id) == 36 - with Executor(endpoint_id=hello_endpoint_id) as gce: - future = gce.submit(hello) - assert future.result() == "Hello" + config_string = yaml.dump(config) + template = Environment(loader=BaseLoader()).from_string(config_string) + content = template.render( + service_endpoint_id=hello_endpoint_id + ) + + content_yaml = yaml.safe_load(content) + resources = chiltepin.configure.load(content_yaml, resources=[endpoint]) + with parsl.load(resources): + future = hello(executor=endpoint) + assert future.result() == "Hello" def test_endpoint_stop(): pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "stop", - "gc-service", + f"{endpoint}", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -112,6 +126,7 @@ def test_endpoint_stop(): def test_endpoint_delete(): pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( [ "globus-compute-endpoint", @@ -119,7 +134,7 @@ def test_endpoint_delete(): f"{pwd}/globus_compute", "delete", "--yes", - "gc-service", + f"{endpoint}", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, diff --git a/tests/test_globus_compute_mpi.py b/tests/test_globus_compute_mpi.py index 8ccd7501..98426f21 100644 --- a/tests/test_globus_compute_mpi.py +++ b/tests/test_globus_compute_mpi.py @@ -4,9 +4,12 @@ import subprocess from datetime import datetime as dt +from chiltepin.tasks import bash_task import pytest from globus_compute_sdk import Executor, MPIFunction, ShellFunction -from jinja2 import Environment, FileSystemLoader +from jinja2 import BaseLoader, Environment, FileSystemLoader +import yaml +import parsl import chiltepin.configure @@ -23,7 +26,7 @@ def config(config_file, platform): f"export PYTHONPATH={pwd.parent.resolve()}" ) resources = yaml_config[platform]["resources"] - return {"resources": resources} + return resources # Local function to get endpoint ids @@ -105,9 +108,9 @@ def test_endpoint_configure(config): for endpoint in ["gc-compute", "gc-mpi"]: template = jinja_env.get_template(f"{endpoint}.yaml") content = template.render( - partition=config["resources"][endpoint]["partition"], - account=config["resources"][endpoint]["account"], - worker_init=";".join(config["resources"][endpoint]["environment"]) + partition=config[endpoint]["partition"], + account=config[endpoint]["account"], + worker_init=";".join(config[endpoint]["environment"]) ) with open( f"{pwd}/globus_compute/{endpoint}/config.yaml", mode="w", encoding="utf-8" @@ -156,28 +159,26 @@ def test_endpoint_start(): def test_endpoint_mpi_hello(config): pwd = pathlib.Path(__file__).parent.resolve() - # Define a ShellFunction to compile the MPI code - compile_func = ShellFunction( - """ + # Define a bash task to compile the MPI code + @bash_task + def compile_func(dirpath, stdout=None, stderr=None): + return f""" cd {dirpath} $CHILTEPIN_MPIF90 -o mpi_hello.exe mpi_hello.f90 - """, - stdout=os.path.join(pwd, "globus_compute_mpi_hello_compile.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_hello_compile.err"), - ) + """ - # Define a MPIFunction to run the MPI program - hello_func = MPIFunction( - """true; # Force the default prefix to launch a no-op + # Define a bash task to run the MPI program + @bash_task + def hello_func(dirpath, stdout=None, stderr=None, parsl_resource_specification=None): + return f""" cd {dirpath} $PARSL_MPI_PREFIX --overcommit ./mpi_hello.exe - """, - stdout=os.path.join(pwd, "globus_compute_mpi_hello_run.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_hello_run.err"), - ) + """ # Get a listing of the endpoints gc_mpi_endpoint_id, gc_compute_endpoint_id = _get_endpoint_ids() + assert len(gc_mpi_endpoint_id) == 36 + assert len(gc_compute_endpoint_id) == 36 # Remove any previous output if necessary if os.path.exists(pwd / "globus_compute_mpi_hello_compile.out"): @@ -189,28 +190,38 @@ def test_endpoint_mpi_hello(config): if os.path.exists(pwd / "globus_compute_mpi_hello_run.err"): os.remove(pwd / "globus_compute_mpi_hello_run.err") - # Compile the MPI program in the gc-compute endpoint - with Executor(endpoint_id=gc_compute_endpoint_id) as gce: - future = gce.submit( - compile_func, - dirpath=pwd, + config_string = yaml.dump(config) + template = Environment(loader=BaseLoader()).from_string(config_string) + content = template.render( + compute_endpoint_id=gc_compute_endpoint_id, + mpi_endpoint_id=gc_mpi_endpoint_id, + ) + + content_yaml = yaml.safe_load(content) + resources = chiltepin.configure.load(content_yaml, resources=["gc-compute", "gc-mpi"]) + with parsl.load(resources): + future = compile_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_hello_compile.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_hello_compile.err"), + executor="gc-compute" ) r = future.result() - assert r.returncode == 0 - - # Run the MPI program in the gc-mpi endpoint - with Executor(endpoint_id=gc_mpi_endpoint_id) as gce: - gce.resource_specification = { - "num_nodes": 3, # Number of nodes required for the application instance - "num_ranks": 6, # Number of ranks in total - "ranks_per_node": 2, # Number of ranks / application elements to be launched per node - } - future = gce.submit( - hello_func, - dirpath=pwd, + assert r == 0 + + future = hello_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_hello_run.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_hello_run.err"), + executor="gc-mpi", + parsl_resource_specification = { + "num_nodes": 3, # Number of nodes required for the application instance + "num_ranks": 6, # Number of ranks in total + "ranks_per_node": 2, # Number of ranks / application elements to be launched per node + }, ) r = future.result() - assert r.returncode == 0 + assert r == 0 # Check output with open(pwd / "globus_compute_mpi_hello_run.out", "r") as f: @@ -222,38 +233,26 @@ def test_endpoint_mpi_hello(config): def test_endpoint_mpi_pi(config): pwd = pathlib.Path(__file__).parent.resolve() - # Define a ShellFunction to compile the MPI code - compile_func = ShellFunction( - """ + # Define a bash task to compile the MPI code + @bash_task + def compile_func(dirpath, stdout=None, stderr=None): + return f""" cd {dirpath} $CHILTEPIN_MPIF90 -o mpi_pi.exe mpi_pi.f90 - """, - stdout=os.path.join(pwd, "globus_compute_mpi_pi_compile.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_pi_compile.err"), - ) - - # Define a MPIFunction to run the MPI program - pi1_func = MPIFunction( - """true; # Force the default prefix to launch a no-op - cd {dirpath} - $PARSL_MPI_PREFIX --overcommit ./mpi_pi.exe - """, - stdout=os.path.join(pwd, "globus_compute_mpi_pi1_run.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_pi1_run.err"), - ) + """ - # Define a MPIFunction to run the MPI program - pi2_func = MPIFunction( - """true; # Force the default prefix to launch a no-op + # Define a bash task to run the MPI program + @bash_task + def pi_func(dirpath, stdout=None, stderr=None, parsl_resource_specification=None): + return f""" cd {dirpath} $PARSL_MPI_PREFIX --overcommit ./mpi_pi.exe - """, - stdout=os.path.join(pwd, "globus_compute_mpi_pi2_run.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_pi2_run.err"), - ) + """ # Get a listing of the endpoints gc_mpi_endpoint_id, gc_compute_endpoint_id = _get_endpoint_ids() + assert len(gc_mpi_endpoint_id) == 36 + assert len(gc_compute_endpoint_id) == 36 # Remove any previous output if necessary if os.path.exists(pwd / "globus_compute_mpi_pi_compile.out"): @@ -269,42 +268,54 @@ def test_endpoint_mpi_pi(config): if os.path.exists(pwd / "globus_compute_mpi_pi2_run.err"): os.remove(pwd / "globus_compute_mpi_pi2_run.err") - # Compile the MPI program in the gc-compute endpoint - with Executor(endpoint_id=gc_compute_endpoint_id) as gce: - future = gce.submit( - compile_func, - dirpath=pwd, - ) - r = future.result() - assert r.returncode == 0 + config_string = yaml.dump(config) + template = Environment(loader=BaseLoader()).from_string(config_string) + content = template.render( + compute_endpoint_id=gc_compute_endpoint_id, + mpi_endpoint_id=gc_mpi_endpoint_id, + ) - # Run the MPI program in the gc-mpi endpoint - with Executor(endpoint_id=gc_mpi_endpoint_id) as gce: + content_yaml = yaml.safe_load(content) + resources = chiltepin.configure.load(content_yaml, resources=["gc-compute", "gc-mpi"]) + with parsl.load(resources): cores_per_node = 8 - gce.resource_specification = { + future = compile_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_pi_compile.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_pi_compile.err"), + executor="gc-compute", + ) + r = future.result() + assert r == 0 + + future1 = pi_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_pi1_run.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_pi1_run.err"), + executor="gc-mpi", + parsl_resource_specification = { "num_nodes": 2, # Number of nodes required for the application instance "num_ranks": 2 * cores_per_node, # Number of ranks in total "ranks_per_node": cores_per_node, # Number of ranks / application elements to be launched per node - } - future1 = gce.submit( - pi1_func, - dirpath=pwd, + }, ) - gce.resource_specification = { + future2 = pi_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_pi2_run.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_pi2_run.err"), + executor="gc-mpi", + parsl_resource_specification = { "num_nodes": 1, # Number of nodes required for the application instance "num_ranks": cores_per_node, # Number of ranks in total "ranks_per_node": cores_per_node, # Number of ranks / application elements to be launched per node - } - future2 = gce.submit( - pi2_func, - dirpath=pwd, + }, ) r1 = future1.result() - assert r1.returncode == 0 + assert r1 == 0 r2 = future2.result() - assert r2.returncode == 0 + assert r2 == 0 # Extract the hostnames used by pi1 with open(pwd / "globus_compute_mpi_pi1_run.out", "r") as f: @@ -375,7 +386,7 @@ def test_endpoint_stop(): # Test endpoint delete -def test_endpoint_delete(config): +def test_endpoint_delete(): pwd = pathlib.Path(__file__).parent.resolve() # Delete gc-compute endpoint diff --git a/tests/test_parsl_mpi_hello.py b/tests/test_parsl_mpi_hello.py index b4ef64c9..86d86e5d 100644 --- a/tests/test_parsl_mpi_hello.py +++ b/tests/test_parsl_mpi_hello.py @@ -76,7 +76,7 @@ def config(config_file, platform): yaml_config[platform]["resources"]["mpi"]["environment"].append( f"export PYTHONPATH={pwd.parent.resolve()}" ) - resources = chiltepin.configure.load(yaml_config[platform]) + resources = chiltepin.configure.load(yaml_config[platform]["resources"], resources=["compute", "mpi"]) with parsl.load(resources): yield {"resources": resources} parsl.clear() diff --git a/tests/test_qg_install.py b/tests/test_qg_install.py index d9287c64..c00a68fc 100644 --- a/tests/test_qg_install.py +++ b/tests/test_qg_install.py @@ -20,7 +20,7 @@ def config(config_file, platform): yaml_config[platform]["resources"]["compute"]["environment"].append( f"export PYTHONPATH={pwd.parent.resolve()}" ) - resources = chiltepin.configure.load(yaml_config[platform]) + resources = chiltepin.configure.load(yaml_config[platform]["resources"], resources=["service", "compute"]) with parsl.load(resources): yield {"resources": resources}