Skip to content

Commit 64b1301

Browse files
committed
chore: add WITH_AWS_CLOUD support for S3 snapshot storage
Add alternative S3 implementation using helio's native S3Storage and AwsCredsProvider when WITH_AWS_CLOUD is ON, removing the dependency on the AWS C++ SDK. Changes: - Add AwsS3SnapshotStorage::Init() to initialize credentials and SSL context (matching GCS/Azure pattern) - Implement ListObjects using cloud::aws::S3Storage::List - Add destructor to free SSL context - Wire up Init call in CreateCloudSnapshotStorage - Update CMakeLists.txt to link aws_cloud_lib - Update helio submodule with S3Storage virtual-hosted-style URLs and SigV4 signing fixes Signed-off-by: Roman Gershman <roman@dragonflydb.io>
1 parent 48f8d11 commit 64b1301

5 files changed

Lines changed: 99 additions & 5 deletions

File tree

src/server/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ if (WITH_AWS)
112112
add_definitions(-DWITH_AWS)
113113
endif()
114114

115+
if (WITH_AWS_CLOUD)
116+
SET(AWS_LIB ${AWS_LIB} aws_cloud_lib)
117+
add_definitions(-DWITH_AWS_CLOUD)
118+
endif()
119+
115120
if (WITH_GCP)
116121
SET(GCP_LIB gcp_lib)
117122
add_definitions(-DWITH_GCP)

src/server/detail/snapshot_storage.cc

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
#include "util/aws/s3_write_file.h"
2020
#endif
2121

22+
#ifdef WITH_AWS_CLOUD
23+
#include "util/cloud/aws/aws_creds_provider.h"
24+
#include "util/cloud/aws/s3_storage.h"
25+
#endif
26+
2227
#ifdef WITH_GCP
2328
#include "util/cloud/gcp/gcs_file.h"
2429
#endif
@@ -520,10 +525,11 @@ error_code AzureSnapshotStorage::CheckPath(const std::string& path) {
520525
return {};
521526
}
522527

523-
#ifdef WITH_AWS
528+
#if defined(WITH_AWS_CLOUD) || defined(WITH_AWS)
524529
AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https,
525530
bool ec2_metadata, bool sign_payload) {
526531
shard_set->pool()->GetNextProactor()->Await([&] {
532+
#ifdef WITH_AWS
527533
if (!ec2_metadata) {
528534
setenv("AWS_EC2_METADATA_DISABLED", "true", 0);
529535
}
@@ -544,11 +550,30 @@ AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool htt
544550
std::shared_ptr<Aws::S3::S3EndpointProviderBase> endpoint_provider =
545551
std::make_shared<aws::S3EndpointProvider>(endpoint, https);
546552
s3_ = std::make_shared<Aws::S3::S3Client>(credentials_provider, endpoint_provider, s3_conf);
553+
#endif
547554
});
548555
}
549556

557+
AwsS3SnapshotStorage::~AwsS3SnapshotStorage() {
558+
#ifdef WITH_AWS_CLOUD
559+
util::http::TlsClient::FreeContext(ctx_);
560+
#endif
561+
}
562+
563+
error_code AwsS3SnapshotStorage::Init(unsigned connect_ms) {
564+
#ifdef WITH_AWS_CLOUD
565+
error_code ec = creds_provider_.Init(connect_ms);
566+
if (ec)
567+
return ec;
568+
569+
ctx_ = util::http::TlsClient::CreateSslContext();
570+
#endif
571+
return {};
572+
}
573+
550574
io::Result<std::pair<io::Sink*, uint8_t>, GenericError> AwsS3SnapshotStorage::OpenWriteFile(
551575
const std::string& path) {
576+
#ifdef WITH_AWS
552577
optional<pair<string, string>> bucket_path = GetBucketPath(path);
553578
if (!bucket_path) {
554579
return nonstd::make_unexpected(GenericError("Invalid S3 path"));
@@ -571,16 +596,26 @@ io::Result<std::pair<io::Sink*, uint8_t>, GenericError> AwsS3SnapshotStorage::Op
571596
result = std::pair<io::Sink*, uint8_t>(f, FileType::CLOUD);
572597
});
573598
fb.Join();
599+
574600
return result;
601+
#else
602+
return nonstd::make_unexpected(GenericError("AWS support not compiled in"));
603+
#endif
575604
}
576605

577606
io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile(const std::string& path) {
607+
VLOG(1) << "Opening S3 read file: " << path;
608+
609+
#ifdef WITH_AWS
578610
std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(path);
579611
if (!bucket_path) {
580612
return nonstd::make_unexpected(GenericError("Invalid S3 path"));
581613
}
582614
auto [bucket, key] = *bucket_path;
583615
return new aws::S3ReadFile(bucket, key, s3_);
616+
#else
617+
return nonstd::make_unexpected(GenericError("AWS support not compiled in"));
618+
#endif
584619
}
585620

