Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ if (WITH_AWS)
add_definitions(-DWITH_AWS)
endif()

if (WITH_AWS_CLOUD)
SET(AWS_LIB ${AWS_LIB} aws_cloud_lib)
add_definitions(-DWITH_AWS_CLOUD)
endif()

if (WITH_GCP)
SET(GCP_LIB gcp_lib)
add_definitions(-DWITH_GCP)
Expand Down
66 changes: 63 additions & 3 deletions src/server/detail/snapshot_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,11 @@ error_code AzureSnapshotStorage::CheckPath(const std::string& path) {
return {};
}

#ifdef WITH_AWS
#if defined(WITH_AWS_CLOUD) || defined(WITH_AWS)

AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https,
bool ec2_metadata, bool sign_payload) {
#ifdef WITH_AWS
Comment thread
romange marked this conversation as resolved.
shard_set->pool()->GetNextProactor()->Await([&] {
if (!ec2_metadata) {
setenv("AWS_EC2_METADATA_DISABLED", "true", 0);
Expand All @@ -546,10 +548,32 @@ AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool htt
std::make_shared<aws::S3EndpointProvider>(endpoint, https);
s3_ = std::make_shared<Aws::S3::S3Client>(credentials_provider, endpoint_provider, s3_conf);
});
#endif
}

AwsS3SnapshotStorage::~AwsS3SnapshotStorage() {
#if defined(WITH_AWS_CLOUD) && !defined(WITH_AWS)
util::http::TlsClient::FreeContext(ctx_);
Comment thread
romange marked this conversation as resolved.
#endif
}

error_code AwsS3SnapshotStorage::Init(unsigned connect_ms) {
#if defined(WITH_AWS_CLOUD) && !defined(WITH_AWS)
error_code ec = creds_provider_.Init(connect_ms);
if (ec)
return ec;

ctx_ = util::http::TlsClient::CreateSslContext();
Comment thread
romange marked this conversation as resolved.
if (!ctx_) {
return make_error_code(std::errc::operation_not_permitted);
}
#endif
return {};
}

io::Result<std::pair<io::Sink*, uint8_t>, GenericError> AwsS3SnapshotStorage::OpenWriteFile(
const std::string& path) {
#ifdef WITH_AWS
Comment thread
romange marked this conversation as resolved.
Comment thread
romange marked this conversation as resolved.
optional<pair<string, string>> bucket_path = GetBucketPath(path);
if (!bucket_path) {
return nonstd::make_unexpected(GenericError("Invalid S3 path"));
Expand All @@ -572,16 +596,26 @@ io::Result<std::pair<io::Sink*, uint8_t>, GenericError> AwsS3SnapshotStorage::Op
result = std::pair<io::Sink*, uint8_t>(f, FileType::CLOUD);
});
fb.Join();

return result;
#else
return nonstd::make_unexpected(GenericError("AWS support not compiled in"));
#endif
}

