|
17 | 17 | #include "google/cloud/storage/internal/async/handle_redirect_error.h" |
18 | 18 | #include "google/cloud/storage/internal/async/multi_stream_manager.h" |
19 | 19 | #include "google/cloud/storage/internal/async/object_descriptor_reader_tracing.h" |
| 20 | +#include "google/cloud/storage/internal/grpc/object_metadata_parser.h" |
20 | 21 | #include "google/cloud/storage/internal/hash_function.h" |
21 | 22 | #include "google/cloud/storage/internal/hash_function_impl.h" |
| 23 | +#include "google/cloud/storage/internal/hash_validator.h" |
| 24 | +#include "google/cloud/storage/internal/hash_validator_impl.h" |
| 25 | +#include "google/cloud/storage/internal/hash_values.h" |
22 | 26 | #include "google/cloud/grpc_error_delegate.h" |
23 | 27 | #include "google/cloud/internal/opentelemetry.h" |
24 | 28 | #include "google/rpc/status.pb.h" |
@@ -149,15 +153,20 @@ void ObjectDescriptorImpl::MakeSubsequentStream() { |
149 | 153 |
|
150 | 154 | std::unique_ptr<storage::AsyncReaderConnection> ObjectDescriptorImpl::Read( |
151 | 155 | ReadParams p) { |
152 | | - std::shared_ptr<storage::internal::HashFunction> hash_function = |
153 | | - std::shared_ptr<storage::internal::HashFunction>( |
154 | | - storage::internal::CreateNullHashFunction()); |
155 | | - if (options_.has<storage::EnableCrc32cValidationOption>()) { |
156 | | - hash_function = |
157 | | - std::make_shared<storage::internal::Crc32cMessageHashFunction>( |
158 | | - storage::internal::CreateNullHashFunction()); |
159 | | - } |
160 | | - auto range = std::make_shared<ReadRange>(p.start, p.length, hash_function); |
| 156 | + // Full-object checksum validation (both CRC32C and MD5) is only supported for |
| 157 | + // full-object reads (starting at offset 0 and reading the entire object). |
| 158 | + // |
| 159 | + // Note that MD5 validation is not supported for partial/ranged reads because |
| 160 | + // GCS does not compute or send chunk-level MD5 checksums (unlike CRC32C, |
| 161 | + // which is validated per-chunk on the gRPC layer). |
| 162 | + bool is_full_read = (p.start == 0 && metadata_.has_value() && |
| 163 | + (p.length == 0 || p.length >= metadata_->size())); |
| 164 | + |
| 165 | + auto hash_function = CreateHashFunction(is_full_read); |
| 166 | + auto hash_validator = CreateHashValidator(is_full_read); |
| 167 | + |
| 168 | + auto range = std::make_shared<ReadRange>(p.start, p.length, hash_function, |
| 169 | + std::move(hash_validator)); |
161 | 170 |
|
162 | 171 | std::unique_lock<std::mutex> lk(mu_); |
163 | 172 | if (stream_manager_->Empty()) { |
@@ -188,6 +197,80 @@ std::unique_ptr<storage::AsyncReaderConnection> ObjectDescriptorImpl::Read( |
188 | 197 | return MakeTracingObjectDescriptorReader(std::move(range)); |
189 | 198 | } |
190 | 199 |
|
| 200 | +std::shared_ptr<storage::internal::HashFunction> |
| 201 | +ObjectDescriptorImpl::CreateHashFunction(bool is_full_read) const { |
| 202 | + auto const enable_crc32c = |
| 203 | + options_.get<storage::EnableCrc32cValidationOption>(); |
| 204 | + auto const enable_md5 = options_.get<storage::EnableMD5ValidationOption>(); |
| 205 | + |
| 206 | + if (enable_crc32c) { |
| 207 | + std::unique_ptr<storage::internal::HashFunction> child; |
| 208 | + if (is_full_read) { |
| 209 | + if (enable_md5) { |
| 210 | + child = std::make_unique<storage::internal::CompositeFunction>( |
| 211 | + std::make_unique<storage::internal::Crc32cHashFunction>(), |
| 212 | + storage::internal::MD5HashFunction::Create()); |
| 213 | + } else { |
| 214 | + child = std::make_unique<storage::internal::Crc32cHashFunction>(); |
| 215 | + } |
| 216 | + } else { |
| 217 | + child = storage::internal::CreateNullHashFunction(); |
| 218 | + } |
| 219 | + return std::make_shared<storage::internal::Crc32cMessageHashFunction>( |
| 220 | + std::move(child)); |
| 221 | + } |
| 222 | + if (enable_md5 && is_full_read) { |
| 223 | + return std::shared_ptr<storage::internal::HashFunction>( |
| 224 | + storage::internal::MD5HashFunction::Create()); |
| 225 | + } |
| 226 | + return std::shared_ptr<storage::internal::HashFunction>( |
| 227 | + storage::internal::CreateNullHashFunction()); |
| 228 | +} |
| 229 | + |
| 230 | +std::unique_ptr<storage::internal::HashValidator> |
| 231 | +ObjectDescriptorImpl::CreateHashValidator(bool is_full_read) const { |
| 232 | + if (!is_full_read) { |
| 233 | + return storage::internal::CreateNullHashValidator(); |
| 234 | + } |
| 235 | + |
| 236 | + auto const enable_crc32c = |
| 237 | + options_.get<storage::EnableCrc32cValidationOption>(); |
| 238 | + auto const enable_md5 = options_.get<storage::EnableMD5ValidationOption>(); |
| 239 | + |
| 240 | + std::unique_ptr<storage::internal::HashValidator> hash_validator; |
| 241 | + if (enable_crc32c && enable_md5) { |
| 242 | + hash_validator = std::make_unique<storage::internal::CompositeValidator>( |
| 243 | + std::make_unique<storage::internal::Crc32cHashValidator>(), |
| 244 | + std::make_unique<storage::internal::MD5HashValidator>()); |
| 245 | + } else if (enable_crc32c) { |
| 246 | + hash_validator = std::make_unique<storage::internal::Crc32cHashValidator>(); |
| 247 | + } else if (enable_md5) { |
| 248 | + hash_validator = std::make_unique<storage::internal::MD5HashValidator>(); |
| 249 | + } else { |
| 250 | + return storage::internal::CreateNullHashValidator(); |
| 251 | + } |
| 252 | + |
| 253 | + // Process the expected hashes from metadata |
| 254 | + storage::internal::HashValues hashes; |
| 255 | + if (metadata_->has_checksums()) { |
| 256 | + auto const& checksums = metadata_->checksums(); |
| 257 | + if (checksums.has_crc32c()) { |
| 258 | + hashes = |
| 259 | + Merge(std::move(hashes), |
| 260 | + storage::internal::HashValues{ |
| 261 | + storage_internal::Crc32cFromProto(checksums.crc32c()), {}}); |
| 262 | + } |
| 263 | + if (!checksums.md5_hash().empty()) { |
| 264 | + hashes = |
| 265 | + Merge(std::move(hashes), |
| 266 | + storage::internal::HashValues{ |
| 267 | + {}, storage_internal::MD5FromProto(checksums.md5_hash())}); |
| 268 | + } |
| 269 | + } |
| 270 | + hash_validator->ProcessHashValues(hashes); |
| 271 | + return hash_validator; |
| 272 | +} |
| 273 | + |
191 | 274 | void ObjectDescriptorImpl::Flush(std::unique_lock<std::mutex> lk, |
192 | 275 | StreamIterator it) { |
193 | 276 | if (it->stream->write_pending || |
|
0 commit comments