Skip to content

Commit

Permalink
Merge pull request #6209 from grondo/flux-jobs-include
Browse files Browse the repository at this point in the history
flux-jobs: add `-i, --include=HOSTS|RANKS` option
  • Loading branch information
mergify[bot] authored Aug 12, 2024
2 parents 7d18dd7 + b2868cc commit d7bd7d4
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 3 deletions.
13 changes: 13 additions & 0 deletions doc/man1/flux-jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ OPTIONS
List jobs in a specific queue or queues. Multiple queues may be separated
by a comma or by using the :option:`-q, --queue` option multiple times.

.. option:: -i, --include=HOSTS|RANKS

List only jobs where the assigned resources intersect with the supplied
argument, which may be specified either as an RFC 22 idset of broker ranks
or an RFC 29 hostlist of host names. It is not an error to specify ranks or
hosts which do not exist.

.. option:: -c, --count=N

Limit output to N jobs (default 1000)
Expand Down Expand Up @@ -618,6 +625,12 @@ RESOURCES

.. include:: common/resources.rst

FLUX RFC
========

| :doc:`rfc:spec_22`
| :doc:`rfc:spec_29`
SEE ALSO
========

Expand Down
26 changes: 23 additions & 3 deletions src/bindings/python/flux/job/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,15 @@ def job_list(
since=0.0,
name=None,
queue=None,
constraint=None,
):
# N.B. an "and" operation with no values returns everything
constraint = {"and": []}
if constraint is None:
# N.B. an "and" operation with no values returns everything
constraint = {"and": []}
else:
# O/w, a provided constraint will be anded with other parameters below:
constraint = {"and": [constraint]}

if userid != flux.constants.FLUX_USERID_UNKNOWN:
constraint["and"].append({"userid": [userid]})
if name:
Expand Down Expand Up @@ -83,7 +89,13 @@ def job_list(


def job_list_inactive(
flux_handle, since=0.0, max_entries=1000, attrs=["all"], name=None, queue=None
flux_handle,
since=0.0,
max_entries=1000,
attrs=["all"],
name=None,
queue=None,
constraint=None,
):
"""Same as ``flux.job.list.job_list``, but lists only inactive jobs."""
return job_list(
Expand All @@ -95,6 +107,7 @@ def job_list_inactive(
since=since,
name=name,
queue=queue,
constraint=constraint,
)


Expand Down Expand Up @@ -207,6 +220,10 @@ class JobList:
:since: Limit jobs to those that have been active since a given timestamp.
:name: Limit jobs to those with a specific name.
:queue: Limit jobs to those submitted to a specific queue or queues
:constraint: An RFC 31 Constraint object describing a job-list constraint
as documented in RFC 43 Constraint Operators section. This constraint
may then be joined with other constraints provided by above parameters
via the ``and`` operator.
"""

# pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -240,6 +257,7 @@ def __init__(
since=0.0,
name=None,
queue=None,
constraint=None,
):
self.handle = flux_handle
self.attrs = list(attrs)
Expand All @@ -255,6 +273,7 @@ def __init__(
for x in fname.split(","):
self.add_filter(x)
self.set_user(user)
self.constraint = constraint

def set_user(self, user):
"""Only return jobs for user (may be a username or userid)"""
Expand Down Expand Up @@ -306,6 +325,7 @@ def fetch_jobs(self):
since=self.since,
name=self.name,
queue=self.queue,
constraint=self.constraint,
)

def jobs(self):
Expand Down
21 changes: 21 additions & 0 deletions src/cmd/flux-jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import sys

import flux.constants
from flux.hostlist import Hostlist
from flux.idset import IDset
from flux.job import JobID, JobInfo, JobInfoFormat, JobList, job_fields_to_attrs
from flux.job.stats import JobStats
from flux.util import (
Expand Down Expand Up @@ -156,6 +158,16 @@ def fetch_jobs_flux(args, fields, flux_handle=None):
if not args.filter:
args.filter = {"pending", "running"}

constraint = None
if args.include:
try:
constraint = {"ranks": [IDset(args.include).encode()]}
except ValueError:
try:
constraint = {"hostlist": [Hostlist(args.include).encode()]}
except ValueError:
raise ValueError(f"-i/--include: invalid targets: {args.include}")

jobs_rpc = JobList(
flux_handle,
ids=args.jobids,
Expand All @@ -166,6 +178,7 @@ def fetch_jobs_flux(args, fields, flux_handle=None):
since=since,
name=args.name,
queue=args.queue,
constraint=constraint,
)

jobs = jobs_rpc.jobs()
Expand Down Expand Up @@ -270,6 +283,14 @@ def parse_args():
metavar="QUEUE,...",
help="Limit output to specific queue or queues",
)
parser.add_argument(
"-i",
"--include",
type=str,
metavar="HOSTS|RANKS",
help="Limit output to jobs that were allocated to the specified "
+ "HOSTS or RANKS provided as a hostlist or idset.",
)
parser.add_argument(
"-o",
"--format",
Expand Down
12 changes: 12 additions & 0 deletions t/python/t0013-job-list.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,18 @@ def test_18_list_id_fail(self):
with self.assertRaises(FileNotFoundError):
rpc_handle.get_jobinfo()

def test_19_list_inactive_constraints(self):
for job in flux.job.job_list_inactive(
self.fh, constraint={"name": ["sleep"]}
).get_jobinfos():
self.assertEqual(job.name, "sleep")

def test_20_list_constraints(self):
for job in flux.job.job_list(
self.fh, constraint={"name": ["sleep"]}
).get_jobinfos():
self.assertEqual(job.name, "sleep")


if __name__ == "__main__":
from subflux import rerun_under_flux
Expand Down
22 changes: 22 additions & 0 deletions t/t2800-jobs-cmd.t
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,28 @@ test_expect_success 'flux-jobs --count works' '
test $count -eq 8
'

#
# -i, --include tests
#
test_expect_success 'flux-jobs -i, --include works with ranks' '
for rank in $(flux jobs -ai 0 -no {ranks}); do
test $rank -eq 0
done
'
test_expect_success 'flux-jobs -i, --include works with ranks' '
for rank in $(flux jobs -ai 0,3 -no {ranks}); do
test $rank -eq 0 -o $rank -eq 3
done
'
test_expect_success 'flux jobs -i, --include works with hosts' '
for host in $(flux jobs -ai $(hostname) -no {nodelist}); do
test $host = $(hostname)
done
'
test_expect_success 'flux jobs -i, --include fails with bad idset/hostlist' '
test_must_fail flux jobs -ai "foo["
'

#
# test specific IDs
#
Expand Down

0 comments on commit d7bd7d4

Please sign in to comment.