From 68427647e8bc3707b5c2fa1c8d39454b83ec8f93 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 20 May 2025 07:00:53 +0000 Subject: [PATCH 01/16] feat(ACv2): support to handle large ranges --- .../cloud/storage/async/object_descriptor.cc | 9 ++++- .../async/object_descriptor_connection.h | 3 ++ .../object_descriptor_connection_tracing.cc | 6 ++++ .../internal/async/object_descriptor_impl.cc | 34 ++++++++++++++++--- .../internal/async/object_descriptor_impl.h | 8 ++++- .../async/object_descriptor_reader_tracing.cc | 2 +- .../mock_async_object_descriptor_connection.h | 1 + 7 files changed, 55 insertions(+), 8 deletions(-) diff --git a/google/cloud/storage/async/object_descriptor.cc b/google/cloud/storage/async/object_descriptor.cc index 71c92eb6c526a..116ee46317b83 100644 --- a/google/cloud/storage/async/object_descriptor.cc +++ b/google/cloud/storage/async/object_descriptor.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/async/object_descriptor.h" +#include "google/cloud/storage/async/options.h" #include "google/cloud/internal/make_status.h" namespace google { @@ -26,7 +27,13 @@ absl::optional ObjectDescriptor::metadata() const { std::pair ObjectDescriptor::Read(std::int64_t offset, std::int64_t limit) { - auto reader = impl_->Read({offset, limit}); + auto max_range = + impl_->options().get(); + std::unique_ptr reader; + if (limit > max_range) { + impl_->MakeSubsequentStream(); + } + reader = impl_->Read({offset, limit}); auto token = storage_internal::MakeAsyncToken(reader.get()); return {AsyncReader(std::move(reader)), std::move(token)}; } diff --git a/google/cloud/storage/async/object_descriptor_connection.h b/google/cloud/storage/async/object_descriptor_connection.h index 834be752c22c8..28ea546da2242 100644 --- a/google/cloud/storage/async/object_descriptor_connection.h +++ b/google/cloud/storage/async/object_descriptor_connection.h @@ -15,6 +15,7 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_OBJECT_DESCRIPTOR_CONNECTION_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_OBJECT_DESCRIPTOR_CONNECTION_H +#include "google/cloud/storage/async/reader.h" #include "google/cloud/storage/async/reader_connection.h" #include "google/cloud/options.h" #include "google/cloud/version.h" @@ -59,6 +60,8 @@ class ObjectDescriptorConnection { * Starts a new range read in the current descriptor. */ virtual std::unique_ptr Read(ReadParams p) = 0; + + virtual void MakeSubsequentStream() = 0; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc b/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc index 78595dfda6bca..65a68d44ea71f 100644 --- a/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc +++ b/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc @@ -13,6 +13,8 @@ // limitations under the License. #include "google/cloud/storage/internal/async/object_descriptor_connection_tracing.h" +#include "google/cloud/storage/async/object_descriptor.h" +#include "google/cloud/storage/async/reader.h" #include "google/cloud/storage/async/reader_connection.h" #include "google/cloud/storage/internal/async/reader_connection_tracing.h" #include "google/cloud/internal/opentelemetry.h" @@ -62,6 +64,10 @@ class AsyncObjectDescriptorConnectionTracing return MakeTracingReaderConnection(span_, std::move(result)); } + void MakeSubsequentStream() override { + return impl_->MakeSubsequentStream(); + }; + private: opentelemetry::nostd::shared_ptr span_; std::shared_ptr impl_; diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 3df8a3c848ebe..9cb2ae03119f7 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -37,17 +37,26 @@ ObjectDescriptorImpl::ObjectDescriptorImpl( : resume_policy_(std::move(resume_policy)), make_stream_(std::move(make_stream)), read_object_spec_(std::move(read_object_spec)), - stream_(std::move(stream)), - options_(std::move(options)) {} + options_(std::move(options)), + active_stream_(0), + streams_{std::move(stream)} {} -ObjectDescriptorImpl::~ObjectDescriptorImpl() { stream_->Cancel(); } +ObjectDescriptorImpl::~ObjectDescriptorImpl() { + for (auto stream : streams_) { + stream->Cancel(); + } +} void ObjectDescriptorImpl::Start( google::storage::v2::BidiReadObjectResponse first_response) { OnRead(std::move(first_response)); } -void ObjectDescriptorImpl::Cancel() { stream_->Cancel(); } +void ObjectDescriptorImpl::Cancel() { + for (auto stream : streams_) { + stream->Cancel(); + } +} absl::optional ObjectDescriptorImpl::metadata() const { @@ -55,6 +64,21 @@ absl::optional ObjectDescriptorImpl::metadata() return metadata_; } +void ObjectDescriptorImpl::MakeSubsequentStream() { + auto request = google::storage::v2::BidiReadObjectRequest{}; + + *request.mutable_read_object_spec() = read_object_spec_; + auto stream_result = make_stream_(std::move(request)).get(); + auto stream = std::move(stream_result->stream); + + std::unique_lock lk(mu_); + active_stream_ = streams_.size(); + streams_.push_back(std::move(stream)); + lk.unlock(); + + Start(std::move(stream_result->first_response)); +} + std::unique_ptr ObjectDescriptorImpl::Read(ReadParams p) { std::shared_ptr hash_function = @@ -200,7 +224,7 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { void ObjectDescriptorImpl::OnResume(StatusOr result) { if (!result) return OnFinish(std::move(result).status()); std::unique_lock lk(mu_); - stream_ = std::move(result->stream); + streams_[0] = std::move(result->stream); // TODO(#15105) - this should be done without release the lock. Flush(std::move(lk)); OnRead(std::move(result->first_response)); diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index 55d6d292c7485..41df6ab1fc66b 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -62,6 +62,8 @@ class ObjectDescriptorImpl std::unique_ptr Read( ReadParams p) override; + void MakeSubsequentStream() override; + private: std::weak_ptr WeakFromThis() { return shared_from_this(); @@ -77,7 +79,9 @@ class ObjectDescriptorImpl return CopyActiveRanges(std::unique_lock(mu_)); } - auto CurrentStream(std::unique_lock) const { return stream_; } + auto CurrentStream(std::unique_lock) const { + return streams_[active_stream_]; + } void Flush(std::unique_lock lk); void OnWrite(bool ok); @@ -105,6 +109,8 @@ class ObjectDescriptorImpl std::unordered_map> active_ranges_; Options options_; + std::vector> streams_ = {}; + int active_stream_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/async/object_descriptor_reader_tracing.cc b/google/cloud/storage/internal/async/object_descriptor_reader_tracing.cc index 39f70fd93011d..3754120c02ed9 100644 --- a/google/cloud/storage/internal/async/object_descriptor_reader_tracing.cc +++ b/google/cloud/storage/internal/async/object_descriptor_reader_tracing.cc @@ -41,7 +41,7 @@ class ObjectDescriptorReaderTracing : public ObjectDescriptorReader { ~ObjectDescriptorReaderTracing() override = default; future Read() override { - auto span = internal::MakeSpan("storage::AsyncConnection::ReadObjectRange"); + auto span = internal::MakeSpan("storage::AsyncConnection::ReadRange"); internal::OTelScope scope(span); return ObjectDescriptorReader::Read().then( [span = std::move(span), diff --git a/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h b/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h index 49b645120634c..90e6efda7b9f3 100644 --- a/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h +++ b/google/cloud/storage/mocks/mock_async_object_descriptor_connection.h @@ -31,6 +31,7 @@ class MockAsyncObjectDescriptorConnection (const, override)); MOCK_METHOD(std::unique_ptr, Read, (ReadParams), (override)); + MOCK_METHOD(void, MakeSubsequentStream, (), (override)); MOCK_METHOD(Options, options, (), (const, override)); }; From 17880a463a8c175b25810134366196f7e5e5609d Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 20 May 2025 07:08:34 +0000 Subject: [PATCH 02/16] remove unnecessary changes --- google/cloud/storage/async/object_descriptor.cc | 3 +-- google/cloud/storage/async/object_descriptor_connection.h | 1 - .../internal/async/object_descriptor_connection_tracing.cc | 2 -- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/google/cloud/storage/async/object_descriptor.cc b/google/cloud/storage/async/object_descriptor.cc index 116ee46317b83..29b77eedcb30f 100644 --- a/google/cloud/storage/async/object_descriptor.cc +++ b/google/cloud/storage/async/object_descriptor.cc @@ -29,11 +29,10 @@ std::pair ObjectDescriptor::Read(std::int64_t offset, std::int64_t limit) { auto max_range = impl_->options().get(); - std::unique_ptr reader; if (limit > max_range) { impl_->MakeSubsequentStream(); } - reader = impl_->Read({offset, limit}); + auto reader = impl_->Read({offset, limit}); auto token = storage_internal::MakeAsyncToken(reader.get()); return {AsyncReader(std::move(reader)), std::move(token)}; } diff --git a/google/cloud/storage/async/object_descriptor_connection.h b/google/cloud/storage/async/object_descriptor_connection.h index 28ea546da2242..b1a602fad426d 100644 --- a/google/cloud/storage/async/object_descriptor_connection.h +++ b/google/cloud/storage/async/object_descriptor_connection.h @@ -15,7 +15,6 @@ #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_OBJECT_DESCRIPTOR_CONNECTION_H #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_OBJECT_DESCRIPTOR_CONNECTION_H -#include "google/cloud/storage/async/reader.h" #include "google/cloud/storage/async/reader_connection.h" #include "google/cloud/options.h" #include "google/cloud/version.h" diff --git a/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc b/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc index 65a68d44ea71f..379eb343bdc8d 100644 --- a/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc +++ b/google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc @@ -13,8 +13,6 @@ // limitations under the License. #include "google/cloud/storage/internal/async/object_descriptor_connection_tracing.h" -#include "google/cloud/storage/async/object_descriptor.h" -#include "google/cloud/storage/async/reader.h" #include "google/cloud/storage/async/reader_connection.h" #include "google/cloud/storage/internal/async/reader_connection_tracing.h" #include "google/cloud/internal/opentelemetry.h" From 055ec379184b6c15936567a1500724aadc093a06 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 3 Jun 2025 09:01:04 +0000 Subject: [PATCH 03/16] Fix ci failure --- .../internal/async/object_descriptor_reader_tracing_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc b/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc index b02e7e1783536..c82015c8bf8e6 100644 --- a/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_reader_tracing_test.cc @@ -62,7 +62,7 @@ TEST(ObjectDescriptorReaderTracing, Read) { auto spans = span_catcher->GetSpans(); EXPECT_THAT( spans, ElementsAre( - AllOf(SpanNamed("storage::AsyncConnection::ReadObjectRange"), + AllOf(SpanNamed("storage::AsyncConnection::ReadRange"), SpanHasEvents(AllOf( EventNamed("gl-cpp.read-range"), SpanEventAttributesAre( @@ -84,7 +84,7 @@ TEST(ObjectDescriptorReaderTracing, ReadError) { EXPECT_THAT( spans, ElementsAre(AllOf( - SpanNamed("storage::AsyncConnection::ReadObjectRange"), + SpanNamed("storage::AsyncConnection::ReadRange"), SpanHasAttributes( OTelAttribute("gl-cpp.status_code", "NOT_FOUND")), SpanHasEvents(AllOf(EventNamed("gl-cpp.read-range"), From bd8653d32b92ad52f22fa8a2181a7fd3eff80bd2 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Tue, 3 Jun 2025 12:00:23 +0000 Subject: [PATCH 04/16] Ci failure fix --- google/cloud/storage/async/object_descriptor.cc | 2 +- google/cloud/storage/internal/async/object_descriptor_impl.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/async/object_descriptor.cc b/google/cloud/storage/async/object_descriptor.cc index 29b77eedcb30f..90877d414f9f6 100644 --- a/google/cloud/storage/async/object_descriptor.cc +++ b/google/cloud/storage/async/object_descriptor.cc @@ -27,7 +27,7 @@ absl::optional ObjectDescriptor::metadata() const { std::pair ObjectDescriptor::Read(std::int64_t offset, std::int64_t limit) { - auto max_range = + std::int64_t max_range = impl_->options().get(); if (limit > max_range) { impl_->MakeSubsequentStream(); diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index 41df6ab1fc66b..ea667ba353804 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -109,8 +109,8 @@ class ObjectDescriptorImpl std::unordered_map> active_ranges_; Options options_; + std::int64_t active_stream_ = 0; std::vector> streams_ = {}; - int active_stream_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END From 9cb458be3012401c17c671e171caccaf1d7d572e Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Thu, 12 Jun 2025 10:30:59 +0000 Subject: [PATCH 05/16] ci failure fix --- google/cloud/storage/async/options.h | 2 +- google/cloud/storage/internal/async/object_descriptor_impl.cc | 4 ++-- google/cloud/storage/internal/async/object_descriptor_impl.h | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/async/options.h b/google/cloud/storage/async/options.h index 31f63dc083ced..acadb8f29a46d 100644 --- a/google/cloud/storage/async/options.h +++ b/google/cloud/storage/async/options.h @@ -83,7 +83,7 @@ struct UseMD5ValueOption { * @endcode */ struct MaximumRangeSizeOption { - using Type = std::uint64_t; + using Type = std::int64_t; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 9cb2ae03119f7..a80242cc7fddc 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -42,7 +42,7 @@ ObjectDescriptorImpl::ObjectDescriptorImpl( streams_{std::move(stream)} {} ObjectDescriptorImpl::~ObjectDescriptorImpl() { - for (auto stream : streams_) { + for (auto const& stream : streams_) { stream->Cancel(); } } @@ -53,7 +53,7 @@ void ObjectDescriptorImpl::Start( } void ObjectDescriptorImpl::Cancel() { - for (auto stream : streams_) { + for (auto const& stream : streams_) { stream->Cancel(); } } diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index ea667ba353804..aa22cb9ce7059 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -101,7 +101,6 @@ class ObjectDescriptorImpl mutable std::mutex mu_; google::storage::v2::BidiReadObjectSpec read_object_spec_; - std::shared_ptr stream_; absl::optional metadata_; std::int64_t read_id_generator_ = 0; bool write_pending_ = false; From 93e8309f77d992efd4570218ebfd54265651bf0b Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Sun, 6 Jul 2025 05:56:35 +0000 Subject: [PATCH 06/16] active ranges for each stream --- .../internal/async/object_descriptor_impl.cc | 20 +++++++----- .../internal/async/object_descriptor_impl.h | 8 ++--- .../tests/async_client_integration_test.cc | 31 +++++++++++-------- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index a80242cc7fddc..6dd4e78b75440 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -38,8 +38,10 @@ ObjectDescriptorImpl::ObjectDescriptorImpl( make_stream_(std::move(make_stream)), read_object_spec_(std::move(read_object_spec)), options_(std::move(options)), - active_stream_(0), - streams_{std::move(stream)} {} + streams_{std::move(stream)} { + std::unordered_map> initial_ranges; + active_ranges_.push_back(std::move(initial_ranges)); + } ObjectDescriptorImpl::~ObjectDescriptorImpl() { for (auto const& stream : streams_) { @@ -74,9 +76,10 @@ void ObjectDescriptorImpl::MakeSubsequentStream() { std::unique_lock lk(mu_); active_stream_ = streams_.size(); streams_.push_back(std::move(stream)); + std::unordered_map> active_ranges; + active_ranges_.push_back(std::move(active_ranges)); lk.unlock(); - - Start(std::move(stream_result->first_response)); + OnRead(std::move(stream_result->first_response)); } std::unique_ptr @@ -93,7 +96,7 @@ ObjectDescriptorImpl::Read(ReadParams p) { std::unique_lock lk(mu_); auto const id = ++read_id_generator_; - active_ranges_.emplace(id, range); + active_ranges_[active_stream_].emplace(id, range); auto& read_range = *next_request_.add_read_ranges(); read_range.set_read_id(id); read_range.set_read_offset(p.start); @@ -170,9 +173,10 @@ void ObjectDescriptorImpl::OnRead( void ObjectDescriptorImpl::CleanupDoneRanges( std::unique_lock const&) { - for (auto i = active_ranges_.begin(); i != active_ranges_.end();) { + auto &active_ranges = active_ranges_[active_stream_]; + for (auto i = active_ranges.begin(); i != active_ranges.end();) { if (i->second->IsDone()) { - i = active_ranges_.erase(i); + i = active_ranges.erase(i); } else { ++i; } @@ -209,7 +213,7 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { ApplyRedirectErrors(read_object_spec_, proto_status); auto request = google::storage::v2::BidiReadObjectRequest{}; *request.mutable_read_object_spec() = read_object_spec_; - for (auto const& kv : active_ranges_) { + for (auto const& kv : active_ranges_[active_stream_]) { auto range = kv.second->RangeForResume(kv.first); if (!range) continue; *request.add_read_ranges() = *std::move(range); diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index aa22cb9ce7059..a4bed85dade9f 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -72,7 +72,7 @@ class ObjectDescriptorImpl // This may seem expensive, but it is less bug-prone than iterating over // the map with the lock held. auto CopyActiveRanges(std::unique_lock const&) const { - return active_ranges_; + return active_ranges_[active_stream_]; } auto CopyActiveRanges() const { @@ -106,10 +106,10 @@ class ObjectDescriptorImpl bool write_pending_ = false; google::storage::v2::BidiReadObjectRequest next_request_; - std::unordered_map> active_ranges_; + std::vector>> active_ranges_; Options options_; - std::int64_t active_stream_ = 0; - std::vector> streams_ = {}; + std::size_t active_stream_ = 0; + std::vector> streams_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index 7a7516d1e3633..e91d8314c538a 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -918,13 +918,24 @@ TEST_F(AsyncClientIntegrationTest, Open) { auto constexpr kSize = 8 * 1024; auto constexpr kStride = 2 * kSize; + auto constexpr kBlockCount = 4; + auto const block = MakeRandomData(kSize); - auto os = client.WriteObject(bucket_name(), object_name); - for (char c : {'0', '1', '2', '3', '4'}) { - os << std::string(kStride, c); + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); } - os.Close(); - ASSERT_STATUS_OK(os.metadata()); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); auto spec = google::storage::v2::BidiReadObjectSpec{}; spec.set_bucket(BucketName(bucket_name()).FullName()); @@ -933,13 +944,7 @@ TEST_F(AsyncClientIntegrationTest, Open) { ASSERT_STATUS_OK(descriptor); AsyncReader r0; - AsyncReader r1; - AsyncReader r2; AsyncToken t0; - AsyncToken t1; - AsyncToken t2; - std::tie(r1, t1) = descriptor->Read(1 * kStride, kSize); - std::tie(r2, t2) = descriptor->Read(1 * kStride, kSize); auto actual0 = std::string{}; std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize); while (t0.valid()) { @@ -952,9 +957,9 @@ TEST_F(AsyncClientIntegrationTest, Open) { t0 = std::move(t); } - EXPECT_EQ(actual0, std::string(kSize, '0')); + EXPECT_EQ(actual0.size(), kSize); client.DeleteObject(bucket_name(), object_name, - storage::Generation(os.metadata()->generation())); + storage::Generation(metadata->generation())); } } // namespace From 3f09a59d90eb8cee0515c9d0c3a952558ab5c0eb Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Sun, 6 Jul 2025 07:44:09 +0000 Subject: [PATCH 07/16] checkers-pr fix, handle resume and cancellation --- .../internal/async/object_descriptor_impl.cc | 22 +++++++++++++------ .../internal/async/object_descriptor_impl.h | 13 ++++++++++- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 6dd4e78b75440..ab25e67692a14 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -39,9 +39,8 @@ ObjectDescriptorImpl::ObjectDescriptorImpl( read_object_spec_(std::move(read_object_spec)), options_(std::move(options)), streams_{std::move(stream)} { - std::unordered_map> initial_ranges; - active_ranges_.push_back(std::move(initial_ranges)); - } + AddNewActiveRanges(); +} ObjectDescriptorImpl::~ObjectDescriptorImpl() { for (auto const& stream : streams_) { @@ -60,6 +59,10 @@ void ObjectDescriptorImpl::Cancel() { } } +void ObjectDescriptorImpl::CancelStream(std::shared_ptr stream) { + stream->Cancel(); +} + absl::optional ObjectDescriptorImpl::metadata() const { std::unique_lock lk(mu_); @@ -76,8 +79,7 @@ void ObjectDescriptorImpl::MakeSubsequentStream() { std::unique_lock lk(mu_); active_stream_ = streams_.size(); streams_.push_back(std::move(stream)); - std::unordered_map> active_ranges; - active_ranges_.push_back(std::move(active_ranges)); + AddNewActiveRanges(lk); lk.unlock(); OnRead(std::move(stream_result->first_response)); } @@ -173,7 +175,7 @@ void ObjectDescriptorImpl::OnRead( void ObjectDescriptorImpl::CleanupDoneRanges( std::unique_lock const&) { - auto &active_ranges = active_ranges_[active_stream_]; + auto& active_ranges = active_ranges_[active_stream_]; for (auto i = active_ranges.begin(); i != active_ranges.end();) { if (i->second->IsDone()) { i = active_ranges.erase(i); @@ -204,6 +206,10 @@ void ObjectDescriptorImpl::OnFinish(Status const& status) { for (auto const& kv : copy) { kv.second->OnFinish(status); } + CancelStream(streams_[active_stream_]); + streams_.erase(streams_.begin() + active_stream_); + active_ranges_.erase(active_ranges_.begin() + active_stream_); + active_stream_ = streams_.size(); } void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { @@ -228,7 +234,9 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { void ObjectDescriptorImpl::OnResume(StatusOr result) { if (!result) return OnFinish(std::move(result).status()); std::unique_lock lk(mu_); - streams_[0] = std::move(result->stream); + active_stream_ = streams_.size(); + streams_.push_back(std::move(result->stream)); + AddNewActiveRanges(lk); // TODO(#15105) - this should be done without release the lock. Flush(std::move(lk)); OnRead(std::move(result->first_response)); diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index a4bed85dade9f..869df1cf65a87 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -79,10 +79,20 @@ class ObjectDescriptorImpl return CopyActiveRanges(std::unique_lock(mu_)); } + auto AddNewActiveRanges(std::unique_lock const&) { + std::unordered_map> active_ranges; + return active_ranges_.push_back(std::move(active_ranges)); + } + + auto AddNewActiveRanges() { + return AddNewActiveRanges(std::unique_lock(mu_)); + } + auto CurrentStream(std::unique_lock) const { return streams_[active_stream_]; } + void CancelStream(std::shared_ptr stream); void Flush(std::unique_lock lk); void OnWrite(bool ok); void DoRead(std::unique_lock); @@ -106,7 +116,8 @@ class ObjectDescriptorImpl bool write_pending_ = false; google::storage::v2::BidiReadObjectRequest next_request_; - std::vector>> active_ranges_; + std::vector>> + active_ranges_; Options options_; std::size_t active_stream_ = 0; std::vector> streams_; From 2c3cc2498c44086ddb9ed408eafce3406f954ac2 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Mon, 7 Jul 2025 13:28:41 +0000 Subject: [PATCH 08/16] CI failure fix --- google/cloud/storage/internal/async/object_descriptor_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index 869df1cf65a87..72c6978d2cb5f 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -92,7 +92,7 @@ class ObjectDescriptorImpl return streams_[active_stream_]; } - void CancelStream(std::shared_ptr stream); + static void CancelStream(std::shared_ptr stream); void Flush(std::unique_lock lk); void OnWrite(bool ok); void DoRead(std::unique_lock); From 395ea800c8c248a2112cb5e8538b6564448c12a4 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Mon, 7 Jul 2025 14:27:45 +0000 Subject: [PATCH 09/16] CI fix --- google/cloud/storage/internal/async/object_descriptor_impl.cc | 3 ++- google/cloud/storage/internal/async/object_descriptor_impl.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index ab25e67692a14..4e4e5e3eb47c8 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -59,7 +59,8 @@ void ObjectDescriptorImpl::Cancel() { } } -void ObjectDescriptorImpl::CancelStream(std::shared_ptr stream) { +void ObjectDescriptorImpl::CancelStream( + std::shared_ptr const& stream) { stream->Cancel(); } diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index 72c6978d2cb5f..81bf5f9d0fcd7 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -92,7 +92,7 @@ class ObjectDescriptorImpl return streams_[active_stream_]; } - static void CancelStream(std::shared_ptr stream); + static void CancelStream(std::shared_ptr const& stream); void Flush(std::unique_lock lk); void OnWrite(bool ok); void DoRead(std::unique_lock); From e82cc018a2bc54a7e4370ebc519ac09a25225da0 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Wed, 9 Jul 2025 14:04:31 +0000 Subject: [PATCH 10/16] Tests for subsequent stream --- .../storage/async/object_descriptor_test.cc | 43 +++++ google/cloud/storage/async/resume_policy.cc | 2 +- .../internal/async/object_descriptor_impl.cc | 1 - .../async/object_descriptor_impl_test.cc | 177 ++++++++++++++++++ .../tests/async_client_integration_test.cc | 56 ++++++ 5 files changed, 277 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/async/object_descriptor_test.cc b/google/cloud/storage/async/object_descriptor_test.cc index afd43f31e4c70..0851439821e7c 100644 --- a/google/cloud/storage/async/object_descriptor_test.cc +++ b/google/cloud/storage/async/object_descriptor_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/async/object_descriptor.h" +#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/mocks/mock_async_object_descriptor_connection.h" #include "google/cloud/storage/mocks/mock_async_reader_connection.h" #include "google/cloud/testing_util/status_matchers.h" @@ -147,6 +148,48 @@ TEST(ObjectDescriptor, ReadLast) { EXPECT_FALSE(token.valid()); } +TEST(ObjectDescriptor, ReadExceedsMaxRange) { + auto mock = std::make_shared(); + auto constexpr kMaxRange = 1024; + EXPECT_CALL(*mock, options) + .WillRepeatedly( + Return(Options{}.set( + kMaxRange))); + + EXPECT_CALL(*mock, MakeSubsequentStream).Times(1); + + EXPECT_CALL(*mock, Read) + .WillOnce([&](ReadParams p) -> std::unique_ptr { + EXPECT_EQ(p.start, 100); + EXPECT_EQ(p.length, kMaxRange + 1); + auto reader = std::make_unique(); + EXPECT_CALL(*reader, Read) + .WillOnce([] { + return make_ready_future( + ReadResponse(ReadPayload(std::string("some data")))); + }) + .WillOnce([] { return make_ready_future(ReadResponse(Status{})); }); + return reader; + }); + + auto tested = ObjectDescriptor(mock); + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = tested.Read(100, kMaxRange + 1); + ASSERT_TRUE(token.valid()); + + auto r1 = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(r1); + ReadPayload payload; + std::tie(payload, token) = *std::move(r1); + EXPECT_THAT(payload.contents(), ElementsAre("some data")); + + auto r2 = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(r2); + std::tie(payload, token) = *std::move(r2); + EXPECT_FALSE(token.valid()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_experimental diff --git a/google/cloud/storage/async/resume_policy.cc b/google/cloud/storage/async/resume_policy.cc index 07224a9cab5e8..d499db090ff28 100644 --- a/google/cloud/storage/async/resume_policy.cc +++ b/google/cloud/storage/async/resume_policy.cc @@ -35,7 +35,7 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy { } Action OnFinish(Status const& s) override { if (!s.ok()) ++error_count_; - return error_count_ > maximum_resumes_ ? kStop : kContinue; + return error_count_ >= maximum_resumes_ ? kStop : kContinue; } private: diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 4e4e5e3eb47c8..c5a551142e61a 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -248,7 +248,6 @@ bool ObjectDescriptorImpl::IsResumable( for (auto const& any : proto_status.details()) { auto error = google::storage::v2::BidiReadObjectError{}; if (!any.UnpackTo(&error)) continue; - auto ranges = CopyActiveRanges(); for (auto const& range : CopyActiveRanges()) { for (auto const& range_error : error.read_range_errors()) { if (range.first != range_error.read_id()) continue; diff --git a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc index 31da8c814417d..8c7ed7f8b436b 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc @@ -1190,6 +1190,183 @@ TEST(ObjectDescriptorImpl, RecoverFromPartialFailure) { EXPECT_THAT(s3r1.get(), VariantWith(PermanentError())); } +/// @test Verify that we can create a subsequent stream and read from it. +TEST(ObjectDescriptorImpl, ReadWithSubsequentStream) { + // Setup + auto constexpr kResponse0 = R"pb( + metadata { + bucket: "projects/_/buckets/test-bucket" + name: "test-object" + generation: 42 + } + read_handle { handle: "handle-12345" } + )pb"; + auto constexpr kRequest1 = R"pb( + read_ranges { read_id: 1 read_offset: 100 read_length: 100 } + )pb"; + auto constexpr kResponse1 = R"pb( + object_data_ranges { + range_end: true + read_range { read_id: 1 read_offset: 100 } + checksummed_data { content: "payload-for-stream-1" } + } + )pb"; + auto constexpr kRequest2 = R"pb( + read_ranges { read_id: 2 read_offset: 200 read_length: 200 } + )pb"; + auto constexpr kResponse2 = R"pb( + object_data_ranges { + range_end: true + read_range { read_id: 2 read_offset: 200 } + checksummed_data { content: "payload-for-stream-2" } + } + )pb"; + + AsyncSequencer sequencer; + + // First stream setup + auto stream1 = std::make_unique(); + EXPECT_CALL(*stream1, Write) + .WillOnce([&](Request const& request, grpc::WriteOptions) { + auto expected = Request{}; + EXPECT_TRUE(TextFormat::ParseFromString(kRequest1, &expected)); + EXPECT_THAT(request, IsProtoEqual(expected)); + return sequencer.PushBack("Write[1]").then([](auto f) { + return f.get(); + }); + }); + EXPECT_CALL(*stream1, Read) + .WillOnce([&]() { + return sequencer.PushBack("Read[1]").then([&](auto) { + auto response = Response{}; + EXPECT_TRUE(TextFormat::ParseFromString(kResponse1, &response)); + return absl::make_optional(response); + }); + }) + .WillOnce([&]() { + return sequencer.PushBack("Read[1.eos]").then([&](auto) { + return absl::optional{}; + }); + }); + EXPECT_CALL(*stream1, Finish).WillOnce([&]() { + return sequencer.PushBack("Finish[1]").then([](auto) { return Status{}; }); + }); + EXPECT_CALL(*stream1, Cancel).Times(1); + + // Second stream setup + auto stream2 = std::make_unique(); + EXPECT_CALL(*stream2, Write) + .WillOnce([&](Request const& request, grpc::WriteOptions) { + auto expected = Request{}; + EXPECT_TRUE(TextFormat::ParseFromString(kRequest2, &expected)); + EXPECT_THAT(request, IsProtoEqual(expected)); + return sequencer.PushBack("Write[2]").then([](auto f) { + return f.get(); + }); + }); + EXPECT_CALL(*stream2, Read) + .WillOnce([&]() { + return sequencer.PushBack("Read[2]").then([&](auto) { + auto response = Response{}; + EXPECT_TRUE(TextFormat::ParseFromString(kResponse2, &response)); + return absl::make_optional(response); + }); + }) + .WillOnce([&]() { + return sequencer.PushBack("Read[2.eos]").then([](auto) { + return absl::optional{}; + }); + }); + EXPECT_CALL(*stream2, Finish).WillOnce([&]() { + return sequencer.PushBack("Finish[2]").then([](auto) { return Status{}; }); + }); + EXPECT_CALL(*stream2, Cancel).Times(1); + + // Mock factory for subsequent streams + MockFactory factory; + EXPECT_CALL(factory, Call).WillOnce([&](Request const&) { + auto stream_result = OpenStreamResult{ + std::make_shared(std::move(stream2)), Response{}}; + return make_ready_future(make_status_or(std::move(stream_result))); + }); + + // Create the ObjectDescriptorImpl + auto tested = std::make_shared( + NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream1))); + + auto response0 = Response{}; + EXPECT_TRUE(TextFormat::ParseFromString(kResponse0, &response0)); + tested->Start(std::move(response0)); + + auto read1 = sequencer.PopFrontWithName(); + EXPECT_EQ(read1.second, "Read[1]"); + // Start a read on the first stream + auto reader1 = tested->Read({100, 100}); + auto future1 = reader1->Read(); + // The implementation starts a read loop eagerly after Start(), and then + // the call to tested->Read() schedules a write. + auto write1 = sequencer.PopFrontWithName(); + EXPECT_EQ(write1.second, "Write[1]"); + write1.first.set_value(true); + + // Now we can satisfy the read. This will deliver the data to the reader. + read1.first.set_value(true); + + EXPECT_THAT(future1.get(), + VariantWith(ResultOf( + "contents are", + [](storage_experimental::ReadPayload const& p) { + return p.contents(); + }, + ElementsAre(absl::string_view{"payload-for-stream-1"})))); + + EXPECT_THAT(reader1->Read().get(), VariantWith(IsOk())); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read[1.eos]"); + next.first.set_value(true); + + // The first stream should be finishing now. + auto finish1 = sequencer.PopFrontWithName(); + EXPECT_EQ(finish1.second, "Finish[1]"); + finish1.first.set_value(true); + + // Create and switch to a new stream + tested->MakeSubsequentStream(); + + auto read2 = sequencer.PopFrontWithName(); + EXPECT_EQ(read2.second, "Read[2]"); + // Start a read on the second stream + auto reader2 = tested->Read({200, 200}); + auto future2 = reader2->Read(); + + auto write2 = sequencer.PopFrontWithName(); + EXPECT_EQ(write2.second, "Write[2]"); + write2.first.set_value(true); + + read2.first.set_value(true); + + EXPECT_THAT(future2.get(), + VariantWith(ResultOf( + "contents are", + [](storage_experimental::ReadPayload const& p) { + return p.contents(); + }, + ElementsAre(absl::string_view{"payload-for-stream-2"})))); + + EXPECT_THAT(reader2->Read().get(), VariantWith(IsOk())); + + auto read2_eos = sequencer.PopFrontWithName(); + EXPECT_EQ(read2_eos.second, "Read[2.eos]"); + read2_eos.first.set_value(true); + + auto finish2 = sequencer.PopFrontWithName(); + EXPECT_EQ(finish2.second, "Finish[2]"); + finish2.first.set_value(true); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index e91d8314c538a..9fc98936868af 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,6 +17,7 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" +#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" #include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" @@ -962,6 +963,61 @@ TEST_F(AsyncClientIntegrationTest, Open) { storage::Generation(metadata->generation())); } +TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient( + TestOptions().set(1024)); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = client.CreateBucket( + bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + + auto constexpr kSize = 2048; + auto const block = MakeRandomData(kSize); + + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + + auto spec = google::storage::v2::BidiReadObjectSpec{}; + spec.set_bucket(BucketName(bucket_name()).FullName()); + spec.set_object(object_name); + auto descriptor = async.Open(spec).get(); + ASSERT_STATUS_OK(descriptor); + + AsyncReader r0; + AsyncToken t0; + auto actual0 = std::string{}; + std::tie(r0, t0) = descriptor->Read(0, kSize); + while (t0.valid()) { + auto read = r0.Read(std::move(t0)).get(); + ASSERT_STATUS_OK(read); + ReadPayload p; + AsyncToken t; + std::tie(p, t) = *std::move(read); + for (auto sv : p.contents()) actual0 += std::string(sv); + t0 = std::move(t); + } + + EXPECT_EQ(actual0.size(), kSize); + client.DeleteObject(bucket_name(), object_name, + storage::Generation(metadata->generation())); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_experimental From d146af829814b1f66809d6661d2b808569ef87c2 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Wed, 9 Jul 2025 14:41:38 +0000 Subject: [PATCH 11/16] Remove the resume_policy change --- google/cloud/storage/async/resume_policy.cc | 2 +- .../storage/internal/async/object_descriptor_impl_test.cc | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/async/resume_policy.cc b/google/cloud/storage/async/resume_policy.cc index d499db090ff28..07224a9cab5e8 100644 --- a/google/cloud/storage/async/resume_policy.cc +++ b/google/cloud/storage/async/resume_policy.cc @@ -35,7 +35,7 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy { } Action OnFinish(Status const& s) override { if (!s.ok()) ++error_count_; - return error_count_ >= maximum_resumes_ ? kStop : kContinue; + return error_count_ > maximum_resumes_ ? kStop : kContinue; } private: diff --git a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc index 8c7ed7f8b436b..c55a442152582 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc @@ -1249,7 +1249,9 @@ TEST(ObjectDescriptorImpl, ReadWithSubsequentStream) { }); }); EXPECT_CALL(*stream1, Finish).WillOnce([&]() { - return sequencer.PushBack("Finish[1]").then([](auto) { return Status{}; }); + return sequencer.PushBack("Finish[1]").then([](auto) { + return PermanentError(); + }); }); EXPECT_CALL(*stream1, Cancel).Times(1); From ba7c75d74eeb485514217537365db6e8e3822ca4 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Thu, 24 Jul 2025 17:57:01 +0000 Subject: [PATCH 12/16] Create a Struct Stream to handle multi streams. Each stream has a resume_policy too --- google/cloud/storage/async/resume_policy.cc | 12 ++++-- google/cloud/storage/async/resume_policy.h | 4 +- .../internal/async/object_descriptor_impl.cc | 41 ++++++++----------- .../internal/async/object_descriptor_impl.h | 29 ++++++------- .../storage/testing/mock_resume_policy.h | 3 ++ 5 files changed, 44 insertions(+), 45 deletions(-) diff --git a/google/cloud/storage/async/resume_policy.cc b/google/cloud/storage/async/resume_policy.cc index 07224a9cab5e8..24e086b965038 100644 --- a/google/cloud/storage/async/resume_policy.cc +++ b/google/cloud/storage/async/resume_policy.cc @@ -25,7 +25,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy { public: explicit LimitedErrorCountResumePolicyImpl(int maximum_resumes) : maximum_resumes_(maximum_resumes) {} - ~LimitedErrorCountResumePolicyImpl() override = default; + + std::unique_ptr clone() const override { + return std::make_unique(*this); + } void OnStartSuccess() override { // For this policy we are only interested in the number of failures. @@ -46,7 +49,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy { class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy { public: StopOnConsecutiveErrorsResumePolicyImpl() = default; - ~StopOnConsecutiveErrorsResumePolicyImpl() override = default; + + std::unique_ptr clone() const override { + return std::make_unique(*this); + } void OnStartSuccess() override { next_action_ = kContinue; } Action OnFinish(Status const&) override { @@ -59,8 +65,6 @@ class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy { } // namespace -ResumePolicy::~ResumePolicy() = default; - ResumePolicyFactory LimitedErrorCountResumePolicy(int maximum_resumes) { return [maximum_resumes]() -> std::unique_ptr { return std::make_unique(maximum_resumes); diff --git a/google/cloud/storage/async/resume_policy.h b/google/cloud/storage/async/resume_policy.h index 8b6a40e37fc07..cd2f5a4c87296 100644 --- a/google/cloud/storage/async/resume_policy.h +++ b/google/cloud/storage/async/resume_policy.h @@ -32,7 +32,9 @@ class ResumePolicy { public: enum Action { kStop, kContinue }; - virtual ~ResumePolicy() = 0; + virtual ~ResumePolicy() = default; + + virtual std::unique_ptr clone() const = 0; /** * Notifies the policy about successful connections. diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index c5a551142e61a..cefc546248378 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -34,17 +34,17 @@ ObjectDescriptorImpl::ObjectDescriptorImpl( OpenStreamFactory make_stream, google::storage::v2::BidiReadObjectSpec read_object_spec, std::shared_ptr stream, Options options) - : resume_policy_(std::move(resume_policy)), + : resume_policy_prototype_(std::move(resume_policy)), make_stream_(std::move(make_stream)), read_object_spec_(std::move(read_object_spec)), - options_(std::move(options)), - streams_{std::move(stream)} { - AddNewActiveRanges(); + options_(std::move(options)) { + streams_.push_back( + Stream{std::move(stream), {}, resume_policy_prototype_->clone()}); } ObjectDescriptorImpl::~ObjectDescriptorImpl() { for (auto const& stream : streams_) { - stream->Cancel(); + stream.stream->Cancel(); } } @@ -55,7 +55,7 @@ void ObjectDescriptorImpl::Start( void ObjectDescriptorImpl::Cancel() { for (auto const& stream : streams_) { - stream->Cancel(); + stream.stream->Cancel(); } } @@ -75,12 +75,11 @@ void ObjectDescriptorImpl::MakeSubsequentStream() { *request.mutable_read_object_spec() = read_object_spec_; auto stream_result = make_stream_(std::move(request)).get(); - auto stream = std::move(stream_result->stream); std::unique_lock lk(mu_); - active_stream_ = streams_.size(); - streams_.push_back(std::move(stream)); - AddNewActiveRanges(lk); + streams_.push_back(Stream{std::move(stream_result->stream), + {}, + resume_policy_prototype_->clone()}); lk.unlock(); OnRead(std::move(stream_result->first_response)); } @@ -99,7 +98,7 @@ ObjectDescriptorImpl::Read(ReadParams p) { std::unique_lock lk(mu_); auto const id = ++read_id_generator_; - active_ranges_[active_stream_].emplace(id, range); + streams_.back().active_ranges.emplace(id, range); auto& read_range = *next_request_.add_read_ranges(); read_range.set_read_id(id); read_range.set_read_offset(p.start); @@ -176,7 +175,8 @@ void ObjectDescriptorImpl::OnRead( void ObjectDescriptorImpl::CleanupDoneRanges( std::unique_lock const&) { - auto& active_ranges = active_ranges_[active_stream_]; + if (streams_.empty()) return; + auto& active_ranges = streams_.back().active_ranges; for (auto i = active_ranges.begin(); i != active_ranges.end();) { if (i->second->IsDone()) { i = active_ranges.erase(i); @@ -207,10 +207,6 @@ void ObjectDescriptorImpl::OnFinish(Status const& status) { for (auto const& kv : copy) { kv.second->OnFinish(status); } - CancelStream(streams_[active_stream_]); - streams_.erase(streams_.begin() + active_stream_); - active_ranges_.erase(active_ranges_.begin() + active_stream_); - active_stream_ = streams_.size(); } void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { @@ -220,7 +216,7 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { ApplyRedirectErrors(read_object_spec_, proto_status); auto request = google::storage::v2::BidiReadObjectRequest{}; *request.mutable_read_object_spec() = read_object_spec_; - for (auto const& kv : active_ranges_[active_stream_]) { + for (auto const& kv : streams_.back().active_ranges) { auto range = kv.second->RangeForResume(kv.first); if (!range) continue; *request.add_read_ranges() = *std::move(range); @@ -235,9 +231,9 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { void ObjectDescriptorImpl::OnResume(StatusOr result) { if (!result) return OnFinish(std::move(result).status()); std::unique_lock lk(mu_); - active_stream_ = streams_.size(); - streams_.push_back(std::move(result->stream)); - AddNewActiveRanges(lk); + streams_.push_back(Stream{std::move(result->stream), + {}, + resume_policy_prototype_->clone()}); // TODO(#15105) - this should be done without release the lock. Flush(std::move(lk)); OnRead(std::move(result->first_response)); @@ -257,9 +253,8 @@ bool ObjectDescriptorImpl::IsResumable( CleanupDoneRanges(std::unique_lock(mu_)); return true; } - - return resume_policy_->OnFinish(status) == - storage_experimental::ResumePolicy::kContinue; + return streams_.back().resume_policy->OnFinish(status) == + storage_experimental::ResumePolicy::kContinue; } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index 81bf5f9d0fcd7..916c93799ff9f 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -38,6 +38,13 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN class ObjectDescriptorImpl : public storage_experimental::ObjectDescriptorConnection, public std::enable_shared_from_this { + private: + struct Stream { + std::shared_ptr stream; + std::unordered_map> active_ranges; + std::unique_ptr resume_policy; + }; + public: ObjectDescriptorImpl( std::unique_ptr resume_policy, @@ -72,27 +79,18 @@ class ObjectDescriptorImpl // This may seem expensive, but it is less bug-prone than iterating over // the map with the lock held. auto CopyActiveRanges(std::unique_lock const&) const { - return active_ranges_[active_stream_]; + return streams_.back().active_ranges; } auto CopyActiveRanges() const { return CopyActiveRanges(std::unique_lock(mu_)); } - auto AddNewActiveRanges(std::unique_lock const&) { - std::unordered_map> active_ranges; - return active_ranges_.push_back(std::move(active_ranges)); - } - - auto AddNewActiveRanges() { - return AddNewActiveRanges(std::unique_lock(mu_)); - } - auto CurrentStream(std::unique_lock) const { - return streams_[active_stream_]; + return streams_.back().stream; } - static void CancelStream(std::shared_ptr const& stream); + void CancelStream(std::shared_ptr const& stream); void Flush(std::unique_lock lk); void OnWrite(bool ok); void DoRead(std::unique_lock); @@ -106,7 +104,7 @@ class ObjectDescriptorImpl bool IsResumable(Status const& status, google::rpc::Status const& proto_status); - std::unique_ptr resume_policy_; + std::unique_ptr resume_policy_prototype_; OpenStreamFactory make_stream_; mutable std::mutex mu_; @@ -116,11 +114,8 @@ class ObjectDescriptorImpl bool write_pending_ = false; google::storage::v2::BidiReadObjectRequest next_request_; - std::vector>> - active_ranges_; Options options_; - std::size_t active_stream_ = 0; - std::vector> streams_; + std::vector streams_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/testing/mock_resume_policy.h b/google/cloud/storage/testing/mock_resume_policy.h index b90f60fab4f64..0183f26185ea2 100644 --- a/google/cloud/storage/testing/mock_resume_policy.h +++ b/google/cloud/storage/testing/mock_resume_policy.h @@ -26,6 +26,9 @@ namespace testing { class MockResumePolicy : public storage_experimental::ResumePolicy { public: ~MockResumePolicy() override = default; + + MOCK_METHOD(std::unique_ptr, clone, (), + (const, override)); MOCK_METHOD(void, OnStartSuccess, (), (override)); MOCK_METHOD(ResumePolicy::Action, OnFinish, (Status const&), (override)); }; From 47682ffb6c4bf90d191bdf59a87982b06860739b Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Thu, 24 Jul 2025 19:22:07 +0000 Subject: [PATCH 13/16] Fix object_descriptor_impl_test --- .../async/object_descriptor_impl_test.cc | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc index c55a442152582..595dd34231b6d 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc @@ -1286,7 +1286,10 @@ TEST(ObjectDescriptorImpl, ReadWithSubsequentStream) { // Mock factory for subsequent streams MockFactory factory; - EXPECT_CALL(factory, Call).WillOnce([&](Request const&) { + EXPECT_CALL(factory, Call).WillOnce([&](Request const& request) { + EXPECT_TRUE(request.read_object_spec().has_read_handle()); + EXPECT_EQ(request.read_object_spec().read_handle().handle(), + "handle-12345"); auto stream_result = OpenStreamResult{ std::make_shared(std::move(stream2)), Response{}}; return make_ready_future(make_status_or(std::move(stream_result))); @@ -1330,16 +1333,18 @@ TEST(ObjectDescriptorImpl, ReadWithSubsequentStream) { EXPECT_EQ(next.second, "Read[1.eos]"); next.first.set_value(true); - // The first stream should be finishing now. + // Create and switch to a new stream. This happens before the first + // stream is finished. + tested->MakeSubsequentStream(); + + // The events are interleaved. Based on the log, Finish[1] comes first. auto finish1 = sequencer.PopFrontWithName(); EXPECT_EQ(finish1.second, "Finish[1]"); - finish1.first.set_value(true); - - // Create and switch to a new stream - tested->MakeSubsequentStream(); auto read2 = sequencer.PopFrontWithName(); EXPECT_EQ(read2.second, "Read[2]"); + finish1.first.set_value(true); + // Start a read on the second stream auto reader2 = tested->Read({200, 200}); auto future2 = reader2->Read(); From 629762ec2d11f9bc94cb36dad092b9941c38940d Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Thu, 24 Jul 2025 20:11:22 +0000 Subject: [PATCH 14/16] Fix ci failures --- .../internal/async/object_descriptor_impl.cc | 17 +++++------------ .../internal/async/object_descriptor_impl.h | 1 - 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index cefc546248378..6e3298ee6776d 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -59,11 +59,6 @@ void ObjectDescriptorImpl::Cancel() { } } -void ObjectDescriptorImpl::CancelStream( - std::shared_ptr const& stream) { - stream->Cancel(); -} - absl::optional ObjectDescriptorImpl::metadata() const { std::unique_lock lk(mu_); @@ -77,9 +72,8 @@ void ObjectDescriptorImpl::MakeSubsequentStream() { auto stream_result = make_stream_(std::move(request)).get(); std::unique_lock lk(mu_); - streams_.push_back(Stream{std::move(stream_result->stream), - {}, - resume_policy_prototype_->clone()}); + streams_.push_back(Stream{ + std::move(stream_result->stream), {}, resume_policy_prototype_->clone()}); lk.unlock(); OnRead(std::move(stream_result->first_response)); } @@ -231,9 +225,8 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { void ObjectDescriptorImpl::OnResume(StatusOr result) { if (!result) return OnFinish(std::move(result).status()); std::unique_lock lk(mu_); - streams_.push_back(Stream{std::move(result->stream), - {}, - resume_policy_prototype_->clone()}); + streams_.push_back( + Stream{std::move(result->stream), {}, resume_policy_prototype_->clone()}); // TODO(#15105) - this should be done without release the lock. Flush(std::move(lk)); OnRead(std::move(result->first_response)); @@ -254,7 +247,7 @@ bool ObjectDescriptorImpl::IsResumable( return true; } return streams_.back().resume_policy->OnFinish(status) == - storage_experimental::ResumePolicy::kContinue; + storage_experimental::ResumePolicy::kContinue; } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index 916c93799ff9f..3afe0c79a3100 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -90,7 +90,6 @@ class ObjectDescriptorImpl return streams_.back().stream; } - void CancelStream(std::shared_ptr const& stream); void Flush(std::unique_lock lk); void OnWrite(bool ok); void DoRead(std::unique_lock); From 342d6fdbb3502aa34e073086132fa383a738e10b Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Fri, 25 Jul 2025 05:43:38 +0000 Subject: [PATCH 15/16] write_pending for each stream --- .../storage/internal/async/object_descriptor_impl.cc | 10 ++++++---- .../storage/internal/async/object_descriptor_impl.h | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index 6e3298ee6776d..fe9ab152425b2 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -108,8 +108,10 @@ ObjectDescriptorImpl::Read(ReadParams p) { } void ObjectDescriptorImpl::Flush(std::unique_lock lk) { - if (write_pending_ || next_request_.read_ranges().empty()) return; - write_pending_ = true; + if (streams_.back().write_pending || next_request_.read_ranges().empty()) { + return; + } + streams_.back().write_pending = true; google::storage::v2::BidiReadObjectRequest request; request.Swap(&next_request_); @@ -125,7 +127,7 @@ void ObjectDescriptorImpl::Flush(std::unique_lock lk) { void ObjectDescriptorImpl::OnWrite(bool ok) { std::unique_lock lk(mu_); if (!ok) return DoFinish(std::move(lk)); - write_pending_ = false; + streams_.back().write_pending = false; Flush(std::move(lk)); } @@ -215,7 +217,7 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { if (!range) continue; *request.add_read_ranges() = *std::move(range); } - write_pending_ = true; + streams_.back().write_pending = true; lk.unlock(); make_stream_(std::move(request)).then([w = WeakFromThis()](auto f) { if (auto self = w.lock()) self->OnResume(f.get()); diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index 3afe0c79a3100..f4c467de5c42e 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -43,6 +43,7 @@ class ObjectDescriptorImpl std::shared_ptr stream; std::unordered_map> active_ranges; std::unique_ptr resume_policy; + bool write_pending = false; }; public: @@ -110,7 +111,6 @@ class ObjectDescriptorImpl google::storage::v2::BidiReadObjectSpec read_object_spec_; absl::optional metadata_; std::int64_t read_id_generator_ = 0; - bool write_pending_ = false; google::storage::v2::BidiReadObjectRequest next_request_; Options options_; From de3a43601b559e034f0aa27467b0e3827d3318d0 Mon Sep 17 00:00:00 2001 From: bajajnehaa Date: Fri, 8 Aug 2025 11:06:42 +0000 Subject: [PATCH 16/16] Address review comments --- google/cloud/storage/internal/async/object_descriptor_impl.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index fe9ab152425b2..cebca0a2bbfc5 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -248,6 +248,7 @@ bool ObjectDescriptorImpl::IsResumable( CleanupDoneRanges(std::unique_lock(mu_)); return true; } + std::unique_lock lk(mu_); return streams_.back().resume_policy->OnFinish(status) == storage_experimental::ResumePolicy::kContinue; }