io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile(const std::string& path) {
io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile(
[[maybe_unused]] const std::string& path) {
#ifdef WITH_AWS
VLOG(1) << "Opening S3 read file: " << path;
std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(path);
if (!bucket_path) {
return nonstd::make_unexpected(GenericError("Invalid S3 path"));
}
auto [bucket, key] = *bucket_path;
return new aws::S3ReadFile(bucket, key, s3_);
#else
return nonstd::make_unexpected(GenericError("AWS support not compiled in"));
#endif
}

io::Result<std::string, GenericError> AwsS3SnapshotStorage::LoadPath(std::string_view dir,
Expand All @@ -608,6 +642,8 @@ io::Result<std::string, GenericError> AwsS3SnapshotStorage::LoadPath(std::string

io::Result<vector<string>, GenericError> AwsS3SnapshotStorage::ExpandFromPath(
const string& load_path) {
VLOG(1) << "Expanding S3 path: " << load_path;

optional<pair<string, string>> bucket_path = GetBucketPath(load_path);
if (!bucket_path) {
return nonstd::make_unexpected(
Expand Down Expand Up @@ -653,6 +689,8 @@ error_code AwsS3SnapshotStorage::CheckPath(const std::string& path) {

io::Result<std::vector<AwsS3SnapshotStorage::SnapStat>, GenericError>
AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view prefix) {
VLOG(1) << "Listing S3 objects in bucket: " << bucket_name << " with prefix: " << prefix;

// Each list objects request has a 1000 object limit, so page through the
// objects if needed.
std::string continuation_token;
Expand All @@ -661,6 +699,7 @@ AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view
// We use a random proactor because this function might be called from the main thread.
fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();

#if defined(WITH_AWS)
do {
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(std::string(bucket_name));
Expand Down Expand Up @@ -716,9 +755,30 @@ AwsS3SnapshotStorage::ListObjects(std::string_view bucket_name, std::string_view
outcome.GetError().GetExceptionName()});
}
} while (!continuation_token.empty());
#elif defined(WITH_AWS_CLOUD)
string adjusted_prefix;
if (!prefix.empty()) {
adjusted_prefix = prefix.back() == '/' ? prefix : absl::StrCat(prefix, "/");
}

error_code ec = proactor->Await([&]() -> error_code {
cloud::aws::S3Storage s3(&creds_provider_, ctx_, proactor);
return s3.List(bucket_name, adjusted_prefix, false, 1000,
[&keys](const cloud::StorageListItem& item) {
keys.emplace_back(string(item.key), item.mtime_ns);
});
});

if (ec) {
return nonstd::make_unexpected(GenericError(ec, "Failed list objects in S3 bucket"));
}
#else
return nonstd::make_unexpected(GenericError("AWS support not compiled in"));
#endif
return keys;
}
#endif

#endif // WITH_AWS_CLOUD || WITH_AWS

#ifdef __linux__
io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
Expand Down
15 changes: 14 additions & 1 deletion src/server/detail/snapshot_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
#include <aws/s3/S3Client.h>
#endif

#ifdef WITH_AWS_CLOUD
#include "util/cloud/aws/aws_creds_provider.h"
#include "util/cloud/aws/s3_storage.h"
#endif

#ifdef WITH_GCP
#include "util/cloud/gcp/gcp_creds_provider.h"
#include "util/cloud/gcp/gcs.h"
Expand Down Expand Up @@ -161,11 +166,14 @@ class AzureSnapshotStorage : public SnapshotStorage {
SSL_CTX* ctx_ = NULL;
};

#ifdef WITH_AWS
#if defined(WITH_AWS_CLOUD) || defined(WITH_AWS)
class AwsS3SnapshotStorage : public SnapshotStorage {
public:
AwsS3SnapshotStorage(const std::string& endpoint, bool https, bool ec2_metadata,
bool sign_payload);
~AwsS3SnapshotStorage();

std::error_code Init(unsigned connect_ms);

io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenWriteFile(
const std::string& path) override;
Expand All @@ -189,7 +197,12 @@ class AwsS3SnapshotStorage : public SnapshotStorage {
io::Result<std::vector<SnapStat>, GenericError> ListObjects(std::string_view bucket_name,
std::string_view prefix);

#ifdef WITH_AWS
std::shared_ptr<Aws::S3::S3Client> s3_;
#elif WITH_AWS_CLOUD
util::cloud::aws::AwsCredsProvider creds_provider_;
SSL_CTX* ctx_ = nullptr;
#endif
};

#endif
Expand Down
11 changes: 10 additions & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,20 @@ string UnknownCmd(string cmd, CmdArgList args) {

std::shared_ptr<detail::SnapshotStorage> CreateCloudSnapshotStorage(std::string_view uri) {
if (detail::IsS3Path(uri)) {
#if defined(WITH_AWS) || defined(WITH_AWS_CLOUD)
#ifdef WITH_AWS
shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); });
return std::make_shared<detail::AwsS3SnapshotStorage>(
#endif
auto aws = std::make_shared<detail::AwsS3SnapshotStorage>(
Comment thread
romange marked this conversation as resolved.
absl::GetFlag(FLAGS_s3_endpoint), absl::GetFlag(FLAGS_s3_use_https),
absl::GetFlag(FLAGS_s3_ec2_metadata), absl::GetFlag(FLAGS_s3_sign_payload));
auto ec = shard_set->pool()->GetNextProactor()->Await(
[&] { return aws->Init(detail::kBucketConnectMs); });
if (ec) {
LOG(ERROR) << "Failed to initialize AWS S3 snapshot storage: " << ec.message();
exit(1);
}
return aws;
#else
LOG(ERROR) << "Compiled without AWS support";
exit(1);
Expand Down
Loading