Skip to content

Commit 6a3a627

Browse files
authored
feat(storage): Add full object checksum validation for appendable uploads (googleapis#16110)
* fastbyte full object checksum * add fastbyte checksum * revert async client integration test local change * resolve ai comments * fix the format * resolve fix formats * fix the format * fix the format * fix the format * resolve the retry edge case * fix the format * fix the format * fix the format * resolve the review comments * fix the format
1 parent a899f46 commit 6a3a627

7 files changed

Lines changed: 365 additions & 32 deletions

File tree

google/cloud/storage/async/writer_connection.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ class AsyncWriterConnection {
125125
/// Returns the latest write handle, if any.
126126
virtual absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
127127
const = 0;
128+
129+
/// Returns the latest persisted data checksums, if any.
130+
virtual absl::optional<google::storage::v2::ObjectChecksums>
131+
PersistedChecksums() const {
132+
return absl::nullopt;
133+
}
128134
};
129135

130136
/**

google/cloud/storage/internal/async/connection_impl.cc

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,47 @@ std::unique_ptr<storage::internal::HashFunction> CreateHashFunction(
110110
return storage::internal::CreateNullHashFunction();
111111
}
112112

113+
StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> MakeAppendableWriter(
114+
google::cloud::internal::ImmutableOptions const& current,
115+
google::storage::v2::BidiWriteObjectRequest request,
116+
std::int64_t persisted_size,
117+
std::function<future<StatusOr<WriteObject::WriteResult>>(
118+
google::storage::v2::BidiWriteObjectRequest)>
119+
factory,
120+
StatusOr<WriteObject::WriteResult> rpc) {
121+
if (!rpc) return std::move(rpc).status();
122+
123+
std::shared_ptr<storage::internal::HashFunction> hash;
124+
std::unique_ptr<AsyncWriterConnectionImpl> impl;
125+
126+
if (rpc->first_response.has_resource()) {
127+
auto const& resource = rpc->first_response.resource();
128+
if (current->get<storage::EnableCrc32cValidationOption>() &&
129+
resource.has_checksums() && resource.checksums().has_crc32c()) {
130+
hash = std::make_shared<
131+
::google::cloud::storage::internal::Crc32cHashFunction>(
132+
resource.checksums().crc32c(), resource.size());
133+
} else {
134+
hash = CreateHashFunction(*current);
135+
}
136+
impl = std::make_unique<AsyncWriterConnectionImpl>(
137+
current, request, std::move(rpc->stream), hash, resource, false);
138+
} else {
139+
persisted_size = rpc->first_response.persisted_size();
140+
hash = CreateHashFunction(*current);
141+
auto checksums = rpc->first_response.has_persisted_data_checksums()
142+
? absl::make_optional(
143+
rpc->first_response.persisted_data_checksums())
144+
: absl::nullopt;
145+
impl = std::make_unique<AsyncWriterConnectionImpl>(
146+
current, request, std::move(rpc->stream), hash, persisted_size, false,
147+
checksums);
148+
}
149+
return MakeWriterConnectionResumed(std::move(factory), std::move(impl),
150+
std::move(request), std::move(hash),
151+
rpc->first_response, *current);
152+
}
153+
113154
std::unique_ptr<storage::internal::HashValidator> CreateHashValidator(
114155
google::storage::v2::ReadObjectRequest const& request,
115156
Options const& options) {
@@ -315,8 +356,6 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
315356
auto current = internal::MakeImmutableOptions(std::move(p.options));
316357
auto request = p.request;
317358
std::int64_t persisted_size = 0;
318-
std::shared_ptr<storage::internal::HashFunction> hash_function =
319-
CreateHashFunction(*current);
320359
auto retry =
321360
std::shared_ptr<storage::AsyncRetryPolicy>(retry_policy(*current));
322361
auto backoff =
@@ -404,24 +443,10 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
404443
auto pending = factory(std::move(request));
405444
return pending.then(
406445
[current, request = std::move(p.request), persisted_size,
407-
hash = std::move(hash_function), fa = std::move(factory)](auto f) mutable
446+
fa = std::move(factory)](auto f) mutable
408447
-> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
409-
auto rpc = f.get();
410-
if (!rpc) return std::move(rpc).status();
411-
std::unique_ptr<AsyncWriterConnectionImpl> impl;
412-
if (rpc->first_response.has_resource()) {
413-
impl = std::make_unique<AsyncWriterConnectionImpl>(
414-
current, request, std::move(rpc->stream), hash,
415-
rpc->first_response.resource(), false);
416-
} else {
417-
persisted_size = rpc->first_response.persisted_size();
418-
impl = std::make_unique<AsyncWriterConnectionImpl>(
419-
current, request, std::move(rpc->stream), hash, persisted_size,
420-
false);
421-
}
422-
return MakeWriterConnectionResumed(std::move(fa), std::move(impl),
423-
std::move(request), std::move(hash),
424-
rpc->first_response, *current);
448+
return MakeAppendableWriter(current, std::move(request), persisted_size,
449+
std::move(fa), f.get());
425450
});
426451
}
427452

google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include "google/cloud/storage/async/options.h"
1516
#include "google/cloud/storage/async/retry_policy.h"
1617
#include "google/cloud/storage/async/writer_connection.h"
1718
#include "google/cloud/storage/internal/async/connection_impl.h"
1819
#include "google/cloud/storage/internal/async/default_options.h"
20+
#include "google/cloud/storage/internal/async/write_object.h"
21+
#include "google/cloud/storage/internal/async/writer_connection_impl.h"
22+
#include "google/cloud/storage/internal/crc32c.h"
1923
#include "google/cloud/storage/testing/canonical_errors.h"
2024
#include "google/cloud/storage/testing/mock_storage_stub.h"
2125
#include "google/cloud/common_options.h"
@@ -627,6 +631,257 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) {
627631
next.first.set_value(true);
628632
}
629633

634+
TEST_F(AsyncConnectionImplAppendableTest,
635+
StartAppendableObjectUploadWithChecksum) {
636+
auto constexpr kRequestText = R"pb(
637+
write_object_spec {
638+
resource {
639+
bucket: "projects/_/buckets/test-bucket"
640+
name: "test-object"
641+
content_type: "text/plain"
642+
}
643+
}
644+
)pb";
645+
AsyncSequencer<bool> sequencer;
646+
auto mock = std::make_shared<storage::testing::MockStorageStub>();
647+
648+
google::storage::v2::Object initial_resource;
649+
initial_resource.set_bucket("projects/_/buckets/test-bucket");
650+
initial_resource.set_name("test-object");
651+
initial_resource.set_size(1024);
652+
initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC
653+
654+
auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
655+
EXPECT_CALL(*stream, Start).WillOnce([&] {
656+
return sequencer.PushBack("Start");
657+
});
658+
659+
EXPECT_CALL(*stream, Read)
660+
.WillOnce([&, initial_resource] {
661+
return sequencer.PushBack("Read(Takeover)")
662+
.then([initial_resource](auto) {
663+
auto response = google::storage::v2::BidiWriteObjectResponse{};
664+
*response.mutable_resource() = initial_resource;
665+
return absl::make_optional(std::move(response));
666+
});
667+
})
668+
.WillOnce([&, initial_resource] {
669+
return sequencer.PushBack("Read(FinalObject)")
670+
.then([initial_resource](auto) {
671+
auto response = google::storage::v2::BidiWriteObjectResponse{};
672+
*response.mutable_resource() = initial_resource;
673+
response.mutable_resource()->set_size(
674+
initial_resource.size() + 9); // "some data" size is 9
675+
return absl::make_optional(std::move(response));
676+
});
677+
});
678+
679+
EXPECT_CALL(*stream, Cancel).Times(1);
680+
EXPECT_CALL(*stream, Finish).WillOnce([&] {
681+
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
682+
});
683+
684+
EXPECT_CALL(*stream, Write)
685+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
686+
grpc::WriteOptions wopt) {
687+
EXPECT_TRUE(request.state_lookup());
688+
EXPECT_FALSE(wopt.is_last_message());
689+
return sequencer.PushBack("Write(StateLookup)");
690+
})
691+
.WillOnce(
692+
[&](google::storage::v2::BidiWriteObjectRequest const& /*request*/,
693+
grpc::WriteOptions wopt) {
694+
EXPECT_FALSE(wopt.is_last_message());
695+
return sequencer.PushBack("Write(data)");
696+
})
697+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
698+
grpc::WriteOptions wopt) {
699+
EXPECT_TRUE(request.finish_write());
700+
EXPECT_TRUE(wopt.is_last_message());
701+
// Here we expect full checksums to be set because we had the resource
702+
// in takeover.
703+
EXPECT_TRUE(request.has_object_checksums());
704+
auto expected_crc =
705+
google::cloud::storage_internal::ExtendCrc32c(12345, "some data");
706+
EXPECT_EQ(request.object_checksums().crc32c(), expected_crc);
707+
return sequencer.PushBack("Write(Finalize)");
708+
});
709+
710+
EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] {
711+
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
712+
});
713+
714+
internal::AutomaticallyCreatedBackgroundThreads pool(1);
715+
// Enable CRC32C validation in options
716+
auto options = TestOptions().set<storage::EnableCrc32cValidationOption>(true);
717+
auto connection = MakeTestConnection(pool.cq(), mock, options);
718+
719+
auto request = google::storage::v2::BidiWriteObjectRequest{};
720+
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
721+
request.mutable_write_object_spec()->set_appendable(true);
722+
723+
auto pending = connection->StartAppendableObjectUpload(
724+
{std::move(request), connection->options()});
725+
726+
auto next = sequencer.PopFrontWithName();
727+
EXPECT_EQ(next.second, "Start");
728+
next.first.set_value(true);
729+
730+
next = sequencer.PopFrontWithName();
731+
EXPECT_EQ(next.second, "Write(StateLookup)");
732+
next.first.set_value(true);
733+
734+
next = sequencer.PopFrontWithName();
735+
EXPECT_EQ(next.second, "Read(Takeover)");
736+
next.first.set_value(true);
737+
738+
auto r = pending.get();
739+
ASSERT_STATUS_OK(r);
740+
auto writer = *std::move(r);
741+
742+
// Write some data.
743+
auto w1 = writer->Write(storage::WritePayload("some data"));
744+
next = sequencer.PopFrontWithName();
745+
EXPECT_EQ(next.second, "Write(data)");
746+
next.first.set_value(true);
747+
EXPECT_STATUS_OK(w1.get());
748+
749+
// Finalize the upload.
750+
auto w2 = writer->Finalize({});
751+
next = sequencer.PopFrontWithName();
752+
EXPECT_EQ(next.second, "Write(Finalize)");
753+
next.first.set_value(true);
754+
next = sequencer.PopFrontWithName();
755+
EXPECT_EQ(next.second, "Read(FinalObject)");
756+
next.first.set_value(true);
757+
758+
auto response = w2.get();
759+
ASSERT_STATUS_OK(response);
760+
761+
writer.reset();
762+
next = sequencer.PopFrontWithName();
763+
EXPECT_EQ(next.second, "Finish");
764+
next.first.set_value(true);
765+
}
766+
767+
TEST_F(AsyncConnectionImplAppendableTest,
768+
ResumeAppendableObjectUploadWithChecksum) {
769+
auto constexpr kRequestText = R"pb(
770+
append_object_spec { object: "test-object" }
771+
)pb";
772+
AsyncSequencer<bool> sequencer;
773+
auto mock = std::make_shared<storage::testing::MockStorageStub>();
774+
775+
constexpr std::int64_t kPersistedSize = 16384;
776+
constexpr std::uint32_t kPersistedCrc = 12345;
777+
778+
auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
779+
EXPECT_CALL(*stream, Start).WillOnce([&] {
780+
return sequencer.PushBack("Start");
781+
});
782+
783+
EXPECT_CALL(*stream, Read)
784+
.WillOnce([&] {
785+
return sequencer.PushBack("Read(PersistedSize)").then([](auto) {
786+
auto response = google::storage::v2::BidiWriteObjectResponse{};
787+
response.set_persisted_size(kPersistedSize);
788+
response.mutable_persisted_data_checksums()->set_crc32c(
789+
kPersistedCrc);
790+
return absl::make_optional(std::move(response));
791+
});
792+
})
793+
.WillOnce([&] {
794+
return sequencer.PushBack("Read(FinalObject)").then([](auto) {
795+
auto response = google::storage::v2::BidiWriteObjectResponse{};
796+
auto object = google::storage::v2::Object{};
797+
object.set_bucket("projects/_/buckets/test-bucket");
798+
object.set_name("test-object");
799+
object.set_size(kPersistedSize + 9);
800+
*response.mutable_resource() = std::move(object);
801+
return absl::make_optional(std::move(response));
802+
});
803+
});
804+
805+
EXPECT_CALL(*stream, Cancel).Times(1);
806+
EXPECT_CALL(*stream, Finish).WillOnce([&] {
807+
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
808+
});
809+
810+
EXPECT_CALL(*stream, Write)
811+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
812+
grpc::WriteOptions wopt) {
813+
EXPECT_TRUE(request.state_lookup());
814+
EXPECT_FALSE(wopt.is_last_message());
815+
return sequencer.PushBack("Write(StateLookup)");
816+
})
817+
.WillOnce(
818+
[&](google::storage::v2::BidiWriteObjectRequest const& /*request*/,
819+
grpc::WriteOptions wopt) {
820+
EXPECT_FALSE(wopt.is_last_message());
821+
return sequencer.PushBack("Write(data)");
822+
})
823+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
824+
grpc::WriteOptions wopt) {
825+
EXPECT_TRUE(request.finish_write());
826+
EXPECT_TRUE(wopt.is_last_message());
827+
EXPECT_TRUE(request.has_object_checksums());
828+
EXPECT_EQ(request.object_checksums().crc32c(), 2901820631);
829+
return sequencer.PushBack("Write(Finalize)");
830+
});
831+
832+
EXPECT_CALL(*mock, AsyncBidiWriteObject)
833+
.WillOnce([&](auto const&, auto, auto) {
834+
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
835+
});
836+
837+
internal::AutomaticallyCreatedBackgroundThreads pool(1);
838+
auto options = TestOptions().set<storage::EnableCrc32cValidationOption>(true);
839+
auto connection = MakeTestConnection(pool.cq(), mock, options);
840+
841+
auto request = google::storage::v2::BidiWriteObjectRequest{};
842+
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
843+
auto pending = connection->ResumeAppendableObjectUpload(
844+
{std::move(request), connection->options()});
845+
846+
auto next = sequencer.PopFrontWithName();
847+
EXPECT_EQ(next.second, "Start");
848+
next.first.set_value(true);
849+
850+
next = sequencer.PopFrontWithName();
851+
EXPECT_EQ(next.second, "Write(StateLookup)");
852+
next.first.set_value(true);
853+
854+
next = sequencer.PopFrontWithName();
855+
EXPECT_EQ(next.second, "Read(PersistedSize)");
856+
next.first.set_value(true);
857+
858+
auto r = pending.get();
859+
ASSERT_STATUS_OK(r);
860+
auto writer = *std::move(r);
861+
862+
auto w1 = writer->Write(storage::WritePayload("some data"));
863+
next = sequencer.PopFrontWithName();
864+
EXPECT_EQ(next.second, "Write(data)");
865+
next.first.set_value(true);
866+
EXPECT_STATUS_OK(w1.get());
867+
868+
auto w2 = writer->Finalize({});
869+
next = sequencer.PopFrontWithName();
870+
EXPECT_EQ(next.second, "Write(Finalize)");
871+
next.first.set_value(true);
872+
next = sequencer.PopFrontWithName();
873+
EXPECT_EQ(next.second, "Read(FinalObject)");
874+
next.first.set_value(true);
875+
876+
auto response = w2.get();
877+
ASSERT_STATUS_OK(response);
878+
879+
writer.reset();
880+
next = sequencer.PopFrontWithName();
881+
EXPECT_EQ(next.second, "Finish");
882+
next.first.set_value(true);
883+
}
884+
630885
} // namespace
631886
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
632887
} // namespace storage_internal

0 commit comments

Comments
 (0)