Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libflux: add flux_watcher_is_active() #6436

Merged
merged 10 commits into from
Nov 14, 2024
1 change: 1 addition & 0 deletions doc/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ MAN3_FILES_SECONDARY = \
man3/flux_reactor_active_decref.3 \
man3/flux_fd_watcher_get_fd.3 \
man3/flux_watcher_stop.3 \
man3/flux_watcher_is_active.3 \
man3/flux_watcher_destroy.3 \
man3/flux_watcher_next_wakeup.3 \
man3/flux_handle_watcher_get_flux.3 \
Expand Down
5 changes: 5 additions & 0 deletions doc/man3/flux_watcher_start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ SYNOPSIS

void flux_watcher_stop (flux_watcher_t *w);

bool flux_watcher_is_active (flux_watcher_t *w);

void flux_watcher_destroy (flux_watcher_t *w);

double flux_watcher_next_wakeup (flux_watcher_t *w);
Expand All @@ -31,6 +33,9 @@ so that it stops receiving events. If :var:`w` is already inactive, the call
has no effect. This may be called from within a :type:`flux_watcher_f`
callback.

:func:`flux_watcher_is_active` returns a true value if the watcher is active
(i.e. it has been started and not yet stopped) and false otherwise.

:func:`flux_watcher_destroy` destroys a :type:`flux_watcher_t` object :var:`w`,
after stopping it. It is not safe to destroy a watcher object within a
:type:`flux_watcher_f` callback.
Expand Down
1 change: 1 addition & 0 deletions doc/manpages.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@
('man3/flux_timer_watcher_create', 'flux_timer_watcher_reset', 'set/reset a timer', [author], 3),
('man3/flux_timer_watcher_create', 'flux_timer_watcher_create', 'set/reset a timer', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_stop', 'start/stop/destroy/query reactor watcher', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_is_active', 'start/stop/destroy/query reactor watcher', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_destroy', 'start/stop/destroy/query reactor watcher', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_next_wakeup', 'start/stop/destroy/query reactor watcher', [author], 3),
('man3/flux_watcher_start', 'flux_watcher_start', 'start/stop/destroy/query reactor watcher', [author], 3),
Expand Down
4 changes: 1 addition & 3 deletions src/common/libflux/attr.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@
#include <errno.h>
#include <stdbool.h>
#include <jansson.h>
#include <flux/core.h>

#include "ccan/str/str.h"
#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libhostlist/hostlist.h"
#include "src/common/libidset/idset.h"
#include "src/common/libutil/errprintf.h"

#include "attr.h"
#include "rpc.h"

enum {
FLUX_ATTRFLAG_IMMUTABLE = 1,
};
Expand Down
2 changes: 0 additions & 2 deletions src/common/libflux/attr.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
* attributes programmatically.
*/

#include "handle.h"

