Skip to content

Commit

Permalink
Refactor serialization buffer management and support memory tracking. (
Browse files Browse the repository at this point in the history
…#5231)

[SC-50984](https://app.shortcut.com/tiledb-inc/story/50984/evaluate-proper-solution-for-serialization-code-base-to-use-tracked-buffers)

This PR adds support for tracking memory used by serialization buffers.
The first step towards implementing this is the replacement of the
`Buffer` class with the new `SerializationBuffer` class, which has the
following characteristics:

* Like `Buffer`, `SerializationBuffer` supports both owned and non-owned
memory buffers. This is necessary to maintain the semantics of the
`tiledb_buffer_t` C API. `SerializationBuffer` also allows swapping
between owned and non-owned modes.
* `SerializationBuffer` does not support the `write` function which
incrementally adds data by growing the buffer. Instead, it supports only
the `assign` operation, which replaces the whole existing data of the
buffer. `assign` copies the given data to a buffer owned by
`SerializationBuffer`, unless a special marker value is passed, which
changes it to just store a span to the buffer.
* There is also the `assign_null_terminated` method that sets an owned
buffer and adds a null terminator at the end. This is used by JSON
serialization for some reason (is it actually needed?).
* `SerializationBuffer` does not store an offset and does not expose a
`read` function that copies to a user-provided buffer and advances the
offset. Instead the class is implicitly convertible to a const span of
bytes, which gives access to the entire buffer at once.
* `SerializationBuffer`s are typically immutable between `assign`ments,
but some cases require subsequent modification. To cover these, the
`owned_mutable_span` function returns a mutable span to the buffer, but
will throw if used in a non-owned buffer.
* `SerializationBuffer` supports being used with polymorphic allocators,
which enables integration with the memory tracking system. A new
`MemoryTracker` was added on `ContextResources` to track
serialization-related allocations.

`BufferList` was also updated to store `SerializationBuffer`s. Creations
of serialization buffers and buffer lists were updated to pass an
allocator, and C API handles for both types were likewise updated.

I also took up on the opportunity and converted all deserialization APIs
that accepted a `Buffer` to accept a `span<const char>`, because they
did not need any other facilities of `SerializationBuffer`.

---
TYPE: NO_HISTORY

---------

Co-authored-by: Luc Rancourt <[email protected]>
  • Loading branch information
teo-tsirpanis and KiterLuc authored Sep 6, 2024
1 parent 9ebf9cd commit ef7aba6
Show file tree
Hide file tree
Showing 53 changed files with 1,146 additions and 1,067 deletions.
1 change: 0 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ set(TILEDB_UNIT_TEST_SOURCES
src/test-cppapi-consolidation-plan.cc
src/unit-average-cell-size.cc
src/unit-backwards_compat.cc
src/unit-bufferlist.cc
src/unit-capi-any.cc
src/unit-capi-as_built.cc
src/unit-capi-array.cc
Expand Down
16 changes: 14 additions & 2 deletions test/src/test-cppapi-consolidation-plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
#include "test/support/src/helpers.h"
#include "tiledb/api/c_api/buffer/buffer_api_internal.h"
#include "tiledb/api/c_api/config/config_api_internal.h"
#include "tiledb/api/c_api/context/context_api_internal.h"
#include "tiledb/sm/c_api/tiledb_serialization.h"
#include "tiledb/sm/c_api/tiledb_struct_def.h"
#include "tiledb/sm/cpp_api/tiledb"
#include "tiledb/sm/cpp_api/tiledb_experimental"
#include "tiledb/sm/enums/serialization_type.h"
Expand Down Expand Up @@ -185,8 +187,18 @@ tiledb::sm::ConsolidationPlan CppConsolidationPlanFx::call_handler(
uint64_t fragment_size,
const Array& array,
tiledb::sm::SerializationType stype) {
auto req_buf = tiledb_buffer_handle_t::make_handle();
auto resp_buf = tiledb_buffer_handle_t::make_handle();
auto req_buf = tiledb_buffer_handle_t::make_handle(
ctx_.ptr()
.get()
->resources()
.serialization_memory_tracker()
->get_resource(tiledb::sm::MemoryType::SERIALIZATION_BUFFER));
auto resp_buf = tiledb_buffer_handle_t::make_handle(
ctx_.ptr()
.get()
->resources()
.serialization_memory_tracker()
->get_resource(tiledb::sm::MemoryType::SERIALIZATION_BUFFER));

tiledb::sm::serialization::serialize_consolidation_plan_request(
fragment_size, cfg_.ptr()->config(), stype, req_buf->buffer());
Expand Down
4 changes: 2 additions & 2 deletions test/src/unit-capi-array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2719,7 +2719,7 @@ TEST_CASE_METHOD(
start_timestamp,
end_timestamp,
tiledb::sm::SerializationType::CAPNP,
&buff->buffer());
buff->buffer());
rc = tiledb_handle_array_delete_fragments_timestamps_request(
ctx_,
array,
Expand Down Expand Up @@ -2755,7 +2755,7 @@ TEST_CASE_METHOD(
array->array_->config(),
fragments,
tiledb::sm::SerializationType::CAPNP,
&buff->buffer());
buff->buffer());
rc = tiledb_handle_array_delete_fragments_list_request(
ctx_,
array,
Expand Down
42 changes: 20 additions & 22 deletions test/src/unit-enumerations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ struct EnumerationFx {
template <typename T>
bool vec_cmp(std::vector<T> v1, std::vector<T> v2);

void flatten_buffer_list(BufferList& blist, Buffer& buf);

void rm_array();

shared_ptr<MemoryTracker> memory_tracker_;
Expand Down Expand Up @@ -2986,17 +2984,19 @@ shared_ptr<ArraySchema> EnumerationFx::ser_des_array_schema(
shared_ptr<const ArraySchema> schema,
bool client_side,
SerializationType stype) {
Buffer buf;
SerializationBuffer buf{tiledb::test::get_test_memory_tracker()->get_resource(
MemoryType::SERIALIZATION_BUFFER)};
throw_if_not_ok(serialization::array_schema_serialize(
*(schema.get()), stype, &buf, client_side));
*(schema.get()), stype, buf, client_side));
return serialization::array_schema_deserialize(stype, buf, memory_tracker_);
}

shared_ptr<ArraySchemaEvolution> EnumerationFx::ser_des_array_schema_evolution(
ArraySchemaEvolution* ase, bool client_side, SerializationType stype) {
Buffer buf;
SerializationBuffer buf{tiledb::test::get_test_memory_tracker()->get_resource(
MemoryType::SERIALIZATION_BUFFER)};
throw_if_not_ok(serialization::array_schema_evolution_serialize(
ase, stype, &buf, client_side));
ase, stype, buf, client_side));

ArraySchemaEvolution* ret;
throw_if_not_ok(serialization::array_schema_evolution_deserialize(
Expand All @@ -3007,21 +3007,27 @@ shared_ptr<ArraySchemaEvolution> EnumerationFx::ser_des_array_schema_evolution(

void EnumerationFx::ser_des_query(
Query* q_in, Query* q_out, bool client_side, SerializationType stype) {
Buffer buf;
BufferList blist;
auto tracker = tiledb::test::get_test_memory_tracker();
BufferList blist{tracker};

throw_if_not_ok(
serialization::query_serialize(q_in, stype, client_side, &blist));
serialization::query_serialize(q_in, stype, client_side, blist));

flatten_buffer_list(blist, buf);
const auto nbytes = blist.total_size();
SerializationBuffer buf{
nbytes, tracker->get_resource(MemoryType::SERIALIZATION_BUFFER)};

blist.reset_offset();
blist.read(buf.owned_mutable_span().data(), nbytes);

throw_if_not_ok(serialization::query_deserialize(
buf,
stype,
client_side,
nullptr,
q_out,
&(ctx_.resources().compute_tp())));
&(ctx_.resources().compute_tp()),
tracker));
}

void EnumerationFx::ser_des_array(
Expand All @@ -3030,8 +3036,9 @@ void EnumerationFx::ser_des_array(
Array* out,
bool client_side,
SerializationType stype) {
Buffer buf;
throw_if_not_ok(serialization::array_serialize(in, stype, &buf, client_side));
SerializationBuffer buf{tiledb::test::get_test_memory_tracker()->get_resource(
MemoryType::SERIALIZATION_BUFFER)};
throw_if_not_ok(serialization::array_serialize(in, stype, buf, client_side));
serialization::array_deserialize(
out, stype, buf, ctx.resources(), memory_tracker_);
}
Expand Down Expand Up @@ -3077,15 +3084,6 @@ bool EnumerationFx::vec_cmp(std::vector<T> v1, std::vector<T> v2) {
return true;
}

void EnumerationFx::flatten_buffer_list(BufferList& blist, Buffer& buf) {
const auto nbytes = blist.total_size();
throw_if_not_ok(buf.realloc(nbytes));

blist.reset_offset();
throw_if_not_ok(blist.read(buf.data(), nbytes));
buf.set_size(nbytes);
}

void EnumerationFx::rm_array() {
bool is_dir;
throw_if_not_ok(ctx_.resources().vfs().is_dir(uri_, &is_dir));
Expand Down
40 changes: 30 additions & 10 deletions test/src/unit-request-handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,12 @@ TEST_CASE_METHOD(
auto ctx = tiledb::Context();
auto array = tiledb::Array(ctx, uri_.to_string(), TILEDB_READ);
auto stype = TILEDB_CAPNP;
auto req_buf = tiledb_buffer_handle_t::make_handle();
auto resp_buf = tiledb_buffer_handle_t::make_handle();
auto req_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
MemoryType::SERIALIZATION_BUFFER));
auto resp_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
MemoryType::SERIALIZATION_BUFFER));

auto rval = tiledb_handle_load_array_schema_request(
nullptr,
Expand Down Expand Up @@ -320,8 +324,12 @@ TEST_CASE_METHOD(
auto ctx = tiledb::Context();
auto array = tiledb::Array(ctx, uri_.to_string(), TILEDB_READ);
auto stype = TILEDB_CAPNP;
auto req_buf = tiledb_buffer_handle_t::make_handle();
auto resp_buf = tiledb_buffer_handle_t::make_handle();
auto req_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
tiledb::sm::MemoryType::SERIALIZATION_BUFFER));
auto resp_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
tiledb::sm::MemoryType::SERIALIZATION_BUFFER));

