Skip to content

Commit aa1a256

Browse files
committed
fix(aws-crt-http-client): cancel stream on abort/timeout to evict connection from pool
1 parent eeb61ca commit aa1a256

4 files changed

Lines changed: 203 additions & 0 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT HTTP Client",
4+
"contributor": "",
5+
"description": "Call aws_http_stream_cancel on abort or timeout so the CRT connection pool evicts the connection instead of reusing it."
6+
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ private void doExecute(CrtAsyncRequestContext executionContext,
7373
CompletableFuture<HttpStreamBase> streamFuture =
7474
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
7575

76+
// Cancels the stream on failure so the pool does not reuse it.
77+
requestFuture.whenComplete((r, t) -> {
78+
if (t != null) {
79+
streamFuture.thenAccept(HttpStreamBase::cancel);
80+
}
81+
});
82+
7683
long finalAcquireStartTime = acquireStartTime;
7784

7885
streamFuture.whenComplete((stream, throwable) -> {

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ private void doExecute(CrtRequestContext executionContext, CompletableFuture<Sdk
6464
CompletableFuture<HttpStreamBase> streamFuture =
6565
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
6666

67+
// Cancels the stream on failure so the pool does not reuse it.
68+
requestFuture.whenComplete((r, t) -> {
69+
if (t != null) {
70+
streamFuture.thenAccept(HttpStreamBase::cancel);
71+
}
72+
});
73+
6774
long finalAcquireStartTime = acquireStartTime;
6875

6976
streamFuture.whenComplete((streamBase, throwable) -> {
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.any;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
21+
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
import static software.amazon.awssdk.http.HttpTestUtils.createProvider;
24+
import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest;
25+
26+
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
27+
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
28+
import java.io.ByteArrayInputStream;
29+
import java.net.URI;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.ScheduledExecutorService;
33+
import java.util.concurrent.TimeUnit;
34+
import org.junit.jupiter.api.AfterAll;
35+
import org.junit.jupiter.api.BeforeAll;
36+
import org.junit.jupiter.api.Test;
37+
import org.junit.jupiter.api.extension.RegisterExtension;
38+
import software.amazon.awssdk.http.ExecutableHttpRequest;
39+
import software.amazon.awssdk.http.HttpExecuteRequest;
40+
import software.amazon.awssdk.http.HttpMetric;
41+
import software.amazon.awssdk.http.RecordingResponseHandler;
42+
import software.amazon.awssdk.http.SdkHttpClient;
43+
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
44+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
45+
import software.amazon.awssdk.metrics.MetricCollector;
46+
47+
/**
48+
* Verifies connection pool behavior when requests are aborted in between.
49+
*/
50+
public class AwsCrtHttpClientAbortBehaviorTest {
51+
52+
@RegisterExtension
53+
static WireMockExtension mockServer = WireMockExtension.newInstance()
54+
.options(wireMockConfig().dynamicPort())
55+
.build();
56+
57+
private static ScheduledExecutorService scheduler;
58+
59+
@BeforeAll
60+
static void setup() {
61+
scheduler = Executors.newScheduledThreadPool(1);
62+
}
63+
64+
@AfterAll
65+
static void tearDown() {
66+
scheduler.shutdown();
67+
}
68+
69+
/**
70+
* Verifies that aborting in-flight requests evicts connections from the pool —
71+
* the next request succeeds and LEASED_CONCURRENCY is 1.
72+
*/
73+
@Test
74+
void syncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception {
75+
URI uri = URI.create("http://localhost:" + wm.getHttpPort());
76+
77+
try (SdkHttpClient client = AwsCrtHttpClient.builder().maxConcurrency(3).build()) {
78+
stubUnresponsiveServer();
79+
executeAndAbort(client, uri, 3);
80+
81+
// allow cancel() callbacks to complete before asserting pool state
82+
Thread.sleep(200);
83+
84+
stubResponsiveServer();
85+
int successCount = 0;
86+
MetricCollector collector = MetricCollector.create("test");
87+
for (int i = 0; i < 3; i++) {
88+
try {
89+
client.prepareRequest(syncRequest(uri, i == 0 ? collector : null)).call();
90+
successCount++;
91+
} catch (Exception e) {
92+
// connection not evicted
93+
}
94+
}
95+
96+
assertThat(successCount).as("%d/%d requests succeeded after aborts", successCount, 3).isEqualTo(3);
97+
assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY))
98+
.as("LEASED_CONCURRENCY must be 1 after aborts, not %d", 4)
99+
.containsExactly(1);
100+
}
101+
}
102+
103+
/**
104+
* Verifies that when an async request future completes exceptionally, the connection is
105+
* evicted from the pool and LEASED_CONCURRENCY is 1 for the next request.
106+
*/
107+
@Test
108+
void asyncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception {
109+
URI uri = URI.create("http://localhost:" + wm.getHttpPort());
110+
111+
try (SdkAsyncHttpClient client = AwsCrtAsyncHttpClient.builder().maxConcurrency(3).build()) {
112+
stubUnresponsiveServer();
113+
for (int i = 0; i < 3; i++) {
114+
RecordingResponseHandler recorder = new RecordingResponseHandler();
115+
CompletableFuture<Void> future = client.execute(AsyncExecuteRequest.builder()
116+
.request(createRequest(uri))
117+
.requestContentPublisher(createProvider(""))
118+
.responseHandler(recorder)
119+
.build());
120+
// abort() equivalent for async: complete the future exceptionally after stream is acquired
121+
scheduler.schedule(() -> future.completeExceptionally(new RuntimeException("timeout")),
122+
100, TimeUnit.MILLISECONDS);
123+
try {
124+
future.get(2, TimeUnit.SECONDS);
125+
} catch (Exception e) {
126+
// expected
127+
}
128+
// wait for the response handler to finish so cancel() has completed before next iteration
129+
try {
130+
recorder.completeFuture().get(2, TimeUnit.SECONDS);
131+
} catch (Exception e) {
132+
// expected — handler receives the error
133+
}
134+
}
135+
136+
stubResponsiveServer();
137+
MetricCollector collector = MetricCollector.create("test");
138+
RecordingResponseHandler recorder = new RecordingResponseHandler();
139+
client.execute(AsyncExecuteRequest.builder()
140+
.request(createRequest(uri))
141+
.requestContentPublisher(createProvider(""))
142+
.responseHandler(recorder)
143+
.metricCollector(collector)
144+
.build());
145+
recorder.completeFuture().get(5, TimeUnit.SECONDS);
146+
147+
assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY))
148+
.as("LEASED_CONCURRENCY must be 1 after exceptionally-completed futures, not %d", 4)
149+
.containsExactly(1);
150+
}
151+
}
152+
153+
private void executeAndAbort(SdkHttpClient client, URI uri, int count) {
154+
for (int i = 0; i < count; i++) {
155+
ExecutableHttpRequest req = client.prepareRequest(syncRequest(uri, null));
156+
// abort() must be called from another thread while call() is blocking
157+
scheduler.schedule(req::abort, 100, TimeUnit.MILLISECONDS);
158+
try {
159+
req.call();
160+
} catch (Exception e) {
161+
// expected — aborted
162+
}
163+
}
164+
}
165+
166+
private HttpExecuteRequest syncRequest(URI uri, MetricCollector collector) {
167+
HttpExecuteRequest.Builder builder = HttpExecuteRequest.builder()
168+
.request(createRequest(uri))
169+
.contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]));
170+
if (collector != null) {
171+
builder.metricCollector(collector);
172+
}
173+
return builder.build();
174+
}
175+
176+
private void stubUnresponsiveServer() {
177+
mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(5000).withBody("slow")));
178+
}
179+
180+
private void stubResponsiveServer() {
181+
mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withStatus(200).withBody("OK")));
182+
}
183+
}

0 commit comments

Comments
 (0)