Skip to content

Commit

Permalink
content: flush before checkpointing
Browse files Browse the repository at this point in the history
Problem: Before checkpointing, users need to remember to call
content.flush, to ensure data has been flushed to the backing store.
It is easy to forget this.

Within the content module, call content.flush before checkpointing.

Fixes flux-framework#6242
  • Loading branch information
chu11 committed Sep 4, 2024
1 parent 846bfe1 commit 3689335
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/common/libcontent/content.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
enum {
CONTENT_FLAG_CACHE_BYPASS = 1,/* request direct to backing store */
CONTENT_FLAG_UPSTREAM = 2, /* make request of upstream TBON peer */
CONTENT_FLAG_NO_FLUSH = 4, /* do not flush content (checkpoint) */
};

/* Send request to load blob by hash or blobref.
Expand Down
7 changes: 6 additions & 1 deletion src/common/libkvs/kvs_checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <time.h>

#include "kvs_checkpoint.h"
#include "src/common/libcontent/content.h"

flux_future_t *kvs_checkpoint_commit (flux_t *h,
const char *key,
Expand All @@ -29,7 +30,8 @@ flux_future_t *kvs_checkpoint_commit (flux_t *h,
{
flux_future_t *f = NULL;
const char *topic = "content.checkpoint-put";
int valid_flags = KVS_CHECKPOINT_FLAG_CACHE_BYPASS;
int valid_flags = (KVS_CHECKPOINT_FLAG_CACHE_BYPASS
| KVS_CHECKPOINT_FLAG_NO_FLUSH);
int target_flags = 0;

if (!h || !rootref || (flags & ~valid_flags)) {
Expand All @@ -42,6 +44,9 @@ flux_future_t *kvs_checkpoint_commit (flux_t *h,
timestamp = flux_reactor_now (flux_get_reactor (h));
if (flags & KVS_CHECKPOINT_FLAG_CACHE_BYPASS)
topic = "content-backing.checkpoint-put";
else if (flags & KVS_CHECKPOINT_FLAG_NO_FLUSH)
/* NO_FLUSH only in content module, not backing */
target_flags |= CONTENT_FLAG_NO_FLUSH;

if (!(f = flux_rpc_pack (h,
topic,
Expand Down
2 changes: 2 additions & 0 deletions src/common/libkvs/kvs_checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
enum {
/* request direct to backing store */
KVS_CHECKPOINT_FLAG_CACHE_BYPASS = 1,
/* skip content flush before checkpoint */
KVS_CHECKPOINT_FLAG_NO_FLUSH = 2,
};

#define KVS_DEFAULT_CHECKPOINT "kvs-primary"
Expand Down
102 changes: 95 additions & 7 deletions src/modules/content/checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <flux/core.h>

#include "src/common/libczmqcontainers/czmq_containers.h"
#include "src/common/libcontent/content.h"

#include "checkpoint.h"
#include "cache.h"
Expand Down Expand Up @@ -196,6 +197,84 @@ static int checkpoint_put_forward (struct content_checkpoint *checkpoint,
return -1;
}

static void checkpoint_content_flush_continuation (flux_future_t *f, void *arg)
{
struct content_checkpoint *checkpoint = arg;
const flux_msg_t *msg = flux_future_aux_get (f, "msg");
const char *key;
json_t *value;
int flags = 0;
const char *errstr = NULL;

assert (msg);

if (flux_rpc_get (f, NULL) < 0) {
errstr = "error flushing content";
goto error;
}

if (flux_request_unpack (msg,
NULL,
"{s:s s:o s?i}",
"key", &key,
"value", &value,
"flags", &flags) < 0)
goto error;

if (checkpoint_put_forward (checkpoint,
msg,
key,
value,
flags,
&errstr) < 0)
goto error;

flux_future_destroy (f);
return;

error:
if (flux_respond_error (checkpoint->h, msg, errno, errstr) < 0)
flux_log_error (checkpoint->h,
"error responding to checkpoint-put request");
flux_future_destroy (f);
}

static int checkpoint_content_flush (struct content_checkpoint *checkpoint,
const flux_msg_t *msg,
const char **errstr)
{
const char *topic = "content.flush";
uint32_t rank;
flux_future_t *f = NULL;

if (flux_get_rank (checkpoint->h, &rank) < 0) {
(*errstr) = "error retrieving rank";
return -1;
}

if (!(f = flux_rpc (checkpoint->h, topic, NULL, rank, 0))
|| flux_future_then (f,
-1,
checkpoint_content_flush_continuation,
checkpoint) < 0)
goto error;

if (flux_future_aux_set (f,
"msg",
(void *)flux_msg_incref (msg),
(flux_free_f)flux_msg_decref) < 0) {
flux_msg_decref (msg);
goto error;
}

return 0;

error:
(*errstr) = "error starting content.flush RPC";
flux_future_destroy (f);
return -1;
}

void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
Expand All @@ -220,13 +299,22 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
"flags", &flags) < 0)
goto error;

if (checkpoint_put_forward (checkpoint,
msg,
key,
value,
flags,
&errstr) < 0)
goto error;
if (content_cache_backing_loaded (checkpoint->cache)
&& !(flags & CONTENT_FLAG_NO_FLUSH)) {
if (checkpoint_content_flush (checkpoint,
msg,
&errstr) < 0)
goto error;
}
else {
if (checkpoint_put_forward (checkpoint,
msg,
key,
value,
flags,
&errstr) < 0)
goto error;
}

return;

Expand Down

0 comments on commit 3689335

Please sign in to comment.