Skip to content

Commit d7559f6

Browse files
authored
fix(storage): Implement clean half-close stream teardown for appendable uploads (googleapis#16112)
1 parent 3c80d3f commit d7559f6

9 files changed

Lines changed: 118 additions & 6 deletions

File tree

google/cloud/storage/async/writer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ future<Status> AsyncWriter::Close() {
104104
"closed stream", GCP_ERROR_INFO()));
105105
}
106106

107-
return impl_->Flush(WritePayload{}).then([impl = std::move(impl_)](auto f) {
107+
return impl_->Close(WritePayload{}).then([impl = std::move(impl_)](auto f) {
108108
return f.get();
109109
});
110110
}

google/cloud/storage/async/writer_connection.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@ class AsyncWriterConnection {
116116
/// Uploads some data to the service and flushes the value.
117117
virtual future<Status> Flush(WritePayload payload) = 0;
118118

119+
/// Cleanly flushes pending data and piggybacks a stream half-close
120+
/// (WritesDone).
121+
virtual future<Status> Close(WritePayload payload) {
122+
return Flush(std::move(payload));
123+
}
124+
119125
/// Wait for the result of a `Flush()` call.
120126
virtual future<StatusOr<std::int64_t>> Query() = 0;
121127

google/cloud/storage/async/writer_test.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ TEST(AsyncWriterTest, MultipleFlushesAreQueuedAndSequential) {
188188
TEST(AsyncWriterTest, Close) {
189189
auto mock = std::make_unique<MockAsyncWriterConnection>();
190190
::testing::InSequence sequence;
191-
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
191+
EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] {
192192
return make_ready_future(Status{});
193193
});
194194

@@ -205,7 +205,7 @@ TEST(AsyncWriterTest, CloseOnDefaultConstructed) {
205205

206206
TEST(AsyncWriterTest, CloseOnMovedWriter) {
207207
auto mock = std::make_unique<MockAsyncWriterConnection>();
208-
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
208+
EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] {
209209
return make_ready_future(Status{});
210210
});
211211
AsyncWriter writer(std::move(mock));
@@ -217,7 +217,7 @@ TEST(AsyncWriterTest, CloseOnMovedWriter) {
217217
TEST(AsyncWriterTest, ErrorOnWriteAfterClose) {
218218
auto mock = std::make_unique<MockAsyncWriterConnection>();
219219
auto* mock_ptr = mock.get();
220-
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
220+
EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] {
221221
return make_ready_future(Status{});
222222
});
223223

@@ -232,7 +232,7 @@ TEST(AsyncWriterTest, ErrorOnWriteAfterClose) {
232232

233233
TEST(AsyncWriterTest, ErrorOnClose) {
234234
auto mock = std::make_unique<MockAsyncWriterConnection>();
235-
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
235+
EXPECT_CALL(*mock, Close(WritePayloadContents(IsEmpty()))).WillOnce([] {
236236
return make_ready_future(PermanentError());
237237
});
238238

google/cloud/storage/internal/async/partial_upload.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ void PartialUpload::Write() {
6060
} else if (action_ == LastMessageAction::kFlush) {
6161
request_.set_flush(true);
6262
request_.set_state_lookup(true);
63+
} else if (action_ == LastMessageAction::kFlushAndClose) {
64+
request_.set_flush(true);
65+
request_.set_state_lookup(true);
66+
wopt.set_last_message();
6367
}
6468
}
6569
(void)rpc_->Write(request_, std::move(wopt))

google/cloud/storage/internal/async/partial_upload.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,13 @@ class PartialUpload : public std::enable_shared_from_this<PartialUpload> {
9090
google::storage::v2::BidiWriteObjectRequest,
9191
google::storage::v2::BidiWriteObjectResponse>;
9292

93-
enum LastMessageAction { kNone, kFlush, kFinalize, kFinalizeWithChecksum };
93+
enum LastMessageAction {
94+
kNone,
95+
kFlush,
96+
kFlushAndClose,
97+
kFinalize,
98+
kFinalizeWithChecksum
99+
};
94100

95101
static std::shared_ptr<PartialUpload> Call(
96102
std::shared_ptr<StreamingWriteRpc> rpc,

google/cloud/storage/internal/async/writer_connection_impl.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,19 @@ future<Status> AsyncWriterConnectionImpl::Flush(storage::WritePayload payload) {
176176
});
177177
}
178178

179+
future<Status> AsyncWriterConnectionImpl::Close(storage::WritePayload payload) {
180+
auto write = MakeRequest();
181+
auto p = WritePayloadImpl::GetImpl(payload);
182+
auto size = p.size();
183+
auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write),
184+
std::move(p), PartialUpload::kFlushAndClose);
185+
186+
return coro->Start().then([coro, size, this](auto f) mutable {
187+
coro.reset(); // breaks the cycle between the completion queue and coro
188+
return OnClose(size, f.get());
189+
});
190+
}
191+
179192
future<StatusOr<std::int64_t>> AsyncWriterConnectionImpl::Query() {
180193
return impl_->Read().then([this](auto f) { return OnQuery(f.get()); });
181194
}
@@ -215,6 +228,19 @@ future<Status> AsyncWriterConnectionImpl::OnPartialUpload(
215228
return make_ready_future(Status{});
216229
}
217230

231+
future<Status> AsyncWriterConnectionImpl::OnClose(std::size_t upload_size,
232+
StatusOr<bool> success) {
233+
if (!success) {
234+
return Finish().then(HandleFinishAfterError(std::move(success).status()));
235+
}
236+
if (!*success) {
237+
return Finish().then(
238+
HandleFinishAfterError("Expected Finish() error after non-ok Write()"));
239+
}
240+
offset_ += upload_size;
241+
return Finish().then([](auto f) { return f.get(); });
242+
}
243+
218244
future<StatusOr<google::storage::v2::Object>>
219245
AsyncWriterConnectionImpl::OnFinalUpload(std::size_t upload_size,
220246
StatusOr<bool> success) {

google/cloud/storage/internal/async/writer_connection_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection {
7171
future<StatusOr<google::storage::v2::Object>> Finalize(
7272
storage::WritePayload) override;
7373
future<Status> Flush(storage::WritePayload payload) override;
74+
future<Status> Close(storage::WritePayload payload) override;
7475
future<StatusOr<std::int64_t>> Query() override;
7576
RpcMetadata GetRequestMetadata() override;
7677

@@ -91,6 +92,7 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection {
9192

9293
future<Status> OnPartialUpload(std::size_t upload_size,
9394
StatusOr<bool> success);
95+
future<Status> OnClose(std::size_t upload_size, StatusOr<bool> success);
9496
future<StatusOr<google::storage::v2::Object>> OnFinalUpload(
9597
std::size_t upload_size, StatusOr<bool> success);
9698
future<StatusOr<std::int64_t>> OnQuery(

google/cloud/storage/internal/async/writer_connection_impl_test.cc

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,73 @@ TEST(AsyncWriterConnectionTest, QueryUpdatesHandle) {
837837
EXPECT_EQ(seen_handles[0], "queried-handle");
838838
}
839839

840+
TEST(AsyncWriterConnectionTest, CloseEmpty) {
841+
AsyncSequencer<bool> sequencer;
842+
auto mock = std::make_unique<MockStream>();
843+
EXPECT_CALL(*mock, Cancel).Times(1);
844+
EXPECT_CALL(*mock, Write)
845+
.WillOnce([&](Request const& request, grpc::WriteOptions wopt) {
846+
EXPECT_TRUE(request.flush());
847+
EXPECT_TRUE(request.state_lookup());
848+
EXPECT_TRUE(wopt.is_last_message());
849+
EXPECT_EQ(request.common_object_request_params().encryption_algorithm(),
850+
"test-only-algo");
851+
return sequencer.PushBack("Write");
852+
});
853+
EXPECT_CALL(*mock, Finish).WillOnce([&] {
854+
return sequencer.PushBack("Finish").then([](auto f) {
855+
if (f.get()) return Status{};
856+
return PermanentError();
857+
});
858+
});
859+
auto hash = std::make_shared<MockHashFunction>();
860+
EXPECT_CALL(*hash, Update(_, An<absl::Cord const&>(), _)).Times(1);
861+
EXPECT_CALL(*hash, Finish).Times(0);
862+
863+
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
864+
TestOptions(), MakeRequest(), std::move(mock), hash, 1024);
865+
auto close = tested->Close(WritePayload{});
866+
auto next = sequencer.PopFrontWithName();
867+
ASSERT_THAT(next.second, "Write");
868+
next.first.set_value(true);
869+
870+
next = sequencer.PopFrontWithName();
871+
ASSERT_THAT(next.second, "Finish");
872+
next.first.set_value(true);
873+
874+
EXPECT_THAT(close.get(), IsOk());
875+
tested = {};
876+
}
877+
878+
TEST(AsyncWriterConnectionTest, CloseError) {
879+
AsyncSequencer<bool> sequencer;
880+
auto mock = std::make_unique<MockStream>();
881+
EXPECT_CALL(*mock, Cancel).Times(1);
882+
EXPECT_CALL(*mock, Write).WillOnce([&](Request const&, grpc::WriteOptions) {
883+
return sequencer.PushBack("Write");
884+
});
885+
EXPECT_CALL(*mock, Finish).WillOnce([&] {
886+
return sequencer.PushBack("Finish").then([](auto f) {
887+
if (f.get()) return Status{};
888+
return PermanentError();
889+
});
890+
});
891+
auto hash = std::make_shared<MockHashFunction>();
892+
EXPECT_CALL(*hash, Update(_, An<absl::Cord const&>(), _)).Times(1);
893+
EXPECT_CALL(*hash, Finish).Times(0);
894+
895+
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
896+
TestOptions(), MakeRequest(), std::move(mock), hash, 1024);
897+
auto response = tested->Close(WritePayload{});
898+
auto next = sequencer.PopFrontWithName();
899+
ASSERT_THAT(next.second, "Write");
900+
next.first.set_value(false); // Detect an error on Write()
901+
next = sequencer.PopFrontWithName();
902+
ASSERT_THAT(next.second, "Finish");
903+
next.first.set_value(false); // Return error from Finish()
904+
EXPECT_THAT(response.get(), StatusIs(PermanentError().code()));
905+
}
906+
840907
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
841908
} // namespace storage_internal
842909
} // namespace cloud

google/cloud/storage/mocks/mock_async_writer_connection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class MockAsyncWriterConnection : public storage::AsyncWriterConnection {
3737
MOCK_METHOD(future<StatusOr<google::storage::v2::Object>>, Finalize,
3838
(storage::WritePayload), (override));
3939
MOCK_METHOD(future<Status>, Flush, (storage::WritePayload), (override));
40+
MOCK_METHOD(future<Status>, Close, (storage::WritePayload), (override));
4041
MOCK_METHOD(future<StatusOr<std::int64_t>>, Query, (), (override));
4142
MOCK_METHOD(RpcMetadata, GetRequestMetadata, (), (override));
4243
};

0 commit comments

Comments
 (0)