diff --git a/google/cloud/storage/async/object_descriptor.cc b/google/cloud/storage/async/object_descriptor.cc index 71c92eb6c526a..90877d414f9f6 100644 --- a/google/cloud/storage/async/object_descriptor.cc +++ b/google/cloud/storage/async/object_descriptor.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/async/object_descriptor.h" +#include "google/cloud/storage/async/options.h" #include "google/cloud/internal/make_status.h" namespace google { @@ -26,6 +27,11 @@ absl::optional ObjectDescriptor::metadata() const { std::pair ObjectDescriptor::Read(std::int64_t offset, std::int64_t limit) { + std::int64_t max_range = + impl_->options().get(); + if (limit > max_range) { + impl_->MakeSubsequentStream(); + } auto reader = impl_->Read({offset, limit}); auto token = storage_internal::MakeAsyncToken(reader.get()); return {AsyncReader(std::move(reader)), std::move(token)}; diff --git a/google/cloud/storage/async/object_descriptor_connection.h b/google/cloud/storage/async/object_descriptor_connection.h index 834be752c22c8..b1a602fad426d 100644 --- a/google/cloud/storage/async/object_descriptor_connection.h +++ b/google/cloud/storage/async/object_descriptor_connection.h @@ -59,6 +59,8 @@ class ObjectDescriptorConnection { * Starts a new range read in the current descriptor. */ virtual std::unique_ptr Read(ReadParams p) = 0; + + virtual void MakeSubsequentStream() = 0; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/async/object_descriptor_test.cc b/google/cloud/storage/async/object_descriptor_test.cc index afd43f31e4c70..0851439821e7c 100644 --- a/google/cloud/storage/async/object_descriptor_test.cc +++ b/google/cloud/storage/async/object_descriptor_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/async/object_descriptor.h" +#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/mocks/mock_async_object_descriptor_connection.h" #include "google/cloud/storage/mocks/mock_async_reader_connection.h" #include "google/cloud/testing_util/status_matchers.h" @@ -147,6 +148,48 @@ TEST(ObjectDescriptor, ReadLast) { EXPECT_FALSE(token.valid()); } +TEST(ObjectDescriptor, ReadExceedsMaxRange) { + auto mock = std::make_shared(); + auto constexpr kMaxRange = 1024; + EXPECT_CALL(*mock, options) + .WillRepeatedly( + Return(Options{}.set( + kMaxRange))); + + EXPECT_CALL(*mock, MakeSubsequentStream).Times(1); + + EXPECT_CALL(*mock, Read) + .WillOnce([&](ReadParams p) -> std::unique_ptr { + EXPECT_EQ(p.start, 100); + EXPECT_EQ(p.length, kMaxRange + 1); + auto reader = std::make_unique(); + EXPECT_CALL(*reader, Read) + .WillOnce([] { + return make_ready_future( + ReadResponse(ReadPayload(std::string("some data")))); + }) + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); + return reader; + }); + + auto tested = ObjectDescriptor(mock); + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = tested.Read(100, kMaxRange + 1); + ASSERT_TRUE(token.valid()); + + auto r1 = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(r1); + ReadPayload payload; + std::tie(payload, token) = *std::move(r1); + EXPECT_THAT(payload.contents(), ElementsAre("some data")); + + auto r2 = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(r2); + std::tie(payload, token) = *std::move(r2); + EXPECT_FALSE(token.valid()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_experimental diff --git a/google/cloud/storage/async/options.h b/google/cloud/storage/async/options.h index 31f63dc083ced..acadb8f29a46d 100644 --- a/google/cloud/storage/async/options.h +++ b/google/cloud/storage/async/options.h @@ -83,7 +83,7 @@ struct UseMD5ValueOption { * @endcode */ struct MaximumRangeSizeOption { - using Type = std::uint64_t; + using Type = std::int64_t; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/async/resume_policy.cc b/google/cloud/storage/async/resume_policy.cc index 07224a9cab5e8..24e086b965038 100644 --- a/google/cloud/storage/async/resume_policy.cc +++ b/google/cloud/storage/async/resume_policy.cc @@ -25,7 +25,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy { public: explicit LimitedErrorCountResumePolicyImpl(int maximum_resumes) : maximum_resumes_(maximum_resumes) {} - ~LimitedErrorCountResumePolicyImpl() override = default; + + std::unique_ptr clone() const override { + return std::make_unique(*this); + } void OnStartSuccess() override { // For this policy we are only interested in the number of failures. @@ -46,7 +49,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy { class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy { public: StopOnConsecutiveErrorsResumePolicyImpl() = default; - ~StopOnConsecutiveErrorsResumePolicyImpl() override = default; + + std::unique_ptr clone() const override { + return std::make_unique(*this); + } void OnStartSuccess() override { next_action_ = kContinue; } Action OnFinish(Status const&) override { @@ -59,8 +65,6 @@ class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy { } // namespace -ResumePolicy::~ResumePolicy() = default; - ResumePolicyFactory LimitedErrorCountResumePolicy(int maximum_resumes) { return [maximum_resumes]() -> std::unique_ptr { return std::make_unique(maximum_resumes); diff --git a/google/cloud/storage/async/resume_policy.h b/google/cloud/storage/async/resume_policy.h index 8b6a40e37fc07..cd2f5a4c87296 100644 --- a/google/cloud/storage/async/resume_policy.h +++ b/google/cloud/storage/async/resume_policy.h @@ -32,7 +32,9 @@ class ResumePolicy { public: enum Action { kStop, kContinue }; - virtual ~ResumePolicy() = 0; + virtual ~ResumePolicy() = default; + + virtual std::unique_ptr clone() const = 0; /** * Notifies the policy about successful connections. diff --git a/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc b/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc index 78595dfda6bca..379eb343bdc8d 100644 --- a/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc +++ b/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc @@ -62,6 +62,10 @@ class AsyncObjectDescriptorConnectionTracing return MakeTracingReaderConnection(span_, std::move(result)); } + void MakeSubsequentStream() override { + return impl_->MakeSubsequentStream(); + }; + private: opentelemetry::nostd::shared_ptr span_; std::shared_ptr impl_; diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 3df8a3c848ebe..cebca0a2bbfc5 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -34,20 +34,30 @@ ObjectDescriptorImpl::ObjectDescriptorImpl( OpenStreamFactory make_stream, google::storage::v2::BidiReadObjectSpec read_object_spec, std::shared_ptr stream, Options options) - : resume_policy_(std::move(resume_policy)), + : resume_policy_prototype_(std::move(resume_policy)), make_stream_(std::move(make_stream)), read_object_spec_(std::move(read_object_spec)), - stream_(std::move(stream)), - options_(std::move(options)) {} + options_(std::move(options)) { + streams_.push_back( + Stream{std::move(stream), {}, resume_policy_prototype_->clone()}); +} -ObjectDescriptorImpl::~ObjectDescriptorImpl() { stream_->Cancel(); } +ObjectDescriptorImpl::~ObjectDescriptorImpl() { + for (auto const& stream : streams_) { + stream.stream->Cancel(); + } +} void ObjectDescriptorImpl::Start( google::storage::v2::BidiReadObjectResponse first_response) { OnRead(std::move(first_response)); } -void ObjectDescriptorImpl::Cancel() { stream_->Cancel(); } +void ObjectDescriptorImpl::Cancel() { + for (auto const& stream : streams_) { + stream.stream->Cancel(); + } +} absl::optional ObjectDescriptorImpl::metadata() const { @@ -55,6 +65,19 @@ absl::optional ObjectDescriptorImpl::metadata() return metadata_; } +void ObjectDescriptorImpl::MakeSubsequentStream() { + auto request = google::storage::v2::BidiReadObjectRequest{}; + + *request.mutable_read_object_spec() = read_object_spec_; + auto stream_result = make_stream_(std::move(request)).get(); + + std::unique_lock lk(mu_); + streams_.push_back(Stream{ + std::move(stream_result->stream), {}, resume_policy_prototype_->clone()}); + lk.unlock(); + OnRead(std::move(stream_result->first_response)); +} + std::unique_ptr ObjectDescriptorImpl::Read(ReadParams p) { std::shared_ptr hash_function = @@ -69,7 +92,7 @@ ObjectDescriptorImpl::Read(ReadParams p) { std::unique_lock lk(mu_); auto const id = ++read_id_generator_; - active_ranges_.emplace(id, range); + streams_.back().active_ranges.emplace(id, range); auto& read_range = *next_request_.add_read_ranges(); read_range.set_read_id(id); read_range.set_read_offset(p.start); @@ -85,8 +108,10 @@ ObjectDescriptorImpl::Read(ReadParams p) { } void ObjectDescriptorImpl::Flush(std::unique_lock lk) { - if (write_pending_ || next_request_.read_ranges().empty()) return; - write_pending_ = true; + if (streams_.back().write_pending || next_request_.read_ranges().empty()) { + return; + } + streams_.back().write_pending = true; google::storage::v2::BidiReadObjectRequest request; request.Swap(&next_request_); @@ -102,7 +127,7 @@ void ObjectDescriptorImpl::Flush(std::unique_lock lk) { void ObjectDescriptorImpl::OnWrite(bool ok) { std::unique_lock lk(mu_); if (!ok) return DoFinish(std::move(lk)); - write_pending_ = false; + streams_.back().write_pending = false; Flush(std::move(lk)); } @@ -146,9 +171,11 @@ void ObjectDescriptorImpl::OnRead( void ObjectDescriptorImpl::CleanupDoneRanges( std::unique_lock const&) { - for (auto i = active_ranges_.begin(); i != active_ranges_.end();) { + if (streams_.empty()) return; + auto& active_ranges = streams_.back().active_ranges; + for (auto i = active_ranges.begin(); i != active_ranges.end();) { if (i->second->IsDone()) { - i = active_ranges_.erase(i); + i = active_ranges.erase(i); } else { ++i; } @@ -185,12 +212,12 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { ApplyRedirectErrors(read_object_spec_, proto_status); auto request = google::storage::v2::BidiReadObjectRequest{}; *request.mutable_read_object_spec() = read_object_spec_; - for (auto const& kv : active_ranges_) { + for (auto const& kv : streams_.back().active_ranges) { auto range = kv.second->RangeForResume(kv.first); if (!range) continue; *request.add_read_ranges() = *std::move(range); } - write_pending_ = true; + streams_.back().write_pending = true; lk.unlock(); make_stream_(std::move(request)).then([w = WeakFromThis()](auto f) { if (auto self = w.lock()) self->OnResume(f.get()); @@ -200,7 +227,8 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { void ObjectDescriptorImpl::OnResume(StatusOr result) { if (!result) return OnFinish(std::move(result).status()); std::unique_lock lk(mu_); - stream_ = std::move(result->stream); + streams_.push_back( + Stream{std::move(result->stream), {}, resume_policy_prototype_->clone()}); // TODO(#15105) - this should be done without release the lock. Flush(std::move(lk)); OnRead(std::move(result->first_response)); @@ -211,7 +239,6 @@ bool ObjectDescriptorImpl::IsResumable( for (auto const& any : proto_status.details()) { auto error = google::storage::v2::BidiReadObjectError{}; if (!any.UnpackTo(&error)) continue; - auto ranges = CopyActiveRanges(); for (auto const& range : CopyActiveRanges()) { for (auto const& range_error : error.read_range_errors()) { if (range.first != range_error.read_id()) continue; @@ -221,8 +248,8 @@ bool ObjectDescriptorImpl::IsResumable( CleanupDoneRanges(std::unique_lock(mu_)); return true; } - - return resume_policy_->OnFinish(status) == + std::unique_lock lk(mu_); + return streams_.back().resume_policy->OnFinish(status) == storage_experimental::ResumePolicy::kContinue; } diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index 55d6d292c7485..f4c467de5c42e 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -38,6 +38,14 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN class ObjectDescriptorImpl : public storage_experimental::ObjectDescriptorConnection, public std::enable_shared_from_this { + private: + struct Stream { + std::shared_ptr stream; + std::unordered_map> active_ranges; + std::unique_ptr resume_policy; + bool write_pending = false; + }; + public: ObjectDescriptorImpl( std::unique_ptr resume_policy, @@ -62,6 +70,8 @@ class ObjectDescriptorImpl std::unique_ptr Read( ReadParams p) override; + void MakeSubsequentStream() override; + private: std::weak_ptr WeakFromThis() { return shared_from_this(); @@ -70,14 +80,16 @@ class ObjectDescriptorImpl // This may seem expensive, but it is less bug-prone than iterating over // the map with the lock held. auto CopyActiveRanges(std::unique_lock const&) const { - return active_ranges_; + return streams_.back().active_ranges; } auto CopyActiveRanges() const { return CopyActiveRanges(std::unique_lock(mu_)); } - auto CurrentStream(std::unique_lock) const { return stream_; } + auto CurrentStream(std::unique_lock) const { + return streams_.back().stream; + } void Flush(std::unique_lock lk); void OnWrite(bool ok); @@ -92,19 +104,17 @@ class ObjectDescriptorImpl bool IsResumable(Status const& status, google::rpc::Status const& proto_status); - std::unique_ptr resume_policy_; + std::unique_ptr resume_policy_prototype_; OpenStreamFactory make_stream_; mutable std::mutex mu_; google::storage::v2::BidiReadObjectSpec read_object_spec_; - std::shared_ptr stream_; absl::optional metadata_; std::int64_t read_id_generator_ = 0; - bool write_pending_ = false; google::storage::v2::BidiReadObjectRequest next_request_; - std::unordered_map> active_ranges_; Options options_; + std::vector streams_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc index 31da8c814417d..595dd34231b6d 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc @@ -1190,6 +1190,190 @@ TEST(ObjectDescriptorImpl, RecoverFromPartialFailure) { EXPECT_THAT(s3r1.get(), VariantWith(PermanentError())); } +/// @test Verify that we can create a subsequent stream and read from it. +TEST(ObjectDescriptorImpl, ReadWithSubsequentStream) { + // Setup + auto constexpr kResponse0 = R"pb( + metadata { + bucket: "projects/_/buckets/test-bucket" + name: "test-object" + generation: 42 + } + read_handle { handle: "handle-12345" } + )pb"; + auto constexpr kRequest1 = R"pb( + read_ranges { read_id: 1 read_offset: 100 read_length: 100 } + )pb"; + auto constexpr kResponse1 = R"pb( + object_data_ranges { + range_end: true + read_range { read_id: 1 read_offset: 100 } + checksummed_data { content: "payload-for-stream-1" } + } + )pb"; + auto constexpr kRequest2 = R"pb( + read_ranges { read_id: 2 read_offset: 200 read_length: 200 } + )pb"; + auto constexpr kResponse2 = R"pb( + object_data_ranges { + range_end: true + read_range { read_id: 2 read_offset: 200 } + checksummed_data { content: "payload-for-stream-2" } + } + )pb"; + + AsyncSequencer sequencer; + + // First stream setup + auto stream1 = std::make_unique(); + EXPECT_CALL(*stream1, Write) + .WillOnce([&](Request const& request, grpc::WriteOptions) { + auto expected = Request{}; + EXPECT_TRUE(TextFormat::ParseFromString(kRequest1, &expected)); + EXPECT_THAT(request, IsProtoEqual(expected)); + return sequencer.PushBack("Write[1]").then([](auto f) { + return f.get(); + }); + }); + EXPECT_CALL(*stream1, Read) + .WillOnce([&]() { + return sequencer.PushBack("Read[1]").then([&](auto) { + auto response = Response{}; + EXPECT_TRUE(TextFormat::ParseFromString(kResponse1, &response)); + return absl::make_optional(response); + }); + }) + .WillOnce([&]() { + return sequencer.PushBack("Read[1.eos]").then([&](auto) { + return absl::optional{}; + }); + }); + EXPECT_CALL(*stream1, Finish).WillOnce([&]() { + return sequencer.PushBack("Finish[1]").then([](auto) { + return PermanentError(); + }); + }); + EXPECT_CALL(*stream1, Cancel).Times(1); + + // Second stream setup + auto stream2 = std::make_unique(); + EXPECT_CALL(*stream2, Write) + .WillOnce([&](Request const& request, grpc::WriteOptions) { + auto expected = Request{}; + EXPECT_TRUE(TextFormat::ParseFromString(kRequest2, &expected)); + EXPECT_THAT(request, IsProtoEqual(expected)); + return sequencer.PushBack("Write[2]").then([](auto f) { + return f.get(); + }); + }); + EXPECT_CALL(*stream2, Read) + .WillOnce([&]() { + return sequencer.PushBack("Read[2]").then([&](auto) { + auto response = Response{}; + EXPECT_TRUE(TextFormat::ParseFromString(kResponse2, &response)); + return absl::make_optional(response); + }); + }) + .WillOnce([&]() { + return sequencer.PushBack("Read[2.eos]").then([](auto) { + return absl::optional{}; + }); + }); + EXPECT_CALL(*stream2, Finish).WillOnce([&]() { + return sequencer.PushBack("Finish[2]").then([](auto) { return Status{}; }); + }); + EXPECT_CALL(*stream2, Cancel).Times(1); + + // Mock factory for subsequent streams + MockFactory factory; + EXPECT_CALL(factory, Call).WillOnce([&](Request const& request) { + EXPECT_TRUE(request.read_object_spec().has_read_handle()); + EXPECT_EQ(request.read_object_spec().read_handle().handle(), + "handle-12345"); + auto stream_result = OpenStreamResult{ + std::make_shared(std::move(stream2)), Response{}}; + return make_ready_future(make_status_or(std::move(stream_result))); + }); + + // Create the ObjectDescriptorImpl + auto tested = std::make_shared( + NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream1))); + + auto response0 = Response{}; + EXPECT_TRUE(TextFormat::ParseFromString(kResponse0, &response0)); + tested->Start(std::move(response0)); + + auto read1 = sequencer.PopFrontWithName(); + EXPECT_EQ(read1.second, "Read[1]"); + // Start a read on the first stream + auto reader1 = tested->Read({100, 100}); + auto future1 = reader1->Read(); + // The implementation starts a read loop eagerly after Start(), and then + // the call to tested->Read() schedules a write. + auto write1 = sequencer.PopFrontWithName(); + EXPECT_EQ(write1.second, "Write[1]"); + write1.first.set_value(true); + + // Now we can satisfy the read. This will deliver the data to the reader. + read1.first.set_value(true); + + EXPECT_THAT(future1.get(), + VariantWith(ResultOf( + "contents are", + [](storage_experimental::ReadPayload const& p) { + return p.contents(); + }, + ElementsAre(absl::string_view{"payload-for-stream-1"})))); + + EXPECT_THAT(reader1->Read().get(), VariantWith(IsOk())); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read[1.eos]"); + next.first.set_value(true); + + // Create and switch to a new stream. This happens before the first + // stream is finished. + tested->MakeSubsequentStream(); + + // The events are interleaved. Based on the log, Finish[1] comes first. + auto finish1 = sequencer.PopFrontWithName(); + EXPECT_EQ(finish1.second, "Finish[1]"); + + auto read2 = sequencer.PopFrontWithName(); + EXPECT_EQ(read2.second, "Read[2]"); + finish1.first.set_value(true); + + // Start a read on the second stream + auto reader2 = tested->Read({200, 200}); + auto future2 = reader2->Read(); + + auto write2 = sequencer.PopFrontWithName(); + EXPECT_EQ(write2.second, "Write[2]"); + write2.first.set_value(true); + + read2.first.set_value(true); + + EXPECT_THAT(future2.get(), + VariantWith(ResultOf( + "contents are", + [](storage_experimental::ReadPayload const& p) { + return p.contents(); + }, + ElementsAre(absl::string_view{"payload-for-stream-2"})))); + + EXPECT_THAT(reader2->Read().get(), VariantWith(IsOk())); + + auto read2_eos = sequencer.PopFrontWithName(); + EXPECT_EQ(read2_eos.second, "Read[2.eos]"); + read2_eos.first.set_value(true); + + auto finish2 = sequencer.PopFrontWithName(); + EXPECT_EQ(finish2.second, "Finish[2]"); + finish2.first.set_value(true); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/object_descriptor_reader_tracing.cc b/google/cloud/storage/internal/async/object_descriptor_reader_tracing.cc index 39f70fd93011d..3754120c02ed9 100644 --- a/google/cloud/storage/internal/async/object_descriptor_reader_tracing.cc +++ b/google/cloud/storage/internal/async/object_descriptor_reader_tracing.cc @@ -41,7 +41,7 @@ class ObjectDescriptorReaderTracing : public ObjectDescriptorReader { ~ObjectDescriptorReaderTracing() override = default; future Read() override { - auto span = internal::MakeSpan("storage::AsyncConnection::ReadObjectRange"); + auto span = internal::MakeSpan("storage::AsyncConnection::ReadRange"); internal::OTelScope scope(span); return ObjectDescriptorReader::Read().then( [span = std::move(span), diff --git a/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc b/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc index b02e7e1783536..c82015c8bf8e6 100644 --- a/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc @@ -62,7 +62,7 @@ TEST(ObjectDescriptorReaderTracing, Read) { auto spans = span_catcher->GetSpans(); EXPECT_THAT( spans, ElementsAre( - AllOf(SpanNamed("storage::AsyncConnection::ReadObjectRange"), + AllOf(SpanNamed("storage::AsyncConnection::ReadRange"), SpanHasEvents(AllOf( EventNamed("gl-cpp.read-range"), SpanEventAttributesAre( @@ -84,7 +84,7 @@ TEST(ObjectDescriptorReaderTracing, ReadError) { EXPECT_THAT( spans, ElementsAre(AllOf( - SpanNamed("storage::AsyncConnection::ReadObjectRange"), + SpanNamed("storage::AsyncConnection::ReadRange"), SpanHasAttributes( OTelAttribute("gl-cpp.status_code", "NOT_FOUND")), SpanHasEvents(AllOf(EventNamed("gl-cpp.read-range"), diff --git a/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h b/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h index 49b645120634c..90e6efda7b9f3 100644 --- a/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h +++ b/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h @@ -31,6 +31,7 @@ class MockAsyncObjectDescriptorConnection (const, override)); MOCK_METHOD(std::unique_ptr, Read, (ReadParams), (override)); + MOCK_METHOD(void, MakeSubsequentStream, (), (override)); MOCK_METHOD(Options, options, (), (const, override)); }; diff --git a/google/cloud/storage/testing/mock_resume_policy.h b/google/cloud/storage/testing/mock_resume_policy.h index b90f60fab4f64..0183f26185ea2 100644 --- a/google/cloud/storage/testing/mock_resume_policy.h +++ b/google/cloud/storage/testing/mock_resume_policy.h @@ -26,6 +26,9 @@ namespace testing { class MockResumePolicy : public storage_experimental::ResumePolicy { public: ~MockResumePolicy() override = default; + + MOCK_METHOD(std::unique_ptr, clone, (), + (const, override)); MOCK_METHOD(void, OnStartSuccess, (), (override)); MOCK_METHOD(ResumePolicy::Action, OnFinish, (Status const&), (override)); }; diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index 7a7516d1e3633..9fc98936868af 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,6 +17,7 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" +#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" #include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" @@ -918,13 +919,24 @@ TEST_F(AsyncClientIntegrationTest, Open) { auto constexpr kSize = 8 * 1024; auto constexpr kStride = 2 * kSize; + auto constexpr kBlockCount = 4; + auto const block = MakeRandomData(kSize); - auto os = client.WriteObject(bucket_name(), object_name); - for (char c : {'0', '1', '2', '3', '4'}) { - os << std::string(kStride, c); + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); } - os.Close(); - ASSERT_STATUS_OK(os.metadata()); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); auto spec = google::storage::v2::BidiReadObjectSpec{}; spec.set_bucket(BucketName(bucket_name()).FullName()); @@ -933,13 +945,7 @@ TEST_F(AsyncClientIntegrationTest, Open) { ASSERT_STATUS_OK(descriptor); AsyncReader r0; - AsyncReader r1; - AsyncReader r2; AsyncToken t0; - AsyncToken t1; - AsyncToken t2; - std::tie(r1, t1) = descriptor->Read(1 * kStride, kSize); - std::tie(r2, t2) = descriptor->Read(1 * kStride, kSize); auto actual0 = std::string{}; std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize); while (t0.valid()) { @@ -952,9 +958,64 @@ TEST_F(AsyncClientIntegrationTest, Open) { t0 = std::move(t); } - EXPECT_EQ(actual0, std::string(kSize, '0')); + EXPECT_EQ(actual0.size(), kSize); + client.DeleteObject(bucket_name(), object_name, + storage::Generation(metadata->generation())); +} + +TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient( + TestOptions().set(1024)); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = client.CreateBucket( + bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + + auto constexpr kSize = 2048; + auto const block = MakeRandomData(kSize); + + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + + auto spec = google::storage::v2::BidiReadObjectSpec{}; + spec.set_bucket(BucketName(bucket_name()).FullName()); + spec.set_object(object_name); + auto descriptor = async.Open(spec).get(); + ASSERT_STATUS_OK(descriptor); + + AsyncReader r0; + AsyncToken t0; + auto actual0 = std::string{}; + std::tie(r0, t0) = descriptor->Read(0, kSize); + while (t0.valid()) { + auto read = r0.Read(std::move(t0)).get(); + ASSERT_STATUS_OK(read); + ReadPayload p; + AsyncToken t; + std::tie(p, t) = *std::move(read); + for (auto sv : p.contents()) actual0 += std::string(sv); + t0 = std::move(t); + } + + EXPECT_EQ(actual0.size(), kSize); client.DeleteObject(bucket_name(), object_name, - storage::Generation(os.metadata()->generation())); + storage::Generation(metadata->generation())); } } // namespace