Skip to content
Open
27 changes: 27 additions & 0 deletions google/cloud/storage/async/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/storage/async/client.h"
#include "google/cloud/storage/async/read_all.h"
#include "google/cloud/storage/internal/async/connection_impl.h"
#include "google/cloud/storage/internal/async/connection_tracing.h"
#include "google/cloud/storage/internal/async/default_options.h"
Expand Down Expand Up @@ -112,6 +113,32 @@ future<StatusOr<ReadPayload>> AsyncClient::ReadObjectRange(
internal::MergeOptions(std::move(opts), connection_->options())});
}

future<StatusOr<ReadPayload>> AsyncClient::ReadAll(
BucketName const& bucket_name, std::string object_name, Options opts) {
auto request = google::storage::v2::ReadObjectRequest{};
request.set_bucket(bucket_name.FullName());
request.set_object(std::move(object_name));
return ReadAll(std::move(request), std::move(opts));
}

future<StatusOr<ReadPayload>> AsyncClient::ReadAll(
google::storage::v2::ReadObjectRequest request, Options opts) {
request.clear_read_offset();
request.clear_read_limit();
auto reader_future = ReadObject(std::move(request), std::move(opts));
return reader_future.then(
[](future<StatusOr<std::pair<AsyncReader, AsyncToken>>> f) {
auto r = f.get();
if (!r) return make_ready_future(StatusOr<ReadPayload>(r.status()));
return ReadAll(std::move(r->first), std::move(r->second));
});
}

future<StatusOr<ReadPayload>> AsyncClient::ReadAll(AsyncReader reader,
AsyncToken token) {
return storage_experimental::ReadAll(std::move(reader), std::move(token));
Comment thread
shubham-up-47 marked this conversation as resolved.
}

