Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google Cloud storage support #300

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,7 @@ cmake-build-*
third-party/folly/
.cache
*.sublime-*

.clang-format
.editorconfig
*.vim
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,12 @@ replication_test: cloud/replication_test.o $(TEST_LIBRARY) $(LIBRARY)
cloud_file_system_test: cloud/cloud_file_system_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

gcp_file_system_test: cloud/gcp/gcp_file_system_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

gcp_db_cloud_test: cloud/gcp/gcp_db_cloud_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

cloud_manifest_test: cloud/cloud_manifest_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down Expand Up @@ -2618,4 +2624,4 @@ list_all_tests:
ROCKS_DEP_RULES=$(filter-out clean format check-format check-buck-targets check-headers check-sources jclean jtest package analyze tags rocksdbjavastatic% unity.% unity_test checkout_folly, $(MAKECMDGOALS))
ifneq ("$(ROCKS_DEP_RULES)", "")
-include $(DEPFILES)
endif
endif
10 changes: 9 additions & 1 deletion TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"//folly/synchronization:distributed_mutex",
], headers=None, link_whole=False, extra_test_libs=False)

<<<<<<< HEAD
cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"cache/cache.cc",
"cache/cache_entry_roles.cc",
Expand Down Expand Up @@ -4983,6 +4982,15 @@ cpp_unittest_wrapper(name="cloud_file_system_test",
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])

