Skip to content

Commit

Permalink
Update code and tests to use GlobusComputeExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
christopherwharrop-noaa committed Nov 22, 2024
1 parent b78385f commit 2c8bffe
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 136 deletions.
2 changes: 1 addition & 1 deletion apps/mpas/bin/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def main(user_config_file: Path) -> None:
# Load Parsl resource configs
resource_config_file = mpas_app / "config" / "resources.yaml"
yaml_config = chiltepin.configure.parse_file(resource_config_file)
resources = chiltepin.configure.load(yaml_config[machine])
resources = chiltepin.configure.load(yaml_config[machine]["resources"], resources=["service", "compute", "mpi"])
with parsl.load(resources):

# Instantiate LimitedArea object
Expand Down
41 changes: 24 additions & 17 deletions src/chiltepin/configure.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import Any, Dict
from typing import Any, Dict, List

import yaml
from parsl.config import Config
from parsl.executors import GlobusComputeExecutor, HighThroughputExecutor, MPIExecutor
from parsl.launchers import SimpleLauncher
from parsl.providers import SlurmProvider
from globus_compute_sdk import Executor


# Define function to parse yaml config
Expand Down Expand Up @@ -180,7 +181,7 @@ def make_globus_compute_executor(
"""
e = GlobusComputeExecutor(
label=name,
endpoint_id=config["endpoint id"],
executor=Executor(endpoint_id=config["endpoint id"]),
user_endpoint_config={
"engine": config.get("engine", "GlobusComputeEngine"),
"max_mpi_apps": config.get("max mpi apps", 1),
Expand All @@ -198,7 +199,7 @@ def make_globus_compute_executor(
return e


def load(config: Dict[str, Any]) -> Config:
def load(config: Dict[str, Any], resources: List[str]|None=None) -> Config:
"""Construct a list of Executors from the input configuration dictionary
The list returned by this function can be used to construct a Parsl Config
Expand All @@ -212,24 +213,30 @@ def load(config: Dict[str, Any]) -> Config:
YAML configuration block that contains the configuration for a list of
resources
resources: List[str] | None
A list of the labels of the resources to load. The default is None.
If None, all resources are loaded. Otherwise the resources whose
labels are in the list will be loaded.
Returns
-------
Config
"""
executors = []
for name, spec in config["resources"].items():
match spec["engine"]:
case "HTEX":
# Make a HighThroughputExecutor
executors.append(make_htex_executor(name, spec))
case "MPI":
# Make an MPIExecutor
executors.append(make_mpi_executor(name, spec))
case "GlobusComputeEngine":
# Make a GlobusComputeExecutor for non-MPI jobs
executors.append(make_globus_compute_executor(name, spec))
case "GlobusMPIEngine":
# Make a GlobusComputeExecutor for MPI jobs
executors.append(make_globus_compute_executor(name, spec))
for name, spec in config.items():
if resources is None or name in resources:
match spec["engine"]:
case "HTEX":
# Make a HighThroughputExecutor
executors.append(make_htex_executor(name, spec))
case "MPI":
# Make an MPIExecutor
executors.append(make_mpi_executor(name, spec))
case "GlobusComputeEngine":
# Make a GlobusComputeExecutor for non-MPI jobs
executors.append(make_globus_compute_executor(name, spec))
case "GlobusMPIEngine":
# Make a GlobusComputeExecutor for MPI jobs
executors.append(make_globus_compute_executor(name, spec))
return Config(executors)
27 changes: 18 additions & 9 deletions tests/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,25 @@ hercules:
account: "gsd-hpcs"
<<: *hercules-env
gc-mpi:
engine: MPI
engine: GlobusMPIEngine
endpoint id: "{{ mpi_endpoint_id }}"
cores per node: 80
nodes per block: 3
max mpi apps: 2
partition: "hercules"
account: "gsd-hpcs"
<<: *hercules-env
gc-service:
engine: HTEX
engine: GlobusComputeEngine
endpoint id: "{{ service_endpoint_id }}"
cores per node: 1
nodes per block: 1
partition: "service"
account: "gsd-hpcs"
<<: *hercules-env
gc-compute:
engine: HTEX
engine: GlobusComputeEngine
endpoint id: "{{ compute_endpoint_id }}"
cores per node: 80
nodes per block: 1
partition: "hercules"
Expand Down Expand Up @@ -101,22 +104,25 @@ hera:
account: "gsd-hpcs"
<<: *hera-env
gc-mpi:
engine: MPI
engine: GlobusMPIEngine
endpoint id: "{{ mpi_endpoint_id }}"
cores per node: 40
nodes per block: 3
max mpi apps: 2
partition: "hera"
account: "gsd-hpcs"
<<: *hera-env
gc-service:
engine: HTEX
engine: GlobusComputeEngine
endpoint id: "{{ service_endpoint_id }}"
cores per node: 1
nodes per block: 1
partition: "service"
account: "gsd-hpcs"
<<: *hera-env
gc-compute:
engine: HTEX
engine: GlobusComputeEngine
endpoint id: "{{ compute_endpoint_id }}"
cores per node: 40
nodes per block: 1
partition: "hera"
Expand Down Expand Up @@ -148,22 +154,25 @@ docker:
account: ""
<<: *docker-env
gc-mpi:
engine: MPI
engine: GlobusMPIEngine
endpoint id: "{{ mpi_endpoint_id }}"
cores per node: 8
nodes per block: 3
max mpi apps: 2
partition: "slurmpar"
account: ""
<<: *docker-env
gc-service:
engine: HTEX
engine: GlobusComputeEngine
endpoint id: "{{ service_endpoint_id }}"
cores per node: 1
nodes per block: 1
partition: "slurmpar"
account: ""
<<: *docker-env
gc-compute:
engine: HTEX
engine: GlobusComputeEngine
endpoint id: "{{ compute_endpoint_id }}"
cores per node: 8
nodes per block: 1
partition: "slurmpar"
Expand Down
61 changes: 38 additions & 23 deletions tests/test_globus_compute_hello.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import pathlib
import subprocess

import parsl
from chiltepin.tasks import python_task
import pytest
from globus_compute_sdk import Executor
from jinja2 import Environment, FileSystemLoader
from jinja2 import BaseLoader, Environment, FileSystemLoader
import yaml

import chiltepin.configure

Expand All @@ -17,18 +20,19 @@ def config(config_file, platform):
f"export PYTHONPATH={pwd.parent.resolve()}"
)
resources = yaml_config[platform]["resources"]
return {"resources": resources}
return resources


def test_endpoint_configure(config):
pwd = pathlib.Path(__file__).parent.resolve()
endpoint = "gc-service"
p = subprocess.run(
[
"globus-compute-endpoint",
"-c",
f"{pwd}/globus_compute",
"configure",
"gc-service",
f"{endpoint}",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
Expand All @@ -39,28 +43,28 @@ def test_endpoint_configure(config):

# Customize endpoint configs for this platform using Jinja2 templates
jinja_env = Environment(loader=FileSystemLoader(f"{pwd}/templates/"))
for endpoint in ["gc-service"]:
template = jinja_env.get_template(f"{endpoint}.yaml")
content = template.render(
partition=config["resources"][endpoint]["partition"],
account=config["resources"][endpoint]["account"],
worker_init=";".join(config["resources"][endpoint]["environment"])
)
with open(
f"{pwd}/globus_compute/{endpoint}/config.yaml", mode="w", encoding="utf-8"
) as gc_config:
gc_config.write(content)
template = jinja_env.get_template(f"{endpoint}.yaml")
content = template.render(
partition=config[endpoint]["partition"],
account=config[endpoint]["account"],
worker_init=";".join(config[endpoint]["environment"])
)
with open(
f"{pwd}/globus_compute/{endpoint}/config.yaml", mode="w", encoding="utf-8"
) as gc_config:
gc_config.write(content)


def test_endpoint_start():
pwd = pathlib.Path(__file__).parent.resolve()
endpoint = "gc-service"
p = subprocess.run(
[
"globus-compute-endpoint",
"-c",
f"{pwd}/globus_compute",
"start",
"gc-service",
f"{endpoint}",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
Expand All @@ -70,37 +74,47 @@ def test_endpoint_start():
assert p.returncode == 0


def test_hello_endpoint():
def test_hello_endpoint(config):
@python_task
def hello():
return "Hello"

pwd = pathlib.Path(__file__).parent.resolve()
endpoint = "gc-service"
p = subprocess.run(
f"globus-compute-endpoint -c {pwd}/globus_compute list | grep gc-service | cut -d' ' -f 2",
f"globus-compute-endpoint -c {pwd}/globus_compute list | grep {endpoint} | cut -d' ' -f 2",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=60,
shell=True,
)
assert p.returncode == 0

hello_endpoint_id = p.stdout.strip()
assert len(hello_endpoint_id) == 36
with Executor(endpoint_id=hello_endpoint_id) as gce:
future = gce.submit(hello)
assert future.result() == "Hello"

config_string = yaml.dump(config)
template = Environment(loader=BaseLoader()).from_string(config_string)
content = template.render(
service_endpoint_id=hello_endpoint_id
)

content_yaml = yaml.safe_load(content)
resources = chiltepin.configure.load(content_yaml, resources=[endpoint])
with parsl.load(resources):
future = hello(executor=endpoint)
assert future.result() == "Hello"

def test_endpoint_stop():
pwd = pathlib.Path(__file__).parent.resolve()
endpoint = "gc-service"
p = subprocess.run(
[
"globus-compute-endpoint",
"-c",
f"{pwd}/globus_compute",
"stop",
"gc-service",
f"{endpoint}",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
Expand All @@ -112,14 +126,15 @@ def test_endpoint_stop():

def test_endpoint_delete():
pwd = pathlib.Path(__file__).parent.resolve()
endpoint = "gc-service"
p = subprocess.run(
[
"globus-compute-endpoint",
"-c",
f"{pwd}/globus_compute",
"delete",
"--yes",
"gc-service",
f"{endpoint}",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
Expand Down
Loading

0 comments on commit 2c8bffe

Please sign in to comment.