Skip to content

Commit 57d5c6e

Browse files
committed
feat(ACv2): support to handle large ranges
1 parent 3bb14d8 commit 57d5c6e

File tree

7 files changed

+55
-8
lines changed

7 files changed

+55
-8
lines changed

google/cloud/storage/async/object_descriptor.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/async/object_descriptor.h"
16+
#include "google/cloud/storage/async/options.h"
1617
#include "google/cloud/internal/make_status.h"
1718

1819
namespace google {
@@ -26,7 +27,13 @@ absl::optional<google::storage::v2::Object> ObjectDescriptor::metadata() const {
2627

2728
std::pair<AsyncReader, AsyncToken> ObjectDescriptor::Read(std::int64_t offset,
2829
std::int64_t limit) {
29-
auto reader = impl_->Read({offset, limit});
30+
auto max_range =
31+
impl_->options().get<storage_experimental::MaximumRangeSizeOption>();
32+
std::unique_ptr<storage_experimental::AsyncReaderConnection> reader;
33+
if (limit > max_range) {
34+
impl_->MakeSubsequentStream();
35+
}
36+
reader = impl_->Read({offset, limit});
3037
auto token = storage_internal::MakeAsyncToken(reader.get());
3138
return {AsyncReader(std::move(reader)), std::move(token)};
3239
}

google/cloud/storage/async/object_descriptor_connection.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_OBJECT_DESCRIPTOR_CONNECTION_H
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_OBJECT_DESCRIPTOR_CONNECTION_H
1717

18+
#include "google/cloud/storage/async/reader.h"
1819
#include "google/cloud/storage/async/reader_connection.h"
1920
#include "google/cloud/options.h"
2021
#include "google/cloud/version.h"
@@ -59,6 +60,8 @@ class ObjectDescriptorConnection {
5960
* Starts a new range read in the current descriptor.
6061
*/
6162
virtual std::unique_ptr<AsyncReaderConnection> Read(ReadParams p) = 0;
63+
64+
virtual void MakeSubsequentStream() = 0;
6265
};
6366

6467
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/internal/async/object_descriptor_connection_tracing.h"
16+
#include "google/cloud/storage/async/object_descriptor.h"
17+
#include "google/cloud/storage/async/reader.h"
1618
#include "google/cloud/storage/async/reader_connection.h"
1719
#include "google/cloud/internal/opentelemetry.h"
1820
#include "google/cloud/version.h"
@@ -61,6 +63,10 @@ class AsyncObjectDescriptorConnectionTracing
6163
return result;
6264
}
6365

66+
void MakeSubsequentStream() override {
67+
return impl_->MakeSubsequentStream();
68+
};
69+
6470
private:
6571
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
6672
std::shared_ptr<storage_experimental::ObjectDescriptorConnection> impl_;

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,48 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
3737
: resume_policy_(std::move(resume_policy)),
3838
make_stream_(std::move(make_stream)),
3939
read_object_spec_(std::move(read_object_spec)),
40-
stream_(std::move(stream)),
41-
options_(std::move(options)) {}
40+
options_(std::move(options)),
41+
active_stream_(0),
42+
streams_{std::move(stream)} {}
4243

43-
ObjectDescriptorImpl::~ObjectDescriptorImpl() { stream_->Cancel(); }
44+
ObjectDescriptorImpl::~ObjectDescriptorImpl() {
45+
for (auto stream : streams_) {
46+
stream->Cancel();
47+
}
48+
}
4449

4550
void ObjectDescriptorImpl::Start(
4651
google::storage::v2::BidiReadObjectResponse first_response) {
4752
OnRead(std::move(first_response));
4853
}
4954

50-
void ObjectDescriptorImpl::Cancel() { stream_->Cancel(); }
55+
void ObjectDescriptorImpl::Cancel() {
56+
for (auto stream : streams_) {
57+
stream->Cancel();
58+
}
59+
}
5160

5261
absl::optional<google::storage::v2::Object> ObjectDescriptorImpl::metadata()
5362
const {
5463
std::unique_lock<std::mutex> lk(mu_);
5564
return metadata_;
5665
}
5766

67+
void ObjectDescriptorImpl::MakeSubsequentStream() {
68+
auto request = google::storage::v2::BidiReadObjectRequest{};
69+
70+
*request.mutable_read_object_spec() = read_object_spec_;
71+
auto stream_result = make_stream_(std::move(request)).get();
72+
auto stream = std::move(stream_result->stream);
73+
74+
std::unique_lock<std::mutex> lk(mu_);
75+
active_stream_ = streams_.size();
76+
streams_.push_back(std::move(stream));
77+
lk.unlock();
78+
79+
Start(std::move(stream_result->first_response));
80+
}
81+
5882
std::unique_ptr<storage_experimental::AsyncReaderConnection>
5983
ObjectDescriptorImpl::Read(ReadParams p) {
6084
std::shared_ptr<storage::internal::HashFunction> hash_function =
@@ -200,7 +224,7 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
200224
void ObjectDescriptorImpl::OnResume(StatusOr<OpenStreamResult> result) {
201225
if (!result) return OnFinish(std::move(result).status());
202226
std::unique_lock<std::mutex> lk(mu_);
203-
stream_ = std::move(result->stream);
227+
streams_[0] = std::move(result->stream);
204228
// TODO(#15105) - this should be done without release the lock.
205229
Flush(std::move(lk));
206230
OnRead(std::move(result->first_response));

google/cloud/storage/internal/async/object_descriptor_impl.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class ObjectDescriptorImpl
6262
std::unique_ptr<storage_experimental::AsyncReaderConnection> Read(
6363
ReadParams p) override;
6464

65+
void MakeSubsequentStream() override;
66+
6567
private:
6668
std::weak_ptr<ObjectDescriptorImpl> WeakFromThis() {
6769
return shared_from_this();
@@ -77,7 +79,9 @@ class ObjectDescriptorImpl
7779
return CopyActiveRanges(std::unique_lock<std::mutex>(mu_));
7880
}
7981

80-
auto CurrentStream(std::unique_lock<std::mutex>) const { return stream_; }
82+
auto CurrentStream(std::unique_lock<std::mutex>) const {
83+
return streams_[active_stream_];
84+
}
8185

8286
void Flush(std::unique_lock<std::mutex> lk);
8387
void OnWrite(bool ok);
@@ -105,6 +109,8 @@ class ObjectDescriptorImpl
105109

106110
std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> active_ranges_;
107111
Options options_;
112+
std::vector<std::shared_ptr<OpenStream>> streams_ = {};
113+
int active_stream_;
108114
};
109115

110116
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/object_descriptor_reader_tracing.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class ObjectDescriptorReaderTracing : public ObjectDescriptorReader {
4141
~ObjectDescriptorReaderTracing() override = default;
4242

4343
future<ObjectDescriptorReader::ReadResponse> Read() override {
44-
auto span = internal::MakeSpan("storage::AsyncConnection::ReadObjectRange");
44+
auto span = internal::MakeSpan("storage::AsyncConnection::ReadRange");
4545
internal::OTelScope scope(span);
4646
return ObjectDescriptorReader::Read().then(
4747
[span = std::move(span),

google/cloud/storage/mocks/mock_async_object_descriptor_connection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class MockAsyncObjectDescriptorConnection
3131
(const, override));
3232
MOCK_METHOD(std::unique_ptr<storage_experimental::AsyncReaderConnection>,
3333
Read, (ReadParams), (override));
34+
MOCK_METHOD(void, MakeSubsequentStream, (), (override));
3435
MOCK_METHOD(Options, options, (), (const, override));
3536
};
3637

0 commit comments

Comments
 (0)