auto rval = tiledb_handle_query_plan_request(
nullptr,
Expand Down Expand Up @@ -365,8 +373,12 @@ TEST_CASE_METHOD(
auto ctx = tiledb::Context();
auto array = tiledb::Array(ctx, uri_.to_string(), TILEDB_READ);
auto stype = TILEDB_CAPNP;
auto req_buf = tiledb_buffer_handle_t::make_handle();
auto resp_buf = tiledb_buffer_handle_t::make_handle();
auto req_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
tiledb::sm::MemoryType::SERIALIZATION_BUFFER));
auto resp_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
tiledb::sm::MemoryType::SERIALIZATION_BUFFER));

auto rval = tiledb_handle_consolidation_plan_request(
nullptr,
Expand Down Expand Up @@ -523,8 +535,12 @@ HandleLoadArraySchemaRequestFx::call_handler(
// objects.
auto ctx = tiledb::Context();
auto array = tiledb::Array(ctx, uri_.to_string(), TILEDB_READ);
auto req_buf = tiledb_buffer_handle_t::make_handle();
auto resp_buf = tiledb_buffer_handle_t::make_handle();
auto req_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
tiledb::sm::MemoryType::SERIALIZATION_BUFFER));
auto resp_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
tiledb::sm::MemoryType::SERIALIZATION_BUFFER));