#ifdef __cplusplus
extern "C" {
#endif
Expand Down
2 changes: 0 additions & 2 deletions src/common/libflux/barrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
#ifndef _FLUX_CORE_BARRIER_H
#define _FLUX_CORE_BARRIER_H

#include "handle.h"

#ifdef __cplusplus
extern "C" {
#endif
Expand Down
3 changes: 1 addition & 2 deletions src/common/libflux/composite_future.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <flux/core.h>

#include "src/common/libczmqcontainers/czmq_containers.h"

#include "future.h"

/* Type-specific data for a composite future:
*/
struct composite_future {
Expand Down
1 change: 0 additions & 1 deletion src/common/libflux/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "src/common/libutil/errprintf.h"
#include "ccan/str/str.h"

#include "conf.h"
#include "conf_private.h"

struct builtin {
Expand Down
1 change: 0 additions & 1 deletion src/common/libflux/conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#ifndef _FLUX_CORE_CONF_H
#define _FLUX_CORE_CONF_H

#include <flux/core.h>
#include <stdarg.h>

#ifdef __cplusplus
Expand Down
2 changes: 1 addition & 1 deletion src/common/libflux/connector_interthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
#include <unistd.h>
#include <poll.h>
#include <pthread.h>

#include <flux/core.h>

#include "src/common/libutil/errprintf.h"
#include "src/common/libutil/aux.h"
#include "ccan/list/list.h"
#include "ccan/str/str.h"

#include "message_private.h" // for access to msg->aux
#include "msg_deque.h"

Expand Down
1 change: 0 additions & 1 deletion src/common/libflux/connector_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#endif
#include <unistd.h>
#include <poll.h>

#include <flux/core.h>

#include "ccan/str/str.h"
Expand Down
2 changes: 0 additions & 2 deletions src/common/libflux/disconnect.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
#ifndef _FLUX_CORE_DISCONNECT_H
#define _FLUX_CORE_DISCONNECT_H

#include "message.h"

#ifdef __cplusplus
extern "C" {
#endif
Expand Down
8 changes: 6 additions & 2 deletions src/common/libflux/ev_flux.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
#endif
#include <stddef.h>
#include <stdbool.h>

#include "handle.h"
#include <flux/core.h>

#include "src/common/libev/ev.h"

Expand Down Expand Up @@ -97,6 +96,11 @@ void ev_flux_stop (struct ev_loop *loop, struct ev_flux *w)
ev_idle_stop (loop, &w->idle_w);
}

bool ev_flux_is_active (struct ev_flux *w)
{
return ev_is_active (&w->prepare_w);
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
Expand Down
1 change: 1 addition & 0 deletions src/common/libflux/ev_flux.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ struct ev_flux {
int ev_flux_init (struct ev_flux *w, ev_flux_f cb, flux_t *h, int events);
void ev_flux_start (struct ev_loop *loop, struct ev_flux *w);
void ev_flux_stop (struct ev_loop *loop, struct ev_flux *w);
bool ev_flux_is_active (struct ev_flux *w);

#endif /* !_EV_FLUX_H */
5 changes: 1 addition & 4 deletions src/common/libflux/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
#include <stdbool.h>
#include <stdarg.h>
#include <jansson.h>
#include <flux/core.h>

#include "ccan/base64/base64.h"

#include "event.h"
#include "rpc.h"
#include "message.h"

flux_future_t *flux_event_subscribe_ex (flux_t *h,
const char *topic,
int flags)
Expand Down
3 changes: 0 additions & 3 deletions src/common/libflux/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
#ifndef _FLUX_CORE_EVENT_H
#define _FLUX_CORE_EVENT_H

#include "message.h"
#include "future.h"

#ifdef __cplusplus
extern "C" {
#endif
Expand Down
7 changes: 1 addition & 6 deletions src/common/libflux/flog.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
#include <unistd.h>
#include <assert.h>
#include <inttypes.h>

#include "flog.h"
#include "attr.h"
#include "message.h"
#include "request.h"
#include "rpc.h"
#include <flux/core.h>

#include "src/common/libutil/wallclock.h"
#include "src/common/libutil/stdlog.h"
Expand Down
2 changes: 0 additions & 2 deletions src/common/libflux/flog.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
#include <stdlib.h>
#include <errno.h>

#include "handle.h"

#ifdef __cplusplus
extern "C" {
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/common/libflux/flux.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
#define _FLUX_CORE_FLUX_H

#include "types.h"
#include "message.h"
#include "handle.h"
#include "reactor.h"
#include "msg_handler.h"
#include "connector.h"
#include "message.h"
#include "msglist.h"
#include "request.h"
#include "response.h"
Expand Down
6 changes: 1 addition & 5 deletions src/common/libflux/fripp.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
#include <syslog.h>
#include <time.h>
#include <unistd.h>

#include "attr.h"
#include "flog.h"
#include "fripp.h"
#include "reactor.h"
#include <flux/core.h>

#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libutil/iterators.h"
Expand Down
4 changes: 1 addition & 3 deletions src/common/libflux/future.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
#include <stdlib.h>
#include <errno.h>
#include <assert.h>
#include <flux/core.h>

#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libutil/aux.h"

#include "future.h"
#include "flog.h"

struct now_context {
flux_t *h; // (optional) cloned flux_t handle
flux_reactor_t *r; // reactor created for this get/check
Expand Down
8 changes: 0 additions & 8 deletions src/common/libflux/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@
#ifndef _FLUX_CORE_FUTURE_H
#define _FLUX_CORE_FUTURE_H

#include "reactor.h"
#include "types.h"
#include "handle.h"
#include "msg_handler.h"
#include "flog.h"

#ifdef __cplusplus
extern "C" {
#endif
Expand All @@ -25,8 +19,6 @@ extern "C" {
* See flux_future_then(3).
*/

typedef struct flux_future flux_future_t;

typedef void (*flux_continuation_f)(flux_future_t *f, void *arg);

int flux_future_then (flux_future_t *f, double timeout,
Expand Down
36 changes: 29 additions & 7 deletions src/common/libflux/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <caliper/cali.h>
#include <sys/syscall.h>
#endif
#include <flux/core.h>

#include "src/common/libflux/plugin_private.h"
#include "src/common/librouter/rpc_track.h"
Expand All @@ -38,13 +39,6 @@
#include "ccan/array_size/array_size.h"
#include "ccan/str/str.h"

#include "handle.h"
#include "reactor.h"
#include "connector.h"
#include "message.h"
#include "msg_handler.h" // for flux_sleep_on ()
#include "flog.h"
#include "conf.h"
#include "msg_deque.h"
#include "message_private.h" // to check msg refcount in flux_send_new ()

Expand Down Expand Up @@ -404,6 +398,34 @@ void flux_close (flux_t *h)
flux_handle_destroy (h);
}

int flux_set_reactor (flux_t *h, flux_reactor_t *r)
{
if (flux_aux_get (h, "flux::reactor")) {
errno = EEXIST;
return -1;
}
if (flux_aux_set (h, "flux::reactor", r, NULL) < 0)
return -1;
return 0;
}

flux_reactor_t *flux_get_reactor (flux_t *h)
{
flux_reactor_t *r = flux_aux_get (h, "flux::reactor");
if (!r) {
if ((r = flux_reactor_create (0))) {
if (flux_aux_set (h,
"flux::reactor",
r,
(flux_free_f)flux_reactor_destroy) < 0) {
flux_reactor_destroy (r);
r = NULL;
}
}
}
return r;
}

/* Create an idset that is configured as an allocator per idset_alloc(3) to
* be used as a matchtag allocator. Remove FLUX_MATCHTAG_NONE from the pool.
*/
Expand Down
12 changes: 7 additions & 5 deletions src/common/libflux/handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@
#include <stdbool.h>
#include <string.h>

#include "types.h"
#include "message.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct flux_handle flux_t;

typedef struct {
int request_tx;
int request_rx;
Expand Down Expand Up @@ -100,6 +95,13 @@ void flux_decref(flux_t *h);
*/
flux_t *flux_clone (flux_t *orig);

/* Get/set the reactor associated with the handle.
* A reactor is created with flags=0 if 'get' is called before one is set.
* 'set' fails if there is already a reactor.
*/
flux_reactor_t *flux_get_reactor (flux_t *h);
int flux_set_reactor (flux_t *h, flux_reactor_t *r);

/* Drop connection to broker and re-establish, if supported by connector.
*/
int flux_reconnect (flux_t *h);
Expand Down
3 changes: 1 addition & 2 deletions src/common/libflux/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@
#include <fnmatch.h>
#include <inttypes.h>
#include <jansson.h>
#include <flux/core.h>

#include "src/common/libutil/aux.h"
#include "src/common/libutil/errno_safe.h"
#include "ccan/array_size/array_size.h"
#include "ccan/str/str.h"

#include "message.h"

#include "message_private.h"
#include "message_iovec.h"
#include "message_route.h"
Expand Down
3 changes: 0 additions & 3 deletions src/common/libflux/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include "types.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct flux_msg flux_msg_t;

enum {
FLUX_MSGTYPE_REQUEST = 0x01,
FLUX_MSGTYPE_RESPONSE = 0x02,
Expand Down
2 changes: 1 addition & 1 deletion src/common/libflux/message_iovec.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include <arpa/inet.h>
#include <assert.h>
#include <inttypes.h>
#include <flux/core.h>

#include "message.h"
#include "message_private.h"
#include "message_route.h"
#include "message_proto.h"
Expand Down
Loading
Loading