Skip to content

Commit 915cef5

Browse files
committed
feat(storage): implement full object checksum validation for appendable uploads
1 parent 106f7a8 commit 915cef5

14 files changed

Lines changed: 429 additions & 22 deletions

google/cloud/storage/async/writer.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,21 +70,22 @@ future<StatusOr<AsyncToken>> AsyncWriter::Write(AsyncToken token,
7070
}
7171

7272
future<StatusOr<google::storage::v2::Object>> AsyncWriter::Finalize(
73-
AsyncToken token, WritePayload payload) {
73+
AsyncToken token, WritePayload payload,
74+
absl::optional<Crc32cChecksumValue> const& expected_checksum) {
7475
if (!impl_) return StreamError<google::storage::v2::Object>(GCP_ERROR_INFO());
7576
auto t = storage_internal::MakeAsyncToken(impl_.get());
7677
if (token != t) {
7778
return TokenError<google::storage::v2::Object>(GCP_ERROR_INFO());
7879
}
7980

80-
return impl_->Finalize(std::move(payload)).then([impl = impl_](auto f) {
81-
return f.get();
82-
});
81+
return impl_->Finalize(std::move(payload), expected_checksum)
82+
.then([impl = impl_](auto f) { return f.get(); });
8383
}
8484

8585
future<StatusOr<google::storage::v2::Object>> AsyncWriter::Finalize(
86-
AsyncToken token) {
87-
return Finalize(std::move(token), WritePayload{});
86+
AsyncToken token,
87+
absl::optional<Crc32cChecksumValue> const& expected_checksum) {
88+
return Finalize(std::move(token), WritePayload{}, expected_checksum);
8889
}
8990

9091
future<Status> AsyncWriter::Flush() {

google/cloud/storage/async/writer.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "google/cloud/storage/async/token.h"
1919
#include "google/cloud/storage/async/write_payload.h"
2020
#include "google/cloud/storage/async/writer_connection.h"
21+
#include "google/cloud/storage/hashing_options.h"
2122
#include "google/cloud/future.h"
2223
#include "google/cloud/status_or.h"
2324
#include "google/cloud/version.h"
@@ -115,13 +116,18 @@ class AsyncWriter {
115116
future<StatusOr<AsyncToken>> Write(AsyncToken token, WritePayload payload);
116117

117118
/// Finalize the upload with the existing data.
118-
future<StatusOr<google::storage::v2::Object>> Finalize(AsyncToken token);
119+
future<StatusOr<google::storage::v2::Object>> Finalize(
120+
AsyncToken token,
121+
absl::optional<Crc32cChecksumValue> const& expected_checksum =
122+
absl::nullopt);
119123

120124
/**
121125
* Upload @p payload and then finalize the upload.
122126
*/
123-
future<StatusOr<google::storage::v2::Object>> Finalize(AsyncToken token,
124-
WritePayload payload);
127+
future<StatusOr<google::storage::v2::Object>> Finalize(
128+
AsyncToken token, WritePayload payload,
129+
absl::optional<Crc32cChecksumValue> const& expected_checksum =
130+
absl::nullopt);
125131

126132
/**
127133
* Flush any buffered data to the service.

google/cloud/storage/async/writer_connection.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_ASYNC_WRITER_CONNECTION_H
1717

1818
#include "google/cloud/storage/async/write_payload.h"
19+
#include "google/cloud/storage/hashing_options.h"
1920
#include "google/cloud/storage/object_metadata.h"
2021
#include "google/cloud/future.h"
2122
#include "google/cloud/rpc_metadata.h"
@@ -112,6 +113,11 @@ class AsyncWriterConnection {
112113
/// Finalizes an upload.
113114
virtual future<StatusOr<google::storage::v2::Object>> Finalize(
114115
WritePayload) = 0;
116+
virtual future<StatusOr<google::storage::v2::Object>> Finalize(
117+
WritePayload p,
118+
absl::optional<Crc32cChecksumValue> const& /*expected_checksum*/) {
119+
return Finalize(std::move(p));
120+
}
115121

116122
/// Uploads some data to the service and flushes the value.
117123
virtual future<Status> Flush(WritePayload payload) = 0;

google/cloud/storage/internal/async/connection_impl.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,16 @@ StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> MakeAppendableWriter(
137137
current, request, std::move(rpc->stream), hash, resource, false);
138138
} else {
139139
persisted_size = rpc->first_response.persisted_size();
140-
hash = CreateHashFunction(*current);
140+
if (current->get<storage::EnableCrc32cValidationOption>() &&
141+
rpc->first_response.has_persisted_data_checksums() &&
142+
rpc->first_response.persisted_data_checksums().has_crc32c()) {
143+
hash = std::make_shared<
144+
::google::cloud::storage::internal::Crc32cHashFunction>(
145+
rpc->first_response.persisted_data_checksums().crc32c(),
146+
persisted_size);
147+
} else {
148+
hash = CreateHashFunction(*current);
149+
}
141150
auto checksums = rpc->first_response.has_persisted_data_checksums()
142151
? absl::make_optional(
143152
rpc->first_response.persisted_data_checksums())

google/cloud/storage/internal/async/writer_connection_buffered.cc

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ class AsyncWriterConnectionBufferedState
106106
return Impl(std::unique_lock<std::mutex>(mu_))->PersistedState();
107107
}
108108

