Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fragment list consolidation for tiledb cloud arrays #5059

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions test/src/unit-capi-consolidation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7169,7 +7169,7 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
ConsolidationFx,
"C API: Test consolidation, dense split fragments",
"[capi][consolidation][dense][split-fragments][non-rest]") {
"[capi][consolidation][dense][split-fragments][rest]") {
remove_dense_array();
create_dense_array();
write_dense_subarray(1, 2, 1, 2);
Expand Down Expand Up @@ -7253,7 +7253,7 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
ConsolidationFx,
"C API: Test consolidation, sparse split fragments",
"[capi][consolidation][sparse][split-fragments][non-rest]") {
"[capi][consolidation][sparse][split-fragments][rest]") {
remove_sparse_array();
create_sparse_array();
write_sparse_row(0);
Expand Down
30 changes: 30 additions & 0 deletions test/support/src/serialization_wrappers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@

#include "test/support/src/helpers.h"
#include "tiledb/api/c_api/array_schema/array_schema_api_internal.h"
#include "tiledb/api/c_api/buffer/buffer_api_internal.h"
#include "tiledb/api/c_api/context/context_api_internal.h"
#include "tiledb/sm/c_api/tiledb.h"
#include "tiledb/sm/c_api/tiledb_serialization.h"
#include "tiledb/sm/c_api/tiledb_struct_def.h"
#include "tiledb/sm/serialization/consolidation.h"
#include "tiledb/sm/serialization/query.h"

#ifdef TILEDB_SERIALIZATION
Expand Down Expand Up @@ -219,3 +222,30 @@ void tiledb_subarray_serialize(
*subarray = deserialized_subarray;
#endif
}

void tiledb_array_consolidation_request_wrapper(
tiledb_ctx_t* ctx,
tiledb_serialization_type_t serialize_type,
const std::string& array_uri,
const std::vector<std::string>* fragment_uris_in,
std::vector<std::string>* fragment_uris_out) {
// Serialize and Deserialize
auto buffer = tiledb_buffer_handle_t::make_handle();
serialization::array_consolidation_request_serialize(
ctx->config(),
fragment_uris_in,
static_cast<tiledb::sm::SerializationType>(serialize_type),
&(buffer->buffer()));

auto [config, fragment_uris_deser] =
serialization::array_consolidation_request_deserialize(
array_uri,
static_cast<tiledb::sm::SerializationType>(serialize_type),
buffer->buffer());

tiledb_buffer_handle_t::break_handle(buffer);

if (fragment_uris_deser.has_value()) {
*fragment_uris_out = fragment_uris_deser.value();
}
}
9 changes: 9 additions & 0 deletions test/support/src/serialization_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#define TILEDB_TEST_SERIALIZATION_WRAPPERS_H

#include <string>
#include <vector>

#include "tiledb/sm/c_api/tiledb.h"
#include "tiledb/sm/c_api/tiledb_serialization.h"
Expand Down Expand Up @@ -141,4 +142,12 @@ int tiledb_fragment_info_serialize(
*/
void tiledb_subarray_serialize(
tiledb_ctx_t* ctx, tiledb_array_t* array, tiledb_subarray_t** subarray);

void tiledb_array_consolidation_request_wrapper(
tiledb_ctx_t* ctx,
tiledb_serialization_type_t serialize_type,
const std::string& array_uri,
const std::vector<std::string>* fragment_uris_in,
std::vector<std::string>* fragment_uris_out);

#endif // TILEDB_TEST_SERIALIZATION_WRAPPERS_H
48 changes: 46 additions & 2 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1709,14 +1709,32 @@ int32_t tiledb_array_create(

int32_t tiledb_array_consolidate(
tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) {
// Validate input arguments
api::ensure_context_is_valid(ctx);
api::ensure_config_is_valid_if_present(config);

auto uri = tiledb::sm::URI(array_uri);
if (uri.is_invalid()) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid input array uri");
}

auto input_config = (config == nullptr) ? ctx->config() : config->config();
if (uri.is_tiledb() &&
tiledb::sm::Consolidator::mode_from_config(input_config) ==
tiledb::sm::ConsolidationMode::FRAGMENT) {
throw api::CAPIStatusException(
"Please use tiledb_array_consolidate_fragments API for consolidating "
"fragments on remote arrays.");
}

tiledb::sm::Consolidator::array_consolidate(
ctx->resources(),
array_uri,
tiledb::sm::EncryptionType::NO_ENCRYPTION,
nullptr,
0,
(config == nullptr) ? ctx->config() : config->config(),
input_config,
ctx->storage_manager());
return TILEDB_OK;
}
Expand All @@ -1727,7 +1745,33 @@ int32_t tiledb_array_consolidate_fragments(
const char** fragment_uris,
const uint64_t num_fragments,
tiledb_config_t* config) {
// Sanity checks
// Validate input arguments
api::ensure_context_is_valid(ctx);
api::ensure_config_is_valid_if_present(config);

if (fragment_uris == nullptr) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid input fragment list");
}

auto uri = tiledb::sm::URI(array_uri);
if (uri.is_invalid()) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid input array uri");
}

if (num_fragments < 1) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid input number of fragments");
}

