Skip to content

Commit e908d88

Browse files
authored
fix(storage): add Close() support to AsyncWriterConnectionBuffered (#16145)
1 parent fd73bee commit e908d88

2 files changed

Lines changed: 348 additions & 28 deletions

File tree

google/cloud/storage/internal/async/writer_connection_buffered.cc

Lines changed: 131 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class AsyncWriterConnectionBufferedState
7272
buffer_size_hwm_(buffer_size_hwm),
7373
impl_(std::move(impl)) {
7474
finalized_future_ = finalized_.get_future();
75+
closed_future_ = closed_.get_future();
7576
auto state = impl_->PersistedState();
7677
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
7778
SetFinalized(std::unique_lock<std::mutex>(mu_),
@@ -121,6 +122,19 @@ class AsyncWriterConnectionBufferedState
121122
return std::move(finalized_future_);
122123
}
123124

125+
future<Status> Close(storage::WritePayload const& p) {
126+
std::unique_lock<std::mutex> lk(mu_);
127+
if (close_ || closed_promise_completed_) {
128+
return make_ready_future(internal::FailedPreconditionError(
129+
"Close() already called", GCP_ERROR_INFO()));
130+
}
131+
resend_buffer_.Append(WritePayloadImpl::GetImpl(p));
132+
close_ = true;
133+
// Force flush to drain the buffer first.
134+
HandleNewData(std::move(lk), true);
135+
return std::move(closed_future_);
136+
}
137+
124138
future<Status> Flush(storage::WritePayload const& p) {
125139
std::unique_lock<std::mutex> lk(mu_);
126140
// Create a new promise for this flush operation.
@@ -207,6 +221,10 @@ class AsyncWriterConnectionBufferedState
207221
// FinalizeStep will set the finalizing_ flag.
208222
return FinalizeStep(std::move(lk));
209223
}
224+
if (close_ && !closing_) {
225+
// CloseStep will set the closing_ flag.
226+
return CloseStep(std::move(lk));
227+
}
210228
// If not finalizing, check if an empty flush is needed.
211229
if (flush_) {
212230
// Pass empty payload to FlushStep
@@ -239,6 +257,22 @@ class AsyncWriterConnectionBufferedState
239257
SetFinalized(std::unique_lock<std::mutex>(mu_), std::move(result));
240258
}
241259

260+
void CloseStep(std::unique_lock<std::mutex> lk) {
261+
if (closing_) return;
262+
closing_ = true;
263+
auto impl = Impl(lk);
264+
lk.unlock();
265+
(void)impl->Close(storage::WritePayload{})
266+
.then([w = WeakFromThis()](auto f) {
267+
if (auto self = w.lock()) return self->OnClose(f.get());
268+
});
269+
}
270+
271+
void OnClose(Status const& result) {
272+
if (!result.ok()) return Resume(std::move(result));
273+
SetClosed(std::unique_lock<std::mutex>(mu_), std::move(result));
274+
}
275+
242276
void FlushStep(std::unique_lock<std::mutex> lk, absl::Cord payload) {
243277
auto impl = Impl(lk);
244278
lk.unlock();
@@ -343,40 +377,41 @@ class AsyncWriterConnectionBufferedState
343377
}
344378

345379
void Resume(Status const& s) {
346-
// Capture the finalization state *before* starting the async resume.
380+
// Capture the finalization and close state *before* starting the async
381+
// resume.
347382
bool was_finalizing;
383+
bool was_closing;
348384
{
349385
std::unique_lock<std::mutex> lk(mu_);
350386
was_finalizing = finalizing_;
387+
was_closing = closing_;
351388
if (!s.ok() && cancelled_) {
352389
return SetError(std::move(lk), std::move(s));
353390
}
354391
}
355-
// Pass the original status `s` and `was_finalizing` to the callback.
356-
factory_().then([s, was_finalizing, w = WeakFromThis()](auto f) {
357-
if (auto self = w.lock())
358-
return self->OnResume(s, was_finalizing, f.get());
359-
});
392+
// Pass the original status `s`, `was_finalizing`, and `was_closing` to the
393+
// callback.
394+
factory_().then(
395+
[s, was_finalizing, was_closing, w = WeakFromThis()](auto f) {
396+
if (auto self = w.lock())
397+
return self->OnResume(s, was_finalizing, was_closing, f.get());
398+
});
360399
}
361400

362401
void OnResume(
363-
Status const& original_status, bool was_finalizing,
402+
Status const& original_status, bool was_finalizing, bool was_closing,
364403
StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> impl) {
365404
std::unique_lock<std::mutex> lk(mu_);
366405

367-
// Resume was *not* triggered by finalization failure.
406+
// Resume was *not* triggered by finalization or close failure.
368407
if (!impl) return SetError(std::move(lk), std::move(impl).status());
369408
// On successful resume, immediately update the active connection.
370409
impl_ = std::move(*impl);
371410

411+
auto state = impl_->PersistedState();
372412
if (was_finalizing) {
373413
// If resuming due to a finalization error, we *must* complete the
374414
// finalized_ promise now, based on the resume attempt's outcome.
375-
// The resume attempt itself failed. Use that error.
376-
if (!impl) return SetError(std::move(lk), std::move(impl).status());
377-
378-
// Resume attempt succeeded, check the persisted state.
379-
auto state = impl_->PersistedState();
380415
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
381416
// Resume found the object is finalized. Success.
382417
return SetFinalized(
@@ -391,7 +426,21 @@ class AsyncWriterConnectionBufferedState
391426
return SetError(std::move(lk), std::move(original_status));
392427
}
393428

394-
auto state = impl_->PersistedState();
429+
if (was_closing) {
430+
// If resuming due to a close error, we must complete the
431+
// closed_ promise now, based on the resume attempt's outcome.
432+
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
433+
// Resume found the object is finalized (which implies closed). Success.
434+
return SetClosed(std::move(lk), Status{});
435+
}
436+
// Resume succeeded, but the object is still not finalized/closed.
437+
// This means the original close attempt failed permanently.
438+
// Use the original status that triggered the resume. Reset closing_
439+
// before setting the error, as the attempt is now over.
440+
closing_ = false;
441+
return SetError(std::move(lk), std::move(original_status));
442+
}
443+
395444
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
396445
// Found finalized object (maybe finalized concurrently or resumed).
397446
return SetFinalized(std::move(lk), absl::get<google::storage::v2::Object>(
@@ -432,6 +481,29 @@ class AsyncWriterConnectionBufferedState
432481
p.set_value(std::move(object)); // Set value on the moved promise
433482
}
434483

484+
void SetClosed(std::unique_lock<std::mutex> lk, Status const& status) {
485+
resend_buffer_.Clear();
486+
writing_ = false;
487+
close_ = false;
488+
closing_ = false;
489+
flush_ = false;
490+
// Check if the promise has already been completed.
491+
if (closed_promise_completed_) {
492+
return;
493+
}
494+
// Mark the promise as completed before moving it.
495+
closed_promise_completed_ = true;
496+
auto handlers = ClearHandlers(lk);
497+
// Also clear any pending flush promises on success.
498+
auto pending_flushes = std::move(pending_flush_promises_);
499+
auto p = std::move(closed_); // Move the member promise.
500+
lk.unlock();
501+
// Notify handlers and pending flushes after releasing the lock.
502+
for (auto& h : handlers) h->Execute(status);
503+
for (auto& pf : pending_flushes) pf.set_value(status);
504+
p.set_value(status); // Set value on the moved promise.
505+
}
506+
435507
void SetFlushed(std::unique_lock<std::mutex> lk, Status const& result) {
436508
if (!result.ok()) return SetError(std::move(lk), std::move(result));
437509
flush_ = false; // Reset flush flag; WriteLoop may set it again.
@@ -465,27 +537,32 @@ class AsyncWriterConnectionBufferedState
465537
writing_ = false;
466538
finalize_ = false;
467539
finalizing_ = false; // Reset finalizing flag
540+
close_ = false;
541+
closing_ = false; // Reset closing flag
468542
flush_ = false;
469543

470544
// Always clear handlers and pending flushes on error.
471545
auto handlers = ClearHandlers(lk);
472546
auto pending_flushes = std::move(pending_flush_promises_);
473547

474548
// Check if the finalized promise has already been completed.
475-
if (finalized_promise_completed_) {
476-
// Finalized promise already set, just notify handlers and pending
477-
// flushes.
478-
lk.unlock(); // Release lock before notifying
479-
for (auto& h : handlers) h->Execute(status);
480-
for (auto& pf : pending_flushes) pf.set_value(status);
481-
return;
549+
bool complete_finalized = false;
550+
promise<StatusOr<google::storage::v2::Object>> finalized_to_complete;
551+
if (!finalized_promise_completed_) {
552+
finalized_promise_completed_ = true;
553+
finalized_to_complete = std::move(finalized_);
554+
complete_finalized = true;
555+
}
556+
557+
// Check if the closed promise has already been completed.
558+
bool complete_closed = false;
559+
promise<Status> closed_to_complete;
560+
if (!closed_promise_completed_) {
561+
closed_promise_completed_ = true;
562+
closed_to_complete = std::move(closed_);
563+
complete_closed = true;
482564
}
483565

484-
// Mark the finalized promise as completed *before* moving it under the
485-
// lock.
486-
finalized_promise_completed_ = true;
487-
// Move the finalized promise.
488-
auto p = std::move(finalized_);
489566
lk.unlock(); // Release lock before notifying
490567

491568
// Notify handlers first.
@@ -494,8 +571,13 @@ class AsyncWriterConnectionBufferedState
494571
for (auto& pf : pending_flushes) {
495572
pf.set_value(status);
496573
}
497-
// Set error on the moved finalized promise *once*.
498-
p.set_value(status);
574+
// Set error on the moved promises *once*.
575+
if (complete_finalized) {
576+
finalized_to_complete.set_value(status);
577+
}
578+
if (complete_closed) {
579+
closed_to_complete.set_value(status);
580+
}
499581
}
500582

501583
std::shared_ptr<storage::AsyncWriterConnection> Impl(
@@ -540,6 +622,14 @@ class AsyncWriterConnectionBufferedState
540622
// finalized_.
541623
future<StatusOr<google::storage::v2::Object>> finalized_future_;
542624

625+
// The result of calling `Close()`. Note that only one such call is ever
626+
// made.
627+
promise<Status> closed_;
628+
629+
// Retrieve the future in the constructor, as some operations reset
630+
// closed_.
631+
future<Status> closed_future_;
632+
543633
// Queue of promises for outstanding Flush() calls.
544634
std::deque<promise<Status>> pending_flush_promises_;
545635

@@ -557,6 +647,15 @@ class AsyncWriterConnectionBufferedState
557647
// If true, all the data to finalize an upload is in `resend_buffer_`.
558648
bool finalize_ = false;
559649

650+
// If true, all the data to close an upload is in `resend_buffer_`.
651+
bool close_ = false;
652+
653+
// True if CloseStep has been initiated. Prevents re-entry.
654+
bool closing_ = false;
655+
656+
// Tracks if the final promise (`closed_`) has been completed.
657+
bool closed_promise_completed_ = false;
658+
560659
// If true, all data should be uploaded with `Flush()`.
561660
bool flush_ = false;
562661

@@ -659,6 +758,10 @@ class AsyncWriterConnectionBuffered : public storage::AsyncWriterConnection {
659758
return state_->Flush(std::move(p));
660759
}
661760

761+
future<Status> Close(storage::WritePayload p) override {
762+
return state_->Close(std::move(p));
763+
}
764+
662765
future<StatusOr<std::int64_t>> Query() override { return state_->Query(); }
663766

664767
RpcMetadata GetRequestMetadata() override {

0 commit comments

Comments
 (0)