future<StatusOr<std::pair<AsyncWriter, AsyncToken>>>
AsyncClient::StartAppendableObjectUpload(BucketName const& bucket_name,
std::string object_name,
Expand Down
77 changes: 77 additions & 0 deletions google/cloud/storage/async/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,83 @@ class AsyncClient {
google::storage::v2::ReadObjectRequest request, std::int64_t offset,
std::int64_t limit, Options opts = {});

/**
* Reads the full contents of an object.
*
* When satisfied, the returned future has the full contents of the given
* object.
*
* Be aware that this will accumulate all the bytes in memory. For large
* objects, this function may fail with `StatusCode::kResourceExhausted` if
* the system runs out of memory. If you need to process large objects,
* consider using `ReadObject()` instead.
*
* @par Example
* @snippet storage_async_samples.cc read-all
*
* @par Idempotency
* This is a read-only operation and is always idempotent. Once the download
* starts, this operation will automatically resume the download if is
* interrupted. Use `ResumePolicyOption` and `ResumePolicy` to control this.
*
* @param bucket_name the name of the bucket that contains the object.
* @param object_name the name of the object to be read.
* @param opts options controlling the behavior of this RPC, for example
* the application may change the retry policy.
*/
future<StatusOr<ReadPayload>> ReadAll(BucketName const& bucket_name,
std::string object_name,
Options opts = {});

/**
* Reads the full contents of an object.
*
* When satisfied, the returned future has the full contents of the given
* object.
*
* Be aware that this will accumulate all the bytes in memory. For large
* objects, this function may fail with `StatusCode::kResourceExhausted` if
* the system runs out of memory. If you need to process large objects,
* consider using `ReadObject()` instead.
*
* @par Example
* @snippet storage_async_samples.cc read-all
*
* @par Idempotency
* This is a read-only operation and is always idempotent. Once the download
* starts, this operation will automatically resume the download if is
* interrupted. Use `ResumePolicyOption` and `ResumePolicy` to control this.
*
* @param request the request contents, it must include the bucket name and
* object names. Many other fields are optional. Any values for
* `read_offset()` and `read_limit()` are ignored. To read a range of
* the object use `ReadObjectRange()`.
* @param opts options controlling the behavior of this RPC, for example
* the application may change the retry policy.
*/
future<StatusOr<ReadPayload>> ReadAll(
google::storage::v2::ReadObjectRequest request, Options opts = {});

/**
* Reads the full contents of an object from an `AsyncReader`.
*
* This function consumes the reader and token to read all the data from the
* underlying stream and accumulates it in memory.
*
* Be aware that this will accumulate all the bytes in memory. For large
* objects, this function may fail with `StatusCode::kResourceExhausted` if
* the system runs out of memory.
*
* @par Idempotency
* This operation will automatically resume the download if it is
* interrupted. Use `ResumePolicyOption` and `ResumePolicy` to control this.
*
* @param reader The asynchronous reader to consume.
* @param token The token to start reading.
*/
static future<StatusOr<ReadPayload>> ReadAll(AsyncReader reader,
AsyncToken token);

/*
[start-appendable-object-upload]
Initiates a [resumable upload][resumable-link] for an appendable object.
Expand Down
110 changes: 110 additions & 0 deletions google/cloud/storage/async/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ struct TestOption {
using Type = std::string;
};

auto MakeTestReaderConnection(std::vector<std::string> chunks) {
auto reader_impl = std::make_unique<MockAsyncReaderConnection>();
::testing::InSequence seq;
for (auto& chunk : chunks) {
EXPECT_CALL(*reader_impl, Read).WillOnce([c = std::move(chunk)] {
return make_ready_future(
AsyncReaderConnection::ReadResponse{ReadPayload(std::move(c))});
});
}
EXPECT_CALL(*reader_impl, Read).WillOnce([] {
return make_ready_future(AsyncReaderConnection::ReadResponse{Status{}});
});
return reader_impl;
}

auto TestProtoObject() {
google::storage::v2::Object result;
result.set_bucket("projects/_/buckets/test-bucket");
Expand Down Expand Up @@ -379,6 +394,101 @@ TEST(AsyncClient, ReadObject2) {
"empty response", [](auto const& p) { return p.size(); }, 0)));
}

TEST(AsyncClient, ReadAll1) {
auto constexpr kExpectedRequest = R"pb(
bucket: "projects/_/buckets/test-bucket"
object: "test-object"
)pb";
auto mock = std::make_shared<MockAsyncConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(
Return(Options{}.set<TestOption<0>>("O0").set<TestOption<1>>("O1")));

EXPECT_CALL(*mock, ReadObject)
.WillOnce([&](AsyncConnection::ReadObjectParams const& p) {
EXPECT_THAT(p.options.get<TestOption<0>>(), "O0");
EXPECT_THAT(p.options.get<TestOption<1>>(), "O1-function");
EXPECT_THAT(p.options.get<TestOption<2>>(), "O2-function");
auto expected = google::storage::v2::ReadObjectRequest{};
EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected));
EXPECT_THAT(p.request, IsProtoEqual(expected));
return make_ready_future(
make_status_or(std::unique_ptr<AsyncReaderConnection>(
MakeTestReaderConnection({"test-", "payload"}))));
});

auto client = AsyncClient(mock);
auto payload = client
.ReadAll(BucketName("test-bucket"), "test-object",
Options{}
.set<TestOption<1>>("O1-function")
.set<TestOption<2>>("O2-function"))
.get();
ASSERT_STATUS_OK(payload);
EXPECT_THAT(payload->contents(), ElementsAre("test-", "payload"));
}

TEST(AsyncClient, ReadAll2) {
auto constexpr kOriginalRequest = R"pb(
bucket: "test-only-invalid"
object: "test-object"
generation: 42
read_offset: 123
read_limit: 456
)pb";
auto constexpr kExpectedRequest =
R"pb(
bucket: "test-only-invalid" object: "test-object" generation: 42
)pb";
auto mock = std::make_shared<MockAsyncConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(
Return(Options{}.set<TestOption<0>>("O0").set<TestOption<1>>("O1")));

EXPECT_CALL(*mock, ReadObject)
.WillOnce([&](AsyncConnection::ReadObjectParams const& p) {
EXPECT_THAT(p.options.get<TestOption<0>>(), "O0");
EXPECT_THAT(p.options.get<TestOption<1>>(), "O1-function");
EXPECT_THAT(p.options.get<TestOption<2>>(), "O2-function");
auto expected = google::storage::v2::ReadObjectRequest{};
EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected));
EXPECT_THAT(p.request, IsProtoEqual(expected));
return make_ready_future(
make_status_or(std::unique_ptr<AsyncReaderConnection>(
MakeTestReaderConnection({"payload"}))));
});

auto client = AsyncClient(mock);
auto request = google::storage::v2::ReadObjectRequest{};
EXPECT_TRUE(TextFormat::ParseFromString(kOriginalRequest, &request));
auto payload =
client
.ReadAll(std::move(request), Options{}
.set<TestOption<1>>("O1-function")
.set<TestOption<2>>("O2-function"))
.get();
ASSERT_STATUS_OK(payload);
EXPECT_THAT(payload->contents(), ElementsAre("payload"));
}

TEST(AsyncClient, ReadAllFromReader) {
auto mock = std::make_shared<MockAsyncConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(
Return(Options{}.set<TestOption<0>>("O0").set<TestOption<1>>("O1")));

auto reader_impl = MakeTestReaderConnection({"test-", "payload"});
auto* reader_impl_ptr = reader_impl.get();
auto reader = AsyncReader(std::move(reader_impl));
auto token = storage_internal::MakeAsyncToken(reader_impl_ptr);

auto client = AsyncClient(mock);
Comment thread
shubham-up-47 marked this conversation as resolved.
Outdated
auto payload =
AsyncClient::ReadAll(std::move(reader), std::move(token)).get();
ASSERT_STATUS_OK(payload);
EXPECT_THAT(payload->contents(), ElementsAre("test-", "payload"));
}

TEST(AsyncClient, StartAppendableObjectUpload1) {
auto constexpr kExpectedRequest = R"pb(
write_object_spec {
Expand Down
8 changes: 4 additions & 4 deletions google/cloud/storage/examples/storage_async_samples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,10 @@ void ReadAll(google::cloud::storage_experimental::AsyncClient& client,
std::string object_name) -> google::cloud::future<std::uint64_t> {
// For small objects, consider `ReadAll()` which accumulates all the
// contents in memory using background threads.
auto payload = (co_await gcs_ex::ReadAll(client.ReadObject(
gcs_ex::BucketName(std::move(bucket_name)),
std::move(object_name))))
.value();
auto payload =
(co_await client.ReadAll(gcs_ex::BucketName(std::move(bucket_name)),
std::move(object_name)))
.value();
std::uint64_t count = 0;
for (auto const& buffer : payload.contents()) {
count += std::count(buffer.begin(), buffer.end(), '\n');
Expand Down