Skip to content

Commit 3545d0a

Browse files
authored
feat(storage): implement multi stream manager for async downloads (#15858)
1 parent 752f0ed commit 3545d0a

13 files changed

+1256
-304
lines changed

google/cloud/storage/async/object_descriptor.cc

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,11 @@ absl::optional<google::storage::v2::Object> ObjectDescriptor::metadata() const {
2727

2828
std::pair<AsyncReader, AsyncToken> ObjectDescriptor::Read(std::int64_t offset,
2929
std::int64_t limit) {
30-
// TODO(15340): This change is causing performance regression. We need to
31-
// revisit it after benchmarking our code.
32-
33-
// std::int64_t max_range =
34-
// impl_->options().get<storage_experimental::MaximumRangeSizeOption>();
35-
// if (limit > max_range) {
36-
// impl_->MakeSubsequentStream();
37-
// }
30+
std::int64_t max_range =
31+
impl_->options().get<storage_experimental::MaximumRangeSizeOption>();
32+
if (limit > max_range) {
33+
impl_->MakeSubsequentStream();
34+
}
3835
auto reader = impl_->Read({offset, limit});
3936
auto token = storage_internal::MakeAsyncToken(reader.get());
4037
return {AsyncReader(std::move(reader)), std::move(token)};

google/cloud/storage/async/object_descriptor_test.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ TEST(ObjectDescriptor, ReadLast) {
149149
}
150150

151151
TEST(ObjectDescriptor, ReadExceedsMaxRange) {
152-
GTEST_SKIP();
153152
auto mock = std::make_shared<MockAsyncObjectDescriptorConnection>();
154153
auto constexpr kMaxRange = 1024;
155154
EXPECT_CALL(*mock, options)

google/cloud/storage/google_cloud_cpp_storage_grpc.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ google_cloud_cpp_storage_grpc_hdrs = [
4343
"internal/async/default_options.h",
4444
"internal/async/handle_redirect_error.h",
4545
"internal/async/insert_object.h",
46+
"internal/async/multi_stream_manager.h",
4647
"internal/async/object_descriptor_connection_tracing.h",
4748
"internal/async/object_descriptor_impl.h",
4849
"internal/async/object_descriptor_reader.h",
@@ -120,6 +121,7 @@ google_cloud_cpp_storage_grpc_srcs = [
120121
"internal/async/default_options.cc",
121122
"internal/async/handle_redirect_error.cc",
122123
"internal/async/insert_object.cc",
124+
"internal/async/multi_stream_manager.cc",
123125
"internal/async/object_descriptor_connection_tracing.cc",
124126
"internal/async/object_descriptor_impl.cc",
125127
"internal/async/object_descriptor_reader.cc",

google/cloud/storage/google_cloud_cpp_storage_grpc.cmake

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ add_library(
110110
internal/async/handle_redirect_error.h
111111
internal/async/insert_object.cc
112112
internal/async/insert_object.h
113+
internal/async/multi_stream_manager.cc
114+
internal/async/multi_stream_manager.h
113115
internal/async/object_descriptor_connection_tracing.cc
114116
internal/async/object_descriptor_connection_tracing.h
115117
internal/async/object_descriptor_impl.cc
@@ -442,6 +444,7 @@ set(storage_client_grpc_unit_tests
442444
internal/async/default_options_test.cc
443445
internal/async/handle_redirect_error_test.cc
444446
internal/async/insert_object_test.cc
447+
internal/async/multi_stream_manager_test.cc
445448
internal/async/object_descriptor_connection_tracing_test.cc
446449
internal/async/object_descriptor_impl_test.cc
447450
internal/async/object_descriptor_reader_test.cc

google/cloud/storage/internal/async/connection_impl_open_test.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ using ::google::cloud::testing_util::IsProtoEqual;
4444
using ::google::cloud::testing_util::MockCompletionQueueImpl;
4545
using ::google::cloud::testing_util::StatusIs;
4646
using ::google::protobuf::TextFormat;
47+
using ::testing::InvokeWithoutArgs;
48+
using ::testing::NiceMock;
4749
using ::testing::NotNull;
4850
using ::testing::Optional;
4951

@@ -183,10 +185,29 @@ TEST(AsyncConnectionImplTest, OpenSimple) {
183185
[](auto) { return Status{}; });
184186
});
185187

188+
return std::unique_ptr<BidiReadStream>(std::move(stream));
189+
})
190+
.WillRepeatedly([](CompletionQueue const&,
191+
std::shared_ptr<grpc::ClientContext> const&,
192+
google::cloud::internal::ImmutableOptions const&) {
193+
auto stream = std::make_unique<NiceMock<MockStream>>();
194+
ON_CALL(*stream, Start).WillByDefault(InvokeWithoutArgs([] {
195+
return make_ready_future(false);
196+
}));
197+
ON_CALL(*stream, Finish).WillByDefault(InvokeWithoutArgs([] {
198+
return make_ready_future(Status{});
199+
}));
200+
ON_CALL(*stream, Cancel).WillByDefault([] {});
186201
return std::unique_ptr<BidiReadStream>(std::move(stream));
187202
});
188203

189204
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
205+
EXPECT_CALL(*mock_cq, MakeRelativeTimer)
206+
.WillRepeatedly([](std::chrono::nanoseconds) {
207+
return make_ready_future(
208+
StatusOr<std::chrono::system_clock::time_point>(
209+
std::chrono::system_clock::now()));
210+
});
190211
auto connection = std::make_shared<AsyncConnectionImpl>(
191212
CompletionQueue(mock_cq), std::shared_ptr<GrpcChannelRefresh>(), mock,
192213
TestOptions());
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/storage/internal/async/multi_stream_manager.h"
16+
#include "google/cloud/storage/internal/async/object_descriptor_impl.h"
17+
18+
namespace google {
19+
namespace cloud {
20+
namespace storage_internal {
21+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
22+
23+
// Explicit instantiation for ObjectDescriptorImpl usage.
24+
template class MultiStreamManager<ReadStream, ReadRange>;
25+
26+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
27+
} // namespace storage_internal
28+
} // namespace cloud
29+
} // namespace google
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_MULTI_STREAM_MANAGER_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_MULTI_STREAM_MANAGER_H
17+
18+
#include "google/cloud/status.h"
19+
#include "google/cloud/version.h"
20+
#include <cstdint>
21+
#include <functional>
22+
#include <list>
23+
#include <memory>
24+
#include <unordered_map>
25+
26+
namespace google {
27+
namespace cloud {
28+
namespace storage_internal {
29+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
30+
31+
// Defines the interface contract that any stream type (e.g., ReadStream,
32+
// WriteStream) managed by MultiStreamManager must implement. This explicit base
33+
// class ensures we have a clear, enforceable interface for operations like
34+
// CancelAll().
35+
class StreamBase {
36+
public:
37+
virtual ~StreamBase() = default;
38+
virtual void Cancel() = 0;
39+
};
40+
41+
// Manages a collection of streams.
42+
//
43+
// This class implements the "Subsequent Stream" logic where idle streams
44+
// are moved to the front of the queue for reuse.
45+
//
46+
// THREAD SAFETY:
47+
// This class is NOT thread-safe. The owner (e.g. ObjectDescriptorImpl
48+
// or AsyncWriterImpl etc) must serialize access, typically by holding
49+
// an external mutex while calling these methods.
50+
//
51+
// EXAMPLE USAGE:
52+
// class MyOwner {
53+
// std::mutex mu_;
54+
// MultiStreamManager<MyStream, MyRange> manager_;
55+
//
56+
// void StartRead() {
57+
// std::unique_lock<std::mutex> lk(mu_);
58+
// auto it = manager_.GetLeastBusyStream();
59+
// }
60+
// };
61+
template <typename StreamT, typename RangeT>
62+
class MultiStreamManager {
63+
public:
64+
struct Stream {
65+
std::shared_ptr<StreamT> stream;
66+
std::unordered_map<std::int64_t, std::shared_ptr<RangeT>> active_ranges;
67+
};
68+
69+
using StreamIterator = typename std::list<Stream>::iterator;
70+
using ConstStreamIterator = typename std::list<Stream>::const_iterator;
71+
using StreamFactory = std::function<std::shared_ptr<StreamT>()>;
72+
73+
// Constructor creates the first stream using the factory immediately.
74+
explicit MultiStreamManager(StreamFactory stream_factory)
75+
: stream_factory_(std::move(stream_factory)) {
76+
streams_.emplace_back(Stream{stream_factory_(), {}});
77+
}
78+
79+
// Constructor accepts an already-created initial stream.
80+
// This is required by ObjectDescriptorImpl which receives an OpenStream.
81+
MultiStreamManager(StreamFactory stream_factory,
82+
std::shared_ptr<StreamT> initial_stream)
83+
: stream_factory_(std::move(stream_factory)) {
84+
streams_.emplace_back(Stream{std::move(initial_stream), {}});
85+
}
86+
87+
StreamIterator GetFirstStream() {
88+
if (streams_.empty()) return streams_.end();
89+
return streams_.begin();
90+
}
91+
92+
StreamIterator GetLeastBusyStream() {
93+
if (streams_.empty()) return streams_.end();
94+
auto least_busy_stream_it = streams_.begin();
95+
// Track min_ranges to avoid calling .size() repeatedly if possible,
96+
// though for std::unordered_map .size() is O(1).
97+
std::size_t min_ranges = least_busy_stream_it->active_ranges.size();
98+
if (min_ranges == 0) return least_busy_stream_it;
99+
100+
// Start checking from the second element
101+
for (auto it = std::next(streams_.begin()); it != streams_.end(); ++it) {
102+
// Strict less-than ensures stability (preferring older streams if tied)
103+
auto size = it->active_ranges.size();
104+
if (size < min_ranges) {
105+
least_busy_stream_it = it;
106+
min_ranges = size;
107+
if (min_ranges == 0) return least_busy_stream_it;
108+
}
109+
}
110+
return least_busy_stream_it;
111+
}
112+
113+
StreamIterator AddStream(std::shared_ptr<StreamT> stream) {
114+
streams_.emplace_front(Stream{std::move(stream), {}});
115+
return streams_.begin();
116+
}
117+
118+
void CancelAll() {
119+
for (auto& s : streams_) {
120+
if (s.stream) s.stream->Cancel();
121+
}
122+
}
123+
124+
void RemoveStreamAndNotifyRanges(StreamIterator it, Status const& status) {
125+
auto ranges = std::move(it->active_ranges);
126+
streams_.erase(it);
127+
for (auto const& kv : ranges) {
128+
kv.second->OnFinish(status);
129+
}
130+
}
131+
132+
void MoveActiveRanges(StreamIterator from, StreamIterator to) {
133+
to->active_ranges = std::move(from->active_ranges);
134+
}
135+
136+
void CleanupDoneRanges(StreamIterator it) {
137+
auto& active_ranges = it->active_ranges;
138+
for (auto i = active_ranges.begin(); i != active_ranges.end();) {
139+
if (i->second->IsDone()) {
140+
i = active_ranges.erase(i);
141+
} else {
142+
++i;
143+
}
144+
}
145+
}
146+
147+
template <typename Pred>
148+
bool ReuseIdleStreamToFront(Pred pred) {
149+
for (auto it = streams_.begin(); it != streams_.end(); ++it) {
150+
if (!pred(*it)) continue;
151+
152+
// If the idle stream is already at the front, we don't
153+
// need to move it. Otherwise splice to the front in O(1).
154+
if (it != streams_.begin()) {
155+
streams_.splice(streams_.begin(), streams_, it);
156+
}
157+
return true;
158+
}
159+
return false;
160+
}
161+
162+
bool Empty() const { return streams_.empty(); }
163+
ConstStreamIterator End() const { return streams_.end(); }
164+
std::size_t Size() const { return streams_.size(); }
165+
166+
private:
167+
std::list<Stream> streams_;
168+
StreamFactory stream_factory_;
169+
};
170+
171+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
172+
} // namespace storage_internal
173+
} // namespace cloud
174+
} // namespace google
175+
176+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_MULTI_STREAM_MANAGER_H

0 commit comments

Comments
 (0)