From 1f0718d923446ba00ad7d4c73572f1751fd79676 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Fri, 17 Apr 2026 13:26:18 -0700 Subject: [PATCH 1/2] Fix CRT HTTP client connection leak when aborting response stream The stream manager refactoring replaced connection.shutdown() + connection.close() with stream.close(), which only releases the refcount without forcing the connection to shut down. This caused connections to leak when customers called abort() before fully consuming a GetObject response stream. Split ResponseHandlerHelper.closeStream() into two methods: - releaseConnection(): calls stream.close() to return the connection to the pool (used on successful completion) - closeConnection(): calls stream.cancel() then stream.close() to force-shutdown the connection (used on error/abort paths) Bump aws-crt version to 0.45.1 for the new stream.cancel() API. --- .../bugfix-AWSCRTHTTPClient-11244fc.json | 6 + .../internal/response/CrtResponseAdapter.java | 7 +- ...reamAdaptingHttpStreamResponseHandler.java | 10 +- .../response/ResponseHandlerHelper.java | 21 +- .../BaseHttpStreamResponseHandlerTest.java | 13 +- .../crt/internal/CrtResponseHandlerTest.java | 2 +- ...AdaptingHttpStreamResponseHandlerTest.java | 17 +- .../internal/ResponseHandlerHelperTest.java | 124 ++++++++++ pom.xml | 2 +- ...seInputStreamConnectionManagementTest.java | 222 ++++++++++++++++++ 10 files changed, 403 insertions(+), 21 deletions(-) create mode 100644 .changes/next-release/bugfix-AWSCRTHTTPClient-11244fc.json create mode 100644 http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java create mode 100644 services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/GetObjectResponseInputStreamConnectionManagementTest.java diff --git a/.changes/next-release/bugfix-AWSCRTHTTPClient-11244fc.json b/.changes/next-release/bugfix-AWSCRTHTTPClient-11244fc.json new file mode 100644 index 000000000000..5364b0157989 --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTHTTPClient-11244fc.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT HTTP Client", + "contributor": "", + "description": "Fixed a connection leak in the CRT HTTP client that occurred when aborting a response stream before fully consuming it (e.g., calling `abort()` on a `GetObject` `ResponseInputStream`)." +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java index cac60a75fea9..1beaa872b5f4 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java @@ -96,7 +96,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { writeFuture.whenComplete((result, failure) -> { if (failure != null) { failResponseHandlerAndFuture(failure); - responseHandlerHelper.closeStream(); + responseHandlerHelper.closeConnection(); return; } responseHandlerHelper.incrementWindow(bodyBytesIn.length); @@ -118,12 +118,11 @@ private void onSuccessfulResponseComplete() { responsePublisher.complete().whenComplete((result, failure) -> { if (failure != null) { failResponseHandlerAndFuture(failure); - responseHandlerHelper.closeStream(); return; } completionFuture.complete(null); }); - responseHandlerHelper.closeStream(); + responseHandlerHelper.releaseConnection(); } private void onFailedResponseComplete(HttpException error) { @@ -131,7 +130,7 @@ private void onFailedResponseComplete(HttpException error) { Throwable toThrow = wrapWithIoExceptionIfRetryable(error); responsePublisher.error(toThrow); failResponseHandlerAndFuture(toThrow); - responseHandlerHelper.closeStream(); + responseHandlerHelper.closeConnection(); } private void failResponseHandlerAndFuture(Throwable error) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java index 1bee55cdc9ce..bb04d71fdb2e 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java @@ -66,7 +66,7 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int // Propagate cancellation requestCompletionFuture.exceptionally(t -> { - responseHandlerHelper.closeStream(); + responseHandlerHelper.closeConnection(); return null; }); } @@ -76,7 +76,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { if (inputStreamSubscriber == null) { inputStreamSubscriber = AbortableInputStreamSubscriber.builder() - .doAfterClose(() -> responseHandlerHelper.closeStream()) + .doAfterClose(() -> responseHandlerHelper.closeConnection()) .build(); simplePublisher.subscribe(inputStreamSubscriber); // For response with a payload, we need to complete the future here to allow downstream to retrieve the data from @@ -97,7 +97,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future", failure); requestCompletionFuture.completeExceptionally(failure); - responseHandlerHelper.closeStream(); + responseHandlerHelper.closeConnection(); return; } responseHandlerHelper.incrementWindow(bodyBytesIn.length); @@ -120,7 +120,7 @@ private void onFailedResponseComplete(int errorCode) { Throwable toThrow = wrapWithIoExceptionIfRetryable(new HttpException(errorCode)); simplePublisher.error(toThrow); requestCompletionFuture.completeExceptionally(toThrow); - responseHandlerHelper.closeStream(); + responseHandlerHelper.closeConnection(); } private void onSuccessfulResponseComplete() { @@ -130,6 +130,6 @@ private void onSuccessfulResponseComplete() { requestCompletionFuture.complete(responseBuilder.build()); // requestCompletionFuture has been completed at this point, no need to notify the future simplePublisher.complete(); - responseHandlerHelper.closeStream(); + responseHandlerHelper.releaseConnection(); } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java index 4b2b4a1c4626..b90b43210472 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java @@ -60,7 +60,11 @@ public void incrementWindow(int windowSize) { } } - public void closeStream() { + /** + * Release the connection back to the pool so that it may be reused. This should be called when the request + * completes successfully and the response has been fully consumed. + */ + public void releaseConnection() { synchronized (streamLock) { if (!streamClosed && stream != null) { streamClosed = true; @@ -68,4 +72,19 @@ public void closeStream() { } } } + + /** + * Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the + * connection pool. This should be called on error paths or when the stream is aborted before the response is + * fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract. + */ + public void closeConnection() { + synchronized (streamLock) { + if (!streamClosed && stream != null) { + streamClosed = true; + stream.cancel(); + stream.close(); + } + } + } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java index 58d11e068c5b..9318f6af228a 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -90,14 +91,16 @@ void nonServerError_shouldCloseStream(int statusCode) { } @Test - void failedToGetResponse_shouldCloseStream() { + void failedToGetResponse_shouldCancelAndCloseStream() { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), httpHeaders); responseHandler.onResponseComplete(httpStream, 1); assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class); - verify(httpStream).close(); + InOrder inOrder = Mockito.inOrder(httpStream); + inOrder.verify(httpStream).cancel(); + inOrder.verify(httpStream).close(); } @Test @@ -115,7 +118,7 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException { } @Test - void publisherWritesFutureFails_shouldCloseStream() { + void publisherWritesFutureFails_shouldCancelAndCloseStream() { SimplePublisher simplePublisher = Mockito.mock(SimplePublisher.class); CompletableFuture future = new CompletableFuture<>(); when(simplePublisher.send(any(ByteBuffer.class))).thenReturn(future); @@ -137,7 +140,9 @@ void publisherWritesFutureFails_shouldCloseStream() { // we don't verify here because it behaves differently in async and sync } - verify(httpStream).close(); + InOrder inOrder = Mockito.inOrder(httpStream); + inOrder.verify(httpStream).cancel(); + inOrder.verify(httpStream).close(); verify(httpStream, never()).incrementWindow(anyInt()); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java index e8865f5e928b..e1caaeb021a9 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java @@ -68,7 +68,7 @@ void onResponseComplete_publisherCancelled_closesStream() { crtResponseHandler.onResponseBody(httpStream, "{}".getBytes(StandardCharsets.UTF_8)); crtResponseHandler.onResponseComplete(httpStream, 0); - assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasMessageContaining( + assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasStackTraceContaining( "subscription has been cancelled"); verify(httpStream).close(); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java index 62906295ba5c..8c786624426b 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java @@ -24,6 +24,8 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpHeaderBlock; import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; @@ -45,7 +47,7 @@ HttpStreamBaseResponseHandler responseHandlerWithMockedPublisher(SimplePublisher } @Test - void abortStream_shouldCloseStream() throws IOException { + void abortStream_shouldCancelAndCloseStream() throws IOException { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(), @@ -61,11 +63,13 @@ void abortStream_shouldCloseStream() throws IOException { abortableInputStream.read(); abortableInputStream.abort(); - verify(httpStream).close(); + InOrder inOrder = Mockito.inOrder(httpStream); + inOrder.verify(httpStream).cancel(); + inOrder.verify(httpStream).close(); } @Test - void closeStream_shouldCloseStream() throws IOException { + void closeStream_shouldCloseStreamWithoutCancel() throws IOException { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(), @@ -85,13 +89,16 @@ void closeStream_shouldCloseStream() throws IOException { } @Test - void cancelFuture_shouldCloseStream() { + void cancelFuture_shouldCancelAndCloseStream() { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), httpHeaders); requestFuture.completeExceptionally(new RuntimeException()); - verify(httpStream).close(); + + InOrder inOrder = Mockito.inOrder(httpStream); + inOrder.verify(httpStream).cancel(); + inOrder.verify(httpStream).close(); } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java new file mode 100644 index 000000000000..1714b9930ec4 --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/ResponseHandlerHelperTest.java @@ -0,0 +1,124 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.crt.http.HttpHeader; +import software.amazon.awssdk.crt.http.HttpHeaderBlock; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.http.crt.internal.response.ResponseHandlerHelper; + +@ExtendWith(MockitoExtension.class) +class ResponseHandlerHelperTest { + + @Mock + private HttpStreamBase stream; + + private ResponseHandlerHelper helper; + + @BeforeEach + void setUp() { + helper = new ResponseHandlerHelper(SdkHttpResponse.builder()); + // Register the stream via onResponseHeaders + HttpHeader[] headers = { new HttpHeader("Content-Length", "1") }; + helper.onResponseHeaders(stream, 200, HttpHeaderBlock.MAIN.getValue(), headers); + } + + @Test + void releaseConnection_shouldOnlyCallClose() { + helper.releaseConnection(); + + verify(stream, never()).cancel(); + verify(stream).close(); + } + + @Test + void closeConnection_shouldCallCancelThenClose() { + helper.closeConnection(); + + InOrder inOrder = Mockito.inOrder(stream); + inOrder.verify(stream).cancel(); + inOrder.verify(stream).close(); + } + + @Test + void releaseConnection_calledTwice_shouldOnlyCloseOnce() { + helper.releaseConnection(); + helper.releaseConnection(); + + verify(stream, Mockito.times(1)).close(); + } + + @Test + void closeConnection_calledTwice_shouldOnlyCloseOnce() { + helper.closeConnection(); + helper.closeConnection(); + + verify(stream, Mockito.times(1)).cancel(); + verify(stream, Mockito.times(1)).close(); + } + + @Test + void releaseConnection_afterCloseConnection_shouldBeNoOp() { + helper.closeConnection(); + helper.releaseConnection(); + + verify(stream, Mockito.times(1)).cancel(); + verify(stream, Mockito.times(1)).close(); + } + + @Test + void closeConnection_afterReleaseConnection_shouldBeNoOp() { + helper.releaseConnection(); + helper.closeConnection(); + + verify(stream, never()).cancel(); + verify(stream, Mockito.times(1)).close(); + } + + @Test + void incrementWindow_afterReleaseConnection_shouldBeNoOp() { + helper.releaseConnection(); + helper.incrementWindow(1024); + + verify(stream, never()).incrementWindow(1024); + } + + @Test + void incrementWindow_afterCloseConnection_shouldBeNoOp() { + helper.closeConnection(); + helper.incrementWindow(1024); + + verify(stream, never()).incrementWindow(1024); + } + + @Test + void incrementWindow_beforeClose_shouldWork() { + helper.incrementWindow(1024); + + verify(stream).incrementWindow(1024); + } +} diff --git a/pom.xml b/pom.xml index 6751394b044b..5d05e099967b 100644 --- a/pom.xml +++ b/pom.xml @@ -131,7 +131,7 @@ 3.1.5 1.17.1 1.37 - 0.44.0 + 0.45.1 5.10.3 diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/GetObjectResponseInputStreamConnectionManagementTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/GetObjectResponseInputStreamConnectionManagementTest.java new file mode 100644 index 000000000000..a049e2943cb5 --- /dev/null +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/GetObjectResponseInputStreamConnectionManagementTest.java @@ -0,0 +1,222 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.functionaltests; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.head; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import java.net.URI; +import java.time.Duration; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.utils.IoUtils; + +/** + * Verifies GetObject response stream connection management. Tests that: + * + * All tests use maxConcurrency=1 so that a leaked connection causes the next request to time out. + */ +@WireMockTest +class GetObjectResponseInputStreamConnectionManagementTest { + + private static final String BUCKET = "test-bucket"; + private static final String KEY = "test-key"; + private static final byte[] LARGE_BODY = new byte[24 * 1024 * 1024]; + private static final byte[] SMALL_BODY = "hello".getBytes(); + private static final Duration CONNECTION_ACQUIRE_TIMEOUT = Duration.ofSeconds(5); + + private static StaticCredentialsProvider credentials() { + return StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret")); + } + + private static void stubSlowGetAndHead() { + // Drip-feed the response body over 10 seconds so that abort() is called while the body is still in-flight. + // Without this, localhost is fast enough that the full body arrives before abort(), masking the leak. + stubFor(get(anyUrl()).willReturn(aResponse().withStatus(200) + .withBody(LARGE_BODY) + .withChunkedDribbleDelay(100, 10_000))); + stubFor(head(anyUrl()).willReturn(aResponse().withStatus(200))); + } + + private static void stubGetAndHead() { + stubFor(get(anyUrl()).willReturn(aResponse().withStatus(200).withBody(SMALL_BODY))); + stubFor(head(anyUrl()).willReturn(aResponse().withStatus(200))); + } + + private static String previousMaxConnections; + + @BeforeAll + static void setUpMaxConnections() { + previousMaxConnections = System.getProperty("http.maxConnections"); + System.setProperty("http.maxConnections", "1"); + } + + @AfterAll + static void restoreMaxConnections() { + if (previousMaxConnections == null) { + System.clearProperty("http.maxConnections"); + } else { + System.setProperty("http.maxConnections", previousMaxConnections); + } + } + + static Stream syncHttpClients() { + return Stream.of( + Arguments.of("Apache", + ApacheHttpClient.builder().connectionAcquisitionTimeout(CONNECTION_ACQUIRE_TIMEOUT) + .maxConnections(1).build()), + Arguments.of("UrlConnection", + UrlConnectionHttpClient.builder().build()), + Arguments.of("CrtSync", + AwsCrtHttpClient.builder().connectionAcquisitionTimeout(CONNECTION_ACQUIRE_TIMEOUT) + .maxConcurrency(1).build()) + ); + } + + static Stream asyncHttpClients() { + return Stream.of( + Arguments.of("Netty", + NettyNioAsyncHttpClient.builder().connectionAcquisitionTimeout(CONNECTION_ACQUIRE_TIMEOUT) + .maxConcurrency(1).build()), + Arguments.of("CrtAsync", + AwsCrtAsyncHttpClient.builder().connectionAcquisitionTimeout(CONNECTION_ACQUIRE_TIMEOUT) + .maxConcurrency(1).build()) + ); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("syncHttpClients") + void syncGetObject_abortBeforeFullyConsumed_doesNotLeakConnection( + String name, SdkHttpClient httpClient, WireMockRuntimeInfo wm) throws Exception { + stubSlowGetAndHead(); + + try (S3Client s3 = S3Client.builder() + .httpClient(httpClient) + .region(Region.US_EAST_1) + .endpointOverride(URI.create(wm.getHttpBaseUrl())) + .forcePathStyle(true) + .credentialsProvider(credentials()) + .build()) { + + ResponseInputStream response = s3.getObject(r -> r.bucket(BUCKET).key(KEY)); + response.read(); + response.abort(); + + HeadObjectResponse headResponse = s3.headObject(r -> r.bucket(BUCKET).key(KEY)); + assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue(); + } + } + + @ParameterizedTest(name = "{0}") + @MethodSource("asyncHttpClients") + void asyncGetObject_abortBeforeFullyConsumed_doesNotLeakConnection( + String name, SdkAsyncHttpClient httpClient, WireMockRuntimeInfo wm) throws Exception { + stubSlowGetAndHead(); + + try (S3AsyncClient s3 = S3AsyncClient.builder() + .httpClient(httpClient) + .region(Region.US_EAST_1) + .endpointOverride(URI.create(wm.getHttpBaseUrl())) + .forcePathStyle(true) + .credentialsProvider(credentials()) + .build()) { + + ResponseInputStream response = + s3.getObject(r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBlockingInputStream()).join(); + response.read(); + response.abort(); + + HeadObjectResponse headResponse = s3.headObject(r -> r.bucket(BUCKET).key(KEY)).join(); + assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue(); + } + } + + @ParameterizedTest(name = "{0}") + @MethodSource("syncHttpClients") + void syncGetObject_fullyConsumedAndClosed_connectionIsReused( + String name, SdkHttpClient httpClient, WireMockRuntimeInfo wm) throws Exception { + stubGetAndHead(); + + try (S3Client s3 = S3Client.builder() + .httpClient(httpClient) + .region(Region.US_EAST_1) + .endpointOverride(URI.create(wm.getHttpBaseUrl())) + .forcePathStyle(true) + .credentialsProvider(credentials()) + .build()) { + + ResponseInputStream response = s3.getObject(r -> r.bucket(BUCKET).key(KEY)); + IoUtils.drainInputStream(response); + response.close(); + + HeadObjectResponse headResponse = s3.headObject(r -> r.bucket(BUCKET).key(KEY)); + assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue(); + } + } + + @ParameterizedTest(name = "{0}") + @MethodSource("asyncHttpClients") + void asyncGetObject_fullyConsumedAndClosed_connectionIsReused( + String name, SdkAsyncHttpClient httpClient, WireMockRuntimeInfo wm) throws Exception { + stubGetAndHead(); + + try (S3AsyncClient s3 = S3AsyncClient.builder() + .httpClient(httpClient) + .region(Region.US_EAST_1) + .endpointOverride(URI.create(wm.getHttpBaseUrl())) + .forcePathStyle(true) + .credentialsProvider(credentials()) + .build()) { + + ResponseInputStream response = + s3.getObject(r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBlockingInputStream()).join(); + IoUtils.drainInputStream(response); + response.close(); + + HeadObjectResponse headResponse = s3.headObject(r -> r.bucket(BUCKET).key(KEY)).join(); + assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue(); + } + } +} From bae27d183bf4cf66e7fbf647822c920d59f859a0 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Fri, 17 Apr 2026 15:17:13 -0700 Subject: [PATCH 2/2] Fix broken tests --- .../awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java index fcb0b9c13490..147ca786703e 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java @@ -208,7 +208,6 @@ public void toBlockingInputStream_requestFailedMidwayDueToIoError_shouldThrowExc stubFor(get(anyUrl()) .inScenario("SucceedThenFail") .whenScenarioStateIs("first request") - .willSetStateTo("second request") .willReturn(aResponse() .withFault(Fault.RANDOM_DATA_THEN_CLOSE))); ResponseInputStream stream = s3AsyncClient.getObject(