@@ -319,9 +319,9 @@ class AsyncWriterConnectionResumedState
319319
320320 void OnClose (Status result) {
321321 if (!result.ok ()) return Resume (std::move (result));
322- auto checksums = impl_->PersistedChecksums ();
322+ std::unique_lock<std::mutex> lk (mu_);
323+ auto checksums = Impl (lk)->PersistedChecksums ();
323324 if (checksums && checksums->has_crc32c ()) {
324- std::unique_lock<std::mutex> lk (mu_);
325325 auto it = crc32c_history_.find (buffer_offset_);
326326 if (it != crc32c_history_.end () && it->second != checksums->crc32c ()) {
327327 SetError (std::move (lk), google::cloud::internal::DataLossError (
@@ -330,7 +330,7 @@ class AsyncWriterConnectionResumedState
330330 return ;
331331 }
332332 }
333- SetClosed (std::unique_lock<std::mutex>(mu_ ), std::move (result));
333+ SetClosed (std::move (lk ), std::move (result));
334334 }
335335
336336 void FlushStep (std::unique_lock<std::mutex> lk, absl::Cord payload) {
@@ -368,9 +368,9 @@ class AsyncWriterConnectionResumedState
368368 return ;
369369 }
370370 auto const persisted_size = *query_res;
371- auto checksums = self->impl_ ->PersistedChecksums ();
371+ std::unique_lock<std::mutex> lk (self->mu_ );
372+ auto checksums = self->Impl (lk)->PersistedChecksums ();
372373 if (checksums && checksums->has_crc32c ()) {
373- std::unique_lock<std::mutex> lk (self->mu_ );
374374 auto it = self->crc32c_history_ .find (persisted_size);
375375 if (it != self->crc32c_history_ .end () &&
376376 it->second != checksums->crc32c ()) {
@@ -381,6 +381,7 @@ class AsyncWriterConnectionResumedState
381381 return ;
382382 }
383383 }
384+ lk.unlock ();
384385 self->OnQuery (std::move (query_res));
385386 self->SetFlushed (std::unique_lock<std::mutex>(self->mu_ ),
386387 std::move (result));
@@ -406,7 +407,7 @@ class AsyncWriterConnectionResumedState
406407 }
407408
408409 void OnQuery (std::unique_lock<std::mutex> lk, std::int64_t persisted_size) {
409- auto handle = impl_ ->WriteHandle ();
410+ auto handle = Impl (lk) ->WriteHandle ();
410411 if (handle) {
411412 latest_write_handle_ = *std::move (handle);
412413 }
@@ -580,14 +581,14 @@ class AsyncWriterConnectionResumedState
580581 checksums = first_res.persisted_data_checksums ();
581582 }
582583 } else {
583- auto state = impl_ ->PersistedState ();
584+ auto state = Impl (lk) ->PersistedState ();
584585 if (absl::holds_alternative<google::storage::v2::Object>(state)) {
585586 finalized = true ;
586587 finalized_object =
587588 absl::get<google::storage::v2::Object>(std::move (state));
588589 } else {
589590 persisted_offset = absl::get<std::int64_t >(state);
590- checksums = impl_ ->PersistedChecksums ();
591+ checksums = Impl (lk) ->PersistedChecksums ();
591592 }
592593 }
593594
0 commit comments