diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java index c07e51f536..7c9451f82e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java @@ -149,7 +149,7 @@ private GatheringByteChannel openSync() throws IOException { return sync; } - private WriteObjectRequest processSegment(ChunkSegment segment) { + private WriteObjectRequest processSegment(ChunkSegment segment, boolean updateCumulativeCrc32c) { WriteObjectRequest.Builder builder = writeCtx.newRequestBuilder(); if (!first) { builder.clearUploadId().clearWriteObjectSpec().clearObjectChecksums(); @@ -162,9 +162,11 @@ private WriteObjectRequest processSegment(ChunkSegment segment) { int contentSize = b.size(); // update ctx state that tracks overall progress - writeCtx - .getCumulativeCrc32c() - .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); + if (updateCumulativeCrc32c) { + writeCtx + .getCumulativeCrc32c() + .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); + } // resolve current offset and set next long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize); @@ -202,6 +204,7 @@ private WriteObjectRequest.Builder finishMessage(WriteObjectRequest.Builder b) { return b; } + @SuppressWarnings("ConstantValue") private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) { AtomicBoolean recover = new AtomicBoolean(false); retrier.run( @@ -211,9 +214,16 @@ private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) { sync.close(); } boolean shouldRecover = recover.getAndSet(true); + // each ChunkSegment will always have its checksum computed, but if a retry happens, and + // we need to rewind and build a new ChunkSegment, we don't want to add it to the + // cumulativeCrc32c value because that will make it appear as the bytes are duplicated. + // If we send "ABCD", get an error and find only "AB" to have been persisted, we don't + // want to add "CD" to the cumulative crc32c as that would be equivalent to "ABCDCD". + boolean updateCumulativeCrc32c = !shouldRecover; if (!shouldRecover) { for (ChunkSegment segment : segments) { - WriteObjectRequest writeObjectRequest = processSegment(segment); + WriteObjectRequest writeObjectRequest = + processSegment(segment, updateCumulativeCrc32c); stream.onNext(writeObjectRequest); } @@ -247,17 +257,22 @@ private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) { first = true; writeCtx.getTotalSentBytes().set(persistedSize); writeCtx.getConfirmedBytes().set(persistedSize); - writeCtx.getCumulativeCrc32c().set(null); // todo: can we rewind checksum? + // intentionally do not modify the cumulativeCrc32c value + // this will stay in the state in sync with what has been written to disk + // when we recover, checksum the individual message but not the cumulative try (SeekableByteChannel reader = rf.reader()) { reader.position(persistedSize); ByteBuffer buf = copyBuffer.get(); + // clear before read, in case an error was thrown before + buf.clear(); while (Buffers.fillFrom(buf, reader) != -1) { buf.flip(); while (buf.hasRemaining()) { ChunkSegment[] recoverySegments = chunkSegmenter.segmentBuffer(buf); for (ChunkSegment segment : recoverySegments) { - WriteObjectRequest writeObjectRequest = processSegment(segment); + WriteObjectRequest writeObjectRequest = + processSegment(segment, updateCumulativeCrc32c); stream.onNext(writeObjectRequest); } } @@ -280,7 +295,8 @@ private void doUpload(boolean closing, ChunkSegment[] segments, long goalSize) { } } long newWritten = writeCtx.getTotalSentBytes().get(); - Preconditions.checkState(newWritten == goalSize, "%s == %s", newWritten, goalSize); + Preconditions.checkState( + newWritten == goalSize, "newWritten == goalSize (%s == %s)", newWritten, goalSize); return null; }, Decoder.identity()); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java index 4ed8f25ede..fd4f0fd58e 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITSyncAndUploadUnbufferedWritableByteChannelPropertyTest.java @@ -16,6 +16,7 @@ package com.google.cloud.storage; +import static com.google.cloud.storage.TestUtils.apiException; import static com.google.cloud.storage.TestUtils.assertAll; import static com.google.cloud.storage.TestUtils.defaultRetryingDeps; import static com.google.cloud.storage.TestUtils.xxd; @@ -31,17 +32,23 @@ import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.UnavailableException; import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; +import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.Retrying.DefaultRetrier; import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.Alg; import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.RequestStream; import com.google.cloud.storage.SyncAndUploadUnbufferedWritableByteChannel.ResponseStream; import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.cloud.storage.it.ChecksummedTestContent; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Ints; import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import com.google.protobuf.TextFormat; import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; import com.google.storage.v2.QueryWriteStatusRequest; import com.google.storage.v2.QueryWriteStatusResponse; import com.google.storage.v2.StartResumableWriteRequest; @@ -50,10 +57,12 @@ import com.google.storage.v2.StorageGrpc.StorageImplBase; import com.google.storage.v2.WriteObjectRequest; import com.google.storage.v2.WriteObjectResponse; +import com.google.storage.v2.WriteObjectSpec; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; @@ -76,6 +85,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import net.jqwik.api.Arbitraries; @@ -372,6 +382,204 @@ void testUploads(@ForAll("scenario") Scenario s) throws Exception { } } + @Example + void multipleRetriesAgainstFakeServer() throws Exception { + ChecksummedTestContent content = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(17)); + + String uploadId = UUID.randomUUID().toString(); + StartResumableWriteRequest reqStart = + StartResumableWriteRequest.newBuilder() + .setWriteObjectSpec( + WriteObjectSpec.newBuilder() + .setResource( + Object.newBuilder().setBucket("projects/_/buckets/b").setName("o").build()) + .build()) + .build(); + StartResumableWriteResponse resStart = + StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build(); + QueryWriteStatusRequest reqQuery = + QueryWriteStatusRequest.newBuilder().setUploadId(uploadId).build(); + QueryWriteStatusResponse resQuery = + QueryWriteStatusResponse.newBuilder().setPersistedSize(8).build(); + WriteObjectRequest reqWrite0 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(0) + .setChecksummedData(content.slice(0, 2).asChecksummedData()) + .build(); + WriteObjectRequest reqWrite2 = + WriteObjectRequest.newBuilder() + .setWriteOffset(2) + .setChecksummedData(content.slice(2, 2).asChecksummedData()) + .build(); + WriteObjectRequest reqWrite4 = + WriteObjectRequest.newBuilder() + .setWriteOffset(4) + .setChecksummedData(content.slice(4, 2).asChecksummedData()) + .build(); + WriteObjectRequest reqWrite6 = + WriteObjectRequest.newBuilder() + .setWriteOffset(6) + .setChecksummedData(content.slice(6, 2).asChecksummedData()) + .build(); + WriteObjectRequest reqWrite8 = + WriteObjectRequest.newBuilder() + .setWriteOffset(8) + .setChecksummedData(content.slice(8, 2).asChecksummedData()) + .build(); + WriteObjectRequest reqWrite8WithUploadId = reqWrite8.toBuilder().setUploadId(uploadId).build(); + WriteObjectRequest reqWrite10 = + WriteObjectRequest.newBuilder() + .setWriteOffset(10) + .setChecksummedData(content.slice(10, 2).asChecksummedData()) + .build(); + WriteObjectRequest reqWrite12 = + WriteObjectRequest.newBuilder() + .setWriteOffset(12) + .setChecksummedData(content.slice(12, 2).asChecksummedData()) + .build(); + WriteObjectRequest reqWrite14 = + WriteObjectRequest.newBuilder() + .setWriteOffset(14) + .setChecksummedData(content.slice(14, 2).asChecksummedData()) + .build(); + WriteObjectRequest reqWrite16 = + WriteObjectRequest.newBuilder() + .setWriteOffset(16) + .setChecksummedData(content.slice(16, 1).asChecksummedData()) + .build(); + WriteObjectRequest reqFinish = + WriteObjectRequest.newBuilder() + .setFinishWrite(true) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(content.getCrc32c()).build()) + .mergeFrom(reqWrite16) + .build(); + WriteObjectResponse resFinish = + WriteObjectResponse.newBuilder() + .setResource( + reqStart.getWriteObjectSpec().getResource().toBuilder() + .setGeneration(1) + .setSize(17) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(content.getCrc32c()) + .setMd5Hash(content.getMd5Bytes()) + .build()) + .build()) + .build(); + ImmutableSet allReqWrite = + ImmutableSet.of( + reqWrite0, + reqWrite2, + reqWrite4, + reqWrite6, + reqWrite8, + reqWrite10, + reqWrite12, + reqWrite14, + reqWrite16); + + AtomicInteger retryCount = new AtomicInteger(0); + StorageImplBase service = + new StorageImplBase() { + @Override + public void startResumableWrite( + StartResumableWriteRequest req, StreamObserver respond) { + if (req.equals(reqStart)) { + respond.onNext(resStart); + respond.onCompleted(); + } else { + unexpected(respond, req); + } + } + + @Override + public void queryWriteStatus( + QueryWriteStatusRequest req, StreamObserver respond) { + if (req.equals(reqQuery)) { + respond.onNext(resQuery); + respond.onCompleted(); + } else { + unexpected(respond, req); + } + } + + @Override + public StreamObserver writeObject( + StreamObserver respond) { + return new StreamObserver() { + @Override + public void onNext(WriteObjectRequest value) { + if (value.equals(reqFinish)) { + respond.onNext(resFinish); + respond.onCompleted(); + } else if (value.equals(reqWrite10)) { + int i = retryCount.get(); + if (i < 2) { + respond.onError(apiException(Code.UNAVAILABLE, "{Unavailable}")); + } + } else if (value.equals(reqWrite8WithUploadId)) { + retryCount.incrementAndGet(); + } else if (allReqWrite.contains(value)) { + // do nothing + } else { + unexpected(respond, value); + } + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} + }; + } + + private void unexpected(StreamObserver respond, Message msg) { + respond.onError( + apiException( + Code.UNIMPLEMENTED, + "Unexpected request { " + TextFormat.printer().shortDebugString(msg) + " }")); + } + }; + try (FakeServer fakeServer = FakeServer.of(service); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + + BlobInfo info = BlobInfo.newBuilder("b", "o").build(); + SettableApiFuture resultFuture = SettableApiFuture.create(); + BufferHandle recoverBufferHandle = BufferHandle.allocate(2); + SyncAndUploadUnbufferedWritableByteChannel syncAndUpload = + new SyncAndUploadUnbufferedWritableByteChannel( + storage.storageClient.writeObjectCallable(), + storage.storageClient.queryWriteStatusCallable(), + resultFuture, + new ChunkSegmenter(Hasher.enabled(), ByteStringStrategy.copy(), 2, 2), + new DefaultRetrier(UnaryOperator.identity(), storage.getOptions()), + StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler(), + new WriteCtx<>( + new ResumableWrite( + reqStart, + resStart, + id -> reqWrite0.toBuilder().clearWriteObjectSpec().setUploadId(id).build())), + recoveryFileManager.newRecoveryFile(info), + recoverBufferHandle); + try (BufferedWritableByteChannel w = + StorageByteChannels.writable() + .createSynchronized( + new DefaultBufferedWritableByteChannel(recoverBufferHandle, syncAndUpload))) { + w.write(ByteBuffer.wrap(content.getBytes())); + } + + Decoder decoder = + Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource); + BlobInfo actual = decoder.decode(resultFuture.get(3, TimeUnit.SECONDS)); + assertThat(actual.getSize()).isEqualTo(content.getBytes().length); + assertThat(actual.getCrc32c()).isEqualTo(content.getCrc32cBase64()); + } + } + static List dataFrames(long length, int segmentLength) { // todo: rethink this Random rand = new Random(length); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ChecksummedTestContent.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ChecksummedTestContent.java index 0f40cd930f..aa87e64780 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ChecksummedTestContent.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ChecksummedTestContent.java @@ -96,6 +96,10 @@ public ChecksummedData asChecksummedData() { .build(); } + public ChecksummedTestContent slice(int begin, int length) { + return of(bytes, begin, Math.min(length, bytes.length - begin)); + } + @Override public String toString() { return MoreObjects.toStringHelper(this)