Skip to content

Commit 5a88176

Browse files
feat(storage): Expose Flush() in AsyncWriter (googleapis#15555)
1 parent 3b96c4b commit 5a88176

4 files changed

Lines changed: 209 additions & 1 deletion

File tree

google/cloud/storage/async/writer.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,28 @@ future<StatusOr<google::storage::v2::Object>> AsyncWriter::Finalize(
8686
return Finalize(std::move(token), WritePayload{});
8787
}
8888

89-
future<Status> AsyncWriter::Close() {
89+
future<Status> AsyncWriter::Flush() {
90+
if (!impl_) {
91+
return make_ready_future(
92+
internal::CancelledError("closed stream", GCP_ERROR_INFO()));
93+
}
94+
9095
return impl_->Flush(WritePayload{}).then([impl = impl_](auto f) {
9196
return f.get();
9297
});
9398
}
9499

100+
future<Status> AsyncWriter::Close() {
101+
if (!impl_) {
102+
return make_ready_future(
103+
internal::CancelledError("closed stream", GCP_ERROR_INFO()));
104+
}
105+
106+
return impl_->Flush(WritePayload{}).then([impl = std::move(impl_)](auto f) {
107+
return f.get();
108+
});
109+
}
110+
95111
RpcMetadata AsyncWriter::GetRequestMetadata() const {
96112
return impl_->GetRequestMetadata();
97113
}

google/cloud/storage/async/writer.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,25 @@ class AsyncWriter {
120120
future<StatusOr<google::storage::v2::Object>> Finalize(AsyncToken token,
121121
WritePayload payload);
122122

123+
/**
124+
* Flush any buffered data to the service.
125+
*
126+
* For buffered uploads, this forces any data in the buffer to be sent to the
127+
* service. The returned future is satisfied when the service acknowledges
128+
* the flush. Note that the service may not have persisted the data, it may
129+
* only be in ephemeral storage. To query the amount of persisted data use
130+
* `PersistedState()` after the flush completes.
131+
*
132+
* @note This is not a terminal operation. The `AsyncWriter` can be used for
133+
* further `Write()` or `Finalize()` operations.
134+
*/
135+
future<Status> Flush();
136+
123137
/**
124138
* Close the upload by flushing the remaining data in buffer.
139+
*
140+
* @warning This is a terminal operation. The `AsyncWriter` object is not
141+
* usable after this call.
125142
*/
126143
future<Status> Close();
127144

google/cloud/storage/async/writer_test.cc

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,140 @@ TEST(AsyncWriterTest, FinalizeEmpty) {
108108
EXPECT_STATUS_OK(actual);
109109
}
110110

111+
TEST(AsyncWriterTest, Flush) {
112+
auto mock = std::make_unique<MockAsyncWriterConnection>();
113+
::testing::InSequence sequence;
114+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
115+
return make_ready_future(Status{});
116+
});
117+
118+
AsyncWriter writer(std::move(mock));
119+
auto const actual = writer.Flush().get();
120+
EXPECT_STATUS_OK(actual);
121+
}
122+
123+
TEST(AsyncWriterTest, ErrorOnFlush) {
124+
auto mock = std::make_unique<MockAsyncWriterConnection>();
125+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
126+
return make_ready_future(PermanentError());
127+
});
128+
129+
AsyncWriter writer(std::move(mock));
130+
auto const actual = writer.Flush().get();
131+
EXPECT_THAT(actual, StatusIs(PermanentError().code()));
132+
}
133+
134+
TEST(AsyncWriterTest, FlushOnDefaultConstructed) {
135+
AsyncWriter writer;
136+
auto const actual = writer.Flush().get();
137+
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled, "closed stream"));
138+
}
139+
140+
TEST(AsyncWriterTest, FlushThenWrite) {
141+
auto mock = std::make_unique<MockAsyncWriterConnection>();
142+
auto* mock_ptr = mock.get();
143+
::testing::InSequence sequence;
144+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
145+
return make_ready_future(Status{});
146+
});
147+
EXPECT_CALL(*mock, Write).WillOnce([](auto const&) {
148+
return make_ready_future(Status{});
149+
});
150+
151+
AsyncWriter writer(std::move(mock));
152+
ASSERT_STATUS_OK(writer.Flush().get());
153+
154+
auto token = storage_internal::MakeAsyncToken(mock_ptr);
155+
auto const actual = writer.Write(std::move(token), {}).get();
156+
EXPECT_STATUS_OK(actual);
157+
}
158+
159+
TEST(AsyncWriterTest, MultipleFlushesAreQueuedAndSequential) {
160+
auto mock = std::make_unique<MockAsyncWriterConnection>();
161+
::testing::InSequence sequence;
162+
// Expect three flushes, each with empty payload, and simulate delayed
163+
// completion.
164+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
165+
// Simulate async completion for first flush
166+
return make_ready_future(Status{});
167+
});
168+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
169+
// Simulate async completion for second flush
170+
return make_ready_future(Status{});
171+
});
172+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
173+
// Simulate async completion for third flush
174+
return make_ready_future(Status{});
175+
});
176+
177+
AsyncWriter writer(std::move(mock));
178+
// Issue three flushes in quick succession
179+
auto f1 = writer.Flush();
180+
auto f2 = writer.Flush();
181+
auto f3 = writer.Flush();
182+
183+
// All futures should be satisfied in order
184+
EXPECT_STATUS_OK(f1.get());
185+
EXPECT_STATUS_OK(f2.get());
186+
EXPECT_STATUS_OK(f3.get());
187+
}
188+
189+
TEST(AsyncWriterTest, Close) {
190+
auto mock = std::make_unique<MockAsyncWriterConnection>();
191+
::testing::InSequence sequence;
192+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
193+
return make_ready_future(Status{});
194+
});
195+
196+
AsyncWriter writer(std::move(mock));
197+
auto const actual = writer.Close().get();
198+
EXPECT_STATUS_OK(actual);
199+
}
200+
201+
TEST(AsyncWriterTest, CloseOnDefaultConstructed) {
202+
AsyncWriter writer;
203+
auto const actual = writer.Close().get();
204+
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled, "closed stream"));
205+
}
206+
207+
TEST(AsyncWriterTest, CloseOnMovedWriter) {
208+
auto mock = std::make_unique<MockAsyncWriterConnection>();
209+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
210+
return make_ready_future(Status{});
211+
});
212+
AsyncWriter writer(std::move(mock));
213+
AsyncWriter moved(std::move(writer));
214+
auto const actual = moved.Close().get();
215+
EXPECT_STATUS_OK(actual);
216+
}
217+
218+
TEST(AsyncWriterTest, ErrorOnWriteAfterClose) {
219+
auto mock = std::make_unique<MockAsyncWriterConnection>();
220+
auto* mock_ptr = mock.get();
221+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
222+
return make_ready_future(Status{});
223+
});
224+
225+
AsyncWriter writer(std::move(mock));
226+
ASSERT_STATUS_OK(writer.Close().get());
227+
228+
// Create a token for the (now invalid) writer.
229+
auto token = storage_internal::MakeAsyncToken(mock_ptr);
230+
auto const actual = writer.Write(std::move(token), {}).get();
231+
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled));
232+
}
233+
234+
TEST(AsyncWriterTest, ErrorOnClose) {
235+
auto mock = std::make_unique<MockAsyncWriterConnection>();
236+
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
237+
return make_ready_future(PermanentError());
238+
});
239+
240+
AsyncWriter writer(std::move(mock));
241+
auto const actual = writer.Close().get();
242+
EXPECT_THAT(actual, StatusIs(PermanentError().code()));
243+
}
244+
111245
TEST(AsyncWriterTest, ErrorDuringWrite) {
112246
auto mock = std::make_unique<MockAsyncWriterConnection>();
113247
EXPECT_CALL(*mock, Write).WillOnce([] {

google/cloud/storage/tests/async_client_integration_test.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,47 @@ TEST_F(AsyncClientIntegrationTest, ResumeFinalizedAppendableObjectUpload) {
905905
IsProtoEqual(*metadata)));
906906
}
907907

908+
TEST_F(AsyncClientIntegrationTest, ExplicitFlushAppendableObjectUpload) {
909+
if (!UsingEmulator()) GTEST_SKIP();
910+
auto async = AsyncClient(TestOptions());
911+
auto client = MakeIntegrationTestClient(true, TestOptions());
912+
auto object_name = MakeRandomObjectName();
913+
// Create a small block to send over and over.
914+
auto constexpr kBlockSize = static_cast<std::int64_t>(256 * 1024);
915+
auto const block = MakeRandomData(kBlockSize);
916+
917+
auto create =
918+
client.CreateBucket(bucket_name(), storage::BucketMetadata{}
919+
.set_location("us-west4")
920+
.set_storage_class("RAPID"));
921+
if (!create && create.status().code() != StatusCode::kAlreadyExists) {
922+
GTEST_FAIL() << "cannot create bucket: " << create.status();
923+
}
924+
auto w =
925+
async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name)
926+
.get();
927+
ASSERT_STATUS_OK(w);
928+
AsyncWriter writer;
929+
AsyncToken token;
930+
std::tie(writer, token) = *std::move(w);
931+
932+
auto p = writer.Write(std::move(token), WritePayload(block)).get();
933+
ASSERT_STATUS_OK(p);
934+
token = *std::move(p);
935+
936+
// Explicitly flush the data.
937+
auto flush_status = writer.Flush().get();
938+
EXPECT_STATUS_OK(flush_status);
939+
940+
auto metadata = writer.Finalize(std::move(token)).get();
941+
ASSERT_STATUS_OK(metadata);
942+
ScheduleForDelete(*metadata);
943+
944+
EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName());
945+
EXPECT_EQ(metadata->name(), object_name);
946+
EXPECT_EQ(metadata->size(), kBlockSize);
947+
}
948+
908949
TEST_F(AsyncClientIntegrationTest, Open) {
909950
if (!UsingEmulator()) GTEST_SKIP();
910951
auto async = AsyncClient(TestOptions());

0 commit comments

Comments
 (0)