diff --git a/.changes/next-release/bugfix-AWSCRTHTTPClient-ab4c77b.json b/.changes/next-release/bugfix-AWSCRTHTTPClient-ab4c77b.json new file mode 100644 index 00000000000..8823e62ed3f --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTHTTPClient-ab4c77b.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT HTTP Client", + "contributor": "", + "description": "Fix connection pool leak in AwsCrtHttpClient when threads are externally interrupted." +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java index 5aebc6f24fd..9c6d769e48f 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java @@ -126,6 +126,15 @@ public HttpExecuteResponse call() throws IOException { return builder.build(); } catch (CompletionException e) { Throwable cause = e.getCause(); + + // Complete the future exceptionally to trigger connection cleanup in the response handler. + // Handles thread-interrupt case where joinInterruptibly throws due to + // InterruptedException. Without this, the + // Ensures that closeConnection() is invoked to prevent leaking the connection from the pool. + if (responseFuture != null) { + responseFuture.completeExceptionally(cause != null ? cause : e); + } + if (cause instanceof IOException) { throw (IOException) cause; } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/GetObjectResponseInputStreamConnectionManagementTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/GetObjectResponseInputStreamConnectionManagementTest.java index a049e2943cb..b9d679bb81c 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/GetObjectResponseInputStreamConnectionManagementTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/GetObjectResponseInputStreamConnectionManagementTest.java @@ -22,13 +22,25 @@ import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static org.assertj.core.api.Assertions.assertThat; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.common.FileSource; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.extension.Parameters; +import com.github.tomakehurst.wiremock.extension.ResponseDefinitionTransformer; +import com.github.tomakehurst.wiremock.http.Request; +import com.github.tomakehurst.wiremock.http.ResponseDefinition; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import java.net.URI; import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -219,4 +231,112 @@ void asyncGetObject_fullyConsumedAndClosed_connectionIsReused( assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue(); } } + + /** + * Verifies that when a thread is externally interrupted while blocked on a CRT sync HTTP request, + * the connection is properly cleaned up and returned to the pool. + */ + @Test + @Timeout(30) + void syncCrtGetObject_threadInterruptDuringRequest_doesNotLeakConnection() throws Exception { + CountDownLatch requestReceived = new CountDownLatch(1); + CountDownLatch releaseResponse = new CountDownLatch(1); + + // Transformer that blocks the GET response until signaled, giving us a deterministic + // window to interrupt the request thread while it's blocked on future.get(). + ResponseDefinitionTransformer blockingTransformer = new ResponseDefinitionTransformer() { + @Override + public ResponseDefinition transform(Request request, ResponseDefinition responseDefinition, + FileSource files, Parameters parameters) { + if ("GET".equals(request.getMethod().getName())) { + requestReceived.countDown(); + try { + releaseResponse.await(25, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return responseDefinition; + } + + @Override + public String getName() { + return "blocking-get-transformer"; + } + + @Override + public boolean applyGlobally() { + return true; + } + }; + + WireMockServer server = new WireMockServer( + WireMockConfiguration.options().dynamicPort().extensions(blockingTransformer)); + server.start(); + + try { + // Use a large body so the response is still being streamed when the connection is reused + server.stubFor(com.github.tomakehurst.wiremock.client.WireMock.get(anyUrl()) + .willReturn(aResponse().withStatus(200).withBody(LARGE_BODY))); + server.stubFor(com.github.tomakehurst.wiremock.client.WireMock.head(anyUrl()) + .willReturn(aResponse().withStatus(200))); + + SdkHttpClient httpClient = AwsCrtHttpClient.builder() + .connectionAcquisitionTimeout(CONNECTION_ACQUIRE_TIMEOUT) + .maxConcurrency(1) + .build(); + + try (S3Client s3 = S3Client.builder() + .httpClient(httpClient) + .region(Region.US_EAST_1) + .endpointOverride(URI.create("http://localhost:" + server.port())) + .forcePathStyle(true) + .credentialsProvider(credentials()) + .overrideConfiguration(c -> c + .apiCallTimeout(Duration.ofSeconds(60)) + .apiCallAttemptTimeout(Duration.ofSeconds(59))) + .build()) { + + AtomicReference threadException = new AtomicReference<>(); + + Thread requestThread = new Thread(() -> { + try { + s3.getObject(r -> r.bucket(BUCKET).key(KEY)); + } catch (Throwable e) { + threadException.set(e); + } + }); + + requestThread.start(); + + // Wait until the server has received the GET — at this point the request thread + // is guaranteed to be blocked on future.get() since the response is held. + assertThat(requestReceived.await(10, TimeUnit.SECONDS)) + .as("Server should have received the GET request") + .isTrue(); + + // Interrupt the blocked thread — this is the key action under test + requestThread.interrupt(); + + // Wait for the request thread to finish processing the interrupt + requestThread.join(10_000); + assertThat(requestThread.isAlive()).isFalse(); + assertThat(threadException.get()).isNotNull(); + + // Unblock the transformer so the server sends the response. + releaseResponse.countDown(); + + // Brief wait for the CRT native layer to process the connection closure + Thread.sleep(500); + + // Verify the pool recovered — if the connection leaked or is in a dirty state, + // this times out at the connectionAcquisitionTimeout (5s). + HeadObjectResponse headResponse = s3.headObject(r -> r.bucket(BUCKET).key(KEY)); + assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue(); + } + } finally { + releaseResponse.countDown(); + server.stop(); + } + } }