From 6c54491bb26892ae8df7facb7c86ec0b62637393 Mon Sep 17 00:00:00 2001 From: Christopher Harrop <35781497+christopherwharrop-noaa@users.noreply.github.com> Date: Fri, 22 Nov 2024 10:55:58 -0700 Subject: [PATCH] Use GlobusComputeExecutor (#136) * Set globus compute endpoint environment in worker_init * Update code and tests to use GlobusComputeExecutor * Fix linting * MPAS app lint check is done in its own workflow, remove from main workflow * Update workflows to install "test" variant in CI workflows to make sure pytest is available --- .github/workflows/docker-slurm.yaml | 2 +- .github/workflows/mpas-app.yaml | 4 +- .github/workflows/package-cleanup.yaml | 64 ---- README.md | 16 +- apps/mpas/bin/experiment.py | 5 +- pyproject.toml | 37 ++- src/chiltepin/configure.py | 41 +-- tests/config.yaml | 75 +++++ .../{compute.yaml => gc-compute.yaml} | 0 tests/templates/{mpi.yaml => gc-mpi.yaml} | 0 .../{service.yaml => gc-service.yaml} | 0 tests/test_globus_compute_hello.py | 63 ++-- tests/test_globus_compute_mpi.py | 282 ++++++++++-------- tests/test_parsl_mpi_hello.py | 4 +- tests/test_qg_install.py | 4 +- 15 files changed, 341 insertions(+), 256 deletions(-) delete mode 100644 .github/workflows/package-cleanup.yaml rename tests/templates/{compute.yaml => gc-compute.yaml} (100%) rename tests/templates/{mpi.yaml => gc-mpi.yaml} (100%) rename tests/templates/{service.yaml => gc-service.yaml} (100%) diff --git a/.github/workflows/docker-slurm.yaml b/.github/workflows/docker-slurm.yaml index be470551..4ffd8c23 100644 --- a/.github/workflows/docker-slurm.yaml +++ b/.github/workflows/docker-slurm.yaml @@ -73,7 +73,7 @@ jobs: name: Install chiltepin package run: | docker exec frontend bash -l -c "cd work ; module use /opt/spack-stack/envs/unified-env/install/modulefiles/Core ; module load stack-gcc; module load stack-openmpi; module load stack-python ; python -m venv .chiltepin" - docker exec frontend bash -l -c "cd work ; source .chiltepin/bin/activate ; pip --use-deprecated=legacy-resolver install -e ." + docker exec frontend bash -l -c "cd work ; source .chiltepin/bin/activate ; pip --use-deprecated=legacy-resolver install -e .[test]" - name: Run test suite env: diff --git a/.github/workflows/mpas-app.yaml b/.github/workflows/mpas-app.yaml index 76a9ab24..da04507a 100644 --- a/.github/workflows/mpas-app.yaml +++ b/.github/workflows/mpas-app.yaml @@ -1,6 +1,8 @@ name: MPAS App on: + push: + branches: [ develop, main ] pull_request: branches: [ develop, main ] types: [ labeled ] @@ -72,7 +74,7 @@ jobs: name: Install chiltepin package run: | docker exec frontend bash -l -c "cd work ; module use /opt/spack-stack/envs/unified-env/install/modulefiles/Core ; module load stack-gcc; module load stack-openmpi; module load stack-python ; python -m venv .chiltepin" - docker exec frontend bash -l -c "cd work ; source .chiltepin/bin/activate ; pip --use-deprecated=legacy-resolver install -e ." + docker exec frontend bash -l -c "cd work ; source .chiltepin/bin/activate ; pip --use-deprecated=legacy-resolver install -e .[test]" - name: Run mpas workflow run: | diff --git a/.github/workflows/package-cleanup.yaml b/.github/workflows/package-cleanup.yaml deleted file mode 100644 index e67d732b..00000000 --- a/.github/workflows/package-cleanup.yaml +++ /dev/null @@ -1,64 +0,0 @@ -name: PackageCleanup - -on: - push: - branches: [ develop, main ] - pull_request: - branches: [ develop, main ] - workflow_dispatch: - -jobs: - cleanup-packages: - runs-on: ubuntu-latest - permissions: - packages: write - contents: read - steps: - - - name: Remove untagged versions of exascaleworkflowsandbox/spack-stack-gnu-openmpi - uses: actions/delete-package-versions@v5 - with: - package-name: 'exascaleworkflowsandbox/spack-stack-gnu-openmpi' - package-type: 'container' - min-versions-to-keep: 0 - delete-only-untagged-versions: 'true' - - - name: Remove untagged versions of exascaleworkflowsandbox/frontend - uses: actions/delete-package-versions@v5 - with: - package-name: 'exascaleworkflowsandbox/frontend' - package-type: 'container' - min-versions-to-keep: 0 - delete-only-untagged-versions: 'true' - - - name: Remove untagged versions of exascaleworkflowsandbox/master - uses: actions/delete-package-versions@v5 - with: - package-name: 'exascaleworkflowsandbox/master' - package-type: 'container' - min-versions-to-keep: 0 - delete-only-untagged-versions: 'true' - - - name: Remove untagged versions of exascaleworkflowsandbox/node - uses: actions/delete-package-versions@v5 - with: - package-name: 'exascaleworkflowsandbox/node' - package-type: 'container' - min-versions-to-keep: 0 - delete-only-untagged-versions: 'true' - - - name: Remove untagged versions of exascaleworkflowsandbox/spack-stack-gnu-openmpi-cache-amd64 - uses: actions/delete-package-versions@v5 - with: - package-name: 'exascaleworkflowsandbox/spack-stack-gnu-openmpi-cache-amd64' - package-type: 'container' - min-versions-to-keep: 0 - delete-only-untagged-versions: 'true' - - - name: Remove untagged versions of exascaleworkflowsandbox/spack-stack-gnu-openmpi-cache-arm64 - uses: actions/delete-package-versions@v5 - with: - package-name: 'exascaleworkflowsandbox/spack-stack-gnu-openmpi-cache-arm64' - package-type: 'container' - min-versions-to-keep: 0 - delete-only-untagged-versions: 'true' diff --git a/README.md b/README.md index d1f527aa..9215ea2d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ -[![ExascaleSandboxTests](https://github.com/NOAA-GSL/ExascaleWorkflowSandbox/actions/workflows/docker-slurm.yml/badge.svg)](https://github.com/NOAA-GSL/ExascaleWorkflowSandbox/actions/workflows/docker-slurm.yml) +[![ExascaleSandboxTests](https://github.com/NOAA-GSL/ExascaleWorkflowSandbox/actions/workflows/docker-slurm.yaml/badge.svg)](https://github.com/NOAA-GSL/ExascaleWorkflowSandbox/actions/workflows/docker-slurm.yaml) +[![MPAS App](https://github.com/NOAA-GSL/ExascaleWorkflowSandbox/actions/workflows/mpas-app.yaml/badge.svg)](https://github.com/NOAA-GSL/ExascaleWorkflowSandbox/actions/workflows/mpas-app.yaml) ``` This repository is a scientific product and is not official communication @@ -34,7 +35,7 @@ Python >= 3.9 is available. ``` python -m venv .chiltepin source .chiltepin/bin/activate -pip --use-deprecated=legacy-resolver install -e . # Do not forget the dot at the end +pip --use-deprecated=legacy-resolver install -e .[test] ``` Alternatively, a conda environment (anaconda3, miniconda3, miniforge, etc.) @@ -44,18 +45,17 @@ of certain known (and accepted) dependency conflicts that must be ignored. ``` conda create -n "chiltepin" python=3.10 source activate chiltepin -pip --use-deprecated=legacy-resolver install -e . # Do not forget the dot at the end +pip --use-deprecated=legacy-resolver install -e .[test] ``` +NOTE: The `[test]` ensures that dependencies required for running the tests are installed. + NOTE: There may be some warnings about incompatible package versions similar to the following: ``` ERROR: pip's legacy dependency resolver does not consider dependency conflicts when selecting packages. This behaviour is the source of the following dependency conflicts. -globus-compute-sdk 2.22.0 requires dill==0.3.6; python_version >= "3.11", but you'll have dill 0.3.8 which is incompatible. globus-identity-mapping 0.3.0 requires typing-extensions<4.10,>=4.9, but you'll have typing-extensions 4.12.2 which is incompatible. -globus-compute-endpoint 2.22.0 requires jsonschema<4.20,>=4.19.0, but you'll have jsonschema 4.22.0 which is incompatible. -globus-compute-endpoint 2.22.0 requires parsl==2024.3.18, but you'll have parsl 2024.6.3 which is incompatible. ``` Those dependency conflicts can be safely ignored. @@ -113,10 +113,12 @@ container environment), and run the tests ``` cd chiltepin -pip --use-deprecated=legacy-resolver install -e . +pip --use-deprecated=legacy-resolver install -e .[test] pytest --assert=plain --config=tests/config.yaml --platform=docker ``` +NOTE: the `[test]` ensures that dependencies required for running the tests are installed. + NOTE: Depending on how many cores your machine has and how many you've allocated to Docker, you may need to modify the `cores per node` setting in the configuration yaml file to match your machine's specifications to get all tests to pass. diff --git a/apps/mpas/bin/experiment.py b/apps/mpas/bin/experiment.py index 972285ec..a2a2907a 100644 --- a/apps/mpas/bin/experiment.py +++ b/apps/mpas/bin/experiment.py @@ -46,7 +46,10 @@ def main(user_config_file: Path) -> None: # Load Parsl resource configs resource_config_file = mpas_app / "config" / "resources.yaml" yaml_config = chiltepin.configure.parse_file(resource_config_file) - resources = chiltepin.configure.load(yaml_config[machine]) + resources = chiltepin.configure.load( + yaml_config[machine]["resources"], + resources=["service", "compute", "mpi"], + ) with parsl.load(resources): # Instantiate LimitedArea object diff --git a/pyproject.toml b/pyproject.toml index 98b4b40f..bfd00a50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,23 +1,42 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + [project] name = "chiltepin" version = "0.0.1" dependencies = [ - "pytest", - "pytest-flake8", - "pytest-black", - "pytest-isort", "globus-compute-sdk>=2.30.1", "globus-compute-endpoint>=2.30.1", "parsl[monitoring] @ git+https://github.com/Parsl/parsl.git@globus_compute_executor.py", "uwtools @ git+https://github.com/ufs-community/uwtools@v2.4.2#subdirectory=src", ] +requires-python = ">=3.9.0" +authors = [ + {name = "Christopher Harrop", email = "Christopher.W.Harrop@noaa.gov"} +] +maintainers = [ + {name = "Christopher Harrop", email = "Christopher.W.Harrop@noaa.gov"} +] +description = "Federated NWP Workflow Tools" +readme = "README.md" +license = {file = "LICENSE"} +keywords = ["federated", "workflow"] +classifiers = [ + "Development Status :: 4 - Beta", + "Programming Language :: Python", +] -#[project.scripts] -#chiltepin = "chiltepin.cli:main" +[project.optional-dependencies] +test = [ + "pytest", + "pytest-flake8", + "pytest-black", + "pytest-isort", +] -[build-system] -requires = ["setuptools>=61.0"] -build-backend = "setuptools.build_meta" +[project.scripts] +chiltepin = "chiltepin.cli:main" [tool.pytest.ini_options] addopts = [ diff --git a/src/chiltepin/configure.py b/src/chiltepin/configure.py index 1cf918b9..c6012db1 100644 --- a/src/chiltepin/configure.py +++ b/src/chiltepin/configure.py @@ -1,6 +1,7 @@ -from typing import Any, Dict +from typing import Any, Dict, List import yaml +from globus_compute_sdk import Executor from parsl.config import Config from parsl.executors import GlobusComputeExecutor, HighThroughputExecutor, MPIExecutor from parsl.launchers import SimpleLauncher @@ -180,7 +181,7 @@ def make_globus_compute_executor( """ e = GlobusComputeExecutor( label=name, - endpoint_id=config["endpoint id"], + executor=Executor(endpoint_id=config["endpoint id"]), user_endpoint_config={ "engine": config.get("engine", "GlobusComputeEngine"), "max_mpi_apps": config.get("max mpi apps", 1), @@ -198,7 +199,7 @@ def make_globus_compute_executor( return e -def load(config: Dict[str, Any]) -> Config: +def load(config: Dict[str, Any], resources: List[str] | None = None) -> Config: """Construct a list of Executors from the input configuration dictionary The list returned by this function can be used to construct a Parsl Config @@ -212,24 +213,30 @@ def load(config: Dict[str, Any]) -> Config: YAML configuration block that contains the configuration for a list of resources + resources: List[str] | None + A list of the labels of the resources to load. The default is None. + If None, all resources are loaded. Otherwise the resources whose + labels are in the list will be loaded. + Returns ------- Config """ executors = [] - for name, spec in config["resources"].items(): - match spec["engine"]: - case "HTEX": - # Make a HighThroughputExecutor - executors.append(make_htex_executor(name, spec)) - case "MPI": - # Make an MPIExecutor - executors.append(make_mpi_executor(name, spec)) - case "GlobusComputeEngine": - # Make a GlobusComputeExecutor for non-MPI jobs - executors.append(make_globus_compute_executor(name, spec)) - case "GlobusMPIEngine": - # Make a GlobusComputeExecutor for MPI jobs - executors.append(make_globus_compute_executor(name, spec)) + for name, spec in config.items(): + if resources is None or name in resources: + match spec["engine"]: + case "HTEX": + # Make a HighThroughputExecutor + executors.append(make_htex_executor(name, spec)) + case "MPI": + # Make an MPIExecutor + executors.append(make_mpi_executor(name, spec)) + case "GlobusComputeEngine": + # Make a GlobusComputeExecutor for non-MPI jobs + executors.append(make_globus_compute_executor(name, spec)) + case "GlobusMPIEngine": + # Make a GlobusComputeExecutor for MPI jobs + executors.append(make_globus_compute_executor(name, spec)) return Config(executors) diff --git a/tests/config.yaml b/tests/config.yaml index aadf8a41..e416b939 100644 --- a/tests/config.yaml +++ b/tests/config.yaml @@ -53,6 +53,31 @@ hercules: partition: "hercules" account: "gsd-hpcs" <<: *hercules-env + gc-mpi: + engine: GlobusMPIEngine + endpoint id: "{{ mpi_endpoint_id }}" + cores per node: 80 + nodes per block: 3 + max mpi apps: 2 + partition: "hercules" + account: "gsd-hpcs" + <<: *hercules-env + gc-service: + engine: GlobusComputeEngine + endpoint id: "{{ service_endpoint_id }}" + cores per node: 1 + nodes per block: 1 + partition: "service" + account: "gsd-hpcs" + <<: *hercules-env + gc-compute: + engine: GlobusComputeEngine + endpoint id: "{{ compute_endpoint_id }}" + cores per node: 80 + nodes per block: 1 + partition: "hercules" + account: "gsd-hpcs" + <<: *hercules-env hera: resources: @@ -78,6 +103,31 @@ hera: partition: "hera" account: "gsd-hpcs" <<: *hera-env + gc-mpi: + engine: GlobusMPIEngine + endpoint id: "{{ mpi_endpoint_id }}" + cores per node: 40 + nodes per block: 3 + max mpi apps: 2 + partition: "hera" + account: "gsd-hpcs" + <<: *hera-env + gc-service: + engine: GlobusComputeEngine + endpoint id: "{{ service_endpoint_id }}" + cores per node: 1 + nodes per block: 1 + partition: "service" + account: "gsd-hpcs" + <<: *hera-env + gc-compute: + engine: GlobusComputeEngine + endpoint id: "{{ compute_endpoint_id }}" + cores per node: 40 + nodes per block: 1 + partition: "hera" + account: "gsd-hpcs" + <<: *hera-env docker: resources: @@ -103,3 +153,28 @@ docker: partition: "slurmpar" account: "" <<: *docker-env + gc-mpi: + engine: GlobusMPIEngine + endpoint id: "{{ mpi_endpoint_id }}" + cores per node: 8 + nodes per block: 3 + max mpi apps: 2 + partition: "slurmpar" + account: "" + <<: *docker-env + gc-service: + engine: GlobusComputeEngine + endpoint id: "{{ service_endpoint_id }}" + cores per node: 1 + nodes per block: 1 + partition: "slurmpar" + account: "" + <<: *docker-env + gc-compute: + engine: GlobusComputeEngine + endpoint id: "{{ compute_endpoint_id }}" + cores per node: 8 + nodes per block: 1 + partition: "slurmpar" + account: "" + <<: *docker-env diff --git a/tests/templates/compute.yaml b/tests/templates/gc-compute.yaml similarity index 100% rename from tests/templates/compute.yaml rename to tests/templates/gc-compute.yaml diff --git a/tests/templates/mpi.yaml b/tests/templates/gc-mpi.yaml similarity index 100% rename from tests/templates/mpi.yaml rename to tests/templates/gc-mpi.yaml diff --git a/tests/templates/service.yaml b/tests/templates/gc-service.yaml similarity index 100% rename from tests/templates/service.yaml rename to tests/templates/gc-service.yaml diff --git a/tests/test_globus_compute_hello.py b/tests/test_globus_compute_hello.py index 73690ac5..2d351603 100644 --- a/tests/test_globus_compute_hello.py +++ b/tests/test_globus_compute_hello.py @@ -1,30 +1,37 @@ import pathlib import subprocess +import parsl import pytest -from globus_compute_sdk import Executor -from jinja2 import Environment, FileSystemLoader +import yaml +from jinja2 import BaseLoader, Environment, FileSystemLoader import chiltepin.configure +from chiltepin.tasks import python_task # Set up fixture to initialize and cleanup Parsl @pytest.fixture(scope="module") def config(config_file, platform): + pwd = pathlib.Path(__file__).parent.resolve() yaml_config = chiltepin.configure.parse_file(config_file) + yaml_config[platform]["resources"]["gc-service"]["environment"].append( + f"export PYTHONPATH={pwd.parent.resolve()}" + ) resources = yaml_config[platform]["resources"] - return {"resources": resources} + return resources def test_endpoint_configure(config): pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "configure", - "service", + f"{endpoint}", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -35,28 +42,28 @@ def test_endpoint_configure(config): # Customize endpoint configs for this platform using Jinja2 templates jinja_env = Environment(loader=FileSystemLoader(f"{pwd}/templates/")) - for endpoint in ["service"]: - template = jinja_env.get_template(f"{endpoint}.yaml") - content = template.render( - partition=config["resources"][endpoint]["partition"], - account=config["resources"][endpoint]["account"], - worker_init=f"export PYTHONPATH={pwd.parent.resolve()}", - ) - with open( - f"{pwd}/globus_compute/{endpoint}/config.yaml", mode="w", encoding="utf-8" - ) as gc_config: - gc_config.write(content) + template = jinja_env.get_template(f"{endpoint}.yaml") + content = template.render( + partition=config[endpoint]["partition"], + account=config[endpoint]["account"], + worker_init=";".join(config[endpoint]["environment"]), + ) + with open( + f"{pwd}/globus_compute/{endpoint}/config.yaml", mode="w", encoding="utf-8" + ) as gc_config: + gc_config.write(content) def test_endpoint_start(): pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "start", - "service", + f"{endpoint}", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -66,13 +73,15 @@ def test_endpoint_start(): assert p.returncode == 0 -def test_hello_endpoint(): +def test_hello_endpoint(config): + @python_task def hello(): return "Hello" pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( - f"globus-compute-endpoint -c {pwd}/globus_compute list | grep service | cut -d' ' -f 2", + f"globus-compute-endpoint -c {pwd}/globus_compute list | grep {endpoint} | cut -d' ' -f 2", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, @@ -80,23 +89,30 @@ def hello(): shell=True, ) assert p.returncode == 0 - hello_endpoint_id = p.stdout.strip() assert len(hello_endpoint_id) == 36 - with Executor(endpoint_id=hello_endpoint_id) as gce: - future = gce.submit(hello) + + config_string = yaml.dump(config) + template = Environment(loader=BaseLoader()).from_string(config_string) + content = template.render(service_endpoint_id=hello_endpoint_id) + + content_yaml = yaml.safe_load(content) + resources = chiltepin.configure.load(content_yaml, resources=[endpoint]) + with parsl.load(resources): + future = hello(executor=endpoint) assert future.result() == "Hello" def test_endpoint_stop(): pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "stop", - "service", + f"{endpoint}", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -108,6 +124,7 @@ def test_endpoint_stop(): def test_endpoint_delete(): pwd = pathlib.Path(__file__).parent.resolve() + endpoint = "gc-service" p = subprocess.run( [ "globus-compute-endpoint", @@ -115,7 +132,7 @@ def test_endpoint_delete(): f"{pwd}/globus_compute", "delete", "--yes", - "service", + f"{endpoint}", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, diff --git a/tests/test_globus_compute_mpi.py b/tests/test_globus_compute_mpi.py index fb384a2e..704aad88 100644 --- a/tests/test_globus_compute_mpi.py +++ b/tests/test_globus_compute_mpi.py @@ -4,21 +4,28 @@ import subprocess from datetime import datetime as dt +import parsl import pytest -from globus_compute_sdk import Executor, MPIFunction, ShellFunction -from jinja2 import Environment, FileSystemLoader +import yaml +from jinja2 import BaseLoader, Environment, FileSystemLoader import chiltepin.configure +from chiltepin.tasks import bash_task # Set up fixture to initialize and cleanup Parsl @pytest.fixture(scope="module") def config(config_file, platform): + pwd = pathlib.Path(__file__).parent.resolve() yaml_config = chiltepin.configure.parse_file(config_file) + yaml_config[platform]["resources"]["gc-compute"]["environment"].append( + f"export PYTHONPATH={pwd.parent.resolve()}" + ) + yaml_config[platform]["resources"]["gc-mpi"]["environment"].append( + f"export PYTHONPATH={pwd.parent.resolve()}" + ) resources = yaml_config[platform]["resources"] - environment = "\n".join(resources["mpi"]["environment"]) - - return {"resources": resources, "environment": environment} + return resources # Local function to get endpoint ids @@ -40,35 +47,35 @@ def _get_endpoint_ids(): ) assert p.returncode == 0 - # Get the uuid of the mpi endpoint - mpi_endpoint_regex = re.compile(r"\| ([0-9a-f\-]{36}) \| Running\s+\| mpi\s+\|") + # Get the uuid of the gc-mpi endpoint + mpi_endpoint_regex = re.compile(r"\| ([0-9a-f\-]{36}) \| Running\s+\| gc-mpi\s+\|") match = mpi_endpoint_regex.search(p.stdout) - mpi_endpoint_id = match.group(1) - assert len(mpi_endpoint_id) == 36 + gc_mpi_endpoint_id = match.group(1) + assert len(gc_mpi_endpoint_id) == 36 - # Get the uuid of the compute endpoint + # Get the uuid of the gc-compute endpoint compute_endpoint_regex = re.compile( - r"\| ([0-9a-f\-]{36}) \| Running\s+\| compute\s+\|" + r"\| ([0-9a-f\-]{36}) \| Running\s+\| gc-compute\s+\|" ) match = compute_endpoint_regex.search(p.stdout) - compute_endpoint_id = match.group(1) - assert len(compute_endpoint_id) == 36 + gc_compute_endpoint_id = match.group(1) + assert len(gc_compute_endpoint_id) == 36 - return mpi_endpoint_id, compute_endpoint_id + return gc_mpi_endpoint_id, gc_compute_endpoint_id # Test endpoint configure def test_endpoint_configure(config): pwd = pathlib.Path(__file__).parent.resolve() - # Configure compute endpoint + # Configure gc-compute endpoint p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "configure", - "compute", + "gc-compute", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -77,14 +84,14 @@ def test_endpoint_configure(config): ) assert p.returncode == 0 - # Configure MPI endpoint + # Configure gc-mpi endpoint p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "configure", - "mpi", + "gc-mpi", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -95,12 +102,12 @@ def test_endpoint_configure(config): # Customize endpoint configs for this platform using Jinja2 templates jinja_env = Environment(loader=FileSystemLoader(f"{pwd}/templates/")) - for endpoint in ["compute", "mpi"]: + for endpoint in ["gc-compute", "gc-mpi"]: template = jinja_env.get_template(f"{endpoint}.yaml") content = template.render( - partition=config["resources"][endpoint]["partition"], - account=config["resources"][endpoint]["account"], - worker_init=f"export PYTHONPATH={pwd.parent.resolve()}", + partition=config[endpoint]["partition"], + account=config[endpoint]["account"], + worker_init=";".join(config[endpoint]["environment"]), ) with open( f"{pwd}/globus_compute/{endpoint}/config.yaml", mode="w", encoding="utf-8" @@ -112,14 +119,14 @@ def test_endpoint_configure(config): def test_endpoint_start(): pwd = pathlib.Path(__file__).parent.resolve() - # Start compute endpoint + # Start gc-compute endpoint p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "start", - "compute", + "gc-compute", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -128,14 +135,14 @@ def test_endpoint_start(): ) assert p.returncode == 0 - # Start MPI endpoint + # Start gc-mpi endpoint p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "start", - "mpi", + "gc-mpi", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -149,30 +156,35 @@ def test_endpoint_start(): def test_endpoint_mpi_hello(config): pwd = pathlib.Path(__file__).parent.resolve() - # Define a ShellFunction to compile the MPI code - compile_func = ShellFunction( - """ - {env} + # Define a bash task to compile the MPI code + @bash_task + def compile_func( + dirpath, + stdout=None, + stderr=None, + ): + return f""" cd {dirpath} $CHILTEPIN_MPIF90 -o mpi_hello.exe mpi_hello.f90 - """, - stdout=os.path.join(pwd, "globus_compute_mpi_hello_compile.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_hello_compile.err"), - ) + """ - # Define a MPIFunction to run the MPI program - hello_func = MPIFunction( - """true; # Force the default prefix to launch a no-op - {env} + # Define a bash task to run the MPI program + @bash_task + def hello_func( + dirpath, + stdout=None, + stderr=None, + parsl_resource_specification=None, + ): + return f""" cd {dirpath} $PARSL_MPI_PREFIX --overcommit ./mpi_hello.exe - """, - stdout=os.path.join(pwd, "globus_compute_mpi_hello_run.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_hello_run.err"), - ) + """ # Get a listing of the endpoints - mpi_endpoint_id, compute_endpoint_id = _get_endpoint_ids() + gc_mpi_endpoint_id, gc_compute_endpoint_id = _get_endpoint_ids() + assert len(gc_mpi_endpoint_id) == 36 + assert len(gc_compute_endpoint_id) == 36 # Remove any previous output if necessary if os.path.exists(pwd / "globus_compute_mpi_hello_compile.out"): @@ -184,30 +196,41 @@ def test_endpoint_mpi_hello(config): if os.path.exists(pwd / "globus_compute_mpi_hello_run.err"): os.remove(pwd / "globus_compute_mpi_hello_run.err") - # Compile the MPI program in the compute endpoint - with Executor(endpoint_id=compute_endpoint_id) as gce: - future = gce.submit( - compile_func, - env=config["environment"], - dirpath=pwd, + config_string = yaml.dump(config) + template = Environment(loader=BaseLoader()).from_string(config_string) + content = template.render( + compute_endpoint_id=gc_compute_endpoint_id, + mpi_endpoint_id=gc_mpi_endpoint_id, + ) + + content_yaml = yaml.safe_load(content) + resources = chiltepin.configure.load( + content_yaml, + resources=["gc-compute", "gc-mpi"], + ) + with parsl.load(resources): + future = compile_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_hello_compile.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_hello_compile.err"), + executor="gc-compute", ) r = future.result() - assert r.returncode == 0 - - # Run the MPI program in the MPI endpoint - with Executor(endpoint_id=mpi_endpoint_id) as gce: - gce.resource_specification = { - "num_nodes": 3, # Number of nodes required for the application instance - "num_ranks": 6, # Number of ranks in total - "ranks_per_node": 2, # Number of ranks / application elements to be launched per node - } - future = gce.submit( - hello_func, - env=config["environment"], - dirpath=pwd, + assert r == 0 + + future = hello_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_hello_run.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_hello_run.err"), + executor="gc-mpi", + parsl_resource_specification={ + "num_nodes": 3, # Number of nodes required for the application instance + "num_ranks": 6, # Number of ranks in total + "ranks_per_node": 2, # Number of ranks / application elements to be launched per node + }, ) r = future.result() - assert r.returncode == 0 + assert r == 0 # Check output with open(pwd / "globus_compute_mpi_hello_run.out", "r") as f: @@ -219,41 +242,26 @@ def test_endpoint_mpi_hello(config): def test_endpoint_mpi_pi(config): pwd = pathlib.Path(__file__).parent.resolve() - # Define a ShellFunction to compile the MPI code - compile_func = ShellFunction( - """ - {env} + # Define a bash task to compile the MPI code + @bash_task + def compile_func(dirpath, stdout=None, stderr=None): + return f""" cd {dirpath} $CHILTEPIN_MPIF90 -o mpi_pi.exe mpi_pi.f90 - """, - stdout=os.path.join(pwd, "globus_compute_mpi_pi_compile.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_pi_compile.err"), - ) - - # Define a MPIFunction to run the MPI program - pi1_func = MPIFunction( - """true; # Force the default prefix to launch a no-op - {env} - cd {dirpath} - $PARSL_MPI_PREFIX --overcommit ./mpi_pi.exe - """, - stdout=os.path.join(pwd, "globus_compute_mpi_pi1_run.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_pi1_run.err"), - ) + """ - # Define a MPIFunction to run the MPI program - pi2_func = MPIFunction( - """true; # Force the default prefix to launch a no-op - {env} + # Define a bash task to run the MPI program + @bash_task + def pi_func(dirpath, stdout=None, stderr=None, parsl_resource_specification=None): + return f""" cd {dirpath} $PARSL_MPI_PREFIX --overcommit ./mpi_pi.exe - """, - stdout=os.path.join(pwd, "globus_compute_mpi_pi2_run.out"), - stderr=os.path.join(pwd, "globus_compute_mpi_pi2_run.err"), - ) + """ # Get a listing of the endpoints - mpi_endpoint_id, compute_endpoint_id = _get_endpoint_ids() + gc_mpi_endpoint_id, gc_compute_endpoint_id = _get_endpoint_ids() + assert len(gc_mpi_endpoint_id) == 36 + assert len(gc_compute_endpoint_id) == 36 # Remove any previous output if necessary if os.path.exists(pwd / "globus_compute_mpi_pi_compile.out"): @@ -269,45 +277,57 @@ def test_endpoint_mpi_pi(config): if os.path.exists(pwd / "globus_compute_mpi_pi2_run.err"): os.remove(pwd / "globus_compute_mpi_pi2_run.err") - # Compile the MPI program in the compute endpoint - with Executor(endpoint_id=compute_endpoint_id) as gce: - future = gce.submit( - compile_func, - env=config["environment"], - dirpath=pwd, - ) - r = future.result() - assert r.returncode == 0 + config_string = yaml.dump(config) + template = Environment(loader=BaseLoader()).from_string(config_string) + content = template.render( + compute_endpoint_id=gc_compute_endpoint_id, + mpi_endpoint_id=gc_mpi_endpoint_id, + ) - # Run the MPI program in the MPI endpoint - with Executor(endpoint_id=mpi_endpoint_id) as gce: + content_yaml = yaml.safe_load(content) + resources = chiltepin.configure.load( + content_yaml, + resources=["gc-compute", "gc-mpi"], + ) + with parsl.load(resources): cores_per_node = 8 - gce.resource_specification = { - "num_nodes": 2, # Number of nodes required for the application instance - "num_ranks": 2 * cores_per_node, # Number of ranks in total - "ranks_per_node": cores_per_node, # Number of ranks / application elements to be launched per node - } - future1 = gce.submit( - pi1_func, - env=config["environment"], - dirpath=pwd, + future = compile_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_pi_compile.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_pi_compile.err"), + executor="gc-compute", + ) + r = future.result() + assert r == 0 + + future1 = pi_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_pi1_run.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_pi1_run.err"), + executor="gc-mpi", + parsl_resource_specification={ + "num_nodes": 2, # Number of nodes required for the application instance + "num_ranks": 2 * cores_per_node, # Number of ranks in total + "ranks_per_node": cores_per_node, # Number of ranks / application elements to be launched per node + }, ) - gce.resource_specification = { - "num_nodes": 1, # Number of nodes required for the application instance - "num_ranks": cores_per_node, # Number of ranks in total - "ranks_per_node": cores_per_node, # Number of ranks / application elements to be launched per node - } - future2 = gce.submit( - pi2_func, - env=config["environment"], - dirpath=pwd, + future2 = pi_func( + pwd, + stdout=os.path.join(pwd, "globus_compute_mpi_pi2_run.out"), + stderr=os.path.join(pwd, "globus_compute_mpi_pi2_run.err"), + executor="gc-mpi", + parsl_resource_specification={ + "num_nodes": 1, # Number of nodes required for the application instance + "num_ranks": cores_per_node, # Number of ranks in total + "ranks_per_node": cores_per_node, # Number of ranks / application elements to be launched per node + }, ) r1 = future1.result() - assert r1.returncode == 0 + assert r1 == 0 r2 = future2.result() - assert r2.returncode == 0 + assert r2 == 0 # Extract the hostnames used by pi1 with open(pwd / "globus_compute_mpi_pi1_run.out", "r") as f: @@ -344,14 +364,14 @@ def test_endpoint_mpi_pi(config): def test_endpoint_stop(): pwd = pathlib.Path(__file__).parent.resolve() - # Stop the compute endpoint + # Stop the gc-compute endpoint p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "stop", - "compute", + "gc-compute", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -360,14 +380,14 @@ def test_endpoint_stop(): ) assert p.returncode == 0 - # Stop the MPI endpoint + # Stop the gc-mpi endpoint p = subprocess.run( [ "globus-compute-endpoint", "-c", f"{pwd}/globus_compute", "stop", - "mpi", + "gc-mpi", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -378,10 +398,10 @@ def test_endpoint_stop(): # Test endpoint delete -def test_endpoint_delete(config): +def test_endpoint_delete(): pwd = pathlib.Path(__file__).parent.resolve() - # Delete compute endpoint + # Delete gc-compute endpoint p = subprocess.run( [ "globus-compute-endpoint", @@ -389,7 +409,7 @@ def test_endpoint_delete(config): f"{pwd}/globus_compute", "delete", "--yes", - "compute", + "gc-compute", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -398,7 +418,7 @@ def test_endpoint_delete(config): ) assert p.returncode == 0 - # Delete MPI endpoint + # Delete gc-mpi endpoint p = subprocess.run( [ "globus-compute-endpoint", @@ -406,7 +426,7 @@ def test_endpoint_delete(config): f"{pwd}/globus_compute", "delete", "--yes", - "mpi", + "gc-mpi", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, diff --git a/tests/test_parsl_mpi_hello.py b/tests/test_parsl_mpi_hello.py index b4ef64c9..4b2f47f1 100644 --- a/tests/test_parsl_mpi_hello.py +++ b/tests/test_parsl_mpi_hello.py @@ -76,7 +76,9 @@ def config(config_file, platform): yaml_config[platform]["resources"]["mpi"]["environment"].append( f"export PYTHONPATH={pwd.parent.resolve()}" ) - resources = chiltepin.configure.load(yaml_config[platform]) + resources = chiltepin.configure.load( + yaml_config[platform]["resources"], resources=["compute", "mpi"] + ) with parsl.load(resources): yield {"resources": resources} parsl.clear() diff --git a/tests/test_qg_install.py b/tests/test_qg_install.py index d9287c64..d8537403 100644 --- a/tests/test_qg_install.py +++ b/tests/test_qg_install.py @@ -20,7 +20,9 @@ def config(config_file, platform): yaml_config[platform]["resources"]["compute"]["environment"].append( f"export PYTHONPATH={pwd.parent.resolve()}" ) - resources = chiltepin.configure.load(yaml_config[platform]) + resources = chiltepin.configure.load( + yaml_config[platform]["resources"], resources=["service", "compute"] + ) with parsl.load(resources): yield {"resources": resources}