Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google-cloud-jar-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@
<configuration>
<forkCount>1C</forkCount>
<reuseForks>true</reuseForks>
<forkedProcessTimeoutInSeconds>1200</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
<plugin>
Expand Down
18 changes: 18 additions & 0 deletions java-storage/google-cloud-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,24 @@
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<forkCount>1C</forkCount>
<reuseForks>true</reuseForks>
<forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
<enableProcessChecker>all</enableProcessChecker>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,16 @@ public void eof() throws IOException {
@Override
public ByteArrayAccumulatingRead withNewReadId(long newReadId) {
this.tombstoned = true;
List<ChildRef> newChildRefs;
synchronized (childRefs) {
newChildRefs = java.util.Collections.synchronizedList(new java.util.ArrayList<>(childRefs));
childRefs.clear();
}
return new ByteArrayAccumulatingRead(
newReadId,
rangeSpec,
hasher,
childRefs,
newChildRefs,
retryContext,
readOffset,
closed,
Expand Down Expand Up @@ -635,11 +640,16 @@ public void eof() throws IOException {
@Override
public ZeroCopyByteStringAccumulatingRead withNewReadId(long newReadId) {
this.tombstoned = true;
List<ChildRef> newChildRefs;
synchronized (childRefs) {
newChildRefs = java.util.Collections.synchronizedList(new java.util.ArrayList<>(childRefs));
childRefs.clear();
}
return new ZeroCopyByteStringAccumulatingRead(
newReadId,
rangeSpec,
hasher,
childRefs,
newChildRefs,
readOffset,
retryContext,
closed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public boolean shouldRetry(
};
// The reasoning for 2 elements below allow for a single response and the EOF/error signal
// from onComplete or onError. Same thing com.google.api.gax.rpc.QueuingResponseObserver does.
this.queue = new SimpleBlockingQueue<>(2);
this.queue = new SimpleBlockingQueue<>(2, () -> open);
}

@Override
Expand Down Expand Up @@ -200,6 +200,9 @@ public boolean isOpen() {
@Override
public void close() throws IOException {
open = false;
if (!result.isDone()) {
result.set(Object.getDefaultInstance());
}
try {
if (leftovers != null) {
leftovers.close();
Expand Down Expand Up @@ -365,7 +368,10 @@ protected void onResponseImpl(ReadObjectResponse response) {
return;
}
}
queue.offer(ReadObjectResponseChildRef.from(handle));
ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle);
if (!queue.offer(ref)) {
ref.close();
}
fetchOffset.addAndGet(contentSize);
if (response.hasMetadata() && !result.isDone()) {
result.set(response.getMetadata());
Expand All @@ -380,6 +386,15 @@ protected void onResponseImpl(ReadObjectResponse response) {

@Override
protected void onErrorImpl(Throwable t) {
if (t instanceof OutOfRangeException) {
if (!result.isDone()) {
result.set(Object.getDefaultInstance());
}
} else {
if (!result.isDone()) {
result.setException(t);
}
}
if (t instanceof OutOfRangeException) {
try {
queue.offer(EOF_MARKER);
Expand All @@ -394,17 +409,21 @@ protected void onErrorImpl(Throwable t) {
}
if (!open.isDone()) {
open.setException(t);
}
try {
queue.offer(t);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Code.ABORTED.toStatus().withCause(e).asRuntimeException();
} else {
try {
queue.offer(t);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Code.ABORTED.toStatus().withCause(e).asRuntimeException();
}
}
}

@Override
protected void onCompleteImpl() {
if (!result.isDone()) {
result.set(Object.getDefaultInstance());
}
try {
cancellation.set(null);
queue.offer(EOF_MARKER);
Expand All @@ -422,9 +441,11 @@ protected void onCompleteImpl() {
static final class SimpleBlockingQueue<T> {

private final ArrayBlockingQueue<T> queue;
private final java.util.function.BooleanSupplier isOpen;

SimpleBlockingQueue(int poolMaxSize) {
SimpleBlockingQueue(int poolMaxSize, java.util.function.BooleanSupplier isOpen) {
this.queue = new ArrayBlockingQueue<>(poolMaxSize);
this.isOpen = isOpen;
}

public boolean nonEmpty() {
Expand All @@ -441,8 +462,13 @@ public T poll() throws InterruptedException {
return queue.take();
}

public void offer(@NonNull T element) throws InterruptedException {
queue.put(element);
public boolean offer(@NonNull T element) throws InterruptedException {
while (isOpen.getAsBoolean()) {
if (queue.offer(element, 100, TimeUnit.MILLISECONDS)) {
return true;
}
}
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,7 @@ static final class ZeroCopyReadinessChecker {
}

public static boolean isReady() {
return isZeroCopyReady;
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,15 @@ void restart() {
Preconditions.checkState(
requestStream == null, "attempting to restart stream when stream is already active");

OpenArguments openArguments = state.getOpenArguments();
BidiReadObjectRequest req = openArguments.getReq();
if (!req.getReadRangesList().isEmpty() || !objectReadSessionResolveFuture.isDone()) {
ClientStream<BidiReadObjectRequest> requestStream1 = getRequestStream(openArguments.getCtx());
requestStream1.send(req);
try {
OpenArguments openArguments = state.getOpenArguments();
BidiReadObjectRequest req = openArguments.getReq();
if (!req.getReadRangesList().isEmpty() || !objectReadSessionResolveFuture.isDone()) {
ClientStream<BidiReadObjectRequest> requestStream1 = getRequestStream(openArguments.getCtx());
requestStream1.send(req);
}
} catch (Throwable t) {
failAll(t);
}
}

Expand Down
Loading
Loading