Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTHTTPClient-11244fc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT HTTP Client",
"contributor": "",
"description": "Fixed a connection leak in the CRT HTTP client that occurred when aborting a response stream before fully consuming it (e.g., calling `abort()` on a `GetObject` `ResponseInputStream`)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
writeFuture.whenComplete((result, failure) -> {
if (failure != null) {
failResponseHandlerAndFuture(failure);
responseHandlerHelper.closeStream();
responseHandlerHelper.closeConnection();
return;
}
responseHandlerHelper.incrementWindow(bodyBytesIn.length);
Expand All @@ -118,20 +118,19 @@ private void onSuccessfulResponseComplete() {
responsePublisher.complete().whenComplete((result, failure) -> {
if (failure != null) {
failResponseHandlerAndFuture(failure);
responseHandlerHelper.closeStream();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this because it's a no-op because releaseConnection is invoked at line 125 anyway

return;
}
completionFuture.complete(null);
});
responseHandlerHelper.closeStream();
responseHandlerHelper.releaseConnection();
}

private void onFailedResponseComplete(HttpException error) {
log.debug(() -> "HTTP response encountered an error.", error);
Throwable toThrow = wrapWithIoExceptionIfRetryable(error);
responsePublisher.error(toThrow);
failResponseHandlerAndFuture(toThrow);
responseHandlerHelper.closeStream();
responseHandlerHelper.closeConnection();
}

private void failResponseHandlerAndFuture(Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

// Propagate cancellation
requestCompletionFuture.exceptionally(t -> {
responseHandlerHelper.closeStream();
responseHandlerHelper.closeConnection();
return null;
});
}
Expand All @@ -76,7 +76,7 @@
if (inputStreamSubscriber == null) {
inputStreamSubscriber =
AbortableInputStreamSubscriber.builder()
.doAfterClose(() -> responseHandlerHelper.closeStream())
.doAfterClose(() -> responseHandlerHelper.closeConnection())

Check warning on line 79 in http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this lambda with method reference 'responseHandlerHelper::closeConnection'.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ2dsUt6qQhEccuQ1aai&open=AZ2dsUt6qQhEccuQ1aai&pullRequest=6876
Comment thread
alextwoods marked this conversation as resolved.
.build();
simplePublisher.subscribe(inputStreamSubscriber);
// For response with a payload, we need to complete the future here to allow downstream to retrieve the data from
Expand All @@ -97,7 +97,7 @@
log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future",
failure);
requestCompletionFuture.completeExceptionally(failure);
responseHandlerHelper.closeStream();
responseHandlerHelper.closeConnection();
return;
}
responseHandlerHelper.incrementWindow(bodyBytesIn.length);
Expand All @@ -120,7 +120,7 @@
Throwable toThrow = wrapWithIoExceptionIfRetryable(new HttpException(errorCode));
simplePublisher.error(toThrow);
requestCompletionFuture.completeExceptionally(toThrow);
responseHandlerHelper.closeStream();
responseHandlerHelper.closeConnection();
}

private void onSuccessfulResponseComplete() {
Expand All @@ -130,6 +130,6 @@
requestCompletionFuture.complete(responseBuilder.build());
// requestCompletionFuture has been completed at this point, no need to notify the future
simplePublisher.complete();
responseHandlerHelper.closeStream();
responseHandlerHelper.releaseConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,31 @@ public void incrementWindow(int windowSize) {
}
}

