@@ -39,9 +39,8 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
3939 read_object_spec_(std::move(read_object_spec)),
4040 options_(std::move(options)),
4141 streams_{std::move (stream)} {
42- std::unordered_map<std::int64_t , std::shared_ptr<ReadRange>> initial_ranges;
43- active_ranges_.push_back (std::move (initial_ranges));
44- }
42+ AddNewActiveRanges ();
43+ }
4544
4645ObjectDescriptorImpl::~ObjectDescriptorImpl () {
4746 for (auto const & stream : streams_) {
@@ -60,6 +59,10 @@ void ObjectDescriptorImpl::Cancel() {
6059 }
6160}
6261
62+ void ObjectDescriptorImpl::CancelStream (std::shared_ptr<OpenStream> stream) {
63+ stream->Cancel ();
64+ }
65+
6366absl::optional<google::storage::v2::Object> ObjectDescriptorImpl::metadata ()
6467 const {
6568 std::unique_lock<std::mutex> lk (mu_);
@@ -76,8 +79,7 @@ void ObjectDescriptorImpl::MakeSubsequentStream() {
7679 std::unique_lock<std::mutex> lk (mu_);
7780 active_stream_ = streams_.size ();
7881 streams_.push_back (std::move (stream));
79- std::unordered_map<std::int64_t , std::shared_ptr<ReadRange>> active_ranges;
80- active_ranges_.push_back (std::move (active_ranges));
82+ AddNewActiveRanges (lk);
8183 lk.unlock ();
8284 OnRead (std::move (stream_result->first_response ));
8385}
@@ -173,7 +175,7 @@ void ObjectDescriptorImpl::OnRead(
173175
174176void ObjectDescriptorImpl::CleanupDoneRanges (
175177 std::unique_lock<std::mutex> const &) {
176- auto & active_ranges = active_ranges_[active_stream_];
178+ auto & active_ranges = active_ranges_[active_stream_];
177179 for (auto i = active_ranges.begin (); i != active_ranges.end ();) {
178180 if (i->second ->IsDone ()) {
179181 i = active_ranges.erase (i);
@@ -204,6 +206,10 @@ void ObjectDescriptorImpl::OnFinish(Status const& status) {
204206 for (auto const & kv : copy) {
205207 kv.second ->OnFinish (status);
206208 }
209+ CancelStream (streams_[active_stream_]);
210+ streams_.erase (streams_.begin () + active_stream_);
211+ active_ranges_.erase (active_ranges_.begin () + active_stream_);
212+ active_stream_ = streams_.size ();
207213}
208214
209215void ObjectDescriptorImpl::Resume (google::rpc::Status const & proto_status) {
@@ -228,7 +234,9 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
228234void ObjectDescriptorImpl::OnResume (StatusOr<OpenStreamResult> result) {
229235 if (!result) return OnFinish (std::move (result).status ());
230236 std::unique_lock<std::mutex> lk (mu_);
231- streams_[0 ] = std::move (result->stream );
237+ active_stream_ = streams_.size ();
238+ streams_.push_back (std::move (result->stream ));
239+ AddNewActiveRanges (lk);
232240 // TODO(#15105) - this should be done without release the lock.
233241 Flush (std::move (lk));
234242 OnRead (std::move (result->first_response ));
0 commit comments