diff --git a/helio b/helio index 3b4cbb6fb928..3aa2abe16f3f 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 3b4cbb6fb9288d85ecb46e8f64a077938c5c12f6 +Subproject commit 3aa2abe16f3fe0e05fa837598556e46688f6b044 diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 9a83d83064a7..f7bbd89d80b5 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -112,6 +112,11 @@ if (WITH_AWS) add_definitions(-DWITH_AWS) endif() +if (WITH_AWS_CLOUD) + SET(AWS_LIB ${AWS_LIB} aws_cloud_lib) + add_definitions(-DWITH_AWS_CLOUD) +endif() + if (WITH_GCP) SET(GCP_LIB gcp_lib) add_definitions(-DWITH_GCP) diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc index 73f4b7662cca..39e6c51306a6 100644 --- a/src/server/detail/snapshot_storage.cc +++ b/src/server/detail/snapshot_storage.cc @@ -521,9 +521,11 @@ error_code AzureSnapshotStorage::CheckPath(const std::string& path) { return {}; } -#ifdef WITH_AWS +#if defined(WITH_AWS_CLOUD) || defined(WITH_AWS) + AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https, bool ec2_metadata, bool sign_payload) { +#ifdef WITH_AWS shard_set->pool()->GetNextProactor()->Await([&] { if (!ec2_metadata) { setenv("AWS_EC2_METADATA_DISABLED", "true", 0); @@ -546,10 +548,32 @@ AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool htt std::make_shared(endpoint, https); s3_ = std::make_shared(credentials_provider, endpoint_provider, s3_conf); }); +#endif +} + +AwsS3SnapshotStorage::~AwsS3SnapshotStorage() { +#if defined(WITH_AWS_CLOUD) && !defined(WITH_AWS) + util::http::TlsClient::FreeContext(ctx_); +#endif +} + +error_code AwsS3SnapshotStorage::Init(unsigned connect_ms) { +#if defined(WITH_AWS_CLOUD) && !defined(WITH_AWS) + error_code ec = creds_provider_.Init(connect_ms); + if (ec) + return ec; + + ctx_ = util::http::TlsClient::CreateSslContext(); + if (!ctx_) { + return make_error_code(std::errc::operation_not_permitted); + } +#endif + return {}; } io::Result, GenericError> AwsS3SnapshotStorage::OpenWriteFile( const std::string& path) { +#ifdef WITH_AWS optional> bucket_path = GetBucketPath(path); if (!bucket_path) { return nonstd::make_unexpected(GenericError("Invalid S3 path")); @@ -572,16 +596,26 @@ io::Result, GenericError> AwsS3SnapshotStorage::Op result = std::pair(f, FileType::CLOUD); }); fb.Join(); + return result; +#else + return nonstd::make_unexpected(GenericError("AWS support not compiled in")); +#endif } -io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile(const std::string& path) { +io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile( + [[maybe_unused]] const std::string& path) { +#ifdef WITH_AWS + VLOG(1) << "Opening S3 read file: " << path; std::optional> bucket_path = GetBucketPath(path); if (!bucket_path) { return nonstd::make_unexpected(GenericError("Invalid S3 path")); } auto [bucket, key] = *bucket_path; return new aws::S3ReadFile(bucket, key, s3_); +#else + return nonstd::make_unexpected(GenericError("AWS support not compiled in")); +#endif } io::Result AwsS3SnapshotStorage::LoadPath(std::string_view dir, @@ -608,6 +642,8 @@ io::Result AwsS3SnapshotStorage::LoadPath(std::string io::Result, GenericError> AwsS3SnapshotStorage::ExpandFromPath( const string& load_path) { + VLOG(1) << "Expanding S3 path: " << load_path; + optional> bucket_path = GetBucketPath(load_path); if (!bucket_path) { return nonstd::make_unexpected( @@ -653,6 +689,8 @@ error_code AwsS3SnapshotStorage::CheckPath(const std::string& path) { io::Result, GenericError> AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view prefix) { + VLOG(1) << "Listing S3 objects in bucket: " << bucket_name << " with prefix: " << prefix; + // Each list objects request has a 1000 object limit, so page through the // objects if needed. std::string continuation_token; @@ -661,6 +699,7 @@ AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view // We use a random proactor because this function might be called from the main thread. fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor(); +#if defined(WITH_AWS) do { Aws::S3::Model::ListObjectsV2Request request; request.SetBucket(std::string(bucket_name)); @@ -716,9 +755,30 @@ AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view outcome.GetError().GetExceptionName()}); } } while (!continuation_token.empty()); +#elif defined(WITH_AWS_CLOUD) + string adjusted_prefix; + if (!prefix.empty()) { + adjusted_prefix = prefix.back() == '/' ? prefix : absl::StrCat(prefix, "/"); + } + + error_code ec = proactor->Await([&]() -> error_code { + cloud::aws::S3Storage s3(&creds_provider_, ctx_, proactor); + return s3.List(bucket_name, adjusted_prefix, false, 1000, + [&keys](const cloud::StorageListItem& item) { + keys.emplace_back(string(item.key), item.mtime_ns); + }); + }); + + if (ec) { + return nonstd::make_unexpected(GenericError(ec, "Failed list objects in S3 bucket")); + } +#else + return nonstd::make_unexpected(GenericError("AWS support not compiled in")); +#endif return keys; } -#endif + +#endif // WITH_AWS_CLOUD || WITH_AWS #ifdef __linux__ io::Result LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) { diff --git a/src/server/detail/snapshot_storage.h b/src/server/detail/snapshot_storage.h index d650302dab2d..b4c936c69f0f 100644 --- a/src/server/detail/snapshot_storage.h +++ b/src/server/detail/snapshot_storage.h @@ -7,6 +7,11 @@ #include #endif +#ifdef WITH_AWS_CLOUD +#include "util/cloud/aws/aws_creds_provider.h" +#include "util/cloud/aws/s3_storage.h" +#endif + #ifdef WITH_GCP #include "util/cloud/gcp/gcp_creds_provider.h" #include "util/cloud/gcp/gcs.h" @@ -161,11 +166,14 @@ class AzureSnapshotStorage : public SnapshotStorage { SSL_CTX* ctx_ = NULL; }; -#ifdef WITH_AWS +#if defined(WITH_AWS_CLOUD) || defined(WITH_AWS) class AwsS3SnapshotStorage : public SnapshotStorage { public: AwsS3SnapshotStorage(const std::string& endpoint, bool https, bool ec2_metadata, bool sign_payload); + ~AwsS3SnapshotStorage(); + + std::error_code Init(unsigned connect_ms); io::Result, GenericError> OpenWriteFile( const std::string& path) override; @@ -189,7 +197,12 @@ class AwsS3SnapshotStorage : public SnapshotStorage { io::Result, GenericError> ListObjects(std::string_view bucket_name, std::string_view prefix); +#ifdef WITH_AWS std::shared_ptr s3_; +#elif WITH_AWS_CLOUD + util::cloud::aws::AwsCredsProvider creds_provider_; + SSL_CTX* ctx_ = nullptr; +#endif }; #endif diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 12e2980886c7..8eada9fc1ad8 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -297,11 +297,20 @@ string UnknownCmd(string cmd, CmdArgList args) { std::shared_ptr CreateCloudSnapshotStorage(std::string_view uri) { if (detail::IsS3Path(uri)) { +#if defined(WITH_AWS) || defined(WITH_AWS_CLOUD) #ifdef WITH_AWS shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); }); - return std::make_shared( +#endif + auto aws = std::make_shared( absl::GetFlag(FLAGS_s3_endpoint), absl::GetFlag(FLAGS_s3_use_https), absl::GetFlag(FLAGS_s3_ec2_metadata), absl::GetFlag(FLAGS_s3_sign_payload)); + auto ec = shard_set->pool()->GetNextProactor()->Await( + [&] { return aws->Init(detail::kBucketConnectMs); }); + if (ec) { + LOG(ERROR) << "Failed to initialize AWS S3 snapshot storage: " << ec.message(); + exit(1); + } + return aws; #else LOG(ERROR) << "Compiled without AWS support"; exit(1);