@@ -94,9 +94,16 @@ AsyncWriterConnectionImpl::~AsyncWriterConnectionImpl() {
9494 // (2) calls to `Write()`, `Finalize()`, and `Query()` must have completed
9595 // by the time the destructor is called
9696 Finish ();
97+
98+ // We use a local copy of `impl` moved under lock to avoid
99+ // data races with concurrent calls to `Finish()` from callbacks.
100+ std::unique_lock<std::mutex> lk (mu_);
101+ auto impl = std::move (impl_);
102+ lk.unlock ();
103+
97104 // When `impl_->Finish()` is satisfied then `finished_` is satisfied too.
98105 // This extends the lifetime of `impl_` until it is safe to delete.
99- finished_.then ([impl = std::move (impl_ )](auto ) mutable {
106+ finished_.then ([impl = std::move (impl )](auto ) mutable {
100107 // Break the ownership cycle between the completion queue and this callback.
101108 impl.reset ();
102109 });
@@ -213,7 +220,11 @@ AsyncWriterConnectionImpl::OnFinalUpload(std::size_t upload_size,
213220 .then (transform);
214221 }
215222 offset_ += upload_size;
216- return impl_->Read ()
223+
224+ std::unique_lock<std::mutex> lk (mu_);
225+ auto impl = impl_;
226+ lk.unlock ();
227+ return impl->Read ()
217228 .then ([this ](auto f) { return OnQuery (f.get ()); })
218229 .then ([this ](auto g) -> StatusOr<google::storage::v2::Object> {
219230 auto status = g.get ();
@@ -256,11 +267,15 @@ future<StatusOr<std::int64_t>> AsyncWriterConnectionImpl::OnQuery(
256267}
257268
258269future<Status> AsyncWriterConnectionImpl::Finish () {
270+ std::unique_lock<std::mutex> lk (mu_);
259271 if (std::exchange (finish_called_, true )) {
260272 return make_ready_future (
261273 internal::CancelledError (" already finished" , GCP_ERROR_INFO ()));
262274 }
263- return impl_->Finish ().then ([p = std::move (on_finish_)](auto f) mutable {
275+ auto impl = impl_;
276+ lk.unlock ();
277+
278+ return impl->Finish ().then ([p = std::move (on_finish_)](auto f) mutable {
264279 p.set_value ();
265280 return f.get ();
266281 });
0 commit comments