cpp_unittest_wrapper(name="gcp_file_system_test",
srcs=["cloud/gcp/gcp_file_system_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])

cpp_unittest_wrapper(name="gcp_db_cloud_test",
srcs=["cloud/gcp/gcp_db_cloud_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])

cpp_unittest_wrapper(name="cloud_manifest_test",
srcs=["cloud/cloud_manifest_test.cc"],
Expand Down
10 changes: 10 additions & 0 deletions build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,16 @@ if [ "${USE_AWS}XXX" = "1XXX" ]; then
COMMON_FLAGS="$COMMON_FLAGS $S3_CCFLAGS"
PLATFORM_LDFLAGS="$S3_LDFLAGS $PLATFORM_LDFLAGS"
fi

if [ "${USE_GCP}XXX" = "1XXX" ]; then
GCP_SDK=/usr/local
GCI=${GCP_SDK}/include/
GCS_CCFLAGS="$GCS_CCFLAGS -I$GCI -DUSE_GCP"
GCS_LDFLAGS="$GCS_LDFLAGS -lgoogle_cloud_cpp_common -lgoogle_cloud_cpp_storage -labsl_bad_variant_access -labsl_bad_optional_access"
COMMON_FLAGS="$COMMON_FLAGS $GCS_CCFLAGS"
PLATFORM_LDFLAGS="$GCS_LDFLAGS $PLATFORM_LDFLAGS"
fi

#
# Support the Kafka WAL storing if the env variable named USE_KAFKA
# is set to 1. Setting it to any other value or not setting it at all means
Expand Down
63 changes: 63 additions & 0 deletions cloud/cloud_file_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
#include <unordered_map>

#include "cloud/aws/aws_file_system.h"
#include "rocksdb/cloud/cloud_file_system_impl.h"
#include "cloud/cloud_log_controller_impl.h"
#include "cloud/cloud_manifest.h"
#include "cloud/db_cloud_impl.h"
#include "cloud/filename.h"
#include "cloud/gcp/gcp_file_system.h"
#include "env/composite_env_wrapper.h"
#include "options/configurable_helper.h"
#include "options/options_helper.h"
Expand Down Expand Up @@ -448,7 +450,30 @@ Status CloudFileSystemEnv::NewAwsFileSystem(
return NewAwsFileSystem(base_fs, options, logger, cfs);
}

Status CloudFileSystemEnv::NewGcpFileSystem(
const std::shared_ptr<FileSystem>& base_fs,
const std::string& src_cloud_bucket, const std::string& src_cloud_object,
const std::string& src_cloud_region, const std::string& dest_cloud_bucket,
const std::string& dest_cloud_object, const std::string& dest_cloud_region,
const CloudFileSystemOptions& cloud_options,
const std::shared_ptr<Logger>& logger, CloudFileSystem** cfs) {
CloudFileSystemOptions options = cloud_options;
if (!src_cloud_bucket.empty())
options.src_bucket.SetBucketName(src_cloud_bucket);
if (!src_cloud_object.empty())
options.src_bucket.SetObjectPath(src_cloud_object);
if (!src_cloud_region.empty()) options.src_bucket.SetRegion(src_cloud_region);
if (!dest_cloud_bucket.empty())
options.dest_bucket.SetBucketName(dest_cloud_bucket);
if (!dest_cloud_object.empty())
options.dest_bucket.SetObjectPath(dest_cloud_object);
if (!dest_cloud_region.empty())
options.dest_bucket.SetRegion(dest_cloud_region);
return NewGcpFileSystem(base_fs, options, logger, cfs);
}

int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) {
(void) arg; // Suppress unused parameter warning
int count = 0;
// Register the FileSystem types
library.AddFactory<FileSystem>(
Expand All @@ -462,6 +487,11 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) {
});
count++;

#ifdef USE_GCP
count += CloudFileSystemImpl::RegisterGcpObjects(library, arg);
#endif

#ifdef USE_AWS
count += CloudFileSystemImpl::RegisterAwsObjects(library, arg);

// Register the Cloud Log Controllers
Expand All @@ -477,6 +507,7 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) {
return guard->get();
});
count++;
#endif

return count;
}
Expand Down Expand Up @@ -638,6 +669,38 @@ Status CloudFileSystemEnv::NewAwsFileSystem(
}
#endif

#ifndef USE_GCP
Status CloudFileSystemEnv::NewGcpFileSystem(
const std::shared_ptr<FileSystem>& /*base_fs*/,
const CloudFileSystemOptions& /*options*/,
const std::shared_ptr<Logger>& /*logger*/, CloudFileSystem** /*cfs*/) {
return Status::NotSupported("RocksDB Cloud not compiled with GCP support");
}
#else
Status CloudFileSystemEnv::NewGcpFileSystem(
const std::shared_ptr<FileSystem>& base_fs,
const CloudFileSystemOptions& options,
const std::shared_ptr<Logger>& logger, CloudFileSystem** cfs) {
CloudFileSystemEnv::RegisterCloudObjects();
// Dump out cloud fs options
options.Dump(logger.get());

Status st = GcpFileSystem::NewGcpFileSystem(base_fs, options, logger, cfs);
if (st.ok()) {
// store a copy to the logger
auto* cloud = static_cast<CloudFileSystemImpl*>(*cfs);
cloud->info_log_ = logger;

// start the purge thread only if there is a destination bucket
if (options.dest_bucket.IsValid() && options.run_purger) {
cloud->purge_thread_ = std::thread([cloud] { cloud->Purger(); });
}
}

return st;
}
#endif

std::unique_ptr<Env> CloudFileSystemEnv::NewCompositeEnv(
Env* env, const std::shared_ptr<FileSystem>& fs) {
return std::make_unique<CompositeEnvWrapper>(env, fs);
Expand Down
2 changes: 1 addition & 1 deletion cloud/cloud_file_system_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2283,7 +2283,7 @@ IOStatus CloudFileSystemImpl::FindAllLiveFiles(
// filename will be remapped correctly based on current_epoch of
// cloud_manifest
*manifest_file =
RemapFilename(ManifestFileWithEpoch("" /* dbname */, "" /* epoch */));
RemapFilename(ManifestFileWithEpoch("" /* epoch */));

RemapFileNumbers(file_nums, live_sst_files);

Expand Down
28 changes: 28 additions & 0 deletions cloud/cloud_file_system_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
// Copyright (c) 2017 Rockset
#ifndef ROCKSDB_LITE

#ifdef USE_AWS

#include "rocksdb/cloud/cloud_file_system.h"

Expand All @@ -11,6 +14,8 @@
#include "test_util/testharness.h"
#include "util/string_util.h"

#include <aws/core/Aws.h>

namespace ROCKSDB_NAMESPACE {

TEST(CloudFileSystemTest, TestBucket) {
Expand Down Expand Up @@ -242,5 +247,28 @@ TEST(CloudFileSystemTest, ConfigureKafkaController) {

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
Aws::InitAPI(Aws::SDKOptions());
return RUN_ALL_TESTS();
}

#else // USE_AWS

#include <stdio.h>

int main(int, char**) {
fprintf(stderr,
"SKIPPED as DBCloud is supported only when USE_AWS is defined.\n");
return 0;
}
#endif // USE_AWS

#else // ROCKSDB_LITE

#include <stdio.h>

int main(int, char**) {
fprintf(stderr, "SKIPPED as DBCloud is not supported in ROCKSDB_LITE\n");
return 0;
}

#endif // ROCKSDB_LITE
10 changes: 9 additions & 1 deletion cloud/db_cloud_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1766,8 +1766,13 @@ TEST_F(CloudTest, CheckpointToCloud) {

auto checkpoint_bucket = cloud_fs_options_.dest_bucket;

std::string ckpt_from_object_path =
cloud_fs_options_.dest_bucket.GetObjectPath();
ckpt_from_object_path += "_from";
cloud_fs_options_.src_bucket = BucketOptions();
cloud_fs_options_.src_bucket.SetObjectPath(ckpt_from_object_path);
cloud_fs_options_.dest_bucket = BucketOptions();
cloud_fs_options_.dest_bucket.SetObjectPath(ckpt_from_object_path);

// Create a DB with two files
OpenDB();
Expand All @@ -1783,6 +1788,9 @@ TEST_F(CloudTest, CheckpointToCloud) {
CloseDB();

DestroyDir(dbname_);
GetCloudFileSystem()->GetStorageProvider()->EmptyBucket(
checkpoint_bucket.GetBucketName(),
cloud_fs_options_.dest_bucket.GetObjectPath());

cloud_fs_options_.src_bucket = checkpoint_bucket;

Expand Down Expand Up @@ -3251,4 +3259,4 @@ int main(int, char**) {
return 0;
}

#endif // !ROCKSDB_LITE
#endif // !ROCKSDB_LITE
28 changes: 27 additions & 1 deletion cloud/filename.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,32 @@ inline bool IsCloudManifestFile(const std::string& pathname) {
return false;
}

inline std::string ReduceSlashes(const std::string& pathname)
{
std::string result;
const char slash = '/';

bool previous_was_slash = false;
for (char c : pathname)
{
if (c == slash)
{
if (!previous_was_slash)
{
result += c;
previous_was_slash = true;
}
}
else
{
result += c;
previous_was_slash = false;
}
}

return result;
}

enum class RocksDBFileType {
kSstFile,
kLogFile,
Expand Down Expand Up @@ -228,4 +254,4 @@ inline RocksDBFileType GetFileType(const std::string& fname_with_epoch) {
return RocksDBFileType::kUnknown;
}

} // namespace
} // namespace
Loading