Skip to content

Commit 4e64a8b

Browse files
committed
feat: add default end-to-end checksumming for several upload methods via grpc transport
* Storage#blobWriteSession for BlobWriteSessionConfigs.getDefault() * Storage#blobWriteSession for BlobWriteSessionConfigs.bufferToTempDirThenUpload() * Storage#blobWriteSession for BlobWriteSessionConfigs.bufferToDiskThenUpload(*) * Storage#create(BlobInfo, InputStream) * Storage#createFrom(BlobInfo, InputStream) * Storage#createFrom(BlobInfo, Path) * Storage#writer
1 parent f9ce548 commit 4e64a8b

12 files changed

Lines changed: 134 additions & 80 deletions

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
163163
grpc.storageClient
164164
.writeObjectCallable()
165165
.withDefaultCallContext(grpcCallContext))
166-
.setHasher(Hasher.noop())
166+
.setHasher(opts.getHasher())
167167
.setByteStringStrategy(ByteStringStrategy.copy())
168168
.resumable()
169169
.withRetryConfig(

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ final class GapicWritableByteChannelSessionBuilder {
5050
GapicWritableByteChannelSessionBuilder(
5151
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write) {
5252
this.write = write;
53-
this.hasher = Hasher.noop();
53+
this.hasher = Hasher.defaultHasher();
5454
this.byteStringStrategy = ByteStringStrategy.copy();
5555
}
5656

5757
/**
5858
* Set the {@link Hasher} to apply to the bytes passing through the built session's channel.
5959
*
60-
* <p>Default: {@link Hasher#noop()}
60+
* <p>Default: {@link Hasher#defaultHasher()}
6161
*
6262
* @see Hasher#enabled()
6363
* @see Hasher#noop()
@@ -179,14 +179,17 @@ UnbufferedDirectUploadBuilder setRequest(WriteObjectRequest req) {
179179
}
180180

181181
UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
182+
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
182183
return new UnbufferedWriteSession<>(
183184
ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")),
184185
lift((WriteObjectRequest start, SettableApiFuture<WriteObjectResponse> resultFuture) ->
185186
new GapicUnbufferedDirectWritableByteChannel(
186187
resultFuture,
187-
getChunkSegmenter(),
188+
chunkSegmenter,
188189
write,
189-
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))))
190+
WriteCtx.of(
191+
WriteObjectRequestBuilderFactory.simple(start),
192+
chunkSegmenter.getHasher())))
190193
.andThen(StorageByteChannels.writable()::createSynchronized));
191194
}
192195
}
@@ -207,14 +210,17 @@ BufferedDirectUploadBuilder setRequest(WriteObjectRequest req) {
207210
}
208211

209212
BufferedWritableByteChannelSession<WriteObjectResponse> build() {
213+
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
210214
return new BufferedWriteSession<>(
211215
ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")),
212216
lift((WriteObjectRequest start, SettableApiFuture<WriteObjectResponse> resultFuture) ->
213217
new GapicUnbufferedDirectWritableByteChannel(
214218
resultFuture,
215-
getChunkSegmenter(),
219+
chunkSegmenter,
216220
write,
217-
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))))
221+
WriteCtx.of(
222+
WriteObjectRequestBuilderFactory.simple(start),
223+
chunkSegmenter.getHasher())))
218224
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
219225
.andThen(StorageByteChannels.writable()::createSynchronized));
220226
}
@@ -290,20 +296,24 @@ UnbufferedResumableUploadBuilder setStartAsync(ApiFuture<ResumableWrite> start)
290296

291297
UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
292298
RetrierWithAlg boundRetrier = retrier;
299+
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
293300
return new UnbufferedWriteSession<>(
294301
requireNonNull(start, "start must be non null"),
295302
lift((ResumableWrite start, SettableApiFuture<WriteObjectResponse> result) -> {
296303
if (fsyncEvery) {
297304
return new GapicUnbufferedChunkedResumableWritableByteChannel(
298305
result,
299-
getChunkSegmenter(),
306+
chunkSegmenter,
300307
write,
301-
new WriteCtx<>(start),
308+
WriteCtx.of(start, chunkSegmenter.getHasher()),
302309
boundRetrier,
303310
Retrying::newCallContext);
304311
} else {
305312
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
306-
result, getChunkSegmenter(), write, new WriteCtx<>(start));
313+
result,
314+
chunkSegmenter,
315+
write,
316+
WriteCtx.of(start, chunkSegmenter.getHasher()));
307317
}
308318
})
309319
.andThen(StorageByteChannels.writable()::createSynchronized));
@@ -330,20 +340,24 @@ BufferedResumableUploadBuilder setStartAsync(ApiFuture<ResumableWrite> start) {
330340
}
331341

332342
BufferedWritableByteChannelSession<WriteObjectResponse> build() {
343+
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
333344
return new BufferedWriteSession<>(
334345
requireNonNull(start, "start must be non null"),
335346
lift((ResumableWrite start, SettableApiFuture<WriteObjectResponse> result) -> {
336347
if (fsyncEvery) {
337348
return new GapicUnbufferedChunkedResumableWritableByteChannel(
338349
result,
339-
getChunkSegmenter(),
350+
chunkSegmenter,
340351
write,
341-
new WriteCtx<>(start),
352+
WriteCtx.of(start, chunkSegmenter.getHasher()),
342353
retrier,
343354
Retrying::newCallContext);
344355
} else {
345356
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
346-
result, getChunkSegmenter(), write, new WriteCtx<>(start));
357+
result,
358+
chunkSegmenter,
359+
write,
360+
WriteCtx.of(start, chunkSegmenter.getHasher()));
347361
}
348362
})
349363
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op
263263
GrpcCallContext grpcCallContext =
264264
optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
265265
WriteObjectRequest req = getWriteObjectRequest(blobInfo, optsWithDefaults);
266-
Hasher hasher = Hasher.enabled();
266+
Hasher hasher = optsWithDefaults.getHasher();
267267
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
268268
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
269269
ResumableMedia.gapic()
@@ -324,7 +324,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
324324
write,
325325
storageClient.queryWriteStatusCallable(),
326326
rw,
327-
Hasher.noop()),
327+
opts.getHasher()),
328328
MoreExecutors.directExecutor());
329329
try {
330330
GrpcResumableSession got = session2.get();
@@ -365,7 +365,7 @@ public Blob createFrom(
365365
.write()
366366
.byteChannel(
367367
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
368-
.setHasher(Hasher.noop())
368+
.setHasher(opts.getHasher())
369369
.setByteStringStrategy(ByteStringStrategy.noCopy())
370370
.resumable()
371371
.withRetryConfig(retrier.withAlg(retryAlgorithmManager.idempotent()))
@@ -779,7 +779,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
779779
GrpcCallContext grpcCallContext =
780780
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
781781
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
782-
Hasher hasher = Hasher.noop();
782+
Hasher hasher = opts.getHasher();
783783
// in JSON, the starting of the resumable session happens before the invocation of write can
784784
// happen. Emulate the same thing here.
785785
// 1. create the future

google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ void validateUnchecked(Crc32cValue<?> expected, ByteString byteString)
7676
@Nullable Crc32cLengthKnown nullSafeConcat(
7777
@Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2);
7878

79+
/**
80+
* The initial value to use for this hasher.
81+
*
82+
* <p>Not ideal, really we should always start with {@link Crc32cValue#zero()} but this saves us
83+
* from having to plumb the initial value along with the actual hasher to the constructor of the
84+
* WriteCtx when hashing is disabled because of user provided crc32c/md5 preconditions.
85+
*/
86+
@Nullable Crc32cLengthKnown initialValue();
87+
7988
static NoOpHasher noop() {
8089
return NoOpHasher.INSTANCE;
8190
}
@@ -118,6 +127,11 @@ public void validateUnchecked(Crc32cValue<?> expected, ByteString byteString) {}
118127
@Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2) {
119128
return null;
120129
}
130+
131+
@Override
132+
public @Nullable Crc32cLengthKnown initialValue() {
133+
return null;
134+
}
121135
}
122136

