Skip to content

Commit 286bcd6

Browse files
committed
feat: add post-upload crc32c validation for most uploads
* Resumable Uploads (JSON & gRPC) * BlobWriteSessionConfigs.getDefault() (JSON & gRPC) * BlobWriteSessionConfigs.bufferToTempDirThenUpload() (JSON & gRPC) * BlobWriteSessionConfigs.bufferToDiskThenUpload() (JSON & gRPC) * BlobWriteSessionConfigs.journaling() (gRPC) BlobWriteSessionConfigs.parallelCompositeUpload() already performs crc32c checksum validation. Storage.create(BlobInfo, byte[]) already performs crc32c checksum validation BlobAppendableUpload already performes crc32c checksum validation BlobWriteSessionConfigs.bidiWrite() will be addressed in the future
1 parent e6007d5 commit 286bcd6

10 files changed

Lines changed: 399 additions & 19 deletions

google-cloud-storage/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@
6363
<groupId>com.google.api</groupId>
6464
<artifactId>gax</artifactId>
6565
</dependency>
66+
<dependency>
67+
<groupId>com.google.api</groupId>
68+
<artifactId>gax-httpjson</artifactId>
69+
</dependency>
6670
<dependency>
6771
<groupId>com.google.api</groupId>
6872
<artifactId>gax-grpc</artifactId>

