Skip to content

Commit 16b6833

Browse files
authored
feat(storage): Handle large read ranges in bidi read. (#15152)
1 parent 0a01e51 commit 16b6833

15 files changed

+392
-45
lines changed

google/cloud/storage/async/object_descriptor.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/async/object_descriptor.h"
16+
#include "google/cloud/storage/async/options.h"
1617
#include "google/cloud/internal/make_status.h"
1718

1819
namespace google {
@@ -26,6 +27,11 @@ absl::optional<google::storage::v2::Object> ObjectDescriptor::metadata() const {
2627

2728
std::pair<AsyncReader, AsyncToken> ObjectDescriptor::Read(std::int64_t offset,
2829
std::int64_t limit) {
30+
std::int64_t max_range =
31+
impl_->options().get<storage_experimental::MaximumRangeSizeOption>();
32+
if (limit > max_range) {
33+
impl_->MakeSubsequentStream();
34+
}
2935
auto reader = impl_->Read({offset, limit});
3036
auto token = storage_internal::MakeAsyncToken(reader.get());
3137
return {AsyncReader(std::move(reader)), std::move(token)};

google/cloud/storage/async/object_descriptor_connection.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class ObjectDescriptorConnection {
5959
* Starts a new range read in the current descriptor.
6060
*/
6161
virtual std::unique_ptr<AsyncReaderConnection> Read(ReadParams p) = 0;
62+
63+
virtual void MakeSubsequentStream() = 0;
6264
};
6365

6466
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/async/object_descriptor_test.cc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/async/object_descriptor.h"
16+
#include "google/cloud/storage/async/options.h"
1617
#include "google/cloud/storage/mocks/mock_async_object_descriptor_connection.h"
1718
#include "google/cloud/storage/mocks/mock_async_reader_connection.h"
1819
#include "google/cloud/testing_util/status_matchers.h"
@@ -147,6 +148,48 @@ TEST(ObjectDescriptor, ReadLast) {
147148
EXPECT_FALSE(token.valid());
148149
}
149150

151+
TEST(ObjectDescriptor, ReadExceedsMaxRange) {
152+
auto mock = std::make_shared<MockAsyncObjectDescriptorConnection>();
153+
auto constexpr kMaxRange = 1024;
154+
EXPECT_CALL(*mock, options)
155+
.WillRepeatedly(
156+
Return(Options{}.set<storage_experimental::MaximumRangeSizeOption>(
157+
kMaxRange)));
158+
159+
EXPECT_CALL(*mock, MakeSubsequentStream).Times(1);
160+
161+
EXPECT_CALL(*mock, Read)
162+
.WillOnce([&](ReadParams p) -> std::unique_ptr<AsyncReaderConnection> {
163+
EXPECT_EQ(p.start, 100);
164+
EXPECT_EQ(p.length, kMaxRange + 1);
165+
auto reader = std::make_unique<MockAsyncReaderConnection>();
166+
EXPECT_CALL(*reader, Read)
167+
.WillOnce([] {
168+
return make_ready_future(
169+
ReadResponse(ReadPayload(std::string("some data"))));
170+
})
171+
.WillOnce([] { return make_ready_future(ReadResponse(Status{})); });
172+
return reader;
173+
});
174+
175+
auto tested = ObjectDescriptor(mock);
176+
AsyncReader reader;
177+
AsyncToken token;
178+
std::tie(reader, token) = tested.Read(100, kMaxRange + 1);
179+
ASSERT_TRUE(token.valid());
180+
181+
auto r1 = reader.Read(std::move(token)).get();
182+
ASSERT_STATUS_OK(r1);
183+
ReadPayload payload;
184+
std::tie(payload, token) = *std::move(r1);
185+
EXPECT_THAT(payload.contents(), ElementsAre("some data"));
186+
187+
auto r2 = reader.Read(std::move(token)).get();
188+
ASSERT_STATUS_OK(r2);
189+
std::tie(payload, token) = *std::move(r2);
190+
EXPECT_FALSE(token.valid());
191+
}
192+
150193
} // namespace
151194
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
152195
} // namespace storage_experimental

google/cloud/storage/async/options.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ struct UseMD5ValueOption {
8383
* @endcode
8484
*/
8585
struct MaximumRangeSizeOption {
86-
using Type = std::uint64_t;
86+
using Type = std::int64_t;
8787
};
8888

8989
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/async/resume_policy.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy {
2525
public:
2626
explicit LimitedErrorCountResumePolicyImpl(int maximum_resumes)
2727
: maximum_resumes_(maximum_resumes) {}
28-
~LimitedErrorCountResumePolicyImpl() override = default;
28+
29+
std::unique_ptr<ResumePolicy> clone() const override {
30+
return std::make_unique<LimitedErrorCountResumePolicyImpl>(*this);
31+
}
2932

3033
void OnStartSuccess() override {
3134
// For this policy we are only interested in the number of failures.
@@ -46,7 +49,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy {
4649
class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy {
4750
public:
4851
StopOnConsecutiveErrorsResumePolicyImpl() = default;
49-
~StopOnConsecutiveErrorsResumePolicyImpl() override = default;
52+
53+
std::unique_ptr<ResumePolicy> clone() const override {
54+
return std::make_unique<StopOnConsecutiveErrorsResumePolicyImpl>(*this);
55+
}
5056

5157
void OnStartSuccess() override { next_action_ = kContinue; }
5258
Action OnFinish(Status const&) override {
@@ -59,8 +65,6 @@ class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy {
5965

6066
} // namespace
6167

62-
ResumePolicy::~ResumePolicy() = default;
63-
6468
ResumePolicyFactory LimitedErrorCountResumePolicy(int maximum_resumes) {
6569
return [maximum_resumes]() -> std::unique_ptr<ResumePolicy> {
6670
return std::make_unique<LimitedErrorCountResumePolicyImpl>(maximum_resumes);

google/cloud/storage/async/resume_policy.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ class ResumePolicy {
3232
public:
3333
enum Action { kStop, kContinue };
3434

35-
virtual ~ResumePolicy() = 0;
35+
virtual ~ResumePolicy() = default;
36+
37+
virtual std::unique_ptr<ResumePolicy> clone() const = 0;
3638

3739
/**
3840
* Notifies the policy about successful connections.

google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ class AsyncObjectDescriptorConnectionTracing
6262
return MakeTracingReaderConnection(span_, std::move(result));
6363
}
6464

65+
void MakeSubsequentStream() override {
66+
return impl_->MakeSubsequentStream();
67+
};
68+
6569
private:
6670
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
6771
std::shared_ptr<storage_experimental::ObjectDescriptorConnection> impl_;

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,27 +34,50 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
3434
OpenStreamFactory make_stream,
3535
google::storage::v2::BidiReadObjectSpec read_object_spec,
3636
std::shared_ptr<OpenStream> stream, Options options)
37-
: resume_policy_(std::move(resume_policy)),
37+
: resume_policy_prototype_(std::move(resume_policy)),
3838
make_stream_(std::move(make_stream)),
3939
read_object_spec_(std::move(read_object_spec)),
40-
stream_(std::move(stream)),
41-
options_(std::move(options)) {}
40+
options_(std::move(options)) {
41+
streams_.push_back(
42+
Stream{std::move(stream), {}, resume_policy_prototype_->clone()});
43+
}
4244

43-
ObjectDescriptorImpl::~ObjectDescriptorImpl() { stream_->Cancel(); }
45+
ObjectDescriptorImpl::~ObjectDescriptorImpl() {
46+
for (auto const& stream : streams_) {
47+
stream.stream->Cancel();
48+
}
49+
}
4450

4551
void ObjectDescriptorImpl::Start(
4652
google::storage::v2::BidiReadObjectResponse first_response) {
4753
OnRead(std::move(first_response));
4854
}
4955

50-
void ObjectDescriptorImpl::Cancel() { stream_->Cancel(); }
56+
void ObjectDescriptorImpl::Cancel() {
57+
for (auto const& stream : streams_) {
58+
stream.stream->Cancel();
59+
}
60+
}
5161

5262
absl::optional<google::storage::v2::Object> ObjectDescriptorImpl::metadata()
5363
const {
5464
std::unique_lock<std::mutex> lk(mu_);
5565
return metadata_;
5666
}
5767

68+
void ObjectDescriptorImpl::MakeSubsequentStream() {
69+
auto request = google::storage::v2::BidiReadObjectRequest{};
70+
71+
*request.mutable_read_object_spec() = read_object_spec_;
72+
auto stream_result = make_stream_(std::move(request)).get();
73+
74+
std::unique_lock<std::mutex> lk(mu_);
75+
streams_.push_back(Stream{
76+
std::move(stream_result->stream), {}, resume_policy_prototype_->clone()});
77+
lk.unlock();
78+
OnRead(std::move(stream_result->first_response));
79+
}
80+
5881
std::unique_ptr<storage_experimental::AsyncReaderConnection>
5982
ObjectDescriptorImpl::Read(ReadParams p) {
6083
std::shared_ptr<storage::internal::HashFunction> hash_function =
@@ -69,7 +92,7 @@ ObjectDescriptorImpl::Read(ReadParams p) {
6992

7093
std::unique_lock<std::mutex> lk(mu_);
7194
auto const id = ++read_id_generator_;
72-
active_ranges_.emplace(id, range);
95+
streams_.back().active_ranges.emplace(id, range);
7396
auto& read_range = *next_request_.add_read_ranges();
7497
read_range.set_read_id(id);
7598
read_range.set_read_offset(p.start);
@@ -85,8 +108,10 @@ ObjectDescriptorImpl::Read(ReadParams p) {
85108
}
86109

87110
void ObjectDescriptorImpl::Flush(std::unique_lock<std::mutex> lk) {
88-
if (write_pending_ || next_request_.read_ranges().empty()) return;
89-
write_pending_ = true;
111+
if (streams_.back().write_pending || next_request_.read_ranges().empty()) {
112+
return;
113+
}
114+
streams_.back().write_pending = true;
90115
google::storage::v2::BidiReadObjectRequest request;
91116
request.Swap(&next_request_);
92117

@@ -102,7 +127,7 @@ void ObjectDescriptorImpl::Flush(std::unique_lock<std::mutex> lk) {
102127
void ObjectDescriptorImpl::OnWrite(bool ok) {
103128
std::unique_lock<std::mutex> lk(mu_);
104129
if (!ok) return DoFinish(std::move(lk));
105-
write_pending_ = false;
130+
streams_.back().write_pending = false;
106131
Flush(std::move(lk));
107132
}
108133

@@ -146,9 +171,11 @@ void ObjectDescriptorImpl::OnRead(
146171

147172
void ObjectDescriptorImpl::CleanupDoneRanges(
148173
std::unique_lock<std::mutex> const&) {
149-
for (auto i = active_ranges_.begin(); i != active_ranges_.end();) {
174+
if (streams_.empty()) return;
175+
auto& active_ranges = streams_.back().active_ranges;
176+
for (auto i = active_ranges.begin(); i != active_ranges.end();) {
150177
if (i->second->IsDone()) {
151-
i = active_ranges_.erase(i);
178+
i = active_ranges.erase(i);
152179
} else {
153180
++i;
154181
}
@@ -185,12 +212,12 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
185212
ApplyRedirectErrors(read_object_spec_, proto_status);
186213
auto request = google::storage::v2::BidiReadObjectRequest{};
187214
*request.mutable_read_object_spec() = read_object_spec_;
188-
for (auto const& kv : active_ranges_) {
215+
for (auto const& kv : streams_.back().active_ranges) {
189216
auto range = kv.second->RangeForResume(kv.first);
190217
if (!range) continue;
191218
*request.add_read_ranges() = *std::move(range);
192219
}
193-
write_pending_ = true;
220+
streams_.back().write_pending = true;
194221
lk.unlock();
195222
make_stream_(std::move(request)).then([w = WeakFromThis()](auto f) {
196223
if (auto self = w.lock()) self->OnResume(f.get());
@@ -200,7 +227,8 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
200227
void ObjectDescriptorImpl::OnResume(StatusOr<OpenStreamResult> result) {
201228
if (!result) return OnFinish(std::move(result).status());
202229
std::unique_lock<std::mutex> lk(mu_);
203-
stream_ = std::move(result->stream);
230+
streams_.push_back(
231+
Stream{std::move(result->stream), {}, resume_policy_prototype_->clone()});
204232
// TODO(#15105) - this should be done without release the lock.
205233
Flush(std::move(lk));
206234
OnRead(std::move(result->first_response));
@@ -211,7 +239,6 @@ bool ObjectDescriptorImpl::IsResumable(
211239
for (auto const& any : proto_status.details()) {
212240
auto error = google::storage::v2::BidiReadObjectError{};
213241
if (!any.UnpackTo(&error)) continue;
214-
auto ranges = CopyActiveRanges();
215242
for (auto const& range : CopyActiveRanges()) {
216243
for (auto const& range_error : error.read_range_errors()) {
217244
if (range.first != range_error.read_id()) continue;
@@ -221,8 +248,8 @@ bool ObjectDescriptorImpl::IsResumable(
221248
CleanupDoneRanges(std::unique_lock<std::mutex>(mu_));
222249
return true;
223250
}
224-
225-
return resume_policy_->OnFinish(status) ==
251+
std::unique_lock<std::mutex> lk(mu_);
252+
return streams_.back().resume_policy->OnFinish(status) ==
226253
storage_experimental::ResumePolicy::kContinue;
227254
}
228255

google/cloud/storage/internal/async/object_descriptor_impl.h

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3838
class ObjectDescriptorImpl
3939
: public storage_experimental::ObjectDescriptorConnection,
4040
public std::enable_shared_from_this<ObjectDescriptorImpl> {
41+
private:
42+
struct Stream {
43+
std::shared_ptr<OpenStream> stream;
44+
std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> active_ranges;
45+
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy;
46+
bool write_pending = false;
47+
};
48+
4149
public:
4250
ObjectDescriptorImpl(
4351
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy,
@@ -62,6 +70,8 @@ class ObjectDescriptorImpl
6270
std::unique_ptr<storage_experimental::AsyncReaderConnection> Read(
6371
ReadParams p) override;
6472

73+
void MakeSubsequentStream() override;
74+
6575
private:
6676
std::weak_ptr<ObjectDescriptorImpl> WeakFromThis() {
6777
return shared_from_this();
@@ -70,14 +80,16 @@ class ObjectDescriptorImpl
7080
// This may seem expensive, but it is less bug-prone than iterating over
7181
// the map with the lock held.
7282
auto CopyActiveRanges(std::unique_lock<std::mutex> const&) const {
73-
return active_ranges_;
83+
return streams_.back().active_ranges;
7484
}
7585

7686
auto CopyActiveRanges() const {
7787
return CopyActiveRanges(std::unique_lock<std::mutex>(mu_));
7888
}
7989

80-
auto CurrentStream(std::unique_lock<std::mutex>) const { return stream_; }
90+
auto CurrentStream(std::unique_lock<std::mutex>) const {
91+
return streams_.back().stream;
92+
}
8193

8294
void Flush(std::unique_lock<std::mutex> lk);
8395
void OnWrite(bool ok);
@@ -92,19 +104,17 @@ class ObjectDescriptorImpl
92104
bool IsResumable(Status const& status,
93105
google::rpc::Status const& proto_status);
94106

95-
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy_;
107+
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy_prototype_;
96108
OpenStreamFactory make_stream_;
97109

98110
mutable std::mutex mu_;
99111
google::storage::v2::BidiReadObjectSpec read_object_spec_;
100-
std::shared_ptr<OpenStream> stream_;
101112
absl::optional<google::storage::v2::Object> metadata_;
102113
std::int64_t read_id_generator_ = 0;
103-
bool write_pending_ = false;
104114
google::storage::v2::BidiReadObjectRequest next_request_;
105115

106-
std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> active_ranges_;
107116
Options options_;
117+
std::vector<Stream> streams_;
108118
};
109119

110120
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

0 commit comments

Comments
 (0)