123137
@Immutable
@@ -185,6 +199,11 @@ public Crc32cLengthKnown nullSafeConcat(
185199
return r1.concat(r2);
186200
}
187201
}
202+
203+
@Override
204+
public @NonNull Crc32cLengthKnown initialValue() {
205+
return Crc32cValue.zero();
206+
}
188207
}
189208

190209
final class ChecksumMismatchException extends IOException {

google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
192192
grpcStorage.startResumableWrite(
193193
grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts), opts);
194194
ApiFuture<WriteCtx<ResumableWrite>> start =
195-
ApiFutures.transform(f, WriteCtx::new, MoreExecutors.directExecutor());
195+
ApiFutures.transform(
196+
f, s -> WriteCtx.of(s, opts.getHasher()), MoreExecutors.directExecutor());
196197

197198
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write =
198199
grpcStorage.storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext);

google-cloud-storage/src/main/java/com/google/cloud/storage/WriteCtx.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.storage.v2.WriteObjectRequest;
2424
import java.util.concurrent.atomic.AtomicLong;
2525
import java.util.concurrent.atomic.AtomicReference;
26+
import org.checkerframework.checker.nullness.qual.NonNull;
2627
import org.checkerframework.checker.nullness.qual.Nullable;
2728

2829
final class WriteCtx<RequestFactoryT extends WriteObjectRequestBuilderFactory> {
@@ -33,22 +34,18 @@ final class WriteCtx<RequestFactoryT extends WriteObjectRequestBuilderFactory> {
3334
private final AtomicLong confirmedBytes;
3435
private final AtomicReference<@Nullable Crc32cLengthKnown> cumulativeCrc32c;
3536

36-
WriteCtx(RequestFactoryT requestFactory) {
37-
this(requestFactory, null);
38-
}
39-
40-
/**
41-
* TODO: Remove initialValue and replace with Crc32cValue.zero() once all uploads have been
42-
* updated to do e2e checksumming by default.
43-
*/
44-
@Deprecated
45-
WriteCtx(RequestFactoryT requestFactory, @Nullable Crc32cLengthKnown initialValue) {
37+
private WriteCtx(RequestFactoryT requestFactory, @Nullable Crc32cLengthKnown initialValue) {
4638
this.requestFactory = requestFactory;
4739
this.totalSentBytes = new AtomicLong(0);
4840
this.confirmedBytes = new AtomicLong(0);
4941
this.cumulativeCrc32c = new AtomicReference<>(initialValue);
5042
}
5143

44+
static <RFT extends WriteObjectRequestBuilderFactory> WriteCtx<RFT> of(
45+
RFT rft, @NonNull Hasher hasher) {
46+
return new WriteCtx<>(rft, hasher.initialValue());
47+
}
48+
5249
public RequestFactoryT getRequestFactory() {
5350
return requestFactory;
5451
}

0 commit comments

Comments
 (0)