Skip to content

Commit be81407

Browse files
committed
feat: Add buffered and resumed writer connection logic
1 parent 1b9b3c1 commit be81407

4 files changed

Lines changed: 206 additions & 6 deletions

File tree

google/cloud/storage/internal/async/writer_connection_buffered.cc

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,8 @@ class AsyncWriterConnectionBufferedState
281281
return tmp;
282282
}
283283

284-
void OnQuery(std::unique_lock<std::mutex> lk, std::int64_t persisted_size) {
284+
void OnQuery(std::unique_lock<std::mutex> lk, std::int64_t persisted_size,
285+
bool is_resume = false) {
285286
if (persisted_size < buffer_offset_) {
286287
auto id = UploadId(lk);
287288
return SetError(std::move(lk),
@@ -297,7 +298,14 @@ class AsyncWriterConnectionBufferedState
297298
}
298299
resend_buffer_.RemovePrefix(static_cast<std::size_t>(n));
299300
buffer_offset_ = persisted_size;
300-
write_offset_ -= static_cast<std::size_t>(n);
301+
if (is_resume) {
302+
// Since the buffer has been modified to start exactly at the point of the
303+
// resume, the next write on this new stream should start from the
304+
// beginning of this truncated buffer.
305+
write_offset_ = 0;
306+
} else {
307+
write_offset_ -= static_cast<std::size_t>(n);
308+
}
301309
// If the buffer is small enough, collect all the handlers to notify them.
302310
auto const handlers = ClearHandlersIfEmpty(lk);
303311
// SetFlushed will release the lock before returning.
@@ -382,7 +390,7 @@ class AsyncWriterConnectionBufferedState
382390
std::move(state)));
383391
}
384392
// Regular resume succeeded, object not finalized. Continue writing.
385-
OnQuery(std::move(lk), absl::get<std::int64_t>(state));
393+
OnQuery(std::move(lk), absl::get<std::int64_t>(state), /*is_resume=*/true);
386394
}
387395

388396
void SetFinalized(std::unique_lock<std::mutex> lk,

google/cloud/storage/internal/async/writer_connection_buffered_test.cc

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,67 @@ TEST(WriteConnectionBuffered, SetFinalizedIsIdempotent) {
12661266
next.first.set_value(true);
12671267
}
12681268

1269+
TEST(WriteConnectionBuffered, ResetWriteOffsetOnResume) {
1270+
AsyncSequencer<bool> sequencer;
1271+
auto mock = std::make_unique<MockAsyncWriterConnection>();
1272+
auto* mock_ptr = mock.get();
1273+
1274+
EXPECT_CALL(*mock_ptr, UploadId).WillRepeatedly(Return("test-upload-id"));
1275+
EXPECT_CALL(*mock_ptr, PersistedState)
1276+
.WillOnce(
1277+
Return(MakePersistedState(0))); // Initial state: 0 bytes persisted.
1278+
1279+
EXPECT_CALL(*mock_ptr, Write).WillOnce([&](auto) {
1280+
return sequencer.PushBack("Write").then([](auto f) {
1281+
if (!f.get()) return TransientError(); // This write will fail.
1282+
return Status{};
1283+
});
1284+
});
1285+
1286+
MockFactory mock_factory;
1287+
auto resumed_mock = std::make_unique<MockAsyncWriterConnection>();
1288+
auto* resumed_mock_ptr = resumed_mock.get();
1289+
1290+
EXPECT_CALL(mock_factory, Call).WillOnce([&]() {
1291+
return sequencer.PushBack("Resume").then([&](auto) {
1292+
// The resumed connection reports that 1024 bytes have been persisted.
1293+
EXPECT_CALL(*resumed_mock_ptr, PersistedState)
1294+
.WillRepeatedly(Return(MakePersistedState(1024)));
1295+
// We expect the next write on the resumed stream to send the remaining
1296+
// 1024 bytes. If the write offset was not reset to 0, this size would be
1297+
// incorrect.
1298+
EXPECT_CALL(*resumed_mock_ptr, Write).WillOnce([&](auto payload) {
1299+
EXPECT_EQ(payload.size(), 1024);
1300+
return sequencer.PushBack("ResumedWrite").then([](auto) {
1301+
return Status{};
1302+
});
1303+
});
1304+
return make_status_or(std::unique_ptr<storage::AsyncWriterConnection>(
1305+
std::move(resumed_mock)));
1306+
});
1307+
});
1308+
1309+
auto connection = MakeWriterConnectionBuffered(
1310+
mock_factory.AsStdFunction(), std::move(mock), TestOptions());
1311+
1312+
// Write a total of 2048 bytes.
1313+
auto write = connection->Write(TestPayload(2048));
1314+
1315+
auto next = sequencer.PopFrontWithName();
1316+
EXPECT_EQ(next.second, "Write");
1317+
next.first.set_value(false);
1318+
1319+
next = sequencer.PopFrontWithName();
1320+
EXPECT_EQ(next.second, "Resume");
1321+
next.first.set_value(true);
1322+
1323+
next = sequencer.PopFrontWithName();
1324+
EXPECT_EQ(next.second, "ResumedWrite");
1325+
next.first.set_value(true);
1326+
1327+
EXPECT_STATUS_OK(write.get());
1328+
}
1329+
12691330
} // namespace
12701331
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
12711332
} // namespace storage_internal

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,8 @@ class AsyncWriterConnectionResumedState
298298
return tmp;
299299
}
300300

