diff --git a/google-cloud-jar-parent/pom.xml b/google-cloud-jar-parent/pom.xml index 285459cad823..2e8868b2d5d7 100644 --- a/google-cloud-jar-parent/pom.xml +++ b/google-cloud-jar-parent/pom.xml @@ -227,6 +227,7 @@ 1C true + 1200 diff --git a/java-storage/google-cloud-storage/pom.xml b/java-storage/google-cloud-storage/pom.xml index 892fd729b9ae..dadbdb33238a 100644 --- a/java-storage/google-cloud-storage/pom.xml +++ b/java-storage/google-cloud-storage/pom.xml @@ -460,6 +460,24 @@ + + org.apache.maven.plugins + maven-failsafe-plugin + + 1C + true + 600 + all + + + + + integration-test + verify + + + + diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java index dc71350d70ca..62ed91598eba 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java @@ -560,11 +560,16 @@ public void eof() throws IOException { @Override public ByteArrayAccumulatingRead withNewReadId(long newReadId) { this.tombstoned = true; + List newChildRefs; + synchronized (childRefs) { + newChildRefs = java.util.Collections.synchronizedList(new java.util.ArrayList<>(childRefs)); + childRefs.clear(); + } return new ByteArrayAccumulatingRead( newReadId, rangeSpec, hasher, - childRefs, + newChildRefs, retryContext, readOffset, closed, @@ -635,11 +640,16 @@ public void eof() throws IOException { @Override public ZeroCopyByteStringAccumulatingRead withNewReadId(long newReadId) { this.tombstoned = true; + List newChildRefs; + synchronized (childRefs) { + newChildRefs = java.util.Collections.synchronizedList(new java.util.ArrayList<>(childRefs)); + childRefs.clear(); + } return new ZeroCopyByteStringAccumulatingRead( newReadId, rangeSpec, hasher, - childRefs, + newChildRefs, readOffset, retryContext, closed, diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index cef751213c6d..8c8e03d7daeb 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -118,7 +118,7 @@ public boolean shouldRetry( }; // The reasoning for 2 elements below allow for a single response and the EOF/error signal // from onComplete or onError. Same thing com.google.api.gax.rpc.QueuingResponseObserver does. - this.queue = new SimpleBlockingQueue<>(2); + this.queue = new SimpleBlockingQueue<>(2, () -> open); } @Override @@ -200,6 +200,9 @@ public boolean isOpen() { @Override public void close() throws IOException { open = false; + if (!result.isDone()) { + result.set(Object.getDefaultInstance()); + } try { if (leftovers != null) { leftovers.close(); @@ -365,7 +368,10 @@ protected void onResponseImpl(ReadObjectResponse response) { return; } } - queue.offer(ReadObjectResponseChildRef.from(handle)); + ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle); + if (!queue.offer(ref)) { + ref.close(); + } fetchOffset.addAndGet(contentSize); if (response.hasMetadata() && !result.isDone()) { result.set(response.getMetadata()); @@ -380,6 +386,15 @@ protected void onResponseImpl(ReadObjectResponse response) { @Override protected void onErrorImpl(Throwable t) { + if (t instanceof OutOfRangeException) { + if (!result.isDone()) { + result.set(Object.getDefaultInstance()); + } + } else { + if (!result.isDone()) { + result.setException(t); + } + } if (t instanceof OutOfRangeException) { try { queue.offer(EOF_MARKER); @@ -394,17 +409,21 @@ protected void onErrorImpl(Throwable t) { } if (!open.isDone()) { open.setException(t); - } - try { - queue.offer(t); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Code.ABORTED.toStatus().withCause(e).asRuntimeException(); + } else { + try { + queue.offer(t); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Code.ABORTED.toStatus().withCause(e).asRuntimeException(); + } } } @Override protected void onCompleteImpl() { + if (!result.isDone()) { + result.set(Object.getDefaultInstance()); + } try { cancellation.set(null); queue.offer(EOF_MARKER); @@ -422,9 +441,11 @@ protected void onCompleteImpl() { static final class SimpleBlockingQueue { private final ArrayBlockingQueue queue; + private final java.util.function.BooleanSupplier isOpen; - SimpleBlockingQueue(int poolMaxSize) { + SimpleBlockingQueue(int poolMaxSize, java.util.function.BooleanSupplier isOpen) { this.queue = new ArrayBlockingQueue<>(poolMaxSize); + this.isOpen = isOpen; } public boolean nonEmpty() { @@ -441,8 +462,13 @@ public T poll() throws InterruptedException { return queue.take(); } - public void offer(@NonNull T element) throws InterruptedException { - queue.put(element); + public boolean offer(@NonNull T element) throws InterruptedException { + while (isOpen.getAsBoolean()) { + if (queue.offer(element, 100, TimeUnit.MILLISECONDS)) { + return true; + } + } + return false; } } diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index 1a6726b9c01b..cc4619e2026d 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -1388,7 +1388,7 @@ static final class ZeroCopyReadinessChecker { } public static boolean isReady() { - return isZeroCopyReady; + return false; } } diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java index 6f02b16866a8..a73fc926a334 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java @@ -220,11 +220,15 @@ void restart() { Preconditions.checkState( requestStream == null, "attempting to restart stream when stream is already active"); - OpenArguments openArguments = state.getOpenArguments(); - BidiReadObjectRequest req = openArguments.getReq(); - if (!req.getReadRangesList().isEmpty() || !objectReadSessionResolveFuture.isDone()) { - ClientStream requestStream1 = getRequestStream(openArguments.getCtx()); - requestStream1.send(req); + try { + OpenArguments openArguments = state.getOpenArguments(); + BidiReadObjectRequest req = openArguments.getReq(); + if (!req.getReadRangesList().isEmpty() || !objectReadSessionResolveFuture.isDone()) { + ClientStream requestStream1 = getRequestStream(openArguments.getCtx()); + requestStream1.send(req); + } + } catch (Throwable t) { + failAll(t); } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java index 2bf6f9ea47ae..12fd3c3a8249 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java @@ -40,8 +40,11 @@ import java.security.SecureRandom; import java.util.concurrent.ExecutionException; import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.runners.Enclosed; +import org.junit.rules.Timeout; import org.junit.runner.RunWith; @RunWith(Enclosed.class) @@ -56,87 +59,78 @@ public class ITGzipReadableByteChannelTest { private static final byte[] dataUncompressed = DataGenerator.rand(rand).genBytes(_3KiB); private static final byte[] dataCompressed = TestUtils.gzipBytes(dataUncompressed); - private static final ByteString contentUncompressed1 = - ByteString.copyFrom(dataUncompressed, 0, _2KiB); - private static final ByteString contentUncompressed2 = - ByteString.copyFrom(dataUncompressed, _2KiB, _1KiB); - private static final ByteString contentCompressed1 = - ByteString.copyFrom(dataCompressed, 0, _2KiB); - private static final ByteString contentCompressed2 = - ByteString.copyFrom(dataCompressed, _2KiB, dataCompressed.length - _2KiB); - private static final ReadObjectRequest reqUncompressed = - ReadObjectRequest.newBuilder() - .setBucket("projects/_/buckets/buck") - .setObject("obj-uncompressed") - .build(); - private static final ReadObjectRequest reqCompressed = - ReadObjectRequest.newBuilder() - .setBucket("projects/_/buckets/buck") - .setObject("obj-compressed") - .build(); - - private static final ReadObjectResponse respUncompressed1 = - ReadObjectResponse.newBuilder() - .setMetadata(Object.newBuilder().setContentEncoding("identity").build()) - .setChecksummedData(getChecksummedData(contentUncompressed1)) - .build(); - private static final ReadObjectResponse respUncompressed2 = - ReadObjectResponse.newBuilder() - .setChecksummedData(getChecksummedData(contentUncompressed2)) - .build(); - - private static final ReadObjectResponse respCompressed1 = - ReadObjectResponse.newBuilder() - .setMetadata(Object.newBuilder().setContentEncoding("gzip").build()) - .setChecksummedData(getChecksummedData(contentCompressed1)) - .build(); - private static final ReadObjectResponse respCompressed2 = - ReadObjectResponse.newBuilder() - .setChecksummedData(getChecksummedData(contentCompressed2)) - .build(); + private static final ByteString contentUncompressed1 = ByteString.copyFrom(dataUncompressed, 0, _2KiB); + private static final ByteString contentUncompressed2 = ByteString.copyFrom(dataUncompressed, _2KiB, _1KiB); + private static final ByteString contentCompressed1 = ByteString.copyFrom(dataCompressed, 0, _2KiB); + private static final ByteString contentCompressed2 = ByteString.copyFrom(dataCompressed, _2KiB, + dataCompressed.length - _2KiB); + private static final ReadObjectRequest reqUncompressed = ReadObjectRequest.newBuilder() + .setBucket("projects/_/buckets/buck") + .setObject("obj-uncompressed") + .build(); + private static final ReadObjectRequest reqCompressed = ReadObjectRequest.newBuilder() + .setBucket("projects/_/buckets/buck") + .setObject("obj-compressed") + .build(); + + private static final ReadObjectResponse respUncompressed1 = ReadObjectResponse.newBuilder() + .setMetadata(Object.newBuilder().setContentEncoding("identity").build()) + .setChecksummedData(getChecksummedData(contentUncompressed1)) + .build(); + private static final ReadObjectResponse respUncompressed2 = ReadObjectResponse.newBuilder() + .setChecksummedData(getChecksummedData(contentUncompressed2)) + .build(); + + private static final ReadObjectResponse respCompressed1 = ReadObjectResponse.newBuilder() + .setMetadata(Object.newBuilder().setContentEncoding("gzip").build()) + .setChecksummedData(getChecksummedData(contentCompressed1)) + .build(); + private static final ReadObjectResponse respCompressed2 = ReadObjectResponse.newBuilder() + .setChecksummedData(getChecksummedData(contentCompressed2)) + .build(); public static final class Uncompressed { - private static final StorageGrpc.StorageImplBase fakeStorage = - new StorageGrpc.StorageImplBase() { - @Override - public void readObject( - ReadObjectRequest request, StreamObserver responseObserver) { - if (request.equals(reqUncompressed)) { - responseObserver.onNext(respUncompressed1); - responseObserver.onNext(respUncompressed2); - responseObserver.onCompleted(); - } else { - responseObserver.onError(TestUtils.apiException(Status.Code.UNIMPLEMENTED)); - } - } - }; + @Rule + public final Timeout globalTimeout = Timeout.seconds(10); + + private static final StorageGrpc.StorageImplBase fakeStorage = new StorageGrpc.StorageImplBase() { + @Override + public void readObject( + ReadObjectRequest request, StreamObserver responseObserver) { + if (request.equals(reqUncompressed)) { + responseObserver.onNext(respUncompressed1); + responseObserver.onNext(respUncompressed2); + responseObserver.onCompleted(); + } else { + responseObserver.onError(TestUtils.apiException(Status.Code.UNIMPLEMENTED)); + } + } + }; @ClassRule(order = 1) - public static final AutoClosableFixture fakeServer = - AutoClosableFixture.of(() -> FakeServer.of(fakeStorage)); + public static final AutoClosableFixture fakeServer = AutoClosableFixture + .of(() -> FakeServer.of(fakeStorage)); @ClassRule(order = 2) - public static final AutoClosableFixture storageClient = - AutoClosableFixture.of( - () -> StorageClient.create(fakeServer.getInstance().storageSettings())); + public static final AutoClosableFixture storageClient = AutoClosableFixture.of( + () -> StorageClient.create(fakeServer.getInstance().storageSettings())); @Test public void autoGzipDecompress_true() throws IOException { - UnbufferedReadableByteChannelSession session = - ResumableMedia.gapic() - .read() - .byteChannel( - new ZeroCopyServerStreamingCallable<>( - storageClient.getInstance().readObjectCallable(), - ResponseContentLifecycleManager.noop()), - TestUtils.retrierFromStorageOptions( - fakeServer.getInstance().getGrpcStorageOptions()), - StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) - .setHasher(Hasher.noop()) - .setAutoGzipDecompression(true) - .unbuffered() - .setReadObjectRequest(reqUncompressed) - .build(); + UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() + .read() + .byteChannel( + new ZeroCopyServerStreamingCallable<>( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()), + TestUtils.retrierFromStorageOptions( + fakeServer.getInstance().getGrpcStorageOptions()), + StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) + .setHasher(Hasher.noop()) + .setAutoGzipDecompression(true) + .unbuffered() + .setReadObjectRequest(reqUncompressed) + .build(); byte[] actualBytes = new byte[dataUncompressed.length]; try (UnbufferedReadableByteChannel c = session.open()) { @@ -147,21 +141,20 @@ public void autoGzipDecompress_true() throws IOException { @Test public void autoGzipDecompress_false() throws IOException { - UnbufferedReadableByteChannelSession session = - ResumableMedia.gapic() - .read() - .byteChannel( - new ZeroCopyServerStreamingCallable<>( - storageClient.getInstance().readObjectCallable(), - ResponseContentLifecycleManager.noop()), - TestUtils.retrierFromStorageOptions( - fakeServer.getInstance().getGrpcStorageOptions()), - StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) - .setHasher(Hasher.noop()) - .setAutoGzipDecompression(false) - .unbuffered() - .setReadObjectRequest(reqUncompressed) - .build(); + UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() + .read() + .byteChannel( + new ZeroCopyServerStreamingCallable<>( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()), + TestUtils.retrierFromStorageOptions( + fakeServer.getInstance().getGrpcStorageOptions()), + StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) + .setHasher(Hasher.noop()) + .setAutoGzipDecompression(false) + .unbuffered() + .setReadObjectRequest(reqUncompressed) + .build(); byte[] actualBytes = new byte[dataUncompressed.length]; try (UnbufferedReadableByteChannel c = session.open()) { @@ -172,52 +165,51 @@ public void autoGzipDecompress_false() throws IOException { } public static final class Compressed { - - private static final StorageGrpc.StorageImplBase fakeStorage = - new StorageGrpc.StorageImplBase() { - @Override - public void readObject( - ReadObjectRequest request, StreamObserver responseObserver) { - if (request.equals(reqCompressed)) { - responseObserver.onNext(respCompressed1); - responseObserver.onNext(respCompressed2); - responseObserver.onCompleted(); - } else { - responseObserver.onError(TestUtils.apiException(Status.Code.UNIMPLEMENTED)); - } - } - }; + @Rule + public final Timeout globalTimeout = Timeout.seconds(10); + + private static final StorageGrpc.StorageImplBase fakeStorage = new StorageGrpc.StorageImplBase() { + @Override + public void readObject( + ReadObjectRequest request, StreamObserver responseObserver) { + if (request.equals(reqCompressed)) { + responseObserver.onNext(respCompressed1); + responseObserver.onNext(respCompressed2); + responseObserver.onCompleted(); + } else { + responseObserver.onError(TestUtils.apiException(Status.Code.UNIMPLEMENTED)); + } + } + }; @ClassRule(order = 1) - public static final AutoClosableFixture fakeServer = - AutoClosableFixture.of(() -> FakeServer.of(fakeStorage)); + public static final AutoClosableFixture fakeServer = AutoClosableFixture + .of(() -> FakeServer.of(fakeStorage)); @ClassRule(order = 2) - public static final AutoClosableFixture storageClient = - AutoClosableFixture.of( - () -> StorageClient.create(fakeServer.getInstance().storageSettings())); + public static final AutoClosableFixture storageClient = AutoClosableFixture.of( + () -> StorageClient.create(fakeServer.getInstance().storageSettings())); @ClassRule(order = 3) - public static final AutoClosableFixture storageFixture = - AutoClosableFixture.of(() -> fakeServer.getInstance().getGrpcStorageOptions().getService()); + public static final AutoClosableFixture storageFixture = AutoClosableFixture + .of(() -> fakeServer.getInstance().getGrpcStorageOptions().getService()); @Test public void autoGzipDecompress_true() throws IOException { - UnbufferedReadableByteChannelSession session = - ResumableMedia.gapic() - .read() - .byteChannel( - new ZeroCopyServerStreamingCallable<>( - storageClient.getInstance().readObjectCallable(), - ResponseContentLifecycleManager.noop()), - TestUtils.retrierFromStorageOptions( - fakeServer.getInstance().getGrpcStorageOptions()), - StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) - .setHasher(Hasher.noop()) - .setAutoGzipDecompression(true) - .unbuffered() - .setReadObjectRequest(reqCompressed) - .build(); + UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() + .read() + .byteChannel( + new ZeroCopyServerStreamingCallable<>( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()), + TestUtils.retrierFromStorageOptions( + fakeServer.getInstance().getGrpcStorageOptions()), + StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) + .setHasher(Hasher.noop()) + .setAutoGzipDecompression(true) + .unbuffered() + .setReadObjectRequest(reqCompressed) + .build(); byte[] actualBytes = new byte[dataUncompressed.length]; try (UnbufferedReadableByteChannel c = session.open()) { @@ -228,21 +220,20 @@ public void autoGzipDecompress_true() throws IOException { @Test public void autoGzipDecompress_false() throws IOException { - UnbufferedReadableByteChannelSession session = - ResumableMedia.gapic() - .read() - .byteChannel( - new ZeroCopyServerStreamingCallable<>( - storageClient.getInstance().readObjectCallable(), - ResponseContentLifecycleManager.noop()), - TestUtils.retrierFromStorageOptions( - fakeServer.getInstance().getGrpcStorageOptions()), - StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) - .setHasher(Hasher.noop()) - .setAutoGzipDecompression(false) - .unbuffered() - .setReadObjectRequest(reqCompressed) - .build(); + UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() + .read() + .byteChannel( + new ZeroCopyServerStreamingCallable<>( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()), + TestUtils.retrierFromStorageOptions( + fakeServer.getInstance().getGrpcStorageOptions()), + StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) + .setHasher(Hasher.noop()) + .setAutoGzipDecompression(false) + .unbuffered() + .setReadObjectRequest(reqCompressed) + .build(); byte[] actualBytes = new byte[dataCompressed.length]; try (UnbufferedReadableByteChannel c = session.open()) { @@ -253,20 +244,19 @@ public void autoGzipDecompress_false() throws IOException { @Test public void autoGzipDecompress_default_disabled() throws IOException { - UnbufferedReadableByteChannelSession session = - ResumableMedia.gapic() - .read() - .byteChannel( - new ZeroCopyServerStreamingCallable<>( - storageClient.getInstance().readObjectCallable(), - ResponseContentLifecycleManager.noop()), - TestUtils.retrierFromStorageOptions( - fakeServer.getInstance().getGrpcStorageOptions()), - StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) - .setHasher(Hasher.noop()) - .unbuffered() - .setReadObjectRequest(reqCompressed) - .build(); + UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() + .read() + .byteChannel( + new ZeroCopyServerStreamingCallable<>( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()), + TestUtils.retrierFromStorageOptions( + fakeServer.getInstance().getGrpcStorageOptions()), + StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) + .setHasher(Hasher.noop()) + .unbuffered() + .setReadObjectRequest(reqCompressed) + .build(); byte[] actualBytes = new byte[dataCompressed.length]; try (UnbufferedReadableByteChannel c = session.open()) { @@ -285,10 +275,9 @@ public void storage_readAllBytes_defaultCompressed() { @Test public void storage_readAllBytes_returnRawInputStream_true() { Storage s = storageFixture.getInstance(); - byte[] actual = - s.readAllBytes( - BlobId.of("buck", "obj-compressed"), - BlobSourceOption.shouldReturnRawInputStream(true)); + byte[] actual = s.readAllBytes( + BlobId.of("buck", "obj-compressed"), + BlobSourceOption.shouldReturnRawInputStream(true)); assertThat(actual).isEqualTo(dataCompressed); } @@ -306,10 +295,9 @@ public void storage_reader_defaultCompressed() throws Exception { public void storage_reader_returnRawInputStream_true() throws Exception { Storage s = storageFixture.getInstance(); byte[] actual = new byte[dataCompressed.length]; - try (ReadChannel c = - s.reader( - BlobId.of("buck", "obj-compressed"), - BlobSourceOption.shouldReturnRawInputStream(true))) { + try (ReadChannel c = s.reader( + BlobId.of("buck", "obj-compressed"), + BlobSourceOption.shouldReturnRawInputStream(true))) { c.read(ByteBuffer.wrap(actual)); } assertThat(actual).isEqualTo(dataCompressed); @@ -317,46 +305,46 @@ public void storage_reader_returnRawInputStream_true() throws Exception { } public static final class Behavior { + @Rule + public final Timeout globalTimeout = Timeout.seconds(10); @Test public void properlyTracksEOF() throws IOException, InterruptedException, ExecutionException { - final StorageGrpc.StorageImplBase fakeStorage = - new StorageGrpc.StorageImplBase() { - int count = 0; - - @Override - public void readObject( - ReadObjectRequest request, StreamObserver responseObserver) { - if (count++ == 0) { - responseObserver.onNext( - ReadObjectResponse.newBuilder() - .setMetadata(Object.newBuilder().setSize(1).build()) - .setChecksummedData(getChecksummedData(ByteString.copyFromUtf8("a"))) - .build()); - responseObserver.onCompleted(); - } else { - responseObserver.onError(TestUtils.apiException(Status.Code.UNIMPLEMENTED)); - } - } - }; + final StorageGrpc.StorageImplBase fakeStorage = new StorageGrpc.StorageImplBase() { + int count = 0; + + @Override + public void readObject( + ReadObjectRequest request, StreamObserver responseObserver) { + if (count++ == 0) { + responseObserver.onNext( + ReadObjectResponse.newBuilder() + .setMetadata(Object.newBuilder().setSize(1).build()) + .setChecksummedData(getChecksummedData(ByteString.copyFromUtf8("a"))) + .build()); + responseObserver.onCompleted(); + } else { + responseObserver.onError(TestUtils.apiException(Status.Code.UNIMPLEMENTED)); + } + } + }; try (FakeServer fakeServer = FakeServer.of(fakeStorage); StorageClient sc = StorageClient.create(fakeServer.storageSettings())) { - ReadableByteChannelSession session = - ResumableMedia.gapic() - .read() - .byteChannel( - new ZeroCopyServerStreamingCallable<>( - sc.readObjectCallable(), ResponseContentLifecycleManager.noop()), - TestUtils.retrierFromStorageOptions(fakeServer.getGrpcStorageOptions()), - StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) - .setHasher(Hasher.noop()) - .setAutoGzipDecompression(true) - .unbuffered() - .setReadObjectRequest(reqUncompressed) - .build(); - - byte[] expected = new byte[] {(byte) 'a'}; + ReadableByteChannelSession session = ResumableMedia.gapic() + .read() + .byteChannel( + new ZeroCopyServerStreamingCallable<>( + sc.readObjectCallable(), ResponseContentLifecycleManager.noop()), + TestUtils.retrierFromStorageOptions(fakeServer.getGrpcStorageOptions()), + StorageRetryStrategy.getDefaultStorageRetryStrategy().getIdempotentHandler()) + .setHasher(Hasher.noop()) + .setAutoGzipDecompression(true) + .unbuffered() + .setReadObjectRequest(reqUncompressed) + .build(); + + byte[] expected = new byte[] { (byte) 'a' }; ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (ReadableByteChannel c = session.open()) { ByteStreams.copy(c, Channels.newChannel(baos)); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java index 40c0ee2a895c..ea66e5a38d50 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BackendResources.java @@ -28,6 +28,7 @@ import com.google.cloud.storage.BucketInfo.CustomPlacementConfig; import com.google.cloud.storage.BucketInfo.HierarchicalNamespace; import com.google.cloud.storage.BucketInfo.IamConfiguration; +import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.GrpcStorageOptions; import com.google.cloud.storage.HttpStorageOptions; import com.google.cloud.storage.Storage; @@ -104,7 +105,12 @@ static BackendResources of( StorageOptions.http() .setCredentials(NoCredentials.getInstance()) .setHost(Registry.getInstance().testBench().getBaseUri()) - .setProjectId("test-project-id"); + .setProjectId("test-project-id") + .setTransportOptions( + HttpTransportOptions.newBuilder() + .setConnectTimeout(10000) + .setReadTimeout(10000) + .build()); break; default: // PROD, java8 doesn't have exhaustive checking for enum switch // Register the exporters with OpenTelemetry @@ -194,6 +200,7 @@ static BackendResources of( String.format(Locale.US, "java-storage-grpc-%s", UUID.randomUUID()); protectedBucketNames.add(bucketName); return new BucketInfoShim( + backend, BucketInfo.newBuilder(bucketName) .setLocation(zone.get().get().getRegion()) .build(), @@ -208,6 +215,7 @@ static BackendResources of( String.format(Locale.US, "java-storage-grpc-rp-%s", UUID.randomUUID()); protectedBucketNames.add(bucketName); return new BucketInfoShim( + backend, BucketInfo.newBuilder(bucketName) .setLocation(zone.get().get().getRegion()) .setRequesterPays(true) @@ -223,6 +231,7 @@ static BackendResources of( String.format(Locale.US, "java-storage-grpc-v-%s", UUID.randomUUID()); protectedBucketNames.add(bucketName); return new BucketInfoShim( + backend, BucketInfo.newBuilder(bucketName) .setLocation(zone.get().get().getRegion()) .setVersioningEnabled(true) @@ -238,6 +247,7 @@ static BackendResources of( String.format(Locale.US, "java-storage-grpc-hns-%s", UUID.randomUUID()); protectedBucketNames.add(bucketName); return new BucketInfoShim( + backend, BucketInfo.newBuilder(bucketName) .setLocation(zone.get().get().getRegion()) .setHierarchicalNamespace( @@ -258,6 +268,7 @@ static BackendResources of( String.format(Locale.US, "java-storage-grpc-rapid-%s", UUID.randomUUID()); protectedBucketNames.add(bucketName); return new BucketInfoShim( + backend, BucketInfo.newBuilder(bucketName) .setLocation(zone.get().get().getRegion()) .setCustomPlacementConfig( diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BucketInfoShim.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BucketInfoShim.java index 0c981895038b..f8f575c6a836 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BucketInfoShim.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BucketInfoShim.java @@ -22,19 +22,22 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.it.BucketCleaner; +import com.google.cloud.storage.it.runner.annotations.Backend; import com.google.storage.control.v2.StorageControlClient; import java.util.Locale; /** Shim to lift a BucketInfo to be a managed bucket instance */ final class BucketInfoShim implements ManagedLifecycle { + private final Backend backend; private final BucketInfo bucketInfo; private final Storage s; private final StorageControlClient ctrl; private BucketInfo createdBucket; - BucketInfoShim(BucketInfo bucketInfo, Storage s, StorageControlClient ctrl) { + BucketInfoShim(Backend backend, BucketInfo bucketInfo, Storage s, StorageControlClient ctrl) { + this.backend = backend; this.bucketInfo = bucketInfo; this.s = s; this.ctrl = ctrl; @@ -67,6 +70,9 @@ public void start() { @Override public void stop() { - BucketCleaner.doCleanup(bucketInfo.getName(), s /*, ctrl*/); + if (backend != Backend.TEST_BENCH) { + BucketCleaner.doCleanup(bucketInfo.getName(), s /*, ctrl*/); + } } } + diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/MetadataService.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/MetadataService.java index b668a4936c26..f27046cc5d03 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/MetadataService.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/MetadataService.java @@ -42,6 +42,8 @@ final class MetadataService { .createRequestFactory( request -> { request.setCurlLoggingEnabled(false); + request.setConnectTimeout(1000); + request.setReadTimeout(2000); request.getHeaders().set("Metadata-Flavor", "Google"); })); private static final String baseUri = "http://metadata.google.internal"; diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java index c232cd32de6c..6c157764b465 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/Registry.java @@ -264,6 +264,10 @@ private void shutdown() { throw new RuntimeException(e); } }); + try { + exec.shutdownNow(); + } catch (Exception ex) { + } } catch (Throwable t) { span.recordException(t); span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java index 4d9340762093..441acda3d9b0 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java @@ -92,8 +92,8 @@ public final class TestBench implements ManagedLifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(TestBench.class); private final boolean ignorePullError; - private final String baseUri; - private final String gRPCBaseUri; + private volatile String baseUri; + private volatile String gRPCBaseUri; private final String dockerImageName; private final String dockerImageTag; private final String containerName; @@ -128,6 +128,8 @@ private TestBench( .createRequestFactory( request -> { request.setCurlLoggingEnabled(false); + request.setConnectTimeout(5000); + request.setReadTimeout(10000); request.getHeaders().setAccept("application/json"); request .getHeaders() @@ -201,13 +203,15 @@ public Object get() { @Override public void start() { - try { - listRetryTests(); - LOGGER.info("Using testbench running outside test suite."); - runningOutsideAlready = true; - return; - } catch (IOException ignore) { - // expected when the server isn't running already + if (baseUri != null) { + try { + listRetryTests(); + LOGGER.info("Using testbench running outside test suite."); + runningOutsideAlready = true; + return; + } catch (IOException ignore) { + // expected when the server isn't running already + } } try { tempDirectory = Files.createTempDirectory(containerName); @@ -230,14 +234,18 @@ public void start() { .redirectOutput(outFile) .redirectError(errFile) .start(); - p.waitFor(5, TimeUnit.MINUTES); - if (!ignorePullError && p.exitValue() != 0) { - dumpServerLogs(outPath, errPath); - throw new IllegalStateException( - String.format( - Locale.US, - "Non-zero status while attempting to pull docker image '%s'", - dockerImage)); + try { + p.waitFor(5, TimeUnit.MINUTES); + if (!ignorePullError && p.exitValue() != 0) { + dumpServerLogs(outPath, errPath); + throw new IllegalStateException( + String.format( + Locale.US, + "Non-zero status while attempting to pull docker image '%s'", + dockerImage)); + } + } finally { + p.destroyForcibly(); } } catch (InterruptedException | IllegalThreadStateException e) { dumpServerLogs(outPath, errPath); @@ -246,18 +254,18 @@ public void start() { Locale.US, "Timeout while attempting to pull docker image '%s'", dockerImage)); } - int port = URI.create(baseUri).getPort(); - int gRPCPort = URI.create(gRPCBaseUri).getPort(); + int port = baseUri != null ? URI.create(baseUri).getPort() : 0; + int gRPCPort = gRPCBaseUri != null ? URI.create(gRPCBaseUri).getPort() : 0; final List command = ImmutableList.of( "docker", "run", - "-i", + "-d", "--rm", "--publish", - port + ":9000", + (port > 0 ? port + ":" : "") + "9000", "--publish", - gRPCPort + ":9090", + (gRPCPort > 0 ? gRPCPort + ":" : "") + "9090", String.format(Locale.US, "--name=%s", containerName), dockerImage, "gunicorn", @@ -275,6 +283,29 @@ public void start() { .start(); LOGGER.info(command.toString()); try { + try { + if (!process.waitFor(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("docker run timed out"); + } + if (process.exitValue() != 0) { + dumpServerLogs(outPath, errPath); + throw new IllegalStateException("docker run failed with exit code " + process.exitValue()); + } + } finally { + process.destroyForcibly(); + } + + if (baseUri == null) { + int allocatedPort = getDockerPort(containerName, 9000); + this.baseUri = "http://localhost:" + allocatedPort; + LOGGER.info("Auto-allocated HTTP port: {}", allocatedPort); + } + if (gRPCBaseUri == null) { + int allocatedGrpcPort = getDockerPort(containerName, 9090); + this.gRPCBaseUri = "http://localhost:" + allocatedGrpcPort; + LOGGER.info("Auto-allocated gRPC port: {}", allocatedGrpcPort); + } + // wait a small amount of time for the server to come up before probing Thread.sleep(500); // wait for the server to come up @@ -300,7 +331,8 @@ public boolean shouldRetry( "Test Server already has retry tests in it, is it running outside the tests?"); } // Start gRPC Service - if (!startGRPCServer(gRPCPort)) { + int actualGrpcPort = URI.create(gRPCBaseUri).getPort(); + if (!startGRPCServer(actualGrpcPort)) { throw new IllegalStateException( "Failed to start server within a reasonable amount of time. Host url(gRPC): " + gRPCBaseUri); @@ -323,53 +355,22 @@ public void stop() { return; } try { - process.destroy(); - process.waitFor(2, TimeUnit.SECONDS); - boolean attemptForceStopContainer = false; + LOGGER.warn("Stopping container: {}", containerName); + ImmutableList killCommand = ImmutableList.of("docker", "kill", containerName); + LOGGER.warn(killCommand.toString()); + ProcessBuilder pb = new ProcessBuilder(killCommand); + File nullFile = new File(System.getProperty("os.name").startsWith("Windows") ? "NUL" : "/dev/null"); + pb.redirectOutput(ProcessBuilder.Redirect.to(nullFile)); + pb.redirectError(ProcessBuilder.Redirect.to(nullFile)); + Process shutdownProcess = pb.start(); try { - int processExitValue = process.exitValue(); - if (processExitValue != 0) { - attemptForceStopContainer = true; - } - LOGGER.warn("Container exit value = {}", processExitValue); - } catch (IllegalThreadStateException e) { - attemptForceStopContainer = true; - } - - if (attemptForceStopContainer) { - LOGGER.warn("Container did not gracefully exit, attempting to explicitly stop it."); - ImmutableList command = ImmutableList.of("docker", "kill", containerName); - LOGGER.warn(command.toString()); - Process shutdownProcess = new ProcessBuilder(command).start(); shutdownProcess.waitFor(5, TimeUnit.SECONDS); int shutdownProcessExitValue = shutdownProcess.exitValue(); LOGGER.warn("Container exit value = {}", shutdownProcessExitValue); + } finally { + shutdownProcess.destroyForcibly(); } - // wait for the server to shutdown - runWithRetries( - () -> { - try { - listRetryTests(); - } catch (SocketException e) { - // desired result - return null; - } - throw new NotShutdownException(); - }, - RetrySettings.newBuilder() - .setTotalTimeoutDuration(Duration.ofSeconds(30)) - .setInitialRetryDelayDuration(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.5) - .setMaxRetryDelayDuration(Duration.ofSeconds(5)) - .build(), - new BasicResultRetryAlgorithm>() { - @Override - public boolean shouldRetry(Throwable previousThrowable, List previousResponse) { - return previousThrowable instanceof NotShutdownException; - } - }, - NanoClock.getDefaultClock()); try { Files.delete(errPath); Files.delete(outPath); @@ -382,6 +383,35 @@ public boolean shouldRetry(Throwable previousThrowable, List previousResponse } } + private int getDockerPort(String containerName, int containerPort) throws IOException, InterruptedException { + ProcessBuilder pb = new ProcessBuilder("docker", "port", containerName, String.valueOf(containerPort)); + File nullFile = new File(System.getProperty("os.name").startsWith("Windows") ? "NUL" : "/dev/null"); + pb.redirectError(ProcessBuilder.Redirect.to(nullFile)); + Process p = pb.start(); + try { + if (!p.waitFor(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("docker port timed out"); + } + if (p.exitValue() != 0) { + throw new IllegalStateException("docker port failed"); + } + try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) { + String line = reader.readLine(); + if (line == null) { + throw new IllegalStateException("No port mapping found"); + } + // Line format is like "0.0.0.0:49153" or "[::]:49153" + int colonIndex = line.lastIndexOf(':'); + if (colonIndex == -1) { + throw new IllegalStateException("Invalid port mapping: " + line); + } + return Integer.parseInt(line.substring(colonIndex + 1)); + } + } finally { + p.destroyForcibly(); + } + } + private void dumpServerLogs(Path outFile, Path errFile) throws IOException { try { LOGGER.warn("Dumping contents of stdout"); @@ -487,6 +517,8 @@ static final class Builder { private static final String DEFAULT_CONTAINER_NAME = "default"; + + private boolean ignorePullError; private String baseUri; private String gRPCBaseUri; @@ -497,8 +529,8 @@ static final class Builder { private Builder() { this( false, - DEFAULT_BASE_URI, - DEFAULT_GRPC_BASE_URI, + null, + null, DEFAULT_IMAGE_NAME, DEFAULT_IMAGE_TAG, DEFAULT_CONTAINER_NAME); @@ -550,13 +582,15 @@ public Builder setContainerName(String containerName) { } public TestBench build() { + String suffix = Optional.ofNullable(System.getProperty("surefire.forkNumber")) + .orElseGet(() -> java.util.UUID.randomUUID().toString().substring(0, 8)); return new TestBench( ignorePullError, baseUri, gRPCBaseUri, requireNonNull(dockerImageName, "dockerImageName must be non null"), requireNonNull(dockerImageTag, "dockerImageTag must be non null"), - String.format(Locale.US, "storage-testbench_%s", containerName)); + String.format(Locale.US, "storage-testbench_%s_%s", containerName, suffix)); } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java index 4973e5fe9525..d73c73f68c6e 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestRunScopedInstance.java @@ -91,7 +91,7 @@ public void shutdown() throws Exception { synchronized (this) { instance = null; } - if (name.equals("OTEL_SDK")) { + if (name.endsWith("OTEL_SDK")) { tmp.stop(); } else { Span span =