google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import com.google.api.core.BetaApi;
2222
import com.google.api.core.InternalApi;
2323
import com.google.api.core.SettableApiFuture;
24+
import com.google.cloud.BaseServiceException;
25+
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
2426
import com.google.cloud.storage.RecoveryFileManager.RecoveryVolumeSinkFactory;
2527
import com.google.cloud.storage.Storage.BlobWriteOption;
2628
import com.google.cloud.storage.TransportCompatibility.Transport;
2729
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
2830
import com.google.cloud.storage.UnifiedOpts.Opts;
2931
import com.google.common.annotations.VisibleForTesting;
3032
import com.google.common.collect.ImmutableList;
33+
import com.google.common.hash.Hasher;
34+
import com.google.common.hash.Hashing;
3135
import com.google.common.util.concurrent.MoreExecutors;
3236
import java.io.IOException;
3337
import java.io.ObjectInputStream;
@@ -215,12 +219,19 @@ private final class Flusher implements WritableByteChannel {
215219

216220
private final WritableByteChannel delegate;
217221

222+
private final Hasher cumulativeCrc32c;
223+
private long totalLength;
224+
218225
private Flusher(WritableByteChannel delegate) {
219226
this.delegate = delegate;
227+
this.cumulativeCrc32c = Hashing.crc32c().newHasher();
228+
this.totalLength = 0;
220229
}
221230

222231
@Override
223232
public int write(ByteBuffer src) throws IOException {
233+
totalLength += src.remaining();
234+
cumulativeCrc32c.putBytes(src.duplicate());
224235
return delegate.write(src);
225236
}
226237

@@ -232,6 +243,7 @@ public boolean isOpen() {
232243
@Override
233244
public void close() throws IOException {
234245
delegate.close();
246+
Crc32cLengthKnown expected = Crc32cValue.of(cumulativeCrc32c.hash().asInt(), totalLength);
235247
try (RecoveryFile rf = Factory.WriteToFileThenUpload.this.rf) {
236248
Path path = rf.getPath();
237249
long size = Files.size(path);
@@ -241,11 +253,20 @@ public void close() throws IOException {
241253
size,
242254
() -> {
243255
BlobInfo blob = storage.internalCreateFrom(path, info, opts);
256+
Crc32cLengthKnown actual =
257+
Crc32cValue.of(Utils.crc32cCodec.decode(blob.getCrc32c()), blob.getSize());
258+
if (!expected.eqValue(actual)) {
259+
throw new ClientDetectedDataLossException(actual, expected);
260+
}
244261
result.set(blob);
245262
});
246263
} catch (StorageException | IOException e) {
247264
result.setException(e);
248265
throw e;
266+
} catch (ClientDetectedDataLossException e) {
267+
BaseServiceException coalesce = StorageException.coalesce(e);
268+
result.setException(coalesce);
269+
throw coalesce;
249270
}
250271
}
251272
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.api.gax.httpjson.HttpJsonStatusCode;
20+
import com.google.api.gax.rpc.DataLossException;
21+
import com.google.api.gax.rpc.StatusCode.Code;
22+
import java.util.Locale;
23+
24+
final class ClientDetectedDataLossException extends DataLossException {
25+
26+
public ClientDetectedDataLossException(Crc32cValue<?> actual, Crc32cValue<?> expected) {
27+
super(
28+
String.format(Locale.US, "actual: %s, expected: %s", actual, expected),
29+
null,
30+
HttpJsonStatusCode.of(Code.DATA_LOSS),
31+
false);
32+
}
33+
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static java.util.Objects.requireNonNull;
2020

21+
import com.google.api.core.ApiFunction;
2122
import com.google.api.core.ApiFuture;
2223
import com.google.api.core.ApiFutures;
2324
import com.google.api.core.InternalApi;
@@ -27,10 +28,13 @@
2728
import com.google.api.gax.rpc.UnaryCallable;
2829
import com.google.cloud.storage.ChannelSession.BufferedWriteSession;
2930
import com.google.cloud.storage.ChannelSession.UnbufferedWriteSession;
31+
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
3032
import com.google.cloud.storage.Retrying.Retrier;
3133
import com.google.cloud.storage.Retrying.RetrierWithAlg;
3234
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
3335
import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory;
36+
import com.google.common.util.concurrent.MoreExecutors;
37+
import com.google.storage.v2.Object;
3438
import com.google.storage.v2.QueryWriteStatusRequest;
3539
import com.google.storage.v2.QueryWriteStatusResponse;
3640
import com.google.storage.v2.ServiceConstants.Values;
@@ -43,6 +47,12 @@
4347
final class GapicWritableByteChannelSessionBuilder {
4448

4549
private static final int DEFAULT_BUFFER_CAPACITY = ByteSizeConstants._16MiB;
50+
private static final ApiFunction<WriteObjectResponse, Crc32cLengthKnown>
51+
WRITE_OBJECT_RESPONSE_CRC32C_VALUE =
52+
writeObjectResponse -> {
53+
Object resource = writeObjectResponse.getResource();
54+
return Crc32cValue.of(resource.getChecksums().getCrc32C(), resource.getSize());
55+
};
4656
private final ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write;
4757
private Hasher hasher;
4858
private ByteStringStrategy byteStringStrategy;
@@ -210,11 +220,17 @@ BufferedWritableByteChannelSession<WriteObjectResponse> build() {
210220
return new BufferedWriteSession<>(
211221
ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")),
212222
lift((WriteObjectRequest start, SettableApiFuture<WriteObjectResponse> resultFuture) ->
213-
new GapicUnbufferedDirectWritableByteChannel(
214-
resultFuture,
215-
getChunkSegmenter(),
216-
write,
217-
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))))
223+
StorageByteChannels.writable()
224+
.validateUploadCrc32c(
225+
new GapicUnbufferedDirectWritableByteChannel(
226+
resultFuture,
227+
getChunkSegmenter(),
228+
write,
229+
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))),
230+
ApiFutures.transform(
231+
resultFuture,
232+
WRITE_OBJECT_RESPONSE_CRC32C_VALUE,
233+
MoreExecutors.directExecutor())))
218234
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
219235
.andThen(StorageByteChannels.writable()::createSynchronized));
220236
}
@@ -436,16 +452,23 @@ BufferedWritableByteChannelSession<WriteObjectResponse> build() {
436452
ByteStringStrategy boundStrategy = byteStringStrategy;
437453
Hasher boundHasher = hasher;
438454
return (writeCtx, resultFuture) ->
439-
new SyncAndUploadUnbufferedWritableByteChannel(
440-
write,
441-
query,
442-
resultFuture,
443-
new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE),
444-
boundRetrier,
445-
alg,
446-
writeCtx,
447-
recoveryFile,
448-
recoveryBuffer);
455+
StorageByteChannels.writable()
456+
.validateUploadCrc32c(
457+
new SyncAndUploadUnbufferedWritableByteChannel(
458+
write,
459+
query,
460+
resultFuture,
461+
new ChunkSegmenter(
462+
boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE),
463+
boundRetrier,
464+
alg,
465+
writeCtx,
466+
recoveryFile,
467+
recoveryBuffer),
468+
ApiFutures.transform(
469+
resultFuture,
470+
WRITE_OBJECT_RESPONSE_CRC32C_VALUE,
471+
MoreExecutors.directExecutor()));
449472
}
450473
}
451474
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GzipReadableByteChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
9191
delegate = source;
9292
}
9393
} catch (InterruptedException | ExecutionException e) {
94+
Thread.currentThread().interrupt();
9495
throw new IOException(e);
9596
}
9697
} else if (leftovers != null && leftovers.hasRemaining()) {

google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
import static java.util.Objects.requireNonNull;
2020

2121
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutures;
2223
import com.google.api.core.SettableApiFuture;
2324
import com.google.api.services.storage.model.StorageObject;
2425
import com.google.cloud.storage.ChannelSession.BufferedWriteSession;
2526
import com.google.cloud.storage.ChannelSession.UnbufferedWriteSession;
2627
import com.google.cloud.storage.Retrying.RetrierWithAlg;
2728
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
29+
import com.google.common.util.concurrent.MoreExecutors;
2830
import java.nio.ByteBuffer;
2931
import java.util.function.BiFunction;
3032
import java.util.function.LongConsumer;
@@ -118,8 +120,17 @@ BufferedResumableUploadBuilder buffered(BufferHandle bufferHandle) {
118120
// fields.
119121
RetrierWithAlg boundRetrier = retrier;
120122
return (start, resultFuture) ->
121-
new ApiaryUnbufferedWritableByteChannel(
122-
httpClientContext, boundRetrier, start, resultFuture, committedBytesCallback);
123+
StorageByteChannels.writable()
124+
.validateUploadCrc32c(
125+
new ApiaryUnbufferedWritableByteChannel(
126+
httpClientContext, boundRetrier, start, resultFuture, committedBytesCallback),
127+
ApiFutures.transform(
128+
resultFuture,
129+
storageObject ->
130+
Crc32cValue.of(
131+
Utils.crc32cCodec.decode(storageObject.getCrc32c()),
132+
storageObject.getSize().longValueExact()),
133+
MoreExecutors.directExecutor()));
123134
}
124135

125136
final class UnbufferedResumableUploadBuilder {

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageByteChannels.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,22 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import com.google.api.core.ApiFuture;
1920
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
2021
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
22+
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
2123
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
2224
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
25+
import com.google.common.hash.Hasher;
26+
import com.google.common.hash.Hashing;
2327
import java.io.IOException;
28+
import java.io.InterruptedIOException;
2429
import java.nio.ByteBuffer;
2530
import java.nio.channels.ClosedChannelException;
2631
import java.nio.channels.ReadableByteChannel;
2732
import java.nio.channels.ScatteringByteChannel;
2833
import java.nio.channels.SeekableByteChannel;
34+
import java.util.concurrent.ExecutionException;
2935
import java.util.concurrent.locks.ReentrantLock;
3036

3137
final class StorageByteChannels {
@@ -70,12 +76,115 @@ public BufferedWritableByteChannel createSynchronized(BufferedWritableByteChanne
7076
return new SynchronizedBufferedWritableByteChannel(delegate);
7177
}
7278

79+
public UnbufferedWritableByteChannel validateUploadCrc32c(
80+
UnbufferedWritableByteChannel delegate, ApiFuture<Crc32cLengthKnown> crc32cGetter) {
81+
return new ChecksumValidatingBufferedWritableByteChannel(delegate, crc32cGetter);
82+
}
83+
7384
public UnbufferedWritableByteChannel createSynchronized(
7485
UnbufferedWritableByteChannel delegate) {
7586
return new SynchronizedUnbufferedWritableByteChannel(delegate);
7687
}
7788
}
7889

90+
@SuppressWarnings("UnstableApiUsage")
91+
private static final class ChecksumValidatingBufferedWritableByteChannel
92+
implements UnbufferedWritableByteChannel {
93+
private final UnbufferedWritableByteChannel delegate;
94+
private final ApiFuture<Crc32cLengthKnown> crc32cGetter;
95+
96+
private final Hasher cumulativeCrc32c;
97+
private long totalLength;
98+
99+
private ChecksumValidatingBufferedWritableByteChannel(
100+
UnbufferedWritableByteChannel delegate, ApiFuture<Crc32cLengthKnown> crc32cGetter) {
101+
this.delegate = delegate;
102+
this.crc32cGetter = crc32cGetter;
103+
this.cumulativeCrc32c = Hashing.crc32c().newHasher();
104+
this.totalLength = 0;
105+
}
106+
107+
@Override
108+
public int write(ByteBuffer src) throws IOException {
109+
hash(src);
110+
return delegate.write(src);
111+
}
112+
113+
@Override
114+
public long write(ByteBuffer[] srcs) throws IOException {
115+
return write(srcs, 0, srcs.length);
116+
}
117+
118+
@Override
119+
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
120+
hash(srcs, offset, length);
121+
return delegate.write(srcs, offset, length);
122+
}
123+
124+
@Override
125+
public int writeAndClose(ByteBuffer src) throws IOException {
126+
hash(src);
127+
int i = delegate.writeAndClose(src);
128+
internalClose();
129+
return i;
130+
}
131+
132+
@Override
133+
public long writeAndClose(ByteBuffer[] srcs) throws IOException {
134+
return writeAndClose(srcs, 0, srcs.length);
135+
}
136+
137+
@Override
138+
public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
139+
hash(srcs, offset, length);
140+
long l = delegate.writeAndClose(srcs, offset, length);
141+
internalClose();
142+
return l;
143+
}
144+
145+
@Override
146+
public boolean isOpen() {
147+
return delegate.isOpen();
148+
}
149+
150+
@Override
151+
public void close() throws IOException {
152+
if (delegate.isOpen()) {
153+
delegate.close();
154+
internalClose();
155+
}
156+
}
157+
158+
private void hash(ByteBuffer src) {
159+
ByteBuffer buffer = src.duplicate();
160+
totalLength += buffer.remaining();
161+
cumulativeCrc32c.putBytes(buffer);
162+
}
163+
164+
private void hash(ByteBuffer[] srcs, int offset, int length) {
165+
for (int i = offset; i < length; i++) {
166+
hash(srcs[i]);
167+
}
168+
}
169+
170+
private void internalClose() throws IOException {
171+
try {
172+
Crc32cLengthKnown actual = crc32cGetter.get();
173+
Crc32cLengthKnown expected = Crc32cValue.of(cumulativeCrc32c.hash().asInt(), totalLength);
174+
if (!expected.eqValue(actual)) {
175+
throw new ClientDetectedDataLossException(actual, expected);
176+
}
177+
} catch (InterruptedException e) {
178+
Thread.currentThread().interrupt();
179+
InterruptedIOException interruptedIOException = new InterruptedIOException();
180+
interruptedIOException.initCause(e);
181+
throw interruptedIOException;
182+
} catch (ExecutionException e) {
183+
throw new IOException(e.getCause());
184+
}
185+
}
186+
}
187+
79188
private static final class SynchronizedBufferedReadableByteChannel
80189
implements BufferedReadableByteChannel {
81190

0 commit comments

Comments
 (0)