Skip to content

Commit 7af9713

Browse files
authored
Fix CRT HTTP client connection leak when aborting response stream (#6876)
* Fix CRT HTTP client connection leak when aborting response stream The stream manager refactoring replaced connection.shutdown() + connection.close() with stream.close(), which only releases the refcount without forcing the connection to shut down. This caused connections to leak when customers called abort() before fully consuming a GetObject response stream. Split ResponseHandlerHelper.closeStream() into two methods: - releaseConnection(): calls stream.close() to return the connection to the pool (used on successful completion) - closeConnection(): calls stream.cancel() then stream.close() to force-shutdown the connection (used on error/abort paths) Bump aws-crt version to 0.45.1 for the new stream.cancel() API. * Fix broken tests
1 parent 56ce0ac commit 7af9713

File tree

11 files changed

+403
-22
lines changed

11 files changed

+403
-22
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT HTTP Client",
4+
"contributor": "",
5+
"description": "Fixed a connection leak in the CRT HTTP client that occurred when aborting a response stream before fully consuming it (e.g., calling `abort()` on a `GetObject` `ResponseInputStream`)."
6+
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
9696
writeFuture.whenComplete((result, failure) -> {
9797
if (failure != null) {
9898
failResponseHandlerAndFuture(failure);
99-
responseHandlerHelper.closeStream();
99+
responseHandlerHelper.closeConnection();
100100
return;
101101
}
102102
responseHandlerHelper.incrementWindow(bodyBytesIn.length);
@@ -118,20 +118,19 @@ private void onSuccessfulResponseComplete() {
118118
responsePublisher.complete().whenComplete((result, failure) -> {
119119
if (failure != null) {
120120
failResponseHandlerAndFuture(failure);
121-
responseHandlerHelper.closeStream();
122121
return;
123122
}
124123
completionFuture.complete(null);
125124
});
126-
responseHandlerHelper.closeStream();
125+
responseHandlerHelper.releaseConnection();
127126
}
128127

129128
private void onFailedResponseComplete(HttpException error) {
130129
log.debug(() -> "HTTP response encountered an error.", error);
131130
Throwable toThrow = wrapWithIoExceptionIfRetryable(error);
132131
responsePublisher.error(toThrow);
133132
failResponseHandlerAndFuture(toThrow);
134-
responseHandlerHelper.closeStream();
133+
responseHandlerHelper.closeConnection();
135134
}
136135

137136
private void failResponseHandlerAndFuture(Throwable error) {

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int
6666

6767
// Propagate cancellation
6868
requestCompletionFuture.exceptionally(t -> {
69-
responseHandlerHelper.closeStream();
69+
responseHandlerHelper.closeConnection();
7070
return null;
7171
});
7272
}
@@ -76,7 +76,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
7676
if (inputStreamSubscriber == null) {
7777
inputStreamSubscriber =
7878
AbortableInputStreamSubscriber.builder()
79-
.doAfterClose(() -> responseHandlerHelper.closeStream())
79+
.doAfterClose(() -> responseHandlerHelper.closeConnection())
8080
.build();
8181
simplePublisher.subscribe(inputStreamSubscriber);
8282
// For response with a payload, we need to complete the future here to allow downstream to retrieve the data from
@@ -97,7 +97,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
9797
log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future",
9898
failure);
9999
requestCompletionFuture.completeExceptionally(failure);
100-
responseHandlerHelper.closeStream();
100+
responseHandlerHelper.closeConnection();
101101
return;
102102
}
103103
responseHandlerHelper.incrementWindow(bodyBytesIn.length);
@@ -120,7 +120,7 @@ private void onFailedResponseComplete(int errorCode) {
120120
Throwable toThrow = wrapWithIoExceptionIfRetryable(new HttpException(errorCode));
121121
simplePublisher.error(toThrow);
122122
requestCompletionFuture.completeExceptionally(toThrow);
123-
responseHandlerHelper.closeStream();
123+
responseHandlerHelper.closeConnection();
124124
}
125125

126126
private void onSuccessfulResponseComplete() {
@@ -130,6 +130,6 @@ private void onSuccessfulResponseComplete() {
130130
requestCompletionFuture.complete(responseBuilder.build());
131131
// requestCompletionFuture has been completed at this point, no need to notify the future
132132
simplePublisher.complete();
133-
responseHandlerHelper.closeStream();
133+
responseHandlerHelper.releaseConnection();
134134
}
135135
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,31 @@ public void incrementWindow(int windowSize) {
6060
}
6161
}
6262

63-
public void closeStream() {
63+
/**
64+
* Release the connection back to the pool so that it may be reused. This should be called when the request
65+
* completes successfully and the response has been fully consumed.
66+
*/
67+
public void releaseConnection() {
6468
synchronized (streamLock) {
6569
if (!streamClosed && stream != null) {
6670
streamClosed = true;
6771
stream.close();
6872
}
6973
}
7074
}
75+
76+
/**
77+
* Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the
78+
* connection pool. This should be called on error paths or when the stream is aborted before the response is
79+
* fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract.
80+
*/
81+
public void closeConnection() {
82+
synchronized (streamLock) {
83+
if (!streamClosed && stream != null) {
84+
streamClosed = true;
85+
stream.cancel();
86+
stream.close();
87+
}
88+
}
89+
}
7190
}

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.junit.jupiter.api.extension.ExtendWith;
3232
import org.junit.jupiter.params.ParameterizedTest;
3333
import org.junit.jupiter.params.provider.ValueSource;
34+
import org.mockito.InOrder;
3435
import org.mockito.Mock;
3536
import org.mockito.Mockito;
3637
import org.mockito.junit.jupiter.MockitoExtension;
@@ -90,14 +91,16 @@ void nonServerError_shouldCloseStream(int statusCode) {
9091
}
9192

9293
@Test
93-
void failedToGetResponse_shouldCloseStream() {
94+
void failedToGetResponse_shouldCancelAndCloseStream() {
9495
HttpHeader[] httpHeaders = getHttpHeaders();
9596
responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
9697
httpHeaders);
9798

9899
responseHandler.onResponseComplete(httpStream, 1);
99100
assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class);
100-
verify(httpStream).close();
101+
InOrder inOrder = Mockito.inOrder(httpStream);
102+
inOrder.verify(httpStream).cancel();
103+
inOrder.verify(httpStream).close();
101104
}
102105

103106
@Test
@@ -115,7 +118,7 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException {
115118
}
116119

117120
@Test
118-
void publisherWritesFutureFails_shouldCloseStream() {
121+
void publisherWritesFutureFails_shouldCancelAndCloseStream() {
119122
SimplePublisher<ByteBuffer> simplePublisher = Mockito.mock(SimplePublisher.class);
120123
CompletableFuture<Void> future = new CompletableFuture<>();
121124
when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future);
@@ -137,7 +140,9 @@ void publisherWritesFutureFails_shouldCloseStream() {
137140
// we don't verify here because it behaves differently in async and sync
138141
}
139142

140-
verify(httpStream).close();
143+
InOrder inOrder = Mockito.inOrder(httpStream);
144+
inOrder.verify(httpStream).cancel();
145+
inOrder.verify(httpStream).close();
141146
verify(httpStream, never()).incrementWindow(anyInt());
142147
}
143148

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ void onResponseComplete_publisherCancelled_closesStream() {
6868
crtResponseHandler.onResponseBody(httpStream, "{}".getBytes(StandardCharsets.UTF_8));
6969

7070
crtResponseHandler.onResponseComplete(httpStream, 0);
71-
assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasMessageContaining(
71+
assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasStackTraceContaining(
7272
"subscription has been cancelled");
7373
verify(httpStream).close();
7474
}

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.concurrent.CompletableFuture;
2525
import org.apache.commons.lang3.RandomStringUtils;
2626
import org.junit.jupiter.api.Test;
27+
import org.mockito.InOrder;
28+
import org.mockito.Mockito;
2729
import software.amazon.awssdk.crt.http.HttpHeader;
2830
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
2931
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
@@ -45,7 +47,7 @@ HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher
4547
}
4648

4749
@Test
48-
void abortStream_shouldCloseStream() throws IOException {
50+
void abortStream_shouldCancelAndCloseStream() throws IOException {
4951
HttpHeader[] httpHeaders = getHttpHeaders();
5052

5153
responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(),
@@ -61,11 +63,13 @@ void abortStream_shouldCloseStream() throws IOException {
6163
abortableInputStream.read();
6264
abortableInputStream.abort();
6365

64-
verify(httpStream).close();
66+
InOrder inOrder = Mockito.inOrder(httpStream);
67+
inOrder.verify(httpStream).cancel();
68+
inOrder.verify(httpStream).close();
6569
}
6670

6771
@Test
68-
void closeStream_shouldCloseStream() throws IOException {
72+
void closeStream_shouldCloseStreamWithoutCancel() throws IOException {
6973
HttpHeader[] httpHeaders = getHttpHeaders();
7074

7175
responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(),
@@ -85,13 +89,16 @@ void closeStream_shouldCloseStream() throws IOException {
8589
}
8690

8791
@Test
88-
void cancelFuture_shouldCloseStream() {
92+
void cancelFuture_shouldCancelAndCloseStream() {
8993
HttpHeader[] httpHeaders = getHttpHeaders();
9094

9195
responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
9296
httpHeaders);
9397

9498
requestFuture.completeExceptionally(new RuntimeException());
95-
verify(httpStream).close();
99+
100+
InOrder inOrder = Mockito.inOrder(httpStream);
101+
inOrder.verify(httpStream).cancel();
102+
inOrder.verify(httpStream).close();
96103
}
97104
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt.internal;
17+
18+
import static org.mockito.Mockito.never;
19+
import static org.mockito.Mockito.verify;
20+
21+
import org.junit.jupiter.api.BeforeEach;
22+
import org.junit.jupiter.api.Test;
23+
import org.junit.jupiter.api.extension.ExtendWith;
24+
import org.mockito.InOrder;
25+
import org.mockito.Mock;
26+
import org.mockito.Mockito;
27+
import org.mockito.junit.jupiter.MockitoExtension;
28+
import software.amazon.awssdk.crt.http.HttpHeader;
29+
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
30+
import software.amazon.awssdk.crt.http.HttpStreamBase;
31+
import software.amazon.awssdk.http.SdkHttpResponse;
32+
import software.amazon.awssdk.http.crt.internal.response.ResponseHandlerHelper;
33+
34+
@ExtendWith(MockitoExtension.class)
35+
class ResponseHandlerHelperTest {
36+
37+
@Mock
38+
private HttpStreamBase stream;
39+
40+
private ResponseHandlerHelper helper;
41+
42+
@BeforeEach
43+
void setUp() {
44+
helper = new ResponseHandlerHelper(SdkHttpResponse.builder());
45+
// Register the stream via onResponseHeaders
46+
HttpHeader[] headers = { new HttpHeader("Content-Length", "1") };
47+
helper.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.getValue(), headers);
48+
}
49+
50+
@Test
51+
void releaseConnection_shouldOnlyCallClose() {
52+
helper.releaseConnection();
53+
54+
verify(stream, never()).cancel();
55+
verify(stream).close();
56+
}
57+
58+
@Test
59+
void closeConnection_shouldCallCancelThenClose() {
60+
helper.closeConnection();
61+
62+
InOrder inOrder = Mockito.inOrder(stream);
63+
inOrder.verify(stream).cancel();
64+
inOrder.verify(stream).close();
65+
}
66+
67+
@Test
68+
void releaseConnection_calledTwice_shouldOnlyCloseOnce() {
69+
helper.releaseConnection();
70+
helper.releaseConnection();
71+
72+
verify(stream, Mockito.times(1)).close();
73+
}
74+
75+
@Test
76+
void closeConnection_calledTwice_shouldOnlyCloseOnce() {
77+
helper.closeConnection();
78+
helper.closeConnection();
79+
80+
verify(stream, Mockito.times(1)).cancel();
81+
verify(stream, Mockito.times(1)).close();
82+
}
83+
84+
@Test
85+
void releaseConnection_afterCloseConnection_shouldBeNoOp() {
86+
helper.closeConnection();
87+
helper.releaseConnection();
88+
89+
verify(stream, Mockito.times(1)).cancel();
90+
verify(stream, Mockito.times(1)).close();
91+
}
92+
93+
@Test
94+
void closeConnection_afterReleaseConnection_shouldBeNoOp() {
95+
helper.releaseConnection();
96+
helper.closeConnection();
97+
98+
verify(stream, never()).cancel();
99+
verify(stream, Mockito.times(1)).close();
100+
}
101+
102+
@Test
103+
void incrementWindow_afterReleaseConnection_shouldBeNoOp() {
104+
helper.releaseConnection();
105+
helper.incrementWindow(1024);
106+
107+
verify(stream, never()).incrementWindow(1024);
108+
}
109+
110+
@Test
111+
void incrementWindow_afterCloseConnection_shouldBeNoOp() {
112+
helper.closeConnection();
113+
helper.incrementWindow(1024);
114+
115+
verify(stream, never()).incrementWindow(1024);
116+
}
117+
118+
@Test
119+
void incrementWindow_beforeClose_shouldWork() {
120+
helper.incrementWindow(1024);
121+
122+
verify(stream).incrementWindow(1024);
123+
}
124+
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@
131131
<rxjava3.version>3.1.5</rxjava3.version>
132132
<commons-codec.verion>1.17.1</commons-codec.verion>
133133
<jmh.version>1.37</jmh.version>
134-
<awscrt.version>0.44.0</awscrt.version>
134+
<awscrt.version>0.45.1</awscrt.version>
135135

136136
<!--Test dependencies -->
137137
<junit5.version>5.10.3</junit5.version>

0 commit comments

Comments
 (0)