586621
io::Result<std::string, GenericError> AwsS3SnapshotStorage::LoadPath(std::string_view dir,
@@ -607,6 +642,8 @@ io::Result<std::string, GenericError> AwsS3SnapshotStorage::LoadPath(std::string
607642

608643
io::Result<vector<string>, GenericError> AwsS3SnapshotStorage::ExpandFromPath(
609644
const string& load_path) {
645+
VLOG(1) << "Expanding S3 path: " << load_path;
646+
610647
optional<pair<string, string>> bucket_path = GetBucketPath(load_path);
611648
if (!bucket_path) {
612649
return nonstd::make_unexpected(
@@ -652,6 +689,8 @@ error_code AwsS3SnapshotStorage::CheckPath(const std::string& path) {
652689

653690
io::Result<std::vector<AwsS3SnapshotStorage::SnapStat>, GenericError>
654691
AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view prefix) {
692+
VLOG(1) << "Listing S3 objects in bucket: " << bucket_name << " with prefix: " << prefix;
693+
655694
// Each list objects request has a 1000 object limit, so page through the
656695
// objects if needed.
657696
std::string continuation_token;
@@ -660,6 +699,7 @@ AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view
660699
// We use a random proactor because this function might be called from the main thread.
661700
fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
662701

702+
#if defined(WITH_AWS)
663703
do {
664704
Aws::S3::Model::ListObjectsV2Request request;
665705
request.SetBucket(std::string(bucket_name));
@@ -715,9 +755,34 @@ AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view
715755
outcome.GetError().GetExceptionName()});
716756
}
717757
} while (!continuation_token.empty());
758+
#elif defined(WITH_AWS_CLOUD)
759+
string adjusted_prefix;
760+
if (!prefix.empty()) {
761+
if (prefix.back() == '/') {
762+
adjusted_prefix = prefix;
763+
} else {
764+
adjusted_prefix = absl::StrCat(prefix, "/");
765+
}
766+
}
767+
768+
error_code ec = proactor->Await([&]() -> error_code {
769+
cloud::aws::S3Storage s3(&creds_provider_, ctx_, proactor);
770+
return s3.List(bucket_name, adjusted_prefix, false, 1000,
771+
[&keys](const cloud::StorageListItem& item) {
772+
keys.emplace_back(string(item.key), item.mtime_ns);
773+
});
774+
});
775+
776+
if (ec) {
777+
return nonstd::make_unexpected(GenericError(ec, "Failed list objects in S3 bucket"));
778+
}
779+
#else
780+
return nonstd::make_unexpected(GenericError("AWS support not compiled in"));
781+
#endif
718782
return keys;
719783
}
720-
#endif
784+
785+
#endif // WITH_AWS_CLOUD || WITH_AWS
721786

722787
#ifdef __linux__
723788
io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {

src/server/detail/snapshot_storage.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
#include <aws/s3/S3Client.h>
88
#endif
99

10+
#ifdef WITH_AWS_CLOUD
11+
#include "util/cloud/aws/aws_creds_provider.h"
12+
#include "util/cloud/aws/s3_storage.h"
13+
#endif
14+
1015
#ifdef WITH_GCP
1116
#include "util/cloud/gcp/gcp_creds_provider.h"
1217
#include "util/cloud/gcp/gcs.h"
@@ -161,11 +166,14 @@ class AzureSnapshotStorage : public SnapshotStorage {
161166
SSL_CTX* ctx_ = NULL;
162167
};
163168

164-
#ifdef WITH_AWS
169+
#if defined(WITH_AWS_CLOUD) || defined(WITH_AWS)
165170
class AwsS3SnapshotStorage : public SnapshotStorage {
166171
public:
167172
AwsS3SnapshotStorage(const std::string& endpoint, bool https, bool ec2_metadata,
168173
bool sign_payload);
174+
~AwsS3SnapshotStorage();
175+
176+
std::error_code Init(unsigned connect_ms);
169177

170178
io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenWriteFile(
171179
const std::string& path) override;
@@ -189,7 +197,14 @@ class AwsS3SnapshotStorage : public SnapshotStorage {
189197
io::Result<std::vector<SnapStat>, GenericError> ListObjects(std::string_view bucket_name,
190198
std::string_view prefix);
191199

200+
#ifdef WITH_AWS
192201
std::shared_ptr<Aws::S3::S3Client> s3_;
202+
#endif
203+
204+
#ifdef WITH_AWS_CLOUD
205+
util::cloud::aws::AwsCredsProvider creds_provider_;
206+
SSL_CTX* ctx_ = nullptr;
207+
#endif
193208
};
194209

195210
#endif

src/server/server_family.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,20 @@ string UnknownCmd(string cmd, CmdArgList args) {
295295

296296
std::shared_ptr<detail::SnapshotStorage> CreateCloudSnapshotStorage(std::string_view uri) {
297297
if (detail::IsS3Path(uri)) {
298+
#if defined(WITH_AWS) || defined(WITH_AWS_CLOUD)
298299
#ifdef WITH_AWS
299300
shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); });
300-
return std::make_shared<detail::AwsS3SnapshotStorage>(
301+
#endif
302+
auto aws = std::make_shared<detail::AwsS3SnapshotStorage>(
301303
absl::GetFlag(FLAGS_s3_endpoint), absl::GetFlag(FLAGS_s3_use_https),
302304
absl::GetFlag(FLAGS_s3_ec2_metadata), absl::GetFlag(FLAGS_s3_sign_payload));
305+
auto ec = shard_set->pool()->GetNextProactor()->Await(
306+
[&] { return aws->Init(detail::kBucketConnectMs); });
307+
if (ec) {
308+
LOG(ERROR) << "Failed to initialize AWS S3 snapshot storage: " << ec.message();
309+
exit(1);
310+
}
311+
return aws;
303312
#else
304313
LOG(ERROR) << "Compiled without AWS support";
305314
exit(1);

0 commit comments

Comments
 (0)