Skip to content

Commit 7b12d01

Browse files
committed
chore: fix failing tests
1 parent 286bcd6 commit 7b12d01

9 files changed

Lines changed: 224 additions & 50 deletions

google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,13 @@ static Get createGetRequest(
300300
}
301301

302302
@SuppressWarnings("unchecked")
303-
private static <T> T cast(Object o) {
303+
static <T> T cast(Object o) {
304304
return (T) o;
305305
}
306306

307307
@Nullable
308308
@SuppressWarnings("unchecked")
309-
private static String getHeaderValue(@NonNull HttpHeaders headers, @NonNull String headerName) {
309+
static String getHeaderValue(@NonNull HttpHeaders headers, @NonNull String headerName) {
310310
Object o = headers.get(headerName);
311311
if (o == null) {
312312
return null;

google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,13 @@ static long copyUsingBuffer(ByteBuffer buf, ReadableByteChannel r, WritableByteC
194194
}
195195
return total;
196196
}
197+
198+
static ByteBuffer[] duplicate(ByteBuffer[] buffers, int offset, int length) {
199+
ByteBuffer[] returnValue = new ByteBuffer[length - offset];
200+
for (int i = offset; i < length; i++) {
201+
ByteBuffer buffer = buffers[i];
202+
returnValue[i] = buffer.duplicate();
203+
}
204+
return returnValue;
205+
}
197206
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -309,18 +309,28 @@ UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
309309
return new UnbufferedWriteSession<>(
310310
requireNonNull(start, "start must be non null"),
311311
lift((ResumableWrite start, SettableApiFuture<WriteObjectResponse> result) -> {
312+
UnbufferedWritableByteChannel channel;
312313
if (fsyncEvery) {
313-
return new GapicUnbufferedChunkedResumableWritableByteChannel(
314-
result,
315-
getChunkSegmenter(),
316-
write,
317-
new WriteCtx<>(start),
318-
boundRetrier,
319-
Retrying::newCallContext);
314+
channel =
315+
new GapicUnbufferedChunkedResumableWritableByteChannel(
316+
result,
317+
getChunkSegmenter(),
318+
write,
319+
new WriteCtx<>(start),
320+
boundRetrier,
321+
Retrying::newCallContext);
320322
} else {
321-
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
322-
result, getChunkSegmenter(), write, new WriteCtx<>(start));
323+
channel =
324+
new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
325+
result, getChunkSegmenter(), write, new WriteCtx<>(start));
323326
}
327+
return StorageByteChannels.writable()
328+
.validateUploadCrc32c(
329+
channel,
330+
ApiFutures.transform(
331+
result,
332+
WRITE_OBJECT_RESPONSE_CRC32C_VALUE,
333+
MoreExecutors.directExecutor()));
324334
})
325335
.andThen(StorageByteChannels.writable()::createSynchronized));
326336
}
@@ -349,18 +359,28 @@ BufferedWritableByteChannelSession<WriteObjectResponse> build() {
349359
return new BufferedWriteSession<>(
350360
requireNonNull(start, "start must be non null"),
351361
lift((ResumableWrite start, SettableApiFuture<WriteObjectResponse> result) -> {
362+
UnbufferedWritableByteChannel channel;
352363
if (fsyncEvery) {
353-
return new GapicUnbufferedChunkedResumableWritableByteChannel(
354-
result,
355-
getChunkSegmenter(),
356-
write,
357-
new WriteCtx<>(start),
358-
retrier,
359-
Retrying::newCallContext);
364+
channel =
365+
new GapicUnbufferedChunkedResumableWritableByteChannel(
366+
result,
367+
getChunkSegmenter(),
368+
write,
369+
new WriteCtx<>(start),
370+
retrier,
371+
Retrying::newCallContext);
360372
} else {
361-
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
362-
result, getChunkSegmenter(), write, new WriteCtx<>(start));
373+
channel =
374+
new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
375+
result, getChunkSegmenter(), write, new WriteCtx<>(start));
363376
}
377+
return StorageByteChannels.writable()
378+
.validateUploadCrc32c(
379+
channel,
380+
ApiFutures.transform(
381+
result,
382+
WRITE_OBJECT_RESPONSE_CRC32C_VALUE,
383+
MoreExecutors.directExecutor()));
364384
})
365385
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
366386
.andThen(StorageByteChannels.writable()::createSynchronized));

