Skip to content

Commit a899f46

Browse files
authored
feat(storage): Add read chunkwise checksum validation for sync grpc read (googleapis#16107)
* feat(storage): Add read chunkwise checksum validation for sync grpc read * resolve the ai comment * add unit tests
1 parent cfc10a2 commit a899f46

4 files changed

Lines changed: 111 additions & 11 deletions

File tree

google/cloud/storage/internal/grpc/object_read_source.cc

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,14 @@ namespace cloud {
2626
namespace storage_internal {
2727
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2828

29-
GrpcObjectReadSource::GrpcObjectReadSource(TimerSource timer_source,
30-
std::unique_ptr<StreamingRpc> stream)
31-
: timer_source_(std::move(timer_source)), stream_(std::move(stream)) {}
29+
GrpcObjectReadSource::GrpcObjectReadSource(
30+
TimerSource timer_source, std::unique_ptr<StreamingRpc> stream,
31+
std::shared_ptr<storage::internal::HashFunction> hash_function)
32+
: timer_source_(std::move(timer_source)),
33+
stream_(std::move(stream)),
34+
hash_function_(hash_function
35+
? std::move(hash_function)
36+
: storage::internal::CreateNullHashFunction()) {}
3237

3338
StatusOr<storage::internal::HttpResponse> GrpcObjectReadSource::Close() {
3439
if (stream_) stream_ = nullptr;
@@ -74,18 +79,33 @@ StatusOr<storage::internal::ReadSourceResult> GrpcObjectReadSource::Read(
7479
if (!status_.ok()) return status_;
7580
return result;
7681
}
77-
HandleResponse(result, buf, n, std::move(response));
82+
auto handle_status = HandleResponse(result, buf, n, std::move(response));
83+
if (!handle_status.ok()) {
84+
status_ = handle_status;
85+
stream_.reset();
86+
return status_;
87+
}
7888
}
7989

8090
return result;
8191
}
8292

83-
void GrpcObjectReadSource::HandleResponse(
93+
Status GrpcObjectReadSource::HandleResponse(
8494
storage::internal::ReadSourceResult& result, char* buf, std::size_t n,
8595
google::storage::v2::ReadObjectResponse response) {
96+
if (!offset_ && response.has_content_range()) {
97+
offset_ = response.content_range().start();
98+
}
8699
// The google.storage.v1.Storage documentation says this field can be
87100
// empty.
88101
if (response.has_checksummed_data()) {
102+
auto const& data = response.checksummed_data();
103+
auto status = hash_function_->Update(offset_.value_or(0), GetContent(data),
104+
data.crc32c());
105+
if (!status.ok()) return status;
106+
107+
offset_ = offset_.value_or(0) + GetContent(data).size();
108+
89109
auto const offset = result.bytes_received;
90110
result.bytes_received += buffer_.HandleResponse(
91111
buf + offset, n - offset,
@@ -115,6 +135,7 @@ void GrpcObjectReadSource::HandleResponse(
115135
result.storage_class.value_or(metadata.storage_class());
116136
result.size = result.size.value_or(metadata.size());
117137
}
138+
return {};
118139
}
119140

120141
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/grpc/object_read_source.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_GRPC_OBJECT_READ_SOURCE_H
1717

1818
#include "google/cloud/storage/internal/grpc/buffer_read_object_data.h"
19+
#include "google/cloud/storage/internal/hash_function.h"
1920
#include "google/cloud/storage/internal/object_read_source.h"
2021
#include "google/cloud/storage/version.h"
2122
#include "google/cloud/future.h"
2223
#include "google/cloud/internal/streaming_read_rpc.h"
2324
#include "absl/functional/function_ref.h"
25+
#include "absl/types/optional.h"
2426
#include "google/storage/v2/storage.pb.h"
2527
#include <functional>
2628
#include <string>
@@ -49,8 +51,10 @@ class GrpcObjectReadSource : public storage::internal::ObjectReadSource {
4951
// `false` if the timer was canceled, and with `true` if the timer triggered.
5052
using TimerSource = std::function<future<bool>()>;
5153

52-
explicit GrpcObjectReadSource(TimerSource timer_source,
53-
std::unique_ptr<StreamingRpc> stream);
54+
explicit GrpcObjectReadSource(
55+
TimerSource timer_source, std::unique_ptr<StreamingRpc> stream,
56+
std::shared_ptr<storage::internal::HashFunction> hash_function =
57+
storage::internal::CreateNullHashFunction());
5458

5559
~GrpcObjectReadSource() override = default;
5660

@@ -65,12 +69,14 @@ class GrpcObjectReadSource : public storage::internal::ObjectReadSource {
6569
std::size_t n) override;
6670

6771
private:
68-
void HandleResponse(storage::internal::ReadSourceResult& result, char* buf,
69-
std::size_t n,
70-
google::storage::v2::ReadObjectResponse response);
72+
Status HandleResponse(storage::internal::ReadSourceResult& result, char* buf,
73+
std::size_t n,
74+
google::storage::v2::ReadObjectResponse response);
7175

7276
TimerSource timer_source_;
7377
std::unique_ptr<StreamingRpc> stream_;
78+
std::shared_ptr<storage::internal::HashFunction> hash_function_;
79+
absl::optional<std::int64_t> offset_;
7480

7581
// In some cases the gRPC response may contain more data than the buffer
7682
// provided by the application. This buffer stores any excess results.

google/cloud/storage/internal/grpc/object_read_source_test.cc

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "google/cloud/storage/hashing_options.h"
1717
#include "google/cloud/storage/internal/grpc/ctype_cord_workaround.h"
1818
#include "google/cloud/storage/internal/grpc/object_metadata_parser.h"
19+
#include "google/cloud/storage/internal/hash_function_impl.h"
1920
#include "google/cloud/storage/testing/mock_storage_stub.h"
2021
#include "google/cloud/testing_util/status_matchers.h"
2122
#include <google/protobuf/text_format.h>
@@ -350,6 +351,72 @@ TEST(GrpcObjectReadSource, StallTimeout) {
350351
EXPECT_THAT(response, StatusIs(StatusCode::kDeadlineExceeded));
351352
}
352353

354+
TEST(GrpcObjectReadSource, ChecksumMismatch) {
355+
auto mock = std::make_unique<MockObjectMediaStream>();
356+
std::string const contents = "0123456789";
357+
358+
::testing::InSequence sequence;
359+
EXPECT_CALL(*mock, Read)
360+
.WillOnce([&contents](storage_proto::ReadObjectResponse* r) {
361+
SetContent(*r, contents);
362+
r->mutable_checksummed_data()->set_crc32c(123); // Wrong CRC
363+
return absl::nullopt;
364+
});
365+
366+
auto hash_function =
367+
std::make_shared<storage::internal::Crc32cMessageHashFunction>(
368+
storage::internal::CreateNullHashFunction());
369+
370+
GrpcObjectReadSource tested(MakeSimpleTimerSource(), std::move(mock),
371+
std::move(hash_function));
372+
std::vector<char> buffer(1024);
373+
auto response = tested.Read(buffer.data(), buffer.size());
374+
EXPECT_THAT(response, StatusIs(StatusCode::kInvalidArgument,
375+
HasSubstr("mismatched crc32c checksum")));
376+
EXPECT_THAT(tested.Close(),
377+
StatusIs(StatusCode::kInvalidArgument,
378+
HasSubstr("mismatched crc32c checksum")));
379+
}
380+
381+
TEST(GrpcObjectReadSource, ChecksumWithNonZeroOffset) {
382+
auto mock = std::make_unique<MockObjectMediaStream>();
383+
std::string const contents1 = "0123456789";
384+
std::string const contents2 = "abcdefghij";
385+
auto const expected_crc32c_1 = storage::ComputeCrc32cChecksum(contents1);
386+
auto const expected_crc32c_2 = storage::ComputeCrc32cChecksum(contents2);
387+
388+
::testing::InSequence sequence;
389+
EXPECT_CALL(*mock, Read)
390+
.WillOnce([&contents1,
391+
expected_crc32c_1](storage_proto::ReadObjectResponse* r) {
392+
SetContent(*r, contents1);
393+
r->mutable_content_range()->set_start(100);
394+
r->mutable_checksummed_data()->set_crc32c(
395+
storage_internal::Crc32cToProto(expected_crc32c_1).value());
396+
return absl::nullopt;
397+
})
398+
.WillOnce([&contents2,
399+
expected_crc32c_2](storage_proto::ReadObjectResponse* r) {
400+
SetContent(*r, contents2);
401+
r->mutable_checksummed_data()->set_crc32c(
402+
storage_internal::Crc32cToProto(expected_crc32c_2).value());
403+
return absl::nullopt;
404+
})
405+
.WillOnce(Return(Status{}));
406+
EXPECT_CALL(*mock, GetRequestMetadata).WillOnce(Return(RpcMetadata{}));
407+
408+
auto hash_function =
409+
std::make_shared<storage::internal::Crc32cMessageHashFunction>(
410+
storage::internal::CreateNullHashFunction());
411+
412+
GrpcObjectReadSource tested(MakeSimpleTimerSource(), std::move(mock),
413+
std::move(hash_function));
414+
std::vector<char> buffer(1024);
415+
auto response = tested.Read(buffer.data(), buffer.size());
416+
ASSERT_STATUS_OK(response);
417+
EXPECT_EQ(contents1.size() + contents2.size(), response->bytes_received);
418+
}
419+
353420
} // namespace
354421
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
355422
} // namespace storage_internal

google/cloud/storage/internal/grpc/stub.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "google/cloud/storage/internal/grpc/sign_blob_request_parser.h"
3030
#include "google/cloud/storage/internal/grpc/split_write_object_data.h"
3131
#include "google/cloud/storage/internal/grpc/synthetic_self_link.h"
32+
#include "google/cloud/storage/internal/hash_function_impl.h"
3233
#include "google/cloud/storage/internal/storage_stub_factory.h"
3334
#include "google/cloud/storage/options.h"
3435
#include "google/cloud/internal/big_endian.h"
@@ -445,9 +446,14 @@ GrpcStub::ReadObject(rest_internal::RestContext& context,
445446
};
446447
}
447448

449+
auto hash_function =
450+
std::make_shared<storage::internal::Crc32cMessageHashFunction>(
451+
storage::internal::CreateHashFunction(request));
452+
448453
return std::unique_ptr<storage::internal::ObjectReadSource>(
449454
std::make_unique<GrpcObjectReadSource>(std::move(timer_source),
450-
std::move(stream)));
455+
std::move(stream),
456+
std::move(hash_function)));
451457
}
452458

453459
StatusOr<storage::internal::ListObjectsResponse> GrpcStub::ListObjects(

0 commit comments

Comments
 (0)