for (size_t i = 0; i < num_fragments; i++) {
if (tiledb::sm::URI(fragment_uris[i]).is_invalid()) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid uri(s) in input fragment "
"list");
}
}

// Convert the list of fragments to a vector
std::vector<std::string> uris;
Expand Down
93 changes: 51 additions & 42 deletions tiledb/sm/consolidator/consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ void Consolidator::array_consolidate(
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
}

// Check if array exists
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}

if (array_uri.is_tiledb()) {
throw_if_not_ok(
resources.rest_client()->post_consolidation_to_rest(array_uri, config));
} else {
// Check if array exists
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}

// Get encryption key from config
std::string encryption_key_from_cfg;
if (!encryption_key) {
Expand Down Expand Up @@ -212,46 +212,55 @@ void Consolidator::fragments_consolidate(
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
}

// Check if array exists
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}
if (array_uri.is_tiledb()) {
throw_if_not_ok(resources.rest_client()->post_consolidation_to_rest(
array_uri, config, &fragment_uris));
} else {
// Check if array exists
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}

// Get encryption key from config
std::string encryption_key_from_cfg;
if (!encryption_key) {
bool found = false;
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
assert(found);
}
// Get encryption key from config
std::string encryption_key_from_cfg;
if (!encryption_key) {
bool found = false;
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
assert(found);
}

if (!encryption_key_from_cfg.empty()) {
encryption_key = encryption_key_from_cfg.c_str();
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
std::string encryption_type_from_cfg;
bool found = false;
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
assert(found);
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
throw_if_not_ok(st);
encryption_type = et.value();

if (!encryption_key_from_cfg.empty()) {
encryption_key = encryption_key_from_cfg.c_str();
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
std::string encryption_type_from_cfg;
bool found = false;
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
assert(found);
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
throw_if_not_ok(st);
encryption_type = et.value();

if (!EncryptionKey::is_valid_key_length(
encryption_type,
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
encryption_key = nullptr;
key_length = 0;
if (!EncryptionKey::is_valid_key_length(
encryption_type,
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
encryption_key = nullptr;
key_length = 0;
}
}
}

// Consolidate
auto consolidator = Consolidator::create(
resources, ConsolidationMode::FRAGMENT, config, storage_manager);
auto fragment_consolidator =
dynamic_cast<FragmentConsolidator*>(consolidator.get());
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
array_name, encryption_type, encryption_key, key_length, fragment_uris));
// Consolidate
auto consolidator = Consolidator::create(
resources, ConsolidationMode::FRAGMENT, config, storage_manager);
auto fragment_consolidator =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed dynamic_cast that should be avoided when possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not do that now. If we're going to do this, let's change all the places that use Consolidator::create for something better and consistent.

Copy link
Member Author

@ypatia ypatia Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shouldn't we do that immediate improvement? This is the only occurrence of a dynamic_cast in consolidation code, and the reason is that consolidate_fragments is a method of FragmentConsolidator and not of its parent class Consolidator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing offline, I am reverting this change, although I think it's improving the current situation, to get this long standing PR finally unblocked and merged.

dynamic_cast<FragmentConsolidator*>(consolidator.get());
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
array_name,
encryption_type,
encryption_key,
key_length,
fragment_uris));
}
}

void Consolidator::write_consolidated_commits_file(
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/rest/rest_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ class RestClient {
}

/// Operation disabled in base class.
inline virtual Status post_consolidation_to_rest(const URI&, const Config&) {
inline virtual Status post_consolidation_to_rest(
const URI&, const Config&, const std::vector<std::string>* = nullptr) {
throw RestClientDisabledException();
}

Expand Down
9 changes: 6 additions & 3 deletions tiledb/sm/rest/rest_client_remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1532,10 +1532,13 @@ Status RestClientRemote::ensure_json_null_delimited_string(Buffer* buffer) {
}

Status RestClientRemote::post_consolidation_to_rest(
const URI& uri, const Config& config) {
const URI& uri,
const Config& config,
const std::vector<std::string>* fragment_uris) {
Buffer buff;
RETURN_NOT_OK(serialization::array_consolidation_request_serialize(
config, serialization_type_, &buff));
serialization::array_consolidation_request_serialize(
config, fragment_uris, serialization_type_, &buff);

// Wrap in a list
BufferList serialized;
RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));
Expand Down
6 changes: 5 additions & 1 deletion tiledb/sm/rest/rest_client_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,14 @@ class RestClientRemote : public RestClient {
*
* @param uri Array URI
* @param config config
* @param fragment_uris The uris of the fragments to be consolidated if this
* is a request for fragment list consolidation
* @return
*/
Status post_consolidation_to_rest(
const URI& uri, const Config& config) override;
const URI& uri,
const Config& config,
const std::vector<std::string>* fragment_uris = nullptr) override;

/**
* Post array vacuum request to the REST server.
Expand Down
Loading
Loading