google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,36 @@
1818

1919
import static java.util.Objects.requireNonNull;
2020

21+
import com.google.api.core.ApiFunction;
2122
import com.google.api.core.ApiFuture;
2223
import com.google.api.core.ApiFutures;
2324
import com.google.api.core.SettableApiFuture;
2425
import com.google.api.services.storage.model.StorageObject;
2526
import com.google.cloud.storage.ChannelSession.BufferedWriteSession;
2627
import com.google.cloud.storage.ChannelSession.UnbufferedWriteSession;
28+
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
2729
import com.google.cloud.storage.Retrying.RetrierWithAlg;
2830
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
2931
import com.google.common.util.concurrent.MoreExecutors;
32+
import java.math.BigInteger;
3033
import java.nio.ByteBuffer;
3134
import java.util.function.BiFunction;
3235
import java.util.function.LongConsumer;
3336
import org.checkerframework.checker.nullness.qual.NonNull;
37+
import org.checkerframework.checker.nullness.qual.Nullable;
3438

3539
final class HttpWritableByteChannelSessionBuilder {
3640

3741
private static final int DEFAULT_BUFFER_CAPACITY = ByteSizeConstants._16MiB;
42+
public static final ApiFunction<StorageObject, Crc32cLengthKnown> STORAGE_OBJECT_TO_CRC32C_VALUE =
43+
storageObject -> {
44+
@Nullable BigInteger size = storageObject.getSize();
45+
long length = 0;
46+
if (size != null) {
47+
length = size.longValueExact();
48+
}
49+
return Crc32cValue.of(Utils.crc32cCodec.decode(storageObject.getCrc32c()), length);
50+
};
3851
@NonNull private final HttpClientContext httpClientContext;
3952

4053
HttpWritableByteChannelSessionBuilder(@NonNull HttpClientContext httpClientContext) {
@@ -126,10 +139,7 @@ BufferedResumableUploadBuilder buffered(BufferHandle bufferHandle) {
126139
httpClientContext, boundRetrier, start, resultFuture, committedBytesCallback),
127140
ApiFutures.transform(
128141
resultFuture,
129-
storageObject ->
130-
Crc32cValue.of(
131-
Utils.crc32cCodec.decode(storageObject.getCrc32c()),
132-
storageObject.getSize().longValueExact()),
142+
STORAGE_OBJECT_TO_CRC32C_VALUE,
133143
MoreExecutors.directExecutor()));
134144
}
135145

