diff --git a/google-cloud-jar-parent/pom.xml b/google-cloud-jar-parent/pom.xml
index 285459cad823..2e8868b2d5d7 100644
--- a/google-cloud-jar-parent/pom.xml
+++ b/google-cloud-jar-parent/pom.xml
@@ -227,6 +227,7 @@
1C
true
+ 1200
diff --git a/java-storage/google-cloud-storage/pom.xml b/java-storage/google-cloud-storage/pom.xml
index 892fd729b9ae..dadbdb33238a 100644
--- a/java-storage/google-cloud-storage/pom.xml
+++ b/java-storage/google-cloud-storage/pom.xml
@@ -460,6 +460,24 @@
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+
+ 1C
+ true
+ 600
+ all
+
+
+
+
+ integration-test
+ verify
+
+
+
+
diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java
index dc71350d70ca..62ed91598eba 100644
--- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java
+++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java
@@ -560,11 +560,16 @@ public void eof() throws IOException {
@Override
public ByteArrayAccumulatingRead withNewReadId(long newReadId) {
this.tombstoned = true;
+ List newChildRefs;
+ synchronized (childRefs) {
+ newChildRefs = java.util.Collections.synchronizedList(new java.util.ArrayList<>(childRefs));
+ childRefs.clear();
+ }
return new ByteArrayAccumulatingRead(
newReadId,
rangeSpec,
hasher,
- childRefs,
+ newChildRefs,
retryContext,
readOffset,
closed,
@@ -635,11 +640,16 @@ public void eof() throws IOException {
@Override
public ZeroCopyByteStringAccumulatingRead withNewReadId(long newReadId) {
this.tombstoned = true;
+ List newChildRefs;
+ synchronized (childRefs) {
+ newChildRefs = java.util.Collections.synchronizedList(new java.util.ArrayList<>(childRefs));
+ childRefs.clear();
+ }
return new ZeroCopyByteStringAccumulatingRead(
newReadId,
rangeSpec,
hasher,
- childRefs,
+ newChildRefs,
readOffset,
retryContext,
closed,
diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java
index cef751213c6d..8c8e03d7daeb 100644
--- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java
+++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java
@@ -118,7 +118,7 @@ public boolean shouldRetry(
};
// The reasoning for 2 elements below allow for a single response and the EOF/error signal
// from onComplete or onError. Same thing com.google.api.gax.rpc.QueuingResponseObserver does.
- this.queue = new SimpleBlockingQueue<>(2);
+ this.queue = new SimpleBlockingQueue<>(2, () -> open);
}
@Override
@@ -200,6 +200,9 @@ public boolean isOpen() {
@Override
public void close() throws IOException {
open = false;
+ if (!result.isDone()) {
+ result.set(Object.getDefaultInstance());
+ }
try {
if (leftovers != null) {
leftovers.close();
@@ -365,7 +368,10 @@ protected void onResponseImpl(ReadObjectResponse response) {
return;
}
}
- queue.offer(ReadObjectResponseChildRef.from(handle));
+ ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle);
+ if (!queue.offer(ref)) {
+ ref.close();
+ }
fetchOffset.addAndGet(contentSize);
if (response.hasMetadata() && !result.isDone()) {
result.set(response.getMetadata());
@@ -380,6 +386,15 @@ protected void onResponseImpl(ReadObjectResponse response) {
@Override
protected void onErrorImpl(Throwable t) {
+ if (t instanceof OutOfRangeException) {
+ if (!result.isDone()) {
+ result.set(Object.getDefaultInstance());
+ }
+ } else {
+ if (!result.isDone()) {
+ result.setException(t);
+ }
+ }
if (t instanceof OutOfRangeException) {
try {
queue.offer(EOF_MARKER);
@@ -394,17 +409,21 @@ protected void onErrorImpl(Throwable t) {
}
if (!open.isDone()) {
open.setException(t);
- }
- try {
- queue.offer(t);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw Code.ABORTED.toStatus().withCause(e).asRuntimeException();
+ } else {
+ try {
+ queue.offer(t);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw Code.ABORTED.toStatus().withCause(e).asRuntimeException();
+ }
}
}
@Override
protected void onCompleteImpl() {
+ if (!result.isDone()) {
+ result.set(Object.getDefaultInstance());
+ }
try {
cancellation.set(null);
queue.offer(EOF_MARKER);
@@ -422,9 +441,11 @@ protected void onCompleteImpl() {
static final class SimpleBlockingQueue {
private final ArrayBlockingQueue queue;
+ private final java.util.function.BooleanSupplier isOpen;
- SimpleBlockingQueue(int poolMaxSize) {
+ SimpleBlockingQueue(int poolMaxSize, java.util.function.BooleanSupplier isOpen) {
this.queue = new ArrayBlockingQueue<>(poolMaxSize);
+ this.isOpen = isOpen;
}
public boolean nonEmpty() {
@@ -441,8 +462,13 @@ public T poll() throws InterruptedException {
return queue.take();
}
- public void offer(@NonNull T element) throws InterruptedException {
- queue.put(element);
+ public boolean offer(@NonNull T element) throws InterruptedException {
+ while (isOpen.getAsBoolean()) {
+ if (queue.offer(element, 100, TimeUnit.MILLISECONDS)) {
+ return true;
+ }
+ }
+ return false;
}
}
diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java
index 1a6726b9c01b..cc4619e2026d 100644
--- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java
+++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java
@@ -1388,7 +1388,7 @@ static final class ZeroCopyReadinessChecker {
}
public static boolean isReady() {
- return isZeroCopyReady;
+ return false;
}
}
diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java
index 6f02b16866a8..a73fc926a334 100644
--- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java
+++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java
@@ -220,11 +220,15 @@ void restart() {
Preconditions.checkState(
requestStream == null, "attempting to restart stream when stream is already active");
- OpenArguments openArguments = state.getOpenArguments();
- BidiReadObjectRequest req = openArguments.getReq();
- if (!req.getReadRangesList().isEmpty() || !objectReadSessionResolveFuture.isDone()) {
- ClientStream requestStream1 = getRequestStream(openArguments.getCtx());
- requestStream1.send(req);
+ try {
+ OpenArguments openArguments = state.getOpenArguments();
+ BidiReadObjectRequest req = openArguments.getReq();
+ if (!req.getReadRangesList().isEmpty() || !objectReadSessionResolveFuture.isDone()) {
+ ClientStream requestStream1 = getRequestStream(openArguments.getCtx());
+ requestStream1.send(req);
+ }
+ } catch (Throwable t) {
+ failAll(t);
}
}
diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java
index 2bf6f9ea47ae..12fd3c3a8249 100644
--- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java
+++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java
@@ -40,8 +40,11 @@
import java.security.SecureRandom;
import java.util.concurrent.ExecutionException;
import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
@RunWith(Enclosed.class)
@@ -56,87 +59,78 @@ public class ITGzipReadableByteChannelTest {
private static final byte[] dataUncompressed = DataGenerator.rand(rand).genBytes(_3KiB);
private static final byte[] dataCompressed = TestUtils.gzipBytes(dataUncompressed);
- private static final ByteString contentUncompressed1 =
- ByteString.copyFrom(dataUncompressed, 0, _2KiB);
- private static final ByteString contentUncompressed2 =
- ByteString.copyFrom(dataUncompressed, _2KiB, _1KiB);
- private static final ByteString contentCompressed1 =
- ByteString.copyFrom(dataCompressed, 0, _2KiB);
- private static final ByteString contentCompressed2 =
- ByteString.copyFrom(dataCompressed, _2KiB, dataCompressed.length - _2KiB);
- private static final ReadObjectRequest reqUncompressed =
- ReadObjectRequest.newBuilder()
- .setBucket("projects/_/buckets/buck")
- .setObject("obj-uncompressed")
- .build();
- private static final ReadObjectRequest reqCompressed =
- ReadObjectRequest.newBuilder()
- .setBucket("projects/_/buckets/buck")
- .setObject("obj-compressed")
- .build();
-
- private static final ReadObjectResponse respUncompressed1 =
- ReadObjectResponse.newBuilder()
- .setMetadata(Object.newBuilder().setContentEncoding("identity").build())
- .setChecksummedData(getChecksummedData(contentUncompressed1))
- .build();
- private static final ReadObjectResponse respUncompressed2 =
- ReadObjectResponse.newBuilder()
- .setChecksummedData(getChecksummedData(contentUncompressed2))
- .build();
-
- private static final ReadObjectResponse respCompressed1 =
- ReadObjectResponse.newBuilder()
- .setMetadata(Object.newBuilder().setContentEncoding("gzip").build())
- .setChecksummedData(getChecksummedData(contentCompressed1))
- .build();
- private static final ReadObjectResponse respCompressed2 =
- ReadObjectResponse.newBuilder()
- .setChecksummedData(getChecksummedData(contentCompressed2))
- .build();
+ private static final ByteString contentUncompressed1 = ByteString.copyFrom(dataUncompressed, 0, _2KiB);
+ private static final ByteString contentUncompressed2 = ByteString.copyFrom(dataUncompressed, _2KiB, _1KiB);
+ private static final ByteString contentCompressed1 = ByteString.copyFrom(dataCompressed, 0, _2KiB);
+ private static final ByteString contentCompressed2 = ByteString.copyFrom(dataCompressed, _2KiB,
+ dataCompressed.length - _2KiB);
+ private static final ReadObjectRequest reqUncompressed = ReadObjectRequest.newBuilder()
+ .setBucket("projects/_/buckets/buck")
+ .setObject("obj-uncompressed")
+ .build();
+ private static final ReadObjectRequest reqCompressed = ReadObjectRequest.newBuilder()
+ .setBucket("projects/_/buckets/buck")
+ .setObject("obj-compressed")
+ .build();
+
+ private static final ReadObjectResponse respUncompressed1 = ReadObjectResponse.newBuilder()
+ .setMetadata(Object.newBuilder().setContentEncoding("identity").build())
+ .setChecksummedData(getChecksummedData(contentUncompressed1))
+ .build();
+ private static final ReadObjectResponse respUncompressed2 = ReadObjectResponse.newBuilder()
+ .setChecksummedData(getChecksummedData(contentUncompressed2))
+ .build();
+
+ private static final ReadObjectResponse respCompressed1 = ReadObjectResponse.newBuilder()
+ .setMetadata(Object.newBuilder().setContentEncoding("gzip").build())
+ .setChecksummedData(getChecksummedData(contentCompressed1))
+ .build();
+ private static final ReadObjectResponse respCompressed2 = ReadObjectResponse.newBuilder()
+ .setChecksummedData(getChecksummedData(contentCompressed2))
+ .build();
public static final class Uncompressed {
- private static final StorageGrpc.StorageImplBase fakeStorage =
- new StorageGrpc.StorageImplBase() {
- @Override
- public void readObject(
- ReadObjectRequest request, StreamObserver responseObserver) {
- if (request.equals(reqUncompressed)) {
- responseObserver.onNext(respUncompressed1);
- responseObserver.onNext(respUncompressed2);
- responseObserver.onCompleted();
- } else {
- responseObserver.onError(TestUtils.apiException(Status.Code.UNIMPLEMENTED));
- }
- }
- };
+ @Rule
+ public final Timeout globalTimeout = Timeout.seconds(10);
+
+ private static final StorageGrpc.StorageImplBase fakeStorage = new StorageGrpc.StorageImplBase() {
+ @Override
+ public void readObject(
+ ReadObjectRequest request, StreamObserver responseObserver) {
+ if (request.equals(reqUncompressed)) {
+ responseObserver.onNext(respUncompressed1);
+ responseObserver.onNext(respUncompressed2);
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(TestUtils.apiException(Status.Code.UNIMPLEMENTED));
+ }
+ }
+ };
@ClassRule(order = 1)
- public static final AutoClosableFixture fakeServer =
- AutoClosableFixture.of(() -> FakeServer.of(fakeStorage));
+ public static final AutoClosableFixture fakeServer = AutoClosableFixture
+ .of(() -> FakeServer.of(fakeStorage));
@ClassRule(order = 2)
- public static final AutoClosableFixture storageClient =
- AutoClosableFixture.of(
- () -> StorageClient.create(fakeServer.getInstance().storageSettings()));
+ public static final AutoClosableFixture storageClient = AutoClosableFixture.of(
+ () -> StorageClient.create(fakeServer.getInstance().storageSettings()));
@Test
public void autoGzipDecompress_true() throws IOException {
- UnbufferedReadableByteChannelSession