serialization::serialize_load_array_schema_request(
cfg_, req, stype, req_buf->buffer());
Expand Down Expand Up @@ -574,8 +590,12 @@ QueryPlan HandleQueryPlanRequestFx::call_handler(
SerializationType stype, Query& query) {
auto ctx = tiledb::Context();
auto array = tiledb::Array(ctx, uri_.to_string(), TILEDB_READ);
auto req_buf = tiledb_buffer_handle_t::make_handle();
auto resp_buf = tiledb_buffer_handle_t::make_handle();
auto req_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
tiledb::sm::MemoryType::SERIALIZATION_BUFFER));
auto resp_buf = tiledb_buffer_handle_t::make_handle(
ctx.ptr()->resources().serialization_memory_tracker()->get_resource(
tiledb::sm::MemoryType::SERIALIZATION_BUFFER));

serialization::serialize_query_plan_request(
cfg_, query, stype, req_buf->buffer());
Expand Down
29 changes: 16 additions & 13 deletions tiledb/api/c_api/buffer/buffer_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,20 @@
**/

#include "tiledb/api/c_api_support/c_api_support.h"
#include "tiledb/common/memory_tracker.h"

#include "buffer_api_external.h"
#include "buffer_api_internal.h"

namespace tiledb::api {

capi_return_t tiledb_buffer_alloc(tiledb_buffer_handle_t** buffer) {
capi_return_t tiledb_buffer_alloc(
tiledb_ctx_handle_t* ctx, tiledb_buffer_handle_t** buffer) {
ensure_context_is_valid(ctx);
ensure_output_pointer_is_valid(buffer);
*buffer = tiledb_buffer_handle_t::make_handle();
*buffer = tiledb_buffer_handle_t::make_handle(
ctx->resources().serialization_memory_tracker()->get_resource(
tiledb::sm::MemoryType::SERIALIZATION_BUFFER));
return TILEDB_OK;
}

Expand Down Expand Up @@ -71,8 +76,10 @@ capi_return_t tiledb_buffer_get_data(
ensure_output_pointer_is_valid(data);
ensure_output_pointer_is_valid(num_bytes);

*data = buffer->buffer().data();
*num_bytes = buffer->buffer().size();
span<const char> span = buffer->buffer();

*data = const_cast<char*>(span.data());
*num_bytes = span.size();

return TILEDB_OK;
}
Expand All @@ -81,14 +88,9 @@ capi_return_t tiledb_buffer_set_data(
tiledb_buffer_t* buffer, void* data, uint64_t size) {
ensure_buffer_is_valid(buffer);

// Create a temporary Buffer object as a wrapper.
tiledb::sm::Buffer tmp_buffer(data, size);

// Swap with the given buffer.
buffer->buffer().swap(tmp_buffer);

// 'tmp_buffer' now destructs, freeing the old allocation (if any) of the
// given buffer.
buffer->buffer().assign(
tiledb::sm::SerializationBuffer::NonOwned,
span<char>(static_cast<char*>(data), size));

return TILEDB_OK;
}
Expand All @@ -97,9 +99,10 @@ capi_return_t tiledb_buffer_set_data(

using tiledb::api::api_entry_context;
using tiledb::api::api_entry_void;
using tiledb::api::api_entry_with_context;

CAPI_INTERFACE(buffer_alloc, tiledb_ctx_t* ctx, tiledb_buffer_t** buffer) {
return api_entry_context<tiledb::api::tiledb_buffer_alloc>(ctx, buffer);
return api_entry_with_context<tiledb::api::tiledb_buffer_alloc>(ctx, buffer);
}

CAPI_INTERFACE_VOID(buffer_free, tiledb_buffer_t** buffer) {
Expand Down
30 changes: 19 additions & 11 deletions tiledb/api/c_api/buffer/buffer_api_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,32 @@ struct tiledb_buffer_handle_t
static constexpr std::string_view object_type_name{"buffer"};

private:
tiledb::sm::Buffer buffer_;
tiledb::sm::SerializationBuffer buffer_;
tiledb::sm::Datatype datatype_;

public:
explicit tiledb_buffer_handle_t()
: buffer_()
explicit tiledb_buffer_handle_t(decltype(buffer_)::allocator_type allocator)
: buffer_(allocator)
, datatype_(tiledb::sm::Datatype::UINT8) {
}

explicit tiledb_buffer_handle_t(void* data, uint64_t size)
: buffer_(data, size)
explicit tiledb_buffer_handle_t(
size_t size, decltype(buffer_)::allocator_type allocator)
: buffer_(size, allocator)
, datatype_(tiledb::sm::Datatype::UINT8) {
}

explicit tiledb_buffer_handle_t(
const void* data,
uint64_t size,
decltype(buffer_)::allocator_type allocator)
: buffer_(allocator)
, datatype_(tiledb::sm::Datatype::UINT8) {
buffer_.assign(
tiledb::sm::SerializationBuffer::NonOwned,
span(static_cast<const char*>(data), size));
}

inline void set_datatype(tiledb::sm::Datatype datatype) {
datatype_ = datatype;
}
Expand All @@ -67,15 +79,11 @@ struct tiledb_buffer_handle_t
return datatype_;
}

inline void set_buffer(tiledb::sm::Buffer& buffer) {
buffer_ = buffer;
}

[[nodiscard]] inline tiledb::sm::Buffer& buffer() {
[[nodiscard]] inline tiledb::sm::SerializationBuffer& buffer() {
return buffer_;
}

[[nodiscard]] inline const tiledb::sm::Buffer& buffer() const {
[[nodiscard]] inline const tiledb::sm::SerializationBuffer& buffer() const {
return buffer_;
}
};
Expand Down
Loading

0 comments on commit ef7aba6

Please sign in to comment.