Skip to content

Commit 4d91636

Browse files
authored
fix(storage): use server-reported size and crc on stream reconnect to avoid duplicate writes (googleapis#16115)
* fix(storage): C++ SDK might be replaying more writes than necessary on write stream reconnects * fix the format * resolve ai comments * update checksum as well * fix the format
1 parent 77109ef commit 4d91636

2 files changed

Lines changed: 258 additions & 9 deletions

File tree

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -441,16 +441,36 @@ class AsyncWriterConnectionResumedState
441441
return SetError(std::move(lk), std::move(res).status());
442442
}
443443
// Regular resume attempt succeeded. Check state.
444-
auto state = impl_->PersistedState();
445-
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
446-
// Found finalized object (maybe finalized concurrently or resumed).
447-
return SetFinalized(std::move(lk), absl::get<google::storage::v2::Object>(
448-
std::move(state)));
444+
std::int64_t persisted_offset = 0;
445+
absl::optional<google::storage::v2::ObjectChecksums> checksums;
446+
447+
if (res->first_response.has_resource()) {
448+
if (!res->first_response.has_write_handle()) {
449+
// Found finalized object (maybe finalized concurrently or resumed).
450+
return SetFinalized(std::move(lk),
451+
std::move(*res->first_response.mutable_resource()));
452+
}
453+
auto const& resource = res->first_response.resource();
454+
persisted_offset = resource.size();
455+
if (resource.has_checksums()) {
456+
checksums = resource.checksums();
457+
}
458+
} else if (res->first_response.has_persisted_size()) {
459+
persisted_offset = res->first_response.persisted_size();
460+
if (res->first_response.has_persisted_data_checksums()) {
461+
checksums = res->first_response.persisted_data_checksums();
462+
}
463+
} else {
464+
auto state = impl_->PersistedState();
465+
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
466+
// Found finalized object (maybe finalized concurrently or resumed).
467+
return SetFinalized(
468+
std::move(lk),
469+
absl::get<google::storage::v2::Object>(std::move(state)));
470+
}
471+
persisted_offset = absl::get<std::int64_t>(state);
472+
checksums = impl_->PersistedChecksums();
449473
}
450-
// Regular resume succeeded, object not finalized. Continue writing.
451-
auto persisted_offset = absl::get<std::int64_t>(state);
452-
453-
auto checksums = impl_->PersistedChecksums();
454474

455475
auto hash = hash_function_;
456476
if (checksums && checksums->has_crc32c()) {

google/cloud/storage/internal/async/writer_connection_resumed_test.cc

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,235 @@ TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) {
733733
EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk));
734734
}
735735

