Skip to content

Commit e58c325

Browse files
committed
Create a Struct Stream to handle multi streams. Each stream has a resume_policy too
1 parent 31c8cf0 commit e58c325

File tree

5 files changed

+44
-45
lines changed

5 files changed

+44
-45
lines changed

google/cloud/storage/async/resume_policy.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy {
2525
public:
2626
explicit LimitedErrorCountResumePolicyImpl(int maximum_resumes)
2727
: maximum_resumes_(maximum_resumes) {}
28-
~LimitedErrorCountResumePolicyImpl() override = default;
28+
29+
std::unique_ptr<ResumePolicy> clone() const override {
30+
return std::make_unique<LimitedErrorCountResumePolicyImpl>(*this);
31+
}
2932

3033
void OnStartSuccess() override {
3134
// For this policy we are only interested in the number of failures.
@@ -46,7 +49,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy {
4649
class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy {
4750
public:
4851
StopOnConsecutiveErrorsResumePolicyImpl() = default;
49-
~StopOnConsecutiveErrorsResumePolicyImpl() override = default;
52+
53+
std::unique_ptr<ResumePolicy> clone() const override {
54+
return std::make_unique<StopOnConsecutiveErrorsResumePolicyImpl>(*this);
55+
}
5056

5157
void OnStartSuccess() override { next_action_ = kContinue; }
5258
Action OnFinish(Status const&) override {
@@ -59,8 +65,6 @@ class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy {
5965

6066
} // namespace
6167

62-
ResumePolicy::~ResumePolicy() = default;
63-
6468
ResumePolicyFactory LimitedErrorCountResumePolicy(int maximum_resumes) {
6569
return [maximum_resumes]() -> std::unique_ptr<ResumePolicy> {
6670
return std::make_unique<LimitedErrorCountResumePolicyImpl>(maximum_resumes);

google/cloud/storage/async/resume_policy.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ class ResumePolicy {
3232
public:
3333
enum Action { kStop, kContinue };
3434

35-
virtual ~ResumePolicy() = 0;
35+
virtual ~ResumePolicy() = default;
36+
37+
virtual std::unique_ptr<ResumePolicy> clone() const = 0;
3638

3739
/**
3840
* Notifies the policy about successful connections.

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,17 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
3434
OpenStreamFactory make_stream,
3535
google::storage::v2::BidiReadObjectSpec read_object_spec,
3636
std::shared_ptr<OpenStream> stream, Options options)
37-
: resume_policy_(std::move(resume_policy)),
37+
: resume_policy_prototype_(std::move(resume_policy)),
3838
make_stream_(std::move(make_stream)),
3939
read_object_spec_(std::move(read_object_spec)),
40-
options_(std::move(options)),
41-
streams_{std::move(stream)} {
42-
AddNewActiveRanges();
40+
options_(std::move(options)) {
41+
streams_.push_back(
42+
Stream{std::move(stream), {}, resume_policy_prototype_->clone()});
4343
}
4444

4545
ObjectDescriptorImpl::~ObjectDescriptorImpl() {
4646
for (auto const& stream : streams_) {
47-
stream->Cancel();
47+
stream.stream->Cancel();
4848
}
4949
}
5050

@@ -55,7 +55,7 @@ void ObjectDescriptorImpl::Start(
5555

5656
void ObjectDescriptorImpl::Cancel() {
5757
for (auto const& stream : streams_) {
58-
stream->Cancel();
58+
stream.stream->Cancel();
5959
}
6060
}
6161

@@ -75,12 +75,11 @@ void ObjectDescriptorImpl::MakeSubsequentStream() {
7575

7676
*request.mutable_read_object_spec() = read_object_spec_;
7777
auto stream_result = make_stream_(std::move(request)).get();
78-
auto stream = std::move(stream_result->stream);
7978

8079
std::unique_lock<std::mutex> lk(mu_);
81-
active_stream_ = streams_.size();
82-
streams_.push_back(std::move(stream));
83-
AddNewActiveRanges(lk);
80+
streams_.push_back(Stream{std::move(stream_result->stream),
81+
{},
82+
resume_policy_prototype_->clone()});
8483
lk.unlock();
8584
OnRead(std::move(stream_result->first_response));
8685
}
@@ -99,7 +98,7 @@ ObjectDescriptorImpl::Read(ReadParams p) {
9998

10099
std::unique_lock<std::mutex> lk(mu_);
101100
auto const id = ++read_id_generator_;
102-
active_ranges_[active_stream_].emplace(id, range);
101+
streams_.back().active_ranges.emplace(id, range);
103102
auto& read_range = *next_request_.add_read_ranges();
104103
read_range.set_read_id(id);
105104
read_range.set_read_offset(p.start);
@@ -176,7 +175,8 @@ void ObjectDescriptorImpl::OnRead(
176175

177176
void ObjectDescriptorImpl::CleanupDoneRanges(
178177
std::unique_lock<std::mutex> const&) {
179-
auto& active_ranges = active_ranges_[active_stream_];
178+
if (streams_.empty()) return;
179+
auto& active_ranges = streams_.back().active_ranges;
180180
for (auto i = active_ranges.begin(); i != active_ranges.end();) {
181181
if (i->second->IsDone()) {
182182
i = active_ranges.erase(i);
@@ -207,10 +207,6 @@ void ObjectDescriptorImpl::OnFinish(Status const& status) {
207207
for (auto const& kv : copy) {
208208
kv.second->OnFinish(status);
209209
}
210-
CancelStream(streams_[active_stream_]);
211-
streams_.erase(streams_.begin() + active_stream_);
212-
active_ranges_.erase(active_ranges_.begin() + active_stream_);
213-
active_stream_ = streams_.size();
214210
}
215211

216212
void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
@@ -220,7 +216,7 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
220216
ApplyRedirectErrors(read_object_spec_, proto_status);
221217
auto request = google::storage::v2::BidiReadObjectRequest{};
222218
*request.mutable_read_object_spec() = read_object_spec_;
223-
for (auto const& kv : active_ranges_[active_stream_]) {
219+
for (auto const& kv : streams_.back().active_ranges) {
224220
auto range = kv.second->RangeForResume(kv.first);
225221
if (!range) continue;
226222
*request.add_read_ranges() = *std::move(range);
@@ -235,9 +231,9 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
235231
void ObjectDescriptorImpl::OnResume(StatusOr<OpenStreamResult> result) {
236232
if (!result) return OnFinish(std::move(result).status());
237233
std::unique_lock<std::mutex> lk(mu_);
238-
active_stream_ = streams_.size();
239-
streams_.push_back(std::move(result->stream));
240-
AddNewActiveRanges(lk);
234+
streams_.push_back(Stream{std::move(result->stream),
235+
{},
236+
resume_policy_prototype_->clone()});
241237
// TODO(#15105) - this should be done without release the lock.
242238
Flush(std::move(lk));
243239
OnRead(std::move(result->first_response));
@@ -257,9 +253,8 @@ bool ObjectDescriptorImpl::IsResumable(
257253
CleanupDoneRanges(std::unique_lock<std::mutex>(mu_));
258254
return true;
259255
}
260-
261-
return resume_policy_->OnFinish(status) ==
262-
storage_experimental::ResumePolicy::kContinue;
256+
return streams_.back().resume_policy->OnFinish(status) ==
257+
storage_experimental::ResumePolicy::kContinue;
263258
}
264259

265260
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/object_descriptor_impl.h

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3838
class ObjectDescriptorImpl
3939
: public storage_experimental::ObjectDescriptorConnection,
4040
public std::enable_shared_from_this<ObjectDescriptorImpl> {
41+
private:
42+
struct Stream {
43+
std::shared_ptr<OpenStream> stream;
44+
std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> active_ranges;
45+
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy;
46+
};
47+
4148
public:
4249
ObjectDescriptorImpl(
4350
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy,
@@ -72,27 +79,18 @@ class ObjectDescriptorImpl
7279
// This may seem expensive, but it is less bug-prone than iterating over
7380
// the map with the lock held.
7481
auto CopyActiveRanges(std::unique_lock<std::mutex> const&) const {
75-
return active_ranges_[active_stream_];
82+
return streams_.back().active_ranges;
7683
}
7784

7885
auto CopyActiveRanges() const {
7986
return CopyActiveRanges(std::unique_lock<std::mutex>(mu_));
8087
}
8188

82-
auto AddNewActiveRanges(std::unique_lock<std::mutex> const&) {
83-
std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> active_ranges;
84-
return active_ranges_.push_back(std::move(active_ranges));
85-
}
86-
87-
auto AddNewActiveRanges() {
88-
return AddNewActiveRanges(std::unique_lock<std::mutex>(mu_));
89-
}
90-
9189
auto CurrentStream(std::unique_lock<std::mutex>) const {
92-
return streams_[active_stream_];
90+
return streams_.back().stream;
9391
}
9492

95-
static void CancelStream(std::shared_ptr<OpenStream> const& stream);
93+
void CancelStream(std::shared_ptr<OpenStream> const& stream);
9694
void Flush(std::unique_lock<std::mutex> lk);
9795
void OnWrite(bool ok);
9896
void DoRead(std::unique_lock<std::mutex>);
@@ -106,7 +104,7 @@ class ObjectDescriptorImpl
106104
bool IsResumable(Status const& status,
107105
google::rpc::Status const& proto_status);
108106

109-
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy_;
107+
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy_prototype_;
110108
OpenStreamFactory make_stream_;
111109

112110
mutable std::mutex mu_;
@@ -116,11 +114,8 @@ class ObjectDescriptorImpl
116114
bool write_pending_ = false;
117115
google::storage::v2::BidiReadObjectRequest next_request_;
118116

119-
std::vector<std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>>>
120-
active_ranges_;
121117
Options options_;
122-
std::size_t active_stream_ = 0;
123-
std::vector<std::shared_ptr<OpenStream>> streams_;
118+
std::vector<Stream> streams_;
124119
};
125120

126121
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/testing/mock_resume_policy.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ namespace testing {
2626
class MockResumePolicy : public storage_experimental::ResumePolicy {
2727
public:
2828
~MockResumePolicy() override = default;
29+
30+
MOCK_METHOD(std::unique_ptr<storage_experimental::ResumePolicy>, clone, (),
31+
(const, override));
2932
MOCK_METHOD(void, OnStartSuccess, (), (override));
3033
MOCK_METHOD(ResumePolicy::Action, OnFinish, (Status const&), (override));
3134
};

0 commit comments

Comments
 (0)