Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ public void sendClose() {
if (tmp != null) {
tmp.closeSend();
}
if (pendingReconciliation != null) {
pendingReconciliation.cancel(true);
pendingReconciliation = null;
}
retryContext.reset();
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ private void flush(@NonNull List<BidiWriteObjectRequest> segments) {
responseObserver.await();
return null;
} catch (Throwable t) {
if (stream != null) {
try {
stream.onError(io.grpc.Status.CANCELLED.withCause(t).asRuntimeException());
} catch (Exception ignored) {
}
}
Comment on lines +254 to +259

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Wrapping the caught Throwable t in io.grpc.Status.CANCELLED.withCause(t).asRuntimeException() obscures the original gRPC status code (such as UNAVAILABLE, PERMISSION_DENIED, etc.) that might be present in t. This can break retry policies or error handling downstream that rely on the specific status code of the failure.

Consider passing the original exception t directly to stream.onError(t) to preserve the original error type and status code.

Suggested change
if (stream != null) {
try {
stream.onError(io.grpc.Status.CANCELLED.withCause(t).asRuntimeException());
} catch (Exception ignored) {
}
}
if (stream != null) {
try {
stream.onError(t);
} catch (Exception ignored) {
}
}

stream = null;
first = true;
t.addSuppressed(new AsyncStorageTaskException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public boolean isOpen() {

@Override
public void close() throws IOException {
if (!open) {
return;
}
open = false;
ApiStreamObserver<WriteObjectRequest> openedStream = openedStream();
if (!finished) {
WriteObjectRequest message = finishMessage();
Expand All @@ -154,7 +158,6 @@ public void close() throws IOException {
throw e;
}
}
open = false;
responseObserver.await();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,18 @@ public void getResultAlwaysReturnsTheSameFuture() {
@Test
public void closingWithoutAppending_throwNoSuchElementException() {
Executor exec = MoreExecutors.newDirectExecutorService();
//noinspection resource
AsyncAppendingQueue<String> q = AsyncAppendingQueue.of(exec, 3, AsyncAppendingQueueTest::agg);

ApiFuture<String> result = q.getResult();
NoSuchElementException nse1 = assertThrows(NoSuchElementException.class, q::close);
NoSuchElementException nse2 =
assertThrows(
NoSuchElementException.class, () -> ApiExceptions.callAndTranslateApiException(result));

assertThat(nse1).hasMessageThat().contains("Never appended to");
assertThat(nse2).hasMessageThat().contains("Never appended to");
try (AsyncAppendingQueue<String> q =
AsyncAppendingQueue.of(exec, 3, AsyncAppendingQueueTest::agg)) {
ApiFuture<String> result = q.getResult();
NoSuchElementException nse1 = assertThrows(NoSuchElementException.class, q::close);
NoSuchElementException nse2 =
assertThrows(
NoSuchElementException.class,
() -> ApiExceptions.callAndTranslateApiException(result));

assertThat(nse1).hasMessageThat().contains("Never appended to");
assertThat(nse2).hasMessageThat().contains("Never appended to");
}
}

@SuppressWarnings("resource")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ void illegalStateExceptionIfWrittenLt0_slice_eqBuffer() {
ChecksummedTestContent all = ChecksummedTestContent.gen(11);
IllegalStateException ise =
assertThrows(IllegalStateException.class, () -> c.write(all.slice(0, 4).asByteBuffer()));
ise.printStackTrace(System.out);
}

@Example
Expand All @@ -568,7 +567,6 @@ void illegalStateExceptionIfWrittenLt0_slice_gtBuffer() {
ChecksummedTestContent all = ChecksummedTestContent.gen(11);
IllegalStateException ise =
assertThrows(IllegalStateException.class, () -> c.write(all.slice(0, 5).asByteBuffer()));
ise.printStackTrace(System.out);
}

@Example
Expand All @@ -587,7 +585,6 @@ void illegalStateExceptionIfWrittenLt0_slice_ltBuffer() {
c.write(all.slice(3, 3).asByteBuffer());
fail("should have errored in previous write call");
});
ise.printStackTrace(System.out);
}

@Example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,18 @@
import java.time.Duration;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class FakeHttpServer implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeHttpServer.class);

private final URI endpoint;
private final Channel channel;
private final Runnable shutdown;
private final HttpStorageOptions httpStorageOptions;


private FakeHttpServer(
URI endpoint, Channel channel, Runnable shutdown, HttpStorageOptions httpStorageOptions) {
this.endpoint = endpoint;
Expand Down Expand Up @@ -195,7 +200,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws E

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
LOGGER.warn("Exception caught in pipeline", cause);
ctx.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ public void bidiWriteObjectRedirectedError_maxAttempts() throws Exception {
assertThrows(
IOException.class,
() -> {
AppendableUploadWriteableByteChannel channel = b.open();
ByteBuffer wrap = ByteBuffer.wrap(content.getBytes());
Buffers.emptyTo(wrap, channel);
channel.close();
try (AppendableUploadWriteableByteChannel channel = b.open()) {
ByteBuffer wrap = ByteBuffer.wrap(content.getBytes());
Buffers.emptyTo(wrap, channel);
}
});

assertAll(
Expand Down Expand Up @@ -466,7 +466,7 @@ public void testFlushMultipleSegments_failsHalfway_partialFlush() throws Excepti
GrpcStorageImpl storage =
(GrpcStorageImpl) fakeServer.getGrpcStorageOptions().toBuilder().build().getService()) {
SettableApiFuture<BidiWriteObjectResponse> done = SettableApiFuture.create();
BidiAppendableUnbufferedWritableByteChannel channel =
try (BidiAppendableUnbufferedWritableByteChannel channel =
new BidiAppendableUnbufferedWritableByteChannel(
new BidiUploadStreamingStream(
BidiUploadState.appendableNew(
Expand All @@ -490,11 +490,11 @@ public void testFlushMultipleSegments_failsHalfway_partialFlush() throws Excepti
storage.storageDataClient.retryContextProvider.create()),
smallSegmenter,
3,
0);
ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
channel.nextWriteShouldFinalize();
channel.close();
0)) {
ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
channel.nextWriteShouldFinalize();
}
assertThat(done.get(777, TimeUnit.MILLISECONDS).getResource().getSize()).isEqualTo(10);

assertThat(map.get(req1)).isEqualTo(1);
Expand Down Expand Up @@ -616,7 +616,7 @@ public void testFlushMultipleSegmentsTwice_firstSucceeds_secondFailsHalfway_part
GrpcStorageImpl storage =
(GrpcStorageImpl) fakeServer.getGrpcStorageOptions().toBuilder().build().getService()) {
SettableApiFuture<BidiWriteObjectResponse> done = SettableApiFuture.create();
BidiAppendableUnbufferedWritableByteChannel channel =
try (BidiAppendableUnbufferedWritableByteChannel channel =
new BidiAppendableUnbufferedWritableByteChannel(
new BidiUploadStreamingStream(
BidiUploadState.appendableNew(
Expand All @@ -640,13 +640,13 @@ public void testFlushMultipleSegmentsTwice_firstSucceeds_secondFailsHalfway_part
storage.storageDataClient.retryContextProvider.create()),
smallSegmenter,
3,
0);
ChecksummedTestContent content1 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
ChecksummedTestContent content2 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 10);
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content1.getBytes()), channel);
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content2.getBytes()), channel);
channel.nextWriteShouldFinalize();
channel.close();
0)) {
ChecksummedTestContent content1 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
ChecksummedTestContent content2 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 10);
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content1.getBytes()), channel);
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content2.getBytes()), channel);
channel.nextWriteShouldFinalize();
}
assertThat(done.get(777, TimeUnit.MILLISECONDS).getResource().getSize()).isEqualTo(20);