736+
TEST(WriterConnectionResumed, ResumeUsesSizeFromFirstResponse) {
737+
AsyncSequencer<bool> sequencer;
738+
auto mock = std::make_unique<MockAsyncWriterConnection>();
739+
auto* mock_ptr = mock.get();
740+
741+
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
742+
google::storage::v2::BidiWriteObjectResponse first_response;
743+
first_response.mutable_write_handle()->set_handle("initial-handle");
744+
745+
auto mock_hash =
746+
std::make_shared<google::cloud::storage::testing::MockHashFunction>();
747+
EXPECT_CALL(*mock_hash, Update(::testing::An<std::int64_t>(),
748+
::testing::An<absl::Cord const&>(),
749+
::testing::An<std::uint32_t>()))
750+
.WillRepeatedly(Return(Status()));
751+
752+
EXPECT_CALL(*mock_ptr, PersistedState)
753+
.WillOnce(Return(MakePersistedState(0)));
754+
755+
auto const payload = TestPayload(2048);
756+
757+
EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) {
758+
return sequencer.PushBack("Flush").then([](auto f) {
759+
if (f.get()) return Status{};
760+
return TransientError();
761+
});
762+
});
763+
764+
MockFactory mock_factory;
765+
auto mock_stream =
766+
std::make_unique<google::cloud::mocks::MockAsyncStreamingReadWriteRpc<
767+
google::storage::v2::BidiWriteObjectRequest,
768+
google::storage::v2::BidiWriteObjectResponse>>();
769+
auto* mock_stream_ptr = mock_stream.get();
770+
771+
EXPECT_CALL(mock_factory, Call(_))
772+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&) {
773+
WriteObject::WriteResult result;
774+
result.stream = std::move(mock_stream);
775+
result.first_response.mutable_write_handle()->set_handle("new-handle");
776+
result.first_response.set_persisted_size(2048);
777+
return sequencer.PushBack("Factory").then(
778+
[r = std::move(result)](auto) mutable {
779+
return StatusOr<WriteObject::WriteResult>(std::move(r));
780+
});
781+
});
782+
783+
EXPECT_CALL(*mock_stream_ptr, Write(_, _))
784+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
785+
grpc::WriteOptions) {
786+
EXPECT_EQ(request.write_offset(), 2048);
787+
return sequencer.PushBack("StateLookupWrite").then([](auto) {
788+
return true;
789+
});
790+
})
791+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
792+
grpc::WriteOptions) {
793+
EXPECT_TRUE(GetContent(request.checksummed_data()).empty());
794+
EXPECT_TRUE(request.flush());
795+
return sequencer.PushBack("GhostWrite").then([](auto) { return true; });
796+
});
797+
798+
google::storage::v2::BidiWriteObjectResponse read_response;
799+
read_response.set_persisted_size(2048);
800+
EXPECT_CALL(*mock_stream_ptr, Read)
801+
.WillOnce([&, read_response]() {
802+
return sequencer.PushBack("StreamRead1").then([read_response](auto) {
803+
return absl::make_optional(read_response);
804+
});
805+
})
806+
.WillOnce([&, read_response]() {
807+
return sequencer.PushBack("StreamRead2").then([read_response](auto) {
808+
return absl::make_optional(read_response);
809+
});
810+
});
811+
812+
EXPECT_CALL(*mock_stream_ptr, Finish)
813+
.WillOnce(Return(make_ready_future(Status{})));
814+
EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return());
815+
816+
auto connection = MakeWriterConnectionResumed(
817+
mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash,
818+
first_response, Options{});
819+
820+
auto write = connection->Write(payload);
821+
822+
auto next = sequencer.PopFrontWithName();
823+
EXPECT_EQ(next.second, "Flush");
824+
next.first.set_value(false);
825+
826+
next = sequencer.PopFrontWithName();
827+
EXPECT_EQ(next.second, "Factory");
828+
next.first.set_value(true);
829+
830+
next = sequencer.PopFrontWithName();
831+
EXPECT_EQ(next.second, "StateLookupWrite");
832+
next.first.set_value(true);
833+
834+
next = sequencer.PopFrontWithName();
835+
EXPECT_EQ(next.second, "StreamRead1");
836+
next.first.set_value(true);
837+
838+
next = sequencer.PopFrontWithName();
839+
EXPECT_EQ(next.second, "GhostWrite");
840+
next.first.set_value(true);
841+
842+
next = sequencer.PopFrontWithName();
843+
EXPECT_EQ(next.second, "StreamRead2");
844+
next.first.set_value(true);
845+
846+
EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk));
847+
}
848+
849+
TEST(WriterConnectionResumed, ResumeUsesChecksumsFromFirstResponse) {
850+
AsyncSequencer<bool> sequencer;
851+
auto mock = std::make_unique<MockAsyncWriterConnection>();
852+
auto* mock_ptr = mock.get();
853+
854+
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
855+
google::storage::v2::BidiWriteObjectResponse first_response;
856+
first_response.mutable_write_handle()->set_handle("initial-handle");
857+
858+
auto mock_hash =
859+
std::make_shared<google::cloud::storage::testing::MockHashFunction>();
860+
EXPECT_CALL(*mock_hash, Update(::testing::An<std::int64_t>(),
861+
::testing::An<absl::Cord const&>(),
862+
::testing::An<std::uint32_t>()))
863+
.WillRepeatedly(Return(Status()));
864+
865+
EXPECT_CALL(*mock_ptr, PersistedState)
866+
.WillOnce(Return(MakePersistedState(0)));
867+
868+
auto const payload = TestPayload(2048);
869+
870+
EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) {
871+
return sequencer.PushBack("Flush").then([](auto f) {
872+
if (f.get()) return Status{};
873+
return TransientError();
874+
});
875+
});
876+
877+
MockFactory mock_factory;
878+
auto mock_stream =
879+
std::make_unique<google::cloud::mocks::MockAsyncStreamingReadWriteRpc<
880+
google::storage::v2::BidiWriteObjectRequest,
881+
google::storage::v2::BidiWriteObjectResponse>>();
882+
auto* mock_stream_ptr = mock_stream.get();
883+
884+
EXPECT_CALL(mock_factory, Call(_))
885+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&) {
886+
WriteObject::WriteResult result;
887+
result.stream = std::move(mock_stream);
888+
result.first_response.mutable_write_handle()->set_handle("new-handle");
889+
result.first_response.set_persisted_size(2048);
890+
// Set checksums in response!
891+
result.first_response.mutable_persisted_data_checksums()->set_crc32c(
892+
12345);
893+
return sequencer.PushBack("Factory").then(
894+
[r = std::move(result)](auto) mutable {
895+
return StatusOr<WriteObject::WriteResult>(std::move(r));
896+
});
897+
});
898+
899+
EXPECT_CALL(*mock_stream_ptr, Write(_, _))
900+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
901+
grpc::WriteOptions) {
902+
EXPECT_EQ(request.write_offset(), 2048);
903+
return sequencer.PushBack("StateLookupWrite").then([](auto) {
904+
return true;
905+
});
906+
})
907+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
908+
grpc::WriteOptions) {
909+
EXPECT_TRUE(GetContent(request.checksummed_data()).empty());
910+
EXPECT_TRUE(request.flush());
911+
return sequencer.PushBack("GhostWrite").then([](auto) { return true; });
912+
});
913+
914+
google::storage::v2::BidiWriteObjectResponse read_response;
915+
read_response.set_persisted_size(2048);
916+
EXPECT_CALL(*mock_stream_ptr, Read)
917+
.WillOnce([&, read_response]() {
918+
return sequencer.PushBack("StreamRead1").then([read_response](auto) {
919+
return absl::make_optional(read_response);
920+
});
921+
})
922+
.WillOnce([&, read_response]() {
923+
return sequencer.PushBack("StreamRead2").then([read_response](auto) {
924+
return absl::make_optional(read_response);
925+
});
926+
});
927+
928+
EXPECT_CALL(*mock_stream_ptr, Finish)
929+
.WillOnce(Return(make_ready_future(Status{})));
930+
EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return());
931+
932+
auto connection = MakeWriterConnectionResumed(
933+
mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash,
934+
first_response, Options{});
935+
936+
auto write = connection->Write(payload);
937+
938+
auto next = sequencer.PopFrontWithName();
939+
EXPECT_EQ(next.second, "Flush");
940+
next.first.set_value(false);
941+
942+
next = sequencer.PopFrontWithName();
943+
EXPECT_EQ(next.second, "Factory");
944+
next.first.set_value(true);
945+
946+
next = sequencer.PopFrontWithName();
947+
EXPECT_EQ(next.second, "StateLookupWrite");
948+
next.first.set_value(true);
949+
950+
next = sequencer.PopFrontWithName();
951+
EXPECT_EQ(next.second, "StreamRead1");
952+
next.first.set_value(true);
953+
954+
next = sequencer.PopFrontWithName();
955+
EXPECT_EQ(next.second, "GhostWrite");
956+
next.first.set_value(true);
957+
958+
next = sequencer.PopFrontWithName();
959+
EXPECT_EQ(next.second, "StreamRead2");
960+
next.first.set_value(true);
961+
962+
EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk));
963+
}
964+
736965
} // namespace
737966
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
738967
} // namespace storage_internal

0 commit comments

Comments
 (0)