301-
void OnQuery(std::unique_lock<std::mutex> lk, std::int64_t persisted_size) {
301+
void OnQuery(std::unique_lock<std::mutex> lk, std::int64_t persisted_size,
302+
bool is_resume = false) {
302303
auto handle = impl_->WriteHandle();
303304
if (handle) {
304305
latest_write_handle_ = *std::move(handle);
@@ -317,7 +318,14 @@ class AsyncWriterConnectionResumedState
317318
}
318319
resend_buffer_.RemovePrefix(static_cast<std::size_t>(n));
319320
buffer_offset_ = persisted_size;
320-
write_offset_ -= static_cast<std::size_t>(n);
321+
if (is_resume) {
322+
// Since the buffer has been modified to start exactly at the point of the
323+
// resume, the next write on this new stream should start from the
324+
// beginning of this truncated buffer.
325+
write_offset_ = 0;
326+
} else {
327+
write_offset_ -= static_cast<std::size_t>(n);
328+
}
321329
// If the buffer is small enough, collect all the handlers to notify them.
322330
auto const handlers = ClearHandlersIfEmpty(lk);
323331
state_ = State::kIdle;
@@ -436,7 +444,7 @@ class AsyncWriterConnectionResumedState
436444
options_, initial_request_, std::move(res->stream), hash_function_,
437445
persisted_offset, false);
438446
// OnQuery will restart the WriteLoop if necessary.
439-
OnQuery(std::move(lk), persisted_offset);
447+
OnQuery(std::move(lk), persisted_offset, /*is_resume=*/true);
440448
}
441449

