Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Expose WAL compression option #797

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[submodule "rocksdb"]
path = librocksdb_sys/rocksdb
url = https://github.com/tikv/rocksdb.git
branch = 6.29.tikv
url = https://github.com/hbisheng/rocksdb.git
branch = fix-pri

[submodule "titan"]
path = librocksdb_sys/libtitan_sys/titan
url = https://github.com/tikv/titan.git
branch = master
url = https://github.com/hbisheng/titan.git
branch = fix-pri
2 changes: 1 addition & 1 deletion librocksdb_sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn main() {

build.cpp(true).file("crocksdb/c.cc");
if env::var("CARGO_CFG_TARGET_OS").unwrap() != "windows" {
build.flag("-std=c++11");
build.flag("-std=c++17");
build.flag("-fno-rtti");
}
link_cpp(&mut build);
Expand Down
140 changes: 23 additions & 117 deletions librocksdb_sys/crocksdb/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
#include "rocksdb/universal_compaction.h"
#include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/backup_engine.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/debug.h"
Expand All @@ -75,8 +75,8 @@
#endif

using rocksdb::BackgroundErrorReason;
using rocksdb::BackupableDBOptions;
using rocksdb::BackupEngine;
using rocksdb::BackupEngineOptions;
using rocksdb::BackupInfo;
using rocksdb::BlockBasedTableOptions;
using rocksdb::BlockCipher;
Expand Down Expand Up @@ -524,41 +524,10 @@ struct crocksdb_filterpolicy_t : public FilterPolicy {
void* state_;
void (*destructor_)(void*);
const char* (*name_)(void*);
char* (*create_)(void*, const char* const* key_array,
const size_t* key_length_array, int num_keys,
size_t* filter_length);
unsigned char (*key_match_)(void*, const char* key, size_t length,
const char* filter, size_t filter_length);
void (*delete_filter_)(void*, const char* filter, size_t filter_length);

virtual ~crocksdb_filterpolicy_t() { (*destructor_)(state_); }

virtual const char* Name() const override { return (*name_)(state_); }

virtual void CreateFilter(const Slice* keys, int n,
std::string* dst) const override {
std::vector<const char*> key_pointers(n);
std::vector<size_t> key_sizes(n);
for (int i = 0; i < n; i++) {
key_pointers[i] = keys[i].data();
key_sizes[i] = keys[i].size();
}
size_t len;
char* filter = (*create_)(state_, &key_pointers[0], &key_sizes[0], n, &len);
dst->append(filter, len);

if (delete_filter_ != nullptr) {
(*delete_filter_)(state_, filter, len);
} else {
free(filter);
}
}

virtual bool KeyMayMatch(const Slice& key,
const Slice& filter) const override {
return (*key_match_)(state_, key.data(), key.size(), filter.data(),
filter.size());
}
};

struct crocksdb_mergeoperator_t : public MergeOperator {
Expand Down Expand Up @@ -853,7 +822,7 @@ crocksdb_backup_engine_t* crocksdb_backup_engine_open(
const crocksdb_options_t* options, const char* path, char** errptr) {
BackupEngine* be;
if (SaveError(errptr, BackupEngine::Open(options->rep.env,
BackupableDBOptions(path), &be))) {
BackupEngineOptions(path), &be))) {
return nullptr;
}
crocksdb_backup_engine_t* result = new crocksdb_backup_engine_t;
Expand Down Expand Up @@ -1189,7 +1158,8 @@ void crocksdb_write_multi_batch(crocksdb_t* db,
for (size_t i = 0; i < batch_size; i++) {
ws.push_back(&batches[i]->rep);
}
SaveError(errptr, db->rep->MultiBatchWrite(options->rep, std::move(ws)));
SaveError(errptr,
db->rep->MultiBatchWrite(options->rep, std::move(ws), nullptr));
}

void crocksdb_write_multi_batch_callback(
Expand Down Expand Up @@ -2097,14 +2067,6 @@ void crocksdb_block_based_options_set_block_cache(
}
}

void crocksdb_block_based_options_set_block_cache_compressed(
crocksdb_block_based_table_options_t* options,
crocksdb_cache_t* block_cache_compressed) {
if (block_cache_compressed) {
options->rep.block_cache_compressed = block_cache_compressed->rep;
}
}

void crocksdb_block_based_options_set_whole_key_filtering(
crocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.whole_key_filtering = v;
Expand All @@ -2120,11 +2082,6 @@ void crocksdb_block_based_options_set_index_type(
options->rep.index_type = static_cast<BlockBasedTableOptions::IndexType>(v);
}

void crocksdb_block_based_options_set_hash_index_allow_collision(
crocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.hash_index_allow_collision = v;
}

void crocksdb_block_based_options_set_optimize_filters_for_memory(
crocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.optimize_filters_for_memory = v;
Expand Down Expand Up @@ -3048,6 +3005,10 @@ void crocksdb_options_set_wal_size_limit_mb(crocksdb_options_t* opt,
opt->rep.WAL_size_limit_MB = limit;
}

void crocksdb_options_set_wal_compression(crocksdb_options_t* opt, int compression_type) {
opt->rep.wal_compression = static_cast<CompressionType>(compression_type);
}

void crocksdb_options_set_manifest_preallocation_size(crocksdb_options_t* opt,
size_t v) {
opt->rep.manifest_preallocation_size = v;
Expand All @@ -3068,11 +3029,6 @@ void crocksdb_options_set_is_fd_close_on_exec(crocksdb_options_t* opt,
opt->rep.is_fd_close_on_exec = v;
}

void crocksdb_options_set_skip_log_error_on_recovery(crocksdb_options_t* opt,
unsigned char v) {
opt->rep.skip_log_error_on_recovery = v;
}

void crocksdb_options_set_stats_dump_period_sec(crocksdb_options_t* opt,
unsigned int v) {
opt->rep.stats_dump_period_sec = v;
Expand Down Expand Up @@ -3197,16 +3153,6 @@ int crocksdb_options_get_max_background_compactions(
return opt->rep.max_background_compactions;
}

void crocksdb_options_set_base_background_compactions(crocksdb_options_t* opt,
int n) {
opt->rep.base_background_compactions = n;
}

int crocksdb_options_get_base_background_compactions(
const crocksdb_options_t* opt) {
return opt->rep.base_background_compactions;
}

void crocksdb_options_set_max_background_flushes(crocksdb_options_t* opt,
int n) {
opt->rep.max_background_flushes = n;
Expand Down Expand Up @@ -3234,14 +3180,6 @@ void crocksdb_options_set_recycle_log_file_num(crocksdb_options_t* opt,
opt->rep.recycle_log_file_num = v;
}

void crocksdb_options_set_soft_rate_limit(crocksdb_options_t* opt, double v) {
opt->rep.soft_rate_limit = v;
}

void crocksdb_options_set_hard_rate_limit(crocksdb_options_t* opt, double v) {
opt->rep.hard_rate_limit = v;
}

void crocksdb_options_set_soft_pending_compaction_bytes_limit(
crocksdb_options_t* opt, size_t v) {
opt->rep.soft_pending_compaction_bytes_limit = v;
Expand All @@ -3262,11 +3200,6 @@ size_t crocksdb_options_get_hard_pending_compaction_bytes_limit(
return opt->rep.hard_pending_compaction_bytes_limit;
}

void crocksdb_options_set_rate_limit_delay_max_milliseconds(
crocksdb_options_t* opt, unsigned int v) {
opt->rep.rate_limit_delay_max_milliseconds = v;
}

void crocksdb_options_set_max_manifest_file_size(crocksdb_options_t* opt,
size_t v) {
opt->rep.max_manifest_file_size = v;
Expand Down Expand Up @@ -3609,12 +3542,14 @@ void crocksdb_options_set_track_and_verify_wals_in_manifest(
}

unsigned char crocksdb_load_latest_options(
const char* dbpath, crocksdb_env_t* env, crocksdb_options_t* db_options,
const char* dbpath, crocksdb_options_t* db_options,
crocksdb_column_family_descriptor*** cf_descs, size_t* cf_descs_len,
unsigned char ignore_unknown_options, char** errptr) {
std::vector<ColumnFamilyDescriptor> tmp_cf_descs;
Status s = rocksdb::LoadLatestOptions(dbpath, env->rep, &db_options->rep,
&tmp_cf_descs, ignore_unknown_options);
rocksdb::ConfigOptions config_options;
config_options.ignore_unknown_options = ignore_unknown_options;
Status s = rocksdb::LoadLatestOptions(rocksdb::ConfigOptions(), dbpath,
&db_options->rep, &tmp_cf_descs);

*errptr = nullptr;
if (s.IsNotFound()) return false;
Expand Down Expand Up @@ -3810,17 +3745,12 @@ unsigned char crocksdb_compactionfiltercontext_is_bottommost_level(
return context->rep.is_bottommost_level;
}

void crocksdb_compactionfiltercontext_file_numbers(
crocksdb_compactionfiltercontext_t* context, const uint64_t** buffer,
size_t* len) {
*buffer = context->rep.file_numbers.data();
*len = context->rep.file_numbers.size();
}

crocksdb_table_properties_t* crocksdb_compactionfiltercontext_table_properties(
crocksdb_compactionfiltercontext_t* context, size_t offset) {
return (crocksdb_table_properties_t*)context->rep.table_properties[offset]
.get();
crocksdb_table_properties_collection_t*
crocksdb_compactionfiltercontext_input_table_properties(
crocksdb_compactionfiltercontext_t* context) {
return (
crocksdb_table_properties_collection_t*)(&context->rep
.input_table_properties);
}

const char* crocksdb_compactionfiltercontext_start_key(
Expand Down Expand Up @@ -3879,25 +3809,6 @@ crocksdb_comparator_t* crocksdb_comparator_create(

void crocksdb_comparator_destroy(crocksdb_comparator_t* cmp) { delete cmp; }

crocksdb_filterpolicy_t* crocksdb_filterpolicy_create(
void* state, void (*destructor)(void*),
char* (*create_filter)(void*, const char* const* key_array,
const size_t* key_length_array, int num_keys,
size_t* filter_length),
unsigned char (*key_may_match)(void*, const char* key, size_t length,
const char* filter, size_t filter_length),
void (*delete_filter)(void*, const char* filter, size_t filter_length),
const char* (*name)(void*)) {
crocksdb_filterpolicy_t* result = new crocksdb_filterpolicy_t;
result->state_ = state;
result->destructor_ = destructor;
result->create_ = create_filter;
result->key_match_ = key_may_match;
result->delete_filter_ = delete_filter;
result->name_ = name;
return result;
}

void crocksdb_filterpolicy_destroy(crocksdb_filterpolicy_t* filter) {
delete filter;
}
Expand All @@ -3910,11 +3821,8 @@ struct FilterPolicyWrapper : public crocksdb_filterpolicy_t {
std::string full_name_;
~FilterPolicyWrapper() override { delete rep_; }
const char* Name() const override { return full_name_.c_str(); }
void CreateFilter(const Slice* keys, int n, std::string* dst) const override {
return rep_->CreateFilter(keys, n, dst);
}
bool KeyMayMatch(const Slice& key, const Slice& filter) const override {
return rep_->KeyMayMatch(key, filter);
const char* CompatibilityName() const override {
return rep_->CompatibilityName();
}
// No need to override GetFilterBitsBuilder if this one is overridden
FilterBitsBuilder* GetBuilderWithContext(
Expand All @@ -3938,7 +3846,6 @@ crocksdb_filterpolicy_t* crocksdb_filterpolicy_create_bloom_format(
wrapper->full_name_ += ".FullBloom";
}
wrapper->state_ = nullptr;
wrapper->delete_filter_ = nullptr;
wrapper->destructor_ = &FilterPolicyWrapper::DoNothing;
return wrapper;
}
Expand All @@ -3961,7 +3868,6 @@ crocksdb_filterpolicy_t* crocksdb_filterpolicy_create_ribbon(
wrapper->full_name_ = wrapper->rep_->Name();
wrapper->full_name_ += ".Ribbon";
wrapper->state_ = nullptr;
wrapper->delete_filter_ = nullptr;
wrapper->destructor_ = &FilterPolicyWrapper::DoNothing;
return wrapper;
}
Expand Down Expand Up @@ -5821,7 +5727,7 @@ struct ExternalSstFileModifier {
auto ioptions = *cfd->ioptions();
auto table_opt =
TableReaderOptions(ioptions, desc.options.prefix_extractor,
env_options_, cfd->internal_comparator());
env_options_, cfd->internal_comparator(), 0);
// Get around global seqno check.
table_opt.largest_seqno = kMaxSequenceNumber;
status = ioptions.table_factory->NewTableReader(
Expand Down
Loading