assertThat(map.get(reconnect)).isEqualTo(1);
Expand Down Expand Up @@ -792,12 +792,12 @@ public void testFlushMultipleSegments_200ResponsePartialFlushHalfway() throws Ex
storage.storageClient.bidiWriteObjectCallable(),
3,
storage.storageDataClient.retryContextProvider.create());
BidiAppendableUnbufferedWritableByteChannel channel =
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 3, 0);
ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
channel.nextWriteShouldFinalize();
channel.close();
try (BidiAppendableUnbufferedWritableByteChannel channel =
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 3, 0)) {
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
channel.nextWriteShouldFinalize();
}
assertThat(stream.getResultFuture().get(777, TimeUnit.MILLISECONDS).getResource().getSize())
.isEqualTo(10);

Expand Down Expand Up @@ -1116,11 +1116,11 @@ private static void runTestFlushMultipleSegments(FakeStorage fake) throws Except
storage.storageClient.bidiWriteObjectCallable(),
3,
storage.storageDataClient.retryContextProvider.create());
BidiAppendableUnbufferedWritableByteChannel channel =
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 32, 0);
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
channel.nextWriteShouldFinalize();
channel.close();
try (BidiAppendableUnbufferedWritableByteChannel channel =
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 32, 0)) {
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
channel.nextWriteShouldFinalize();
}
BidiWriteObjectResponse response = stream.getResultFuture().get(777, TimeUnit.MILLISECONDS);
assertThat(response.getResource().getSize()).isEqualTo(10);
assertThat(response.getResource().getChecksums().getCrc32C()).isEqualTo(content.getCrc32c());
Expand Down
Loading
Loading