Skip to content

Commit 197cb44

Browse files
committed
active ranges for each stream
1 parent f07c9f5 commit 197cb44

File tree

3 files changed

+34
-25
lines changed

3 files changed

+34
-25
lines changed

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
3838
make_stream_(std::move(make_stream)),
3939
read_object_spec_(std::move(read_object_spec)),
4040
options_(std::move(options)),
41-
active_stream_(0),
42-
streams_{std::move(stream)} {}
41+
streams_{std::move(stream)} {
42+
std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> initial_ranges;
43+
active_ranges_.push_back(std::move(initial_ranges));
44+
}
4345

4446
ObjectDescriptorImpl::~ObjectDescriptorImpl() {
4547
for (auto const& stream : streams_) {
@@ -74,9 +76,10 @@ void ObjectDescriptorImpl::MakeSubsequentStream() {
7476
std::unique_lock<std::mutex> lk(mu_);
7577
active_stream_ = streams_.size();
7678
streams_.push_back(std::move(stream));
79+
std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> active_ranges;
80+
active_ranges_.push_back(std::move(active_ranges));
7781
lk.unlock();
78-
79-
Start(std::move(stream_result->first_response));
82+
OnRead(std::move(stream_result->first_response));
8083
}
8184

8285
std::unique_ptr<storage_experimental::AsyncReaderConnection>
@@ -93,7 +96,7 @@ ObjectDescriptorImpl::Read(ReadParams p) {
9396

9497
std::unique_lock<std::mutex> lk(mu_);
9598
auto const id = ++read_id_generator_;
96-
active_ranges_.emplace(id, range);
99+
active_ranges_[active_stream_].emplace(id, range);
97100
auto& read_range = *next_request_.add_read_ranges();
98101
read_range.set_read_id(id);
99102
read_range.set_read_offset(p.start);
@@ -170,9 +173,10 @@ void ObjectDescriptorImpl::OnRead(
170173

171174
void ObjectDescriptorImpl::CleanupDoneRanges(
172175
std::unique_lock<std::mutex> const&) {
173-
for (auto i = active_ranges_.begin(); i != active_ranges_.end();) {
176+
auto &active_ranges = active_ranges_[active_stream_];
177+
for (auto i = active_ranges.begin(); i != active_ranges.end();) {
174178
if (i->second->IsDone()) {
175-
i = active_ranges_.erase(i);
179+
i = active_ranges.erase(i);
176180
} else {
177181
++i;
178182
}
@@ -209,7 +213,7 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
209213
ApplyRedirectErrors(read_object_spec_, proto_status);
210214
auto request = google::storage::v2::BidiReadObjectRequest{};
211215
*request.mutable_read_object_spec() = read_object_spec_;
212-
for (auto const& kv : active_ranges_) {
216+
for (auto const& kv : active_ranges_[active_stream_]) {
213217
auto range = kv.second->RangeForResume(kv.first);
214218
if (!range) continue;
215219
*request.add_read_ranges() = *std::move(range);

google/cloud/storage/internal/async/object_descriptor_impl.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class ObjectDescriptorImpl
7272
// This may seem expensive, but it is less bug-prone than iterating over
7373
// the map with the lock held.
7474
auto CopyActiveRanges(std::unique_lock<std::mutex> const&) const {
75-
return active_ranges_;
75+
return active_ranges_[active_stream_];
7676
}
7777

7878
auto CopyActiveRanges() const {
@@ -106,10 +106,10 @@ class ObjectDescriptorImpl
106106
bool write_pending_ = false;
107107
google::storage::v2::BidiReadObjectRequest next_request_;
108108

109-
std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> active_ranges_;
109+
std::vector<std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>>> active_ranges_;
110110
Options options_;
111-
std::int64_t active_stream_ = 0;
112-
std::vector<std::shared_ptr<OpenStream>> streams_ = {};
111+
std::size_t active_stream_ = 0;
112+
std::vector<std::shared_ptr<OpenStream>> streams_;
113113
};
114114

115115
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/tests/async_client_integration_test.cc

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -918,13 +918,24 @@ TEST_F(AsyncClientIntegrationTest, Open) {
918918

919919
auto constexpr kSize = 8 * 1024;
920920
auto constexpr kStride = 2 * kSize;
921+
auto constexpr kBlockCount = 4;
922+
auto const block = MakeRandomData(kSize);
921923

922-
auto os = client.WriteObject(bucket_name(), object_name);
923-
for (char c : {'0', '1', '2', '3', '4'}) {
924-
os << std::string(kStride, c);
924+
auto w =
925+
async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name)
926+
.get();
927+
ASSERT_STATUS_OK(w);
928+
AsyncWriter writer;
929+
AsyncToken token;
930+
std::tie(writer, token) = *std::move(w);
931+
for (int i = 0; i != kBlockCount; ++i) {
932+
auto p = writer.Write(std::move(token), WritePayload(block)).get();
933+
ASSERT_STATUS_OK(p);
934+
token = *std::move(p);
925935
}
926-
os.Close();
927-
ASSERT_STATUS_OK(os.metadata());
936+
937+
auto metadata = writer.Finalize(std::move(token)).get();
938+
ASSERT_STATUS_OK(metadata);
928939

929940
auto spec = google::storage::v2::BidiReadObjectSpec{};
930941
spec.set_bucket(BucketName(bucket_name()).FullName());
@@ -933,13 +944,7 @@ TEST_F(AsyncClientIntegrationTest, Open) {
933944
ASSERT_STATUS_OK(descriptor);
934945

935946
AsyncReader r0;
936-
AsyncReader r1;
937-
AsyncReader r2;
938947
AsyncToken t0;
939-
AsyncToken t1;
940-
AsyncToken t2;
941-
std::tie(r1, t1) = descriptor->Read(1 * kStride, kSize);
942-
std::tie(r2, t2) = descriptor->Read(1 * kStride, kSize);
943948
auto actual0 = std::string{};
944949
std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize);
945950
while (t0.valid()) {
@@ -952,9 +957,9 @@ TEST_F(AsyncClientIntegrationTest, Open) {
952957
t0 = std::move(t);
953958
}
954959

955-
EXPECT_EQ(actual0, std::string(kSize, '0'));
960+
EXPECT_EQ(actual0.size(), kSize);
956961
client.DeleteObject(bucket_name(), object_name,
957-
storage::Generation(os.metadata()->generation()));
962+
storage::Generation(metadata->generation()));
958963
}
959964

960965
} // namespace

0 commit comments

Comments
 (0)