Skip to content

Commit d56f271

Browse files
committed
feat(storage): Add read chunkwise checksum validation for sync grpc read
1 parent f0bb606 commit d56f271

4 files changed

Lines changed: 61 additions & 11 deletions

File tree

google/cloud/storage/internal/grpc/object_read_source.cc

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@ namespace cloud {
2626
namespace storage_internal {
2727
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2828

29-
GrpcObjectReadSource::GrpcObjectReadSource(TimerSource timer_source,
30-
std::unique_ptr<StreamingRpc> stream)
31-
: timer_source_(std::move(timer_source)), stream_(std::move(stream)) {}
29+
GrpcObjectReadSource::GrpcObjectReadSource(
30+
TimerSource timer_source, std::unique_ptr<StreamingRpc> stream,
31+
std::shared_ptr<storage::internal::HashFunction> hash_function)
32+
: timer_source_(std::move(timer_source)),
33+
stream_(std::move(stream)),
34+
hash_function_(std::move(hash_function)) {}
3235

3336
StatusOr<storage::internal::HttpResponse> GrpcObjectReadSource::Close() {
3437
if (stream_) stream_ = nullptr;
@@ -74,18 +77,30 @@ StatusOr<storage::internal::ReadSourceResult> GrpcObjectReadSource::Read(
7477
if (!status_.ok()) return status_;
7578
return result;
7679
}
77-
HandleResponse(result, buf, n, std::move(response));
80+
auto handle_status = HandleResponse(result, buf, n, std::move(response));
81+
if (!handle_status.ok()) return handle_status;
7882
}
7983

8084
return result;
8185
}
8286

83-
void GrpcObjectReadSource::HandleResponse(
87+
Status GrpcObjectReadSource::HandleResponse(
8488
storage::internal::ReadSourceResult& result, char* buf, std::size_t n,
8589
google::storage::v2::ReadObjectResponse response) {
90+
if (!offset_ && response.has_content_range()) {
91+
offset_ = response.content_range().start();
92+
}
8693
// The google.storage.v1.Storage documentation says this field can be
8794
// empty.
8895
if (response.has_checksummed_data()) {
96+
auto const& data = response.checksummed_data();
97+
auto status = hash_function_->Update(offset_.value_or(0),
98+
GetContent(data),
99+
data.crc32c());
100+
if (!status.ok()) return status;
101+
102+
offset_ = offset_.value_or(0) + GetContent(data).size();
103+
89104
auto const offset = result.bytes_received;
90105
result.bytes_received += buffer_.HandleResponse(
91106
buf + offset, n - offset,
@@ -115,6 +130,7 @@ void GrpcObjectReadSource::HandleResponse(
115130
result.storage_class.value_or(metadata.storage_class());
116131
result.size = result.size.value_or(metadata.size());
117132
}
133+
return {};
118134
}
119135

120136
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/grpc/object_read_source.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
#include "google/cloud/future.h"
2222
#include "google/cloud/internal/streaming_read_rpc.h"
2323
#include "absl/functional/function_ref.h"
24+
#include "google/cloud/storage/internal/hash_function.h"
2425
#include "google/storage/v2/storage.pb.h"
26+
#include "absl/types/optional.h"
2527
#include <functional>
2628
#include <string>
2729

@@ -49,8 +51,10 @@ class GrpcObjectReadSource : public storage::internal::ObjectReadSource {
4951
// `false` if the timer was canceled, and with `true` if the timer triggered.
5052
using TimerSource = std::function<future<bool>()>;
5153

52-
explicit GrpcObjectReadSource(TimerSource timer_source,
53-
std::unique_ptr<StreamingRpc> stream);
54+
explicit GrpcObjectReadSource(
55+
TimerSource timer_source, std::unique_ptr<StreamingRpc> stream,
56+
std::shared_ptr<storage::internal::HashFunction> hash_function =
57+
storage::internal::CreateNullHashFunction());
5458

5559
~GrpcObjectReadSource() override = default;
5660

@@ -65,12 +69,14 @@ class GrpcObjectReadSource : public storage::internal::ObjectReadSource {
6569
std::size_t n) override;
6670

6771
private:
68-
void HandleResponse(storage::internal::ReadSourceResult& result, char* buf,
69-
std::size_t n,
70-
google::storage::v2::ReadObjectResponse response);
72+
Status HandleResponse(storage::internal::ReadSourceResult& result, char* buf,
73+
std::size_t n,
74+
google::storage::v2::ReadObjectResponse response);
7175

7276
TimerSource timer_source_;
7377
std::unique_ptr<StreamingRpc> stream_;
78+
std::shared_ptr<storage::internal::HashFunction> hash_function_;
79+
absl::optional<std::int64_t> offset_;
7480

7581
// In some cases the gRPC response may contain more data than the buffer
7682
// provided by the application. This buffer stores any excess results.

google/cloud/storage/internal/grpc/object_read_source_test.cc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "google/cloud/storage/internal/grpc/object_metadata_parser.h"
1919
#include "google/cloud/storage/testing/mock_storage_stub.h"
2020
#include "google/cloud/testing_util/status_matchers.h"
21+
#include "google/cloud/storage/internal/hash_function_impl.h"
2122
#include <google/protobuf/text_format.h>
2223
#include <google/protobuf/util/message_differencer.h>
2324
#include <gmock/gmock.h>
@@ -350,6 +351,27 @@ TEST(GrpcObjectReadSource, StallTimeout) {
350351
EXPECT_THAT(response, StatusIs(StatusCode::kDeadlineExceeded));
351352
}
352353

354+
TEST(GrpcObjectReadSource, ChecksumMismatch) {
355+
auto mock = std::make_unique<MockObjectMediaStream>();
356+
std::string const contents = "0123456789";
357+
358+
::testing::InSequence sequence;
359+
EXPECT_CALL(*mock, Read)
360+
.WillOnce([&contents](storage_proto::ReadObjectResponse* r) {
361+
SetContent(*r, contents);
362+
r->mutable_checksummed_data()->set_crc32c(123); // Wrong CRC
363+
return absl::nullopt;
364+
});
365+
366+
auto hash_function = std::make_shared<storage::internal::Crc32cMessageHashFunction>(
367+
storage::internal::CreateNullHashFunction());
368+
369+
GrpcObjectReadSource tested(MakeSimpleTimerSource(), std::move(mock), std::move(hash_function));
370+
std::vector<char> buffer(1024);
371+
auto response = tested.Read(buffer.data(), buffer.size());
372+
EXPECT_THAT(response, StatusIs(StatusCode::kInvalidArgument, HasSubstr("mismatched crc32c checksum")));
373+
}
374+
353375
} // namespace
354376
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
355377
} // namespace storage_internal

google/cloud/storage/internal/grpc/stub.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "google/cloud/storage/internal/grpc/synthetic_self_link.h"
3232
#include "google/cloud/storage/internal/storage_stub_factory.h"
3333
#include "google/cloud/storage/options.h"
34+
#include "google/cloud/storage/internal/hash_function_impl.h"
3435
#include "google/cloud/internal/big_endian.h"
3536
#include "google/cloud/internal/invoke_result.h"
3637
#include "google/cloud/internal/make_status.h"
@@ -445,9 +446,14 @@ GrpcStub::ReadObject(rest_internal::RestContext& context,
445446
};
446447
}
447448

449+
auto hash_function =
450+
std::make_shared<storage::internal::Crc32cMessageHashFunction>(
451+
storage::internal::CreateHashFunction(request));
452+
448453
return std::unique_ptr<storage::internal::ObjectReadSource>(
449454
std::make_unique<GrpcObjectReadSource>(std::move(timer_source),
450-
std::move(stream)));
455+
std::move(stream),
456+
std::move(hash_function)));
451457
}
452458

453459
StatusOr<storage::internal::ListObjectsResponse> GrpcStub::ListObjects(

0 commit comments

Comments
 (0)