Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot checkpointing #710

Merged
merged 9 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ raft-core-integration-test
raft-core-unit-test
raft-uv-integration-test
raft-uv-unit-test
.cache/
compile_commands.json
2 changes: 1 addition & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ unit_test_SOURCES += \
test/unit/test_sm.c \
test/unit/test_tuple.c \
test/unit/test_vfs.c \
test/unit/test_vfs_extra.c \
test/unit/test_vfs2.c \
test/unit/main.c
unit_test_CFLAGS = $(AM_CFLAGS) -Wno-unknown-warning-option -Wno-uninitialized -Wno-maybe-uninitialized -Wno-float-equal -Wno-conversion
Expand All @@ -195,7 +196,6 @@ integration_test_SOURCES = \
test/integration/test_node.c \
test/integration/test_role_management.c \
test/integration/test_server.c \
test/integration/test_vfs.c \
test/integration/main.c
integration_test_CFLAGS = $(AM_CFLAGS) -Wno-conversion
integration_test_LDFLAGS = $(AM_LDFLAGS) -no-install
Expand Down
15 changes: 2 additions & 13 deletions src/dqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,12 @@
struct dqlite_buffer bufs[],
unsigned n)
{
int rv;
if (n != 2) {
return -1;
}

rv = VfsDiskSnapshotDb(vfs, filename, &bufs[0]);
if (rv != 0) {
return rv;
}

rv = VfsDiskSnapshotWal(vfs, filename, &bufs[1]);
return rv;
return VfsSnapshotDisk(vfs, filename, bufs, n);

Check warning on line 60 in src/dqlite.c

View check run for this annotation

Codecov / codecov/patch

src/dqlite.c#L60

Added line #L60 was not covered by tests
}

int dqlite_vfs_num_pages(sqlite3_vfs *vfs, const char *filename, unsigned *n)
{
return VfsDatabaseNumPages(vfs, filename, n);
return VfsDatabaseNumPages(vfs, filename, false, n);

Check warning on line 65 in src/dqlite.c

View check run for this annotation

Codecov / codecov/patch

src/dqlite.c#L65

Added line #L65 was not covered by tests
}

int dqlite_vfs_shallow_snapshot(sqlite3_vfs *vfs,
Expand Down
39 changes: 12 additions & 27 deletions src/fsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,46 +385,34 @@ static int encodeDatabase(struct db *db,
{
struct snapshotDatabase header;
sqlite3_vfs *vfs;
uint32_t database_size = 0;
uint8_t *page;
char *cursor;
struct dqlite_buffer *bufs = (struct dqlite_buffer *)r_bufs;
int rv;

header.filename = db->filename;
header.main_size = (n - 1) * (uint64_t)db->config->page_size;
/* The database is checkpointed before writing it to disk. As such,
* wal_size is always 0. */
header.wal_size = 0;

vfs = sqlite3_vfs_find(db->config->name);
rv = VfsShallowSnapshot(vfs, db->filename, &bufs[1], n - 1);
if (rv != 0) {
goto err;
}

/* Extract the database size from the first page. */
page = bufs[1].base;
database_size += (uint32_t)(page[28] << 24);
database_size += (uint32_t)(page[29] << 16);
database_size += (uint32_t)(page[30] << 8);
database_size += (uint32_t)(page[31]);

header.main_size =
(uint64_t)database_size * (uint64_t)db->config->page_size;
header.wal_size = bufs[n - 1].len;

/* Database header. */
bufs[0].len = snapshotDatabase__sizeof(&header);
bufs[0].base = sqlite3_malloc64(bufs[0].len);
if (bufs[0].base == NULL) {
rv = RAFT_NOMEM;
goto err_after_snapshot;
goto err;
}
cursor = bufs[0].base;
snapshotDatabase__encode(&header, &cursor);

return 0;

err_after_snapshot:
/* Free the wal buffer */
sqlite3_free(bufs[n - 1].base);
err:
assert(rv != 0);
return rv;
Expand Down Expand Up @@ -490,7 +478,7 @@ static unsigned dbNumPages(struct db *db)
uint32_t n;

vfs = sqlite3_vfs_find(db->config->name);
rv = VfsDatabaseNumPages(vfs, db->filename, &n);
rv = VfsDatabaseNumPages(vfs, db->filename, true, &n);
assert(rv == 0);
return n;
}
Expand All @@ -504,7 +492,7 @@ static unsigned snapshotNumBufs(struct fsm *f)

QUEUE_FOREACH(head, &f->registry->dbs)
{
n += 2; /* database header & wal */
n += 1; /* database header */
db = QUEUE_DATA(head, struct db, queue);
n += dbNumPages(db); /* 1 buffer per page (zero copy) */
}
Expand All @@ -514,13 +502,12 @@ static unsigned snapshotNumBufs(struct fsm *f)

/* An example array of snapshot buffers looks like this:
*
* bufs: SH DH1 P1 P2 P3 WAL1 DH2 P1 P2 WAL2
* index: 0 1 2 3 4 5 6 7 8 9
* bufs: SH DH1 P1 P2 P3 DH2 P1 P2
* index: 0 1 2 3 4 6 7 8
*
* SH: Snapshot Header
* DHx: Database Header
* Px: Database Page (not to be freed)
* WALx: a WAL
* */
static void freeSnapshotBufs(struct fsm *f,
struct raft_buffer bufs[],
Expand Down Expand Up @@ -548,9 +535,7 @@ static void freeSnapshotBufs(struct fsm *f,
/* i is the index of the database header */
sqlite3_free(bufs[i].base);
/* i is now the index of the next database header (if any) */
i += 1 /* db header */ + dbNumPages(db) + 1 /* WAL */;
/* free WAL buffer */
sqlite3_free(bufs[i - 1].base);
i += 1 /* db header */ + dbNumPages(db);
}
}

Expand Down Expand Up @@ -601,8 +586,8 @@ static int fsm__snapshot(struct raft_fsm *fsm,
QUEUE_FOREACH(head, &f->registry->dbs)
{
db = QUEUE_DATA(head, struct db, queue);
/* database_header + num_pages + wal */
unsigned n = 1 + dbNumPages(db) + 1;
/* database_header + num_pages */
unsigned n = 1 + dbNumPages(db);
rv = encodeDatabase(db, &(*bufs)[i], n);
if (rv != 0) {
goto err_after_encode_header;
Expand Down
3 changes: 1 addition & 2 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ static int openConnection(const char *filename,
goto err;
}

rc = sqlite3_exec(*conn, "PRAGMA wal_autocheckpoint=0", NULL, NULL,
&msg);
rc = sqlite3_wal_autocheckpoint(*conn, 0);
if (rc != SQLITE_OK) {
tracef("wal autocheckpoint off failed %d", rc);
goto err;
Expand Down
Loading
Loading