google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void rewindTo(long offset) {
148148
response.ignore();
149149
actualSize = new BigInteger(storedContentLength, 10);
150150
success = true;
151-
storageObject = null;
151+
storageObject = jsonResumableWrite.storageObjectFromResponseHeaders(response);
152152
} else {
153153
response.ignore();
154154
StorageException se =

google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionQueryTask.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,14 @@ final class JsonResumableSessionQueryTask
7878
// when a signed url is used, the finalize response is empty
7979
response.ignore();
8080
actualSize = new BigInteger(storedContentLength, 10);
81-
storageObject = null;
81+
storageObject = jsonResumableWrite.storageObjectFromResponseHeaders(response);
8282
} else {
8383
response.ignore();
8484
throw ResumableSessionFailureScenario.SCENARIO_0_1.toStorageException(
8585
uploadId, response, null, () -> null);
8686
}
8787
if (actualSize != null) {
88-
if (storageObject != null) {
89-
return ResumableOperationResult.complete(storageObject, actualSize.longValue());
90-
} else {
91-
return ResumableOperationResult.incremental(actualSize.longValue());
92-
}
88+
return ResumableOperationResult.complete(storageObject, actualSize.longValue());
9389
} else {
9490
throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException(
9591
uploadId,

google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import static com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.getHeaderValue;
20+
import static com.google.cloud.storage.Utils.ifNonNull;
1921
import static com.google.common.base.Preconditions.checkArgument;
2022

23+
import com.google.api.client.http.HttpHeaders;
24+
import com.google.api.client.http.HttpResponse;
2125
import com.google.api.services.storage.model.StorageObject;
2226
import com.google.cloud.storage.spi.v1.StorageRpc;
2327
import com.google.common.base.MoreObjects;
@@ -29,6 +33,8 @@
2933
import java.io.ObjectOutputStream;
3034
import java.io.Serializable;
3135
import java.io.StringReader;
36+
import java.math.BigInteger;
37+
import java.util.List;
3238
import java.util.Map;
3339
import java.util.Objects;
3440
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -120,6 +126,32 @@ public String toString() {
120126
.toString();
121127
}
122128

129+
StorageObject storageObjectFromResponseHeaders(HttpResponse resp) {
130+
StorageObject r = new StorageObject();
131+
if (object != null) {
132+
r.putAll(object);
133+
}
134+
HttpHeaders h = resp.getHeaders();
135+
ifNonNull(getHeaderValue(h, "x-goog-generation"), Long::parseLong, r::setGeneration);
136+
ifNonNull(getHeaderValue(h, "x-goog-metageneration"), Long::parseLong, r::setMetageneration);
137+
ifNonNull(getHeaderValue(h, "x-goog-stored-content-length"), BigInteger::new, r::setSize);
138+
ifNonNull(getHeaderValue(h, "x-goog-stored-content-encoding"), r::setContentEncoding);
139+
ifNonNull(
140+
h.get("x-goog-hash"),
141+
ApiaryUnbufferedReadableByteChannel::cast,
142+
(List<String> x) -> {
143+
for (String s : x) {
144+
if (s.startsWith("crc32c=")) {
145+
r.setCrc32c(s.substring(7));
146+
} else if (s.startsWith("md5=")) {
147+
r.setMd5Hash(s.substring(5));
148+
}
149+
}
150+
});
151+
152+
return r;
153+
}
154+
123155
private String getObjectJson() {
124156
if (objectJson == null) {
125157
synchronized (this) {

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageByteChannels.java

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.nio.channels.SeekableByteChannel;
3434
import java.util.concurrent.ExecutionException;
3535
import java.util.concurrent.locks.ReentrantLock;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3638

3739
final class StorageByteChannels {
3840

@@ -78,7 +80,7 @@ public BufferedWritableByteChannel createSynchronized(BufferedWritableByteChanne
7880

7981
public UnbufferedWritableByteChannel validateUploadCrc32c(
8082
UnbufferedWritableByteChannel delegate, ApiFuture<Crc32cLengthKnown> crc32cGetter) {
81-
return new ChecksumValidatingBufferedWritableByteChannel(delegate, crc32cGetter);
83+
return new ChecksumValidatingUnbufferedWritableByteChannel(delegate, crc32cGetter);
8284
}
8385

8486
public UnbufferedWritableByteChannel createSynchronized(
@@ -88,15 +90,17 @@ public UnbufferedWritableByteChannel createSynchronized(
8890
}
8991

9092
@SuppressWarnings("UnstableApiUsage")
91-
private static final class ChecksumValidatingBufferedWritableByteChannel
93+
private static final class ChecksumValidatingUnbufferedWritableByteChannel
9294
implements UnbufferedWritableByteChannel {
95+
private static final Logger LOGGER =
96+
LoggerFactory.getLogger(ChecksumValidatingUnbufferedWritableByteChannel.class);
9397
private final UnbufferedWritableByteChannel delegate;
9498
private final ApiFuture<Crc32cLengthKnown> crc32cGetter;
9599

96100
private final Hasher cumulativeCrc32c;
97101
private long totalLength;
98102

99-
private ChecksumValidatingBufferedWritableByteChannel(
103+
private ChecksumValidatingUnbufferedWritableByteChannel(
100104
UnbufferedWritableByteChannel delegate, ApiFuture<Crc32cLengthKnown> crc32cGetter) {
101105
this.delegate = delegate;
102106
this.crc32cGetter = crc32cGetter;
@@ -106,8 +110,10 @@ private ChecksumValidatingBufferedWritableByteChannel(
106110

107111
@Override
108112
public int write(ByteBuffer src) throws IOException {
109-
hash(src);
110-
return delegate.write(src);
113+
ByteBuffer dup = src.duplicate();
114+
int written = delegate.write(src);
115+
hash(dup, written);
116+
return written;
111117
}
112118

113119
@Override
@@ -117,16 +123,19 @@ public long write(ByteBuffer[] srcs) throws IOException {
117123

118124
@Override
119125
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
120-
hash(srcs, offset, length);
121-
return delegate.write(srcs, offset, length);
126+
ByteBuffer[] dups = Buffers.duplicate(srcs, offset, length);
127+
long written = delegate.write(srcs, offset, length);
128+
hash(dups, offset, length, written);
129+
return written;
122130
}
123131

124132
@Override
125133
public int writeAndClose(ByteBuffer src) throws IOException {
126-
hash(src);
127-
int i = delegate.writeAndClose(src);
134+
ByteBuffer dup = src.duplicate();
135+
int written = delegate.writeAndClose(src);
136+
hash(dup, written);
128137
internalClose();
129-
return i;
138+
return written;
130139
}
131140

132141
@Override
@@ -136,10 +145,11 @@ public long writeAndClose(ByteBuffer[] srcs) throws IOException {
136145

137146
@Override
138147
public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
139-
hash(srcs, offset, length);
140-
long l = delegate.writeAndClose(srcs, offset, length);
148+
ByteBuffer[] dups = Buffers.duplicate(srcs, offset, length);
149+
long written = delegate.writeAndClose(srcs, offset, length);
150+
hash(dups, offset, length, written);
141151
internalClose();
142-
return l;
152+
return written;
143153
}
144154

145155
@Override
@@ -155,22 +165,32 @@ public void close() throws IOException {
155165
}
156166
}
157167

158-
private void hash(ByteBuffer src) {
159-
ByteBuffer buffer = src.duplicate();
160-
totalLength += buffer.remaining();
168+
private long hash(ByteBuffer src, long written) {
169+
ByteBuffer buffer = src.slice();
170+
int remaining = buffer.remaining();
171+
int consumed = remaining;
172+
if (written < remaining) {
173+
int intExact = Math.toIntExact(written);
174+
buffer.limit(intExact);
175+
consumed = intExact;
176+
}
177+
totalLength += remaining;
161178
cumulativeCrc32c.putBytes(buffer);
179+
src.position(src.position() + consumed);
180+
return consumed;
162181
}
163182

164-
private void hash(ByteBuffer[] srcs, int offset, int length) {
183+
private void hash(ByteBuffer[] srcs, int offset, int length, long written) {
165184
for (int i = offset; i < length; i++) {
166-
hash(srcs[i]);
185+
written -= hash(srcs[i], written);
167186
}
168187
}
169188

170189
private void internalClose() throws IOException {
171190
try {
172191
Crc32cLengthKnown actual = crc32cGetter.get();
173192
Crc32cLengthKnown expected = Crc32cValue.of(cumulativeCrc32c.hash().asInt(), totalLength);
193+
LOGGER.debug("expected = {}, actual = {}", expected, actual);
174194
if (!expected.eqValue(actual)) {
175195
throw new ClientDetectedDataLossException(actual, expected);
176196
}

0 commit comments

Comments
 (0)