442450
void SetFinalized(std::unique_lock<std::mutex> lk,

google/cloud/storage/internal/async/writer_connection_resumed_test.cc

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/internal/async/writer_connection_resumed.h"
16+
#include "google/cloud/mocks/mock_async_streaming_read_write_rpc.h"
1617
#include "google/cloud/storage/async/connection.h"
1718
#include "google/cloud/storage/mocks/mock_async_writer_connection.h"
1819
#include "google/cloud/storage/testing/canonical_errors.h"
@@ -615,6 +616,128 @@ TEST(WriterConnectionResumed, OnQueryUpdatesWriteHandle) {
615616
EXPECT_EQ(current_handle->handle(), "updated-handle");
616617
}
617618

619+
TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) {
620+
AsyncSequencer<bool> sequencer;
621+
auto mock = std::make_unique<MockAsyncWriterConnection>();
622+
auto* mock_ptr = mock.get();
623+
624+
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
625+
google::storage::v2::BidiWriteObjectResponse first_response;
626+
first_response.mutable_write_handle()->set_handle("initial-handle");
627+
628+
auto mock_hash =
629+
std::make_shared<google::cloud::storage::testing::MockHashFunction>();
630+
EXPECT_CALL(*mock_hash, Update(::testing::An<std::int64_t>(),
631+
::testing::An<absl::Cord const&>(),
632+
::testing::An<std::uint32_t>()))
633+
.WillRepeatedly(Return(Status()));
634+
635+
EXPECT_CALL(*mock_ptr, PersistedState)
636+
.WillOnce(
637+
Return(MakePersistedState(0))) // Initial state: 0 bytes persisted.
638+
.WillOnce(Return(
639+
MakePersistedState(1024))); // Resumed state: 1024 bytes persisted.
640+
641+
EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) {
642+
return sequencer.PushBack("Flush").then([](auto f) {
643+
if (f.get()) return Status{};
644+
return TransientError(); // Return a transient error to trigger resume.
645+
});
646+
});
647+
648+
MockFactory mock_factory;
649+
auto mock_stream =
650+
std::make_unique<google::cloud::mocks::MockAsyncStreamingReadWriteRpc<
651+
google::storage::v2::BidiWriteObjectRequest,
652+
google::storage::v2::BidiWriteObjectResponse>>();
653+
auto* mock_stream_ptr = mock_stream.get();
654+
655+
// The mock factory is called when the connection resumes.
656+
EXPECT_CALL(mock_factory, Call(_))
657+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest) {
658+
WriteObject::WriteResult result;
659+
result.stream = std::move(mock_stream);
660+
result.first_response.mutable_write_handle()->set_handle("new-handle");
661+
return sequencer.PushBack("Factory").then(
662+
[r = std::move(result)](auto) mutable {
663+
return StatusOr<WriteObject::WriteResult>(std::move(r));
664+
});
665+
});
666+
667+
// After resuming, the connection should write the remaining payload.
668+
EXPECT_CALL(*mock_stream_ptr, Write(_, _))
669+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
670+
grpc::WriteOptions) {
671+
// We expect the next write on the resumed stream to send the remaining
672+
// 1024 bytes. If the write offset was not reset to 0, this size would
673+
// be incorrect.
674+
EXPECT_EQ(request.checksummed_data().content().size(), 1024);
675+
return sequencer.PushBack("StreamWrite").then([](auto) {
676+
return true;
677+
});
678+
})
679+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
680+
grpc::WriteOptions) {
681+
// Expect a final "ghost" write to flush.
682+
EXPECT_TRUE(request.checksummed_data().content().empty());
683+
EXPECT_TRUE(request.flush());
684+
return sequencer.PushBack("GhostWrite").then([](auto) { return true; });
685+
});
686+
687+
google::storage::v2::BidiWriteObjectResponse read_response1;
688+
read_response1.set_persisted_size(2048);
689+
google::storage::v2::BidiWriteObjectResponse read_response2;
690+
read_response2.set_persisted_size(2048);
691+
EXPECT_CALL(*mock_stream_ptr, Read)
692+
.WillOnce([&, read_response1]() {
693+
return sequencer.PushBack("StreamRead1").then([read_response1](auto) {
694+
return absl::make_optional(read_response1);
695+
});
696+
})
697+
.WillOnce([&, read_response2]() {
698+
return sequencer.PushBack("StreamRead2").then([read_response2](auto) {
699+
return absl::make_optional(read_response2);
700+
});
701+
});
702+
703+
EXPECT_CALL(*mock_stream_ptr, Finish)
704+
.WillOnce(Return(make_ready_future(Status{})));
705+
EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return());
706+
707+
auto connection = MakeWriterConnectionResumed(
708+
mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash,
709+
first_response, Options{});
710+
711+
// Write a total of 2048 bytes.
712+
auto write = connection->Write(TestPayload(2048));
713+
714+
auto next = sequencer.PopFrontWithName();
715+
EXPECT_EQ(next.second, "Flush");
716+
next.first.set_value(false);
717+
718+
next = sequencer.PopFrontWithName();
719+
EXPECT_EQ(next.second, "Factory");
720+
next.first.set_value(true);
721+
722+
next = sequencer.PopFrontWithName();
723+
EXPECT_EQ(next.second, "StreamWrite");
724+
next.first.set_value(true);
725+
726+
next = sequencer.PopFrontWithName();
727+
EXPECT_EQ(next.second, "StreamRead1");
728+
next.first.set_value(true);
729+
730+
next = sequencer.PopFrontWithName();
731+
EXPECT_EQ(next.second, "GhostWrite");
732+
next.first.set_value(true);
733+
734+
next = sequencer.PopFrontWithName();
735+
EXPECT_EQ(next.second, "StreamRead2");
736+
next.first.set_value(true);
737+
738+
EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk));
739+
}
740+
618741
} // namespace
619742
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
620743
} // namespace storage_internal

0 commit comments

Comments
 (0)