109+
absl::optional<google::storage::v2::ObjectChecksums> PersistedChecksums()
110+
const {
111+
return Impl(std::unique_lock<std::mutex>(mu_))->PersistedChecksums();
112+
}
113+
109114
future<Status> Write(storage::WritePayload const& p) {
110115
std::unique_lock<std::mutex> lk(mu_);
111116
resend_buffer_.Append(WritePayloadImpl::GetImpl(p));
@@ -114,7 +119,15 @@ class AsyncWriterConnectionBufferedState
114119

115120
future<StatusOr<google::storage::v2::Object>> Finalize(
116121
storage::WritePayload const& p) {
122+
return Finalize(p, absl::nullopt);
123+
}
124+
future<StatusOr<google::storage::v2::Object>> Finalize(
125+
storage::WritePayload const& p,
126+
absl::optional<storage::Crc32cChecksumValue> const& expected_checksum) {
117127
std::unique_lock<std::mutex> lk(mu_);
128+
if (expected_checksum.has_value()) {
129+
expected_checksum_ = expected_checksum;
130+
}
118131
resend_buffer_.Append(WritePayloadImpl::GetImpl(p));
119132
finalize_ = true;
120133
HandleNewData(std::move(lk), true);
@@ -246,7 +259,7 @@ class AsyncWriterConnectionBufferedState
246259
auto impl = Impl(lk);
247260
lk.unlock();
248261
// Finalize with an empty payload.
249-
(void)impl->Finalize(storage::WritePayload{})
262+
(void)impl->Finalize(storage::WritePayload{}, expected_checksum_)
250263
.then([w = WeakFromThis()](auto f) {
251264
if (auto self = w.lock()) return self->OnFinalize(f.get());
252265
});
@@ -636,6 +649,7 @@ class AsyncWriterConnectionBufferedState
636649
// Retrieve the future in the constructor, as some operations reset
637650
// finalized_.
638651
future<StatusOr<google::storage::v2::Object>> finalized_future_;
652+
absl::optional<storage::Crc32cChecksumValue> expected_checksum_;
639653

640654
// The result of calling `Close()`. Note that only one such call is ever
641655
// made.
@@ -763,13 +777,24 @@ class AsyncWriterConnectionBuffered : public storage::AsyncWriterConnection {
763777
return state_->PersistedState();
764778
}
765779

780+
absl::optional<google::storage::v2::ObjectChecksums> PersistedChecksums()
781+
const override {
782+
return state_->PersistedChecksums();
783+
}
784+
766785
future<Status> Write(storage::WritePayload p) override {
767786
return state_->Write(std::move(p));
768787
}
769788

770789
future<StatusOr<google::storage::v2::Object>> Finalize(
771790
storage::WritePayload p) override {
772-
return state_->Finalize(std::move(p));
791+
return Finalize(std::move(p), absl::nullopt);
792+
}
793+
future<StatusOr<google::storage::v2::Object>> Finalize(
794+
storage::WritePayload p,
795+
absl::optional<storage::Crc32cChecksumValue> const& expected_checksum)
796+
override {
797+
return state_->Finalize(std::move(p), expected_checksum);
773798
}
774799

775800
future<Status> Flush(storage::WritePayload p) override {

google/cloud/storage/internal/async/writer_connection_impl.cc

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,22 +140,48 @@ future<Status> AsyncWriterConnectionImpl::Write(storage::WritePayload payload) {
140140
}
141141

142142
future<StatusOr<google::storage::v2::Object>>
143-
AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) {
143+
AsyncWriterConnectionImpl::Finalize(
144+
storage::WritePayload payload,
145+
absl::optional<storage::Crc32cChecksumValue> const& expected_checksum) {
144146
auto write = MakeRequest();
145147
write.set_finish_write(true);
146148

147149
auto p = WritePayloadImpl::GetImpl(payload);
148150
auto size = p.size();
151+
152+
if (p.empty() && expected_checksum.has_value()) {
153+
auto const actual = hash_function_->Finish().crc32c;
154+
if (!actual.empty() && expected_checksum->value() != actual) {
155+
return make_ready_future(StatusOr<google::storage::v2::Object>(
156+
google::cloud::internal::DataLossError(
157+
"client checksum mismatch: expected " +
158+
expected_checksum->value() + " got " + actual,
159+
GCP_ERROR_INFO())));
160+
}
161+
}
162+
149163
auto action = request_.has_append_object_spec() ||
150164
request_.write_object_spec().appendable()
151165
? PartialUpload::kFinalize
152166
: PartialUpload::kFinalizeWithChecksum;
153167
auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write),
154168
std::move(p), std::move(action));
155-
return coro->Start().then([coro, size, this](auto f) mutable {
156-
coro.reset(); // breaks the cycle between the completion queue and coro
157-
return OnFinalUpload(size, f.get());
158-
});
169+
return coro->Start().then(
170+
[coro, size, expected_checksum, this](auto f) mutable {
171+
coro.reset(); // breaks the cycle between the completion queue and coro
172+
auto res = f.get();
173+
if (res.ok() && *res && expected_checksum.has_value()) {
174+
auto const actual = hash_function_->Finish().crc32c;
175+
if (!actual.empty() && expected_checksum->value() != actual) {
176+
return make_ready_future(StatusOr<google::storage::v2::Object>(
177+
google::cloud::internal::DataLossError(
178+
"client checksum mismatch: expected " +
179+
expected_checksum->value() + " got " + actual,
180+
GCP_ERROR_INFO())));
181+
}
182+
}
183+
return OnFinalUpload(size, std::move(res));
184+
});
159185
}
160186

161187
future<Status> AsyncWriterConnectionImpl::Flush(storage::WritePayload payload) {

google/cloud/storage/internal/async/writer_connection_impl.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,13 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection {
6969

7070
future<Status> Write(storage::WritePayload payload) override;
7171
future<StatusOr<google::storage::v2::Object>> Finalize(
72-
storage::WritePayload) override;
72+
storage::WritePayload payload) override {
73+
return Finalize(std::move(payload), absl::nullopt);
74+
}
75+
future<StatusOr<google::storage::v2::Object>> Finalize(
76+
storage::WritePayload payload,
77+
absl::optional<storage::Crc32cChecksumValue> const& expected_checksum)
78+
override;
7379
future<Status> Flush(storage::WritePayload payload) override;
7480
future<Status> Close(storage::WritePayload payload) override;
7581
future<StatusOr<std::int64_t>> Query() override;

google/cloud/storage/internal/async/writer_connection_impl_test.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,68 @@ TEST(AsyncWriterConnectionTest, CloseError) {
904904
EXPECT_THAT(response.get(), StatusIs(PermanentError().code()));
905905
}
906906

907+
TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchImmediate) {
908+
auto mock = std::make_unique<MockStream>();
909+
EXPECT_CALL(*mock, Cancel).Times(1);
910+
EXPECT_CALL(*mock, Finish).WillOnce([] {
911+
return make_ready_future(Status{});
912+
});
913+
auto hash = std::make_shared<MockHashFunction>();
914+
EXPECT_CALL(*hash, Finish).WillOnce(Return(storage::internal::HashValues{"ImIEBA==", ""}));
915+
916+
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
917+
TestOptions(), MakeRequest(), std::move(mock), hash, 1024);
918+
auto response = tested->Finalize(WritePayload{}, storage::Crc32cChecksumValue("AAAAAA=="));
919+
EXPECT_THAT(response.get(), StatusIs(StatusCode::kDataLoss));
920+
}
921+
922+
TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMatchImmediate) {
923+
auto mock = std::make_unique<MockStream>();
924+
EXPECT_CALL(*mock, Cancel).Times(1);
925+
EXPECT_CALL(*mock, Finish).WillOnce([] {
926+
return make_ready_future(Status{});
927+
});
928+
EXPECT_CALL(*mock, Write).WillOnce([](Request const&, grpc::WriteOptions) {
929+
return make_ready_future(true);
930+
});
931+
EXPECT_CALL(*mock, Read).WillOnce([] {
932+
return make_ready_future(absl::make_optional(MakeTestResponse()));
933+
});
934+
auto hash = std::make_shared<MockHashFunction>();
935+
EXPECT_CALL(*hash, Finish).WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""}));
936+
937+
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
938+
TestOptions(), MakeRequest(), std::move(mock), hash, 1024);
939+
auto response = tested->Finalize(WritePayload{}, storage::Crc32cChecksumValue("ImIEBA=="));
940+
EXPECT_THAT(response.get(), IsOk());
941+
}
942+
943+
TEST(AsyncWriterConnectionTest, FinalizeExpectedChecksumMismatchOnComplete) {
944+
AsyncSequencer<bool> sequencer;
945+
auto mock = std::make_unique<MockStream>();
946+
EXPECT_CALL(*mock, Cancel).Times(1);
947+
EXPECT_CALL(*mock, Finish).WillOnce([] {
948+
return make_ready_future(Status{});
949+
});
950+
EXPECT_CALL(*mock, Write)
951+
.WillOnce([&](Request const& request, grpc::WriteOptions wopt) {
952+
EXPECT_TRUE(request.finish_write());
953+
EXPECT_TRUE(wopt.is_last_message());
954+
return sequencer.PushBack("Write");
955+
});
956+
auto hash = std::make_shared<MockHashFunction>();
957+
EXPECT_CALL(*hash, Update(_, An<absl::Cord const&>(), _)).Times(1);
958+
EXPECT_CALL(*hash, Finish).WillRepeatedly(Return(storage::internal::HashValues{"ImIEBA==", ""}));
959+
960+
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
961+
TestOptions(), MakeRequest(), std::move(mock), hash, 1024);
962+
auto response = tested->Finalize(WritePayload(std::string(128, 'A')), storage::Crc32cChecksumValue("AAAAAA=="));
963+
auto next = sequencer.PopFrontWithName();
964+
ASSERT_THAT(next.second, "Write");
965+
next.first.set_value(true);
966+
EXPECT_THAT(response.get(), StatusIs(StatusCode::kDataLoss));
967+
}
968+
907969
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
908970
} // namespace storage_internal
909971
} // namespace cloud

0 commit comments

Comments
 (0)