Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTHTTPClient-ab4c77b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT HTTP Client",
"contributor": "",
"description": "Fix connection pool leak in AwsCrtHttpClient when threads are externally interrupted."
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ public HttpExecuteResponse call() throws IOException {
return builder.build();
} catch (CompletionException e) {
Throwable cause = e.getCause();

// Complete the future exceptionally to trigger connection cleanup in the response handler.
// Handles thread-interrupt case where joinInterruptibly throws due to
// InterruptedException. Without this, the
// Ensures that closeConnection() is invoked to prevent leaking the connection from the pool.
if (responseFuture != null) {
responseFuture.completeExceptionally(cause != null ? cause : e);
}

if (cause instanceof IOException) {
throw (IOException) cause;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,25 @@
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static org.assertj.core.api.Assertions.assertThat;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.common.FileSource;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.extension.Parameters;
import com.github.tomakehurst.wiremock.extension.ResponseDefinitionTransformer;
import com.github.tomakehurst.wiremock.http.Request;
import com.github.tomakehurst.wiremock.http.ResponseDefinition;
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -219,4 +231,112 @@ void asyncGetObject_fullyConsumedAndClosed_connectionIsReused(
assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue();
}
}

/**
* Verifies that when a thread is externally interrupted while blocked on a CRT sync HTTP request,
* the connection is properly cleaned up and returned to the pool.
*/
@Test
@Timeout(30)
void syncCrtGetObject_threadInterruptDuringRequest_doesNotLeakConnection() throws Exception {
CountDownLatch requestReceived = new CountDownLatch(1);
CountDownLatch releaseResponse = new CountDownLatch(1);

// Transformer that blocks the GET response until signaled, giving us a deterministic
// window to interrupt the request thread while it's blocked on future.get().
ResponseDefinitionTransformer blockingTransformer = new ResponseDefinitionTransformer() {
@Override
public ResponseDefinition transform(Request request, ResponseDefinition responseDefinition,
FileSource files, Parameters parameters) {
if ("GET".equals(request.getMethod().getName())) {
requestReceived.countDown();
try {
releaseResponse.await(25, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return responseDefinition;
}

@Override
public String getName() {
return "blocking-get-transformer";
}

@Override
public boolean applyGlobally() {
return true;
}
};

WireMockServer server = new WireMockServer(
WireMockConfiguration.options().dynamicPort().extensions(blockingTransformer));
server.start();

try {
// Use a large body so the response is still being streamed when the connection is reused
server.stubFor(com.github.tomakehurst.wiremock.client.WireMock.get(anyUrl())
.willReturn(aResponse().withStatus(200).withBody(LARGE_BODY)));
server.stubFor(com.github.tomakehurst.wiremock.client.WireMock.head(anyUrl())
.willReturn(aResponse().withStatus(200)));

SdkHttpClient httpClient = AwsCrtHttpClient.builder()
.connectionAcquisitionTimeout(CONNECTION_ACQUIRE_TIMEOUT)
.maxConcurrency(1)
.build();

try (S3Client s3 = S3Client.builder()
.httpClient(httpClient)
.region(Region.US_EAST_1)
.endpointOverride(URI.create("http://localhost:" + server.port()))
.forcePathStyle(true)
.credentialsProvider(credentials())
.overrideConfiguration(c -> c
.apiCallTimeout(Duration.ofSeconds(60))
.apiCallAttemptTimeout(Duration.ofSeconds(59)))
.build()) {

AtomicReference<Throwable> threadException = new AtomicReference<>();

Thread requestThread = new Thread(() -> {
try {
s3.getObject(r -> r.bucket(BUCKET).key(KEY));
} catch (Throwable e) {
threadException.set(e);
}
});

requestThread.start();

// Wait until the server has received the GET — at this point the request thread
// is guaranteed to be blocked on future.get() since the response is held.
assertThat(requestReceived.await(10, TimeUnit.SECONDS))
.as("Server should have received the GET request")
.isTrue();

// Interrupt the blocked thread — this is the key action under test
requestThread.interrupt();

// Wait for the request thread to finish processing the interrupt
requestThread.join(10_000);
assertThat(requestThread.isAlive()).isFalse();
assertThat(threadException.get()).isNotNull();

// Unblock the transformer so the server sends the response.
releaseResponse.countDown();

// Brief wait for the CRT native layer to process the connection closure
Thread.sleep(500);

// Verify the pool recovered — if the connection leaked or is in a dirty state,
// this times out at the connectionAcquisitionTimeout (5s).
HeadObjectResponse headResponse = s3.headObject(r -> r.bucket(BUCKET).key(KEY));
assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue();
}
} finally {
releaseResponse.countDown();
server.stop();
}
}
}
Loading