Skip to content

Commit 57dd539

Browse files
authored
Fix connection pool leak in AwsCrtHttpClient when threads are externally interrupted. (#6896)
1 parent 5f8076b commit 57dd539

3 files changed

Lines changed: 135 additions & 0 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT HTTP Client",
4+
"contributor": "",
5+
"description": "Fix connection pool leak in AwsCrtHttpClient when threads are externally interrupted."
6+
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,15 @@ public HttpExecuteResponse call() throws IOException {
126126
return builder.build();
127127
} catch (CompletionException e) {
128128
Throwable cause = e.getCause();
129+
130+
// Complete the future exceptionally to trigger connection cleanup in the response handler.
131+
// Handles thread-interrupt case where joinInterruptibly throws due to
132+
// InterruptedException. Without this, the
133+
// Ensures that closeConnection() is invoked to prevent leaking the connection from the pool.
134+
if (responseFuture != null) {
135+
responseFuture.completeExceptionally(cause != null ? cause : e);
136+
}
137+
129138
if (cause instanceof IOException) {
130139
throw (IOException) cause;
131140
}

services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/GetObjectResponseInputStreamConnectionManagementTest.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,25 @@
2222
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
2323
import static org.assertj.core.api.Assertions.assertThat;
2424

25+
import com.github.tomakehurst.wiremock.WireMockServer;
26+
import com.github.tomakehurst.wiremock.common.FileSource;
27+
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
28+
import com.github.tomakehurst.wiremock.extension.Parameters;
29+
import com.github.tomakehurst.wiremock.extension.ResponseDefinitionTransformer;
30+
import com.github.tomakehurst.wiremock.http.Request;
31+
import com.github.tomakehurst.wiremock.http.ResponseDefinition;
2532
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
2633
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
2734
import java.net.URI;
2835
import java.time.Duration;
36+
import java.util.concurrent.CountDownLatch;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.atomic.AtomicReference;
2939
import java.util.stream.Stream;
3040
import org.junit.jupiter.api.AfterAll;
3141
import org.junit.jupiter.api.BeforeAll;
42+
import org.junit.jupiter.api.Test;
43+
import org.junit.jupiter.api.Timeout;
3244
import org.junit.jupiter.params.ParameterizedTest;
3345
import org.junit.jupiter.params.provider.Arguments;
3446
import org.junit.jupiter.params.provider.MethodSource;
@@ -219,4 +231,112 @@ void asyncGetObject_fullyConsumedAndClosed_connectionIsReused(
219231
assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue();
220232
}
221233
}
234+
235+
/**
236+
* Verifies that when a thread is externally interrupted while blocked on a CRT sync HTTP request,
237+
* the connection is properly cleaned up and returned to the pool.
238+
*/
239+
@Test
240+
@Timeout(30)
241+
void syncCrtGetObject_threadInterruptDuringRequest_doesNotLeakConnection() throws Exception {
242+
CountDownLatch requestReceived = new CountDownLatch(1);
243+
CountDownLatch releaseResponse = new CountDownLatch(1);
244+
245+
// Transformer that blocks the GET response until signaled, giving us a deterministic
246+
// window to interrupt the request thread while it's blocked on future.get().
247+
ResponseDefinitionTransformer blockingTransformer = new ResponseDefinitionTransformer() {
248+
@Override
249+
public ResponseDefinition transform(Request request, ResponseDefinition responseDefinition,
250+
FileSource files, Parameters parameters) {
251+
if ("GET".equals(request.getMethod().getName())) {
252+
requestReceived.countDown();
253+
try {
254+
releaseResponse.await(25, TimeUnit.SECONDS);
255+
} catch (InterruptedException e) {
256+
Thread.currentThread().interrupt();
257+
}
258+
}
259+
return responseDefinition;
260+
}
261+
262+
@Override
263+
public String getName() {
264+
return "blocking-get-transformer";
265+
}
266+
267+
@Override
268+
public boolean applyGlobally() {
269+
return true;
270+
}
271+
};
272+
273+
WireMockServer server = new WireMockServer(
274+
WireMockConfiguration.options().dynamicPort().extensions(blockingTransformer));
275+
server.start();
276+
277+
try {
278+
// Use a large body so the response is still being streamed when the connection is reused
279+
server.stubFor(com.github.tomakehurst.wiremock.client.WireMock.get(anyUrl())
280+
.willReturn(aResponse().withStatus(200).withBody(LARGE_BODY)));
281+
server.stubFor(com.github.tomakehurst.wiremock.client.WireMock.head(anyUrl())
282+
.willReturn(aResponse().withStatus(200)));
283+
284+
SdkHttpClient httpClient = AwsCrtHttpClient.builder()
285+
.connectionAcquisitionTimeout(CONNECTION_ACQUIRE_TIMEOUT)
286+
.maxConcurrency(1)
287+
.build();
288+
289+
try (S3Client s3 = S3Client.builder()
290+
.httpClient(httpClient)
291+
.region(Region.US_EAST_1)
292+
.endpointOverride(URI.create("http://localhost:" + server.port()))
293+
.forcePathStyle(true)
294+
.credentialsProvider(credentials())
295+
.overrideConfiguration(c -> c
296+
.apiCallTimeout(Duration.ofSeconds(60))
297+
.apiCallAttemptTimeout(Duration.ofSeconds(59)))
298+
.build()) {
299+
300+
AtomicReference<Throwable> threadException = new AtomicReference<>();
301+
302+
Thread requestThread = new Thread(() -> {
303+
try {
304+
s3.getObject(r -> r.bucket(BUCKET).key(KEY));
305+
} catch (Throwable e) {
306+
threadException.set(e);
307+
}
308+
});
309+
310+
requestThread.start();
311+
312+
// Wait until the server has received the GET — at this point the request thread
313+
// is guaranteed to be blocked on future.get() since the response is held.
314+
assertThat(requestReceived.await(10, TimeUnit.SECONDS))
315+
.as("Server should have received the GET request")
316+
.isTrue();
317+
318+
// Interrupt the blocked thread — this is the key action under test
319+
requestThread.interrupt();
320+
321+
// Wait for the request thread to finish processing the interrupt
322+
requestThread.join(10_000);
323+
assertThat(requestThread.isAlive()).isFalse();
324+
assertThat(threadException.get()).isNotNull();
325+
326+
// Unblock the transformer so the server sends the response.
327+
releaseResponse.countDown();
328+
329+
// Brief wait for the CRT native layer to process the connection closure
330+
Thread.sleep(500);
331+
332+
// Verify the pool recovered — if the connection leaked or is in a dirty state,
333+
// this times out at the connectionAcquisitionTimeout (5s).
334+
HeadObjectResponse headResponse = s3.headObject(r -> r.bucket(BUCKET).key(KEY));
335+
assertThat(headResponse.sdkHttpResponse().isSuccessful()).isTrue();
336+
}
337+
} finally {
338+
releaseResponse.countDown();
339+
server.stop();
340+
}
341+
}
222342
}

0 commit comments

Comments
 (0)