From 52d95e1f9eb4ab0daa2bb94ec0ebda64ae95e0ea Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 11 Jul 2024 22:34:22 +0300 Subject: [PATCH] chore: implement object listing (#293) --- examples/gcs_demo.cc | 25 +++++---- strings/CMakeLists.txt | 2 +- strings/escaping.cc | 41 ++++++++++++++ strings/escaping.h | 13 +++++ util/cloud/gcp/CMakeLists.txt | 2 +- util/cloud/gcp/gcs.cc | 102 ++++++++++++++++++++++++++++------ util/cloud/gcp/gcs.h | 10 +++- 7 files changed, 166 insertions(+), 29 deletions(-) create mode 100644 strings/escaping.cc create mode 100644 strings/escaping.h diff --git a/examples/gcs_demo.cc b/examples/gcs_demo.cc index 8539b19e..5aeedd4b 100644 --- a/examples/gcs_demo.cc +++ b/examples/gcs_demo.cc @@ -14,10 +14,11 @@ using namespace util; using absl::GetFlag; ABSL_FLAG(string, bucket, "", ""); +ABSL_FLAG(string, prefix, "", ""); + ABSL_FLAG(uint32_t, connect_ms, 2000, ""); ABSL_FLAG(bool, epoll, false, "Whether to use epoll instead of io_uring"); - void Run(SSL_CTX* ctx) { fb2::ProactorBase* pb = fb2::ProactorBase::me(); cloud::GCPCredsProvider provider; @@ -28,11 +29,18 @@ void Run(SSL_CTX* ctx) { cloud::GCS gcs(&provider, ctx, pb); ec = gcs.Connect(connect_ms); CHECK(!ec) << "Could not connect " << ec; - auto cb = [](std::string_view bname) { - CONSOLE_INFO << bname; - }; - ec = gcs.ListBuckets(cb); + string prefix = GetFlag(FLAGS_prefix); + if (!prefix.empty()) { + auto cb = [](cloud::GCS::ObjectItem item) { + cout << "Object: " << item.key << ", size: " << item.size << endl; + }; + ec = gcs.List(GetFlag(FLAGS_bucket), prefix, false, cb); + } else { + auto cb = [](std::string_view bname) { CONSOLE_INFO << bname; }; + + ec = gcs.ListBuckets(cb); + } CHECK(!ec) << ec.message(); } @@ -53,12 +61,9 @@ int main(int argc, char** argv) { pp->Run(); - SSL_CTX* ctx = util::http::TlsClient::CreateSslContext(); - pp->GetNextProactor()->Await([ctx] { - Run(ctx); - }); + SSL_CTX* ctx = util::http::TlsClient::CreateSslContext(); + pp->GetNextProactor()->Await([ctx] { Run(ctx); }); util::http::TlsClient::FreeContext(ctx); - return 0; } diff --git a/strings/CMakeLists.txt b/strings/CMakeLists.txt index a638f878..579d8d9b 100644 --- a/strings/CMakeLists.txt +++ b/strings/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(strings_lib human_readable.cc) +add_library(strings_lib escaping.cc human_readable.cc) cxx_link(strings_lib base absl::strings absl::str_format) cxx_test(strings_test strings_lib LABELS CI) \ No newline at end of file diff --git a/strings/escaping.cc b/strings/escaping.cc new file mode 100644 index 00000000..84cdae14 --- /dev/null +++ b/strings/escaping.cc @@ -0,0 +1,41 @@ +// Copyright 2024, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#include + +namespace strings { +using namespace std; + +inline bool IsValidUrlChar(char ch) { + return absl::ascii_isalnum(ch) || ch == '-' || ch == '_' || ch == '_' || ch == '.' || ch == '!' || + ch == '~' || ch == '*' || ch == '(' || ch == ')'; +} + +static size_t InternalUrlEncode(absl::string_view src, char* dest) { + static const char digits[] = "0123456789ABCDEF"; + + char* start = dest; + for (char ch_c : src) { + unsigned char ch = static_cast(ch_c); + if (IsValidUrlChar(ch)) { + *dest++ = ch_c; + } else { + *dest++ = '%'; + *dest++ = digits[(ch >> 4) & 0x0F]; + *dest++ = digits[ch & 0x0F]; + } + } + *dest = 0; + + return static_cast(dest - start); +} + +void AppendUrlEncoded(const std::string_view src, string* dest) { + size_t sz = dest->size(); + dest->resize(dest->size() + src.size() * 3 + 1); + char* next = &dest->front() + sz; + size_t written = InternalUrlEncode(src, next); + dest->resize(sz + written); +} + +} // namespace strings \ No newline at end of file diff --git a/strings/escaping.h b/strings/escaping.h new file mode 100644 index 00000000..bf9434b9 --- /dev/null +++ b/strings/escaping.h @@ -0,0 +1,13 @@ +// Copyright 2024, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +namespace strings { + +void AppendUrlEncoded(const std::string_view src, std::string* dest); + +} // namespace strings \ No newline at end of file diff --git a/util/cloud/gcp/CMakeLists.txt b/util/cloud/gcp/CMakeLists.txt index 2ce50be3..d19f6d62 100644 --- a/util/cloud/gcp/CMakeLists.txt +++ b/util/cloud/gcp/CMakeLists.txt @@ -1,3 +1,3 @@ add_library(gcp_lib gcs.cc) -cxx_link(gcp_lib http_client_lib TRDP::rapidjson) +cxx_link(gcp_lib http_client_lib strings_lib TRDP::rapidjson) diff --git a/util/cloud/gcp/gcs.cc b/util/cloud/gcp/gcs.cc index 0977f027..c5202291 100644 --- a/util/cloud/gcp/gcs.cc +++ b/util/cloud/gcp/gcs.cc @@ -14,6 +14,7 @@ #include "io/file.h" #include "io/file_util.h" #include "io/line_reader.h" +#include "strings/escaping.h" using namespace std; namespace h2 = boost::beast::http; @@ -31,17 +32,19 @@ auto Unexpected(std::errc code) { return nonstd::make_unexpected(make_error_code(code)); } -#define RETURN_UNEXPECTED(x) do { \ - auto ec = (x); \ - if (ec) \ - return nonstd::make_unexpected(ec); \ -} while(false) +#define RETURN_UNEXPECTED(x) \ + do { \ + auto ec = (x); \ + if (ec) \ + return nonstd::make_unexpected(ec); \ + } while (false) -#define RETURN_ERROR(x) do { \ - auto ec = (x); \ - if (ec) \ - return ec; \ -} while (false) +#define RETURN_ERROR(x) \ + do { \ + auto ec = (x); \ + if (ec) \ + return ec; \ + } while (false) string AuthHeader(string_view access_token) { return absl::StrCat("Bearer ", access_token); @@ -185,7 +188,7 @@ io::Result SendWithToken(GCPCredsProvider* provider, http::Clien EmptyParserPtr parser(new h2::response_parser()); RETURN_UNEXPECTED(client->ReadHeader(parser.get())); - VLOG(1) << "RespHeader" << i << ": " << parser.get(); + VLOG(1) << "RespHeader" << i << ": " << parser->get(); if (parser->get().result() == h2::status::ok) { return parser; @@ -204,6 +207,11 @@ io::Result SendWithToken(GCPCredsProvider* provider, http::Clien return nonstd::make_unexpected(ec); } +#define FETCH_ARRAY_MEMBER(val) \ + if (!(val).IsArray()) \ + return make_error_code(errc::bad_message); \ + auto array = val.GetArray() + } // namespace error_code GCPCredsProvider::Init(unsigned connect_ms, fb2::ProactorBase* pb) { @@ -335,11 +343,7 @@ error_code GCS::ListBuckets(ListBucketCb cb) { if (it == doc.MemberEnd()) break; - const auto& val = it->value; - if (!val.IsArray()) { - return make_error_code(errc::bad_message); - } - auto array = val.GetArray(); + FETCH_ARRAY_MEMBER(it->value); for (size_t i = 0; i < array.Size(); ++i) { const auto& item = array[i]; @@ -359,5 +363,71 @@ error_code GCS::ListBuckets(ListBucketCb cb) { return {}; } +error_code GCS::List(string_view bucket, string_view prefix, bool recursive, + ListObjectCb cb) { + CHECK(!bucket.empty()); + + string url = "/storage/v1/b/"; + absl::StrAppend(&url, bucket, "/o?maxResults=200&prefix="); + strings::AppendUrlEncoded(prefix, &url); + if (!recursive) { + absl::StrAppend(&url, "&delimiter=%2f"); + } + auto http_req = PrepareRequest(h2::verb::get, url, creds_provider_.access_token()); + + rj::Document doc; + while (true) { + io::Result parse_res = + SendWithToken(&creds_provider_, client_.get(), &http_req); + if (!parse_res) + return parse_res.error(); + EmptyParserPtr empty_parser = std::move(*parse_res); + h2::response_parser resp(std::move(*empty_parser)); + RETURN_ERROR(client_->Recv(&resp)); + + auto msg = resp.release(); + + doc.ParseInsitu(&msg.body().front()); + if (doc.HasParseError()) { + return make_error_code(errc::bad_message); + } + + auto it = doc.FindMember("items"); + if (it != doc.MemberEnd()) { + FETCH_ARRAY_MEMBER(it->value); + + for (size_t i = 0; i < array.Size(); ++i) { + const auto& item = array[i]; + auto it = item.FindMember("name"); + CHECK(it != item.MemberEnd()); + absl::string_view key_name(it->value.GetString(), it->value.GetStringLength()); + it = item.FindMember("size"); + CHECK(it != item.MemberEnd()); + absl::string_view sz_str(it->value.GetString(), it->value.GetStringLength()); + size_t item_size = 0; + CHECK(absl::SimpleAtoi(sz_str, &item_size)); + cb(ObjectItem{item_size, key_name, false}); + } + } + it = doc.FindMember("prefixes"); + if (it != doc.MemberEnd()) { + FETCH_ARRAY_MEMBER(it->value); + for (size_t i = 0; i < array.Size(); ++i) { + const auto& item = array[i]; + absl::string_view str(item.GetString(), item.GetStringLength()); + cb(ObjectItem{0, str, true}); + } + } + + it = doc.FindMember("nextPageToken"); + if (it == doc.MemberEnd()) { + break; + } + absl::string_view page_token{it->value.GetString(), it->value.GetStringLength()}; + http_req.target(absl::StrCat(url, "&pageToken=", page_token)); + } + return {}; +} + } // namespace cloud } // namespace util \ No newline at end of file diff --git a/util/cloud/gcp/gcs.h b/util/cloud/gcp/gcs.h index 382b6093..a2c5cbb7 100644 --- a/util/cloud/gcp/gcs.h +++ b/util/cloud/gcp/gcs.h @@ -70,7 +70,14 @@ class GCPCredsProvider { class GCS { public: using BucketItem = std::string_view; + struct ObjectItem { + size_t size; + std::string_view key; + bool is_prefix; + }; + using ListBucketCb = std::function; + using ListObjectCb = std::function; GCS(GCPCredsProvider* creds_provider, SSL_CTX* ssl_cntx, fb2::ProactorBase* pb); ~GCS(); @@ -78,7 +85,8 @@ class GCS { std::error_code Connect(unsigned msec); std::error_code ListBuckets(ListBucketCb cb); - + std::error_code List(std::string_view bucket, std::string_view prefix, bool recursive, + ListObjectCb cb); private: GCPCredsProvider& creds_provider_; SSL_CTX* ssl_ctx_;