From 89e2fc6c5af6ef29b361d70d74e94eff73db588f Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 2 Apr 2026 23:47:32 +0300 Subject: [PATCH 1/2] chore: add WITH_AWS_CLOUD support for S3 snapshot storage Add alternative S3 implementation using helio's native S3Storage and AwsCredsProvider when WITH_AWS_CLOUD is ON, removing the dependency on the AWS C++ SDK. Changes: - Add AwsS3SnapshotStorage::Init() to initialize credentials and SSL context (matching GCS/Azure pattern) - Implement ListObjects using cloud::aws::S3Storage::List - Add destructor to free SSL context - Wire up Init call in CreateCloudSnapshotStorage - Update CMakeLists.txt to link aws_cloud_lib - Update helio submodule with S3Storage virtual-hosted-style URLs and SigV4 signing fixes This PR substitutes ListObjects implementation ONLY if WITH_AWS is disabled and WITH_AWS_CLOUD enabled. Signed-off-by: Roman Gershman --- helio | 2 +- src/server/CMakeLists.txt | 5 ++ src/server/detail/snapshot_storage.cc | 68 ++++++++++++++++++++++++++- src/server/detail/snapshot_storage.h | 15 +++++- src/server/server_family.cc | 11 ++++- 5 files changed, 96 insertions(+), 5 deletions(-) diff --git a/helio b/helio index 3b4cbb6fb928..3aa2abe16f3f 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 3b4cbb6fb9288d85ecb46e8f64a077938c5c12f6 +Subproject commit 3aa2abe16f3fe0e05fa837598556e46688f6b044 diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 9a83d83064a7..f7bbd89d80b5 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -112,6 +112,11 @@ if (WITH_AWS) add_definitions(-DWITH_AWS) endif() +if (WITH_AWS_CLOUD) + SET(AWS_LIB ${AWS_LIB} aws_cloud_lib) + add_definitions(-DWITH_AWS_CLOUD) +endif() + if (WITH_GCP) SET(GCP_LIB gcp_lib) add_definitions(-DWITH_GCP) diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc index 73f4b7662cca..c3a0b4a60735 100644 --- a/src/server/detail/snapshot_storage.cc +++ b/src/server/detail/snapshot_storage.cc @@ -521,10 +521,12 @@ error_code AzureSnapshotStorage::CheckPath(const std::string& path) { return {}; } -#ifdef WITH_AWS +#if defined(WITH_AWS_CLOUD) || defined(WITH_AWS) + AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https, bool ec2_metadata, bool sign_payload) { shard_set->pool()->GetNextProactor()->Await([&] { +#ifdef WITH_AWS if (!ec2_metadata) { setenv("AWS_EC2_METADATA_DISABLED", "true", 0); } @@ -545,11 +547,33 @@ AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool htt std::shared_ptr endpoint_provider = std::make_shared(endpoint, https); s3_ = std::make_shared(credentials_provider, endpoint_provider, s3_conf); +#endif }); } +AwsS3SnapshotStorage::~AwsS3SnapshotStorage() { +#if defined(WITH_AWS_CLOUD) && !defined(WITH_AWS) + util::http::TlsClient::FreeContext(ctx_); +#endif +} + +error_code AwsS3SnapshotStorage::Init(unsigned connect_ms) { +#if defined(WITH_AWS_CLOUD) && !defined(WITH_AWS) + error_code ec = creds_provider_.Init(connect_ms); + if (ec) + return ec; + + ctx_ = util::http::TlsClient::CreateSslContext(); + if (!ctx_) { + return make_error_code(std::errc::operation_not_permitted); + } +#endif + return {}; +} + io::Result, GenericError> AwsS3SnapshotStorage::OpenWriteFile( const std::string& path) { +#ifdef WITH_AWS optional> bucket_path = GetBucketPath(path); if (!bucket_path) { return nonstd::make_unexpected(GenericError("Invalid S3 path")); @@ -572,16 +596,26 @@ io::Result, GenericError> AwsS3SnapshotStorage::Op result = std::pair(f, FileType::CLOUD); }); fb.Join(); + return result; +#else + return nonstd::make_unexpected(GenericError("AWS support not compiled in")); +#endif } io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile(const std::string& path) { + VLOG(1) << "Opening S3 read file: " << path; + +#ifdef WITH_AWS std::optional> bucket_path = GetBucketPath(path); if (!bucket_path) { return nonstd::make_unexpected(GenericError("Invalid S3 path")); } auto [bucket, key] = *bucket_path; return new aws::S3ReadFile(bucket, key, s3_); +#else + return nonstd::make_unexpected(GenericError("AWS support not compiled in")); +#endif } io::Result AwsS3SnapshotStorage::LoadPath(std::string_view dir, @@ -608,6 +642,8 @@ io::Result AwsS3SnapshotStorage::LoadPath(std::string io::Result, GenericError> AwsS3SnapshotStorage::ExpandFromPath( const string& load_path) { + VLOG(1) << "Expanding S3 path: " << load_path; + optional> bucket_path = GetBucketPath(load_path); if (!bucket_path) { return nonstd::make_unexpected( @@ -653,6 +689,8 @@ error_code AwsS3SnapshotStorage::CheckPath(const std::string& path) { io::Result, GenericError> AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view prefix) { + VLOG(1) << "Listing S3 objects in bucket: " << bucket_name << " with prefix: " << prefix; + // Each list objects request has a 1000 object limit, so page through the // objects if needed. std::string continuation_token; @@ -661,6 +699,7 @@ AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view // We use a random proactor because this function might be called from the main thread. fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor(); +#if defined(WITH_AWS) do { Aws::S3::Model::ListObjectsV2Request request; request.SetBucket(std::string(bucket_name)); @@ -716,9 +755,34 @@ AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view outcome.GetError().GetExceptionName()}); } } while (!continuation_token.empty()); +#elif defined(WITH_AWS_CLOUD) + string adjusted_prefix; + if (!prefix.empty()) { + if (prefix.back() == '/') { + adjusted_prefix = prefix; + } else { + adjusted_prefix = absl::StrCat(prefix, "/"); + } + } + + error_code ec = proactor->Await([&]() -> error_code { + cloud::aws::S3Storage s3(&creds_provider_, ctx_, proactor); + return s3.List(bucket_name, adjusted_prefix, false, 1000, + [&keys](const cloud::StorageListItem& item) { + keys.emplace_back(string(item.key), item.mtime_ns); + }); + }); + + if (ec) { + return nonstd::make_unexpected(GenericError(ec, "Failed list objects in S3 bucket")); + } +#else + return nonstd::make_unexpected(GenericError("AWS support not compiled in")); +#endif return keys; } -#endif + +#endif // WITH_AWS_CLOUD || WITH_AWS #ifdef __linux__ io::Result LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) { diff --git a/src/server/detail/snapshot_storage.h b/src/server/detail/snapshot_storage.h index d650302dab2d..b4c936c69f0f 100644 --- a/src/server/detail/snapshot_storage.h +++ b/src/server/detail/snapshot_storage.h @@ -7,6 +7,11 @@ #include #endif +#ifdef WITH_AWS_CLOUD +#include "util/cloud/aws/aws_creds_provider.h" +#include "util/cloud/aws/s3_storage.h" +#endif + #ifdef WITH_GCP #include "util/cloud/gcp/gcp_creds_provider.h" #include "util/cloud/gcp/gcs.h" @@ -161,11 +166,14 @@ class AzureSnapshotStorage : public SnapshotStorage { SSL_CTX* ctx_ = NULL; }; -#ifdef WITH_AWS +#if defined(WITH_AWS_CLOUD) || defined(WITH_AWS) class AwsS3SnapshotStorage : public SnapshotStorage { public: AwsS3SnapshotStorage(const std::string& endpoint, bool https, bool ec2_metadata, bool sign_payload); + ~AwsS3SnapshotStorage(); + + std::error_code Init(unsigned connect_ms); io::Result, GenericError> OpenWriteFile( const std::string& path) override; @@ -189,7 +197,12 @@ class AwsS3SnapshotStorage : public SnapshotStorage { io::Result, GenericError> ListObjects(std::string_view bucket_name, std::string_view prefix); +#ifdef WITH_AWS std::shared_ptr s3_; +#elif WITH_AWS_CLOUD + util::cloud::aws::AwsCredsProvider creds_provider_; + SSL_CTX* ctx_ = nullptr; +#endif }; #endif diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 12e2980886c7..8eada9fc1ad8 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -297,11 +297,20 @@ string UnknownCmd(string cmd, CmdArgList args) { std::shared_ptr CreateCloudSnapshotStorage(std::string_view uri) { if (detail::IsS3Path(uri)) { +#if defined(WITH_AWS) || defined(WITH_AWS_CLOUD) #ifdef WITH_AWS shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); }); - return std::make_shared( +#endif + auto aws = std::make_shared( absl::GetFlag(FLAGS_s3_endpoint), absl::GetFlag(FLAGS_s3_use_https), absl::GetFlag(FLAGS_s3_ec2_metadata), absl::GetFlag(FLAGS_s3_sign_payload)); + auto ec = shard_set->pool()->GetNextProactor()->Await( + [&] { return aws->Init(detail::kBucketConnectMs); }); + if (ec) { + LOG(ERROR) << "Failed to initialize AWS S3 snapshot storage: " << ec.message(); + exit(1); + } + return aws; #else LOG(ERROR) << "Compiled without AWS support"; exit(1); From a530e6648f9dac9da64e9bb0626ed0ac0bcb0592 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sat, 4 Apr 2026 11:35:12 +0300 Subject: [PATCH 2/2] chore: comments --- src/server/detail/snapshot_storage.cc | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc index c3a0b4a60735..39e6c51306a6 100644 --- a/src/server/detail/snapshot_storage.cc +++ b/src/server/detail/snapshot_storage.cc @@ -525,8 +525,8 @@ error_code AzureSnapshotStorage::CheckPath(const std::string& path) { AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https, bool ec2_metadata, bool sign_payload) { - shard_set->pool()->GetNextProactor()->Await([&] { #ifdef WITH_AWS + shard_set->pool()->GetNextProactor()->Await([&] { if (!ec2_metadata) { setenv("AWS_EC2_METADATA_DISABLED", "true", 0); } @@ -547,8 +547,8 @@ AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool htt std::shared_ptr endpoint_provider = std::make_shared(endpoint, https); s3_ = std::make_shared(credentials_provider, endpoint_provider, s3_conf); -#endif }); +#endif } AwsS3SnapshotStorage::~AwsS3SnapshotStorage() { @@ -603,10 +603,10 @@ io::Result, GenericError> AwsS3SnapshotStorage::Op #endif } -io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile(const std::string& path) { - VLOG(1) << "Opening S3 read file: " << path; - +io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile( + [[maybe_unused]] const std::string& path) { #ifdef WITH_AWS + VLOG(1) << "Opening S3 read file: " << path; std::optional> bucket_path = GetBucketPath(path); if (!bucket_path) { return nonstd::make_unexpected(GenericError("Invalid S3 path")); @@ -758,11 +758,7 @@ AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view #elif defined(WITH_AWS_CLOUD) string adjusted_prefix; if (!prefix.empty()) { - if (prefix.back() == '/') { - adjusted_prefix = prefix; - } else { - adjusted_prefix = absl::StrCat(prefix, "/"); - } + adjusted_prefix = prefix.back() == '/' ? prefix : absl::StrCat(prefix, "/"); } error_code ec = proactor->Await([&]() -> error_code {