public void closeStream() {
/**
* Release the connection back to the pool so that it may be reused. This should be called when the request
* completes successfully and the response has been fully consumed.
*/
public void releaseConnection() {
synchronized (streamLock) {
if (!streamClosed && stream != null) {
streamClosed = true;
stream.close();
}
}
}

/**
* Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the
* connection pool. This should be called on error paths or when the stream is aborted before the response is
* fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract.
*/
public void closeConnection() {
synchronized (streamLock) {
if (!streamClosed && stream != null) {
streamClosed = true;
stream.cancel();
stream.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down Expand Up @@ -90,14 +91,16 @@ void nonServerError_shouldCloseStream(int statusCode) {
}

@Test
void failedToGetResponse_shouldCloseStream() {
void failedToGetResponse_shouldCancelAndCloseStream() {
HttpHeader[] httpHeaders = getHttpHeaders();
responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
httpHeaders);

responseHandler.onResponseComplete(httpStream, 1);
assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class);
verify(httpStream).close();
InOrder inOrder = Mockito.inOrder(httpStream);
inOrder.verify(httpStream).cancel();
inOrder.verify(httpStream).close();
}

@Test
Expand All @@ -115,7 +118,7 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException {
}

@Test
void publisherWritesFutureFails_shouldCloseStream() {
void publisherWritesFutureFails_shouldCancelAndCloseStream() {
SimplePublisher<ByteBuffer> simplePublisher = Mockito.mock(SimplePublisher.class);
CompletableFuture<Void> future = new CompletableFuture<>();
when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future);
Expand All @@ -137,7 +140,9 @@ void publisherWritesFutureFails_shouldCloseStream() {
// we don't verify here because it behaves differently in async and sync
}

verify(httpStream).close();
InOrder inOrder = Mockito.inOrder(httpStream);
inOrder.verify(httpStream).cancel();
inOrder.verify(httpStream).close();
verify(httpStream, never()).incrementWindow(anyInt());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void onResponseComplete_publisherCancelled_closesStream() {
crtResponseHandler.onResponseBody(httpStream, "{}".getBytes(StandardCharsets.UTF_8));

crtResponseHandler.onResponseComplete(httpStream, 0);
assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasMessageContaining(
assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasStackTraceContaining(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to make the build pass in Java 25. In Java 25, the error message is just "join".

java.util.concurrent.CancellationException: join

	at java.base/java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:454)
	at java.base/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2139)
	at software.amazon.awssdk.http.crt.internal.CrtResponseHandlerTest.onResponseComplete_publisherCancelled_closesStream(CrtResponseHandlerTest.java:71)
	at java.base/java.lang.reflect.Method.invoke(Method.java:565)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1604)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1604)
Caused by: java.util.concurrent.CancellationException: subscription has been cancelled.

"subscription has been cancelled");
verify(httpStream).close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
Expand All @@ -45,7 +47,7 @@ HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher
}

@Test
void abortStream_shouldCloseStream() throws IOException {
void abortStream_shouldCancelAndCloseStream() throws IOException {
HttpHeader[] httpHeaders = getHttpHeaders();

responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(),
Expand All @@ -61,11 +63,13 @@ void abortStream_shouldCloseStream() throws IOException {
abortableInputStream.read();
abortableInputStream.abort();

verify(httpStream).close();
InOrder inOrder = Mockito.inOrder(httpStream);
inOrder.verify(httpStream).cancel();
inOrder.verify(httpStream).close();
}

@Test
void closeStream_shouldCloseStream() throws IOException {
void closeStream_shouldCloseStreamWithoutCancel() throws IOException {
HttpHeader[] httpHeaders = getHttpHeaders();

responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(),
Expand All @@ -85,13 +89,16 @@ void closeStream_shouldCloseStream() throws IOException {
}

@Test
void cancelFuture_shouldCloseStream() {
void cancelFuture_shouldCancelAndCloseStream() {
HttpHeader[] httpHeaders = getHttpHeaders();

responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
httpHeaders);

requestFuture.completeExceptionally(new RuntimeException());
verify(httpStream).close();

InOrder inOrder = Mockito.inOrder(httpStream);
inOrder.verify(httpStream).cancel();
inOrder.verify(httpStream).close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt.internal;

import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.crt.internal.response.ResponseHandlerHelper;

@ExtendWith(MockitoExtension.class)
class ResponseHandlerHelperTest {

@Mock
private HttpStreamBase stream;

private ResponseHandlerHelper helper;

@BeforeEach
void setUp() {
helper = new ResponseHandlerHelper(SdkHttpResponse.builder());
// Register the stream via onResponseHeaders
HttpHeader[] headers = { new HttpHeader("Content-Length", "1") };
helper.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.getValue(), headers);
}

@Test
void releaseConnection_shouldOnlyCallClose() {
helper.releaseConnection();

verify(stream, never()).cancel();
verify(stream).close();
}

@Test
void closeConnection_shouldCallCancelThenClose() {
helper.closeConnection();

InOrder inOrder = Mockito.inOrder(stream);
inOrder.verify(stream).cancel();
inOrder.verify(stream).close();
}

@Test
void releaseConnection_calledTwice_shouldOnlyCloseOnce() {
helper.releaseConnection();
helper.releaseConnection();

verify(stream, Mockito.times(1)).close();
}

@Test
void closeConnection_calledTwice_shouldOnlyCloseOnce() {
helper.closeConnection();
helper.closeConnection();

verify(stream, Mockito.times(1)).cancel();
verify(stream, Mockito.times(1)).close();
}

@Test
void releaseConnection_afterCloseConnection_shouldBeNoOp() {
helper.closeConnection();
helper.releaseConnection();

verify(stream, Mockito.times(1)).cancel();
verify(stream, Mockito.times(1)).close();
}

@Test
void closeConnection_afterReleaseConnection_shouldBeNoOp() {
helper.releaseConnection();
helper.closeConnection();

verify(stream, never()).cancel();
verify(stream, Mockito.times(1)).close();
}

@Test
void incrementWindow_afterReleaseConnection_shouldBeNoOp() {
helper.releaseConnection();
helper.incrementWindow(1024);

verify(stream, never()).incrementWindow(1024);
}

@Test
void incrementWindow_afterCloseConnection_shouldBeNoOp() {
helper.closeConnection();
helper.incrementWindow(1024);

verify(stream, never()).incrementWindow(1024);
}

@Test
void incrementWindow_beforeClose_shouldWork() {
helper.incrementWindow(1024);

verify(stream).incrementWindow(1024);
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<rxjava3.version>3.1.5</rxjava3.version>
<commons-codec.verion>1.17.1</commons-codec.verion>
<jmh.version>1.37</jmh.version>
<awscrt.version>0.44.0</awscrt.version>
<awscrt.version>0.45.1</awscrt.version>

<!--Test dependencies -->
<junit5.version>5.10.3</junit5.version>
Expand Down
Loading
Loading