Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTHTTPClient-ea797a6.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT HTTP Client",
"contributor": "",
"description": "Fix connection pool exhaustion in the CRT HTTP client where connections were not released after a request abort or timeout."
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@
<!-- Ignore usage of S3MetaRequest in CrtS3ClientUploadBenchmark. !-->
<suppress checks="Regexp"
files="software.amazon.awssdk.s3benchmarks.CrtS3ClientUploadBenchmark.java"/>

<!-- ResponseHandlerHelper has helper method closeConnection() which handles safe closing of connection -->
<suppress id="NoCrtStreamCancel"
files=".*ResponseHandlerHelper\.java$"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,15 @@
<property name="ignoreComments" value="true"/>
</module>

<!-- Checks that we don't call HttpStreamBase.cancel() directly -->
<module name="Regexp">
<property name="id" value="NoCrtStreamCancel"/>
<property name="format" value="HttpStreamBase\s*(\.|::)\s*cancel"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Don't call HttpStreamBase.cancel() directly. Use ResponseHandlerHelper.closeConnection() or the response handler's closeConnection() method, which is idempotent and pairs cancel() with close() correctly."/>
<property name="ignoreComments" value="true"/>
</module>

<!-- Checks that we don't implement AutoCloseable/Closeable -->
<module name="Regexp">
<property name="format" value="(class|interface).*(implements|extends).*[^\w](Closeable|AutoCloseable)[^\w]"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequestBase;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
Expand Down Expand Up @@ -67,12 +66,23 @@ private void doExecute(CrtAsyncRequestContext executionContext,

HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext);

HttpStreamBaseResponseHandler crtResponseHandler =
CrtResponseAdapter crtResponseHandler =
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler());

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);

// Capture the stream as soon as it is acquired, so closeConnection() works even before
// onResponseHeaders fires (e.g. when the server is unresponsive).
streamFuture.thenAccept(crtResponseHandler::onAcquireStream);

// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
crtResponseHandler.closeConnection();
}
});

long finalAcquireStartTime = acquireStartTime;

streamFuture.whenComplete((stream, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
Expand Down Expand Up @@ -57,13 +56,25 @@ private void doExecute(CrtRequestContext executionContext, CompletableFuture<Sdk
acquireStartTime = System.nanoTime();
}

HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);

HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);

// Capture the stream as soon as it is acquired, so closeConnection() works even before
// onResponseHeaders fires (e.g. when the server is unresponsive).
streamFuture.thenAccept(crtResponseHandler::onAcquireStream);

// Evict the connection from the pool on failure so it is not reused.
requestFuture.whenComplete((r, t) -> {
if (t != null) {
crtResponseHandler.closeConnection();
}
});

long finalAcquireStartTime = acquireStartTime;

streamFuture.whenComplete((streamBase, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public CrtResponseAdapter(CompletableFuture<Void> completionFuture,
this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder);
}

public static HttpStreamBaseResponseHandler toCrtResponseHandler(
public static CrtResponseAdapter toCrtResponseHandler(
CompletableFuture<Void> requestFuture,
SdkAsyncHttpResponseHandler responseHandler) {
return new CrtResponseAdapter(requestFuture, responseHandler);
Expand Down Expand Up @@ -145,4 +145,12 @@ private void callResponseHandlerOnError(Throwable error) {
log.warn(() -> "Exception raised from SdkAsyncHttpResponseHandler#onError.", e);
}
}

public void onAcquireStream(HttpStreamBase stream) {
responseHandlerHelper.onAcquireStream(stream);
}

public void closeConnection() {
responseHandlerHelper.closeConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,12 @@ private void onSuccessfulResponseComplete() {
simplePublisher.complete();
responseHandlerHelper.releaseConnection();
}

public void onAcquireStream(HttpStreamBase stream) {
responseHandlerHelper.onAcquireStream(stream);
}

public void closeConnection() {
responseHandlerHelper.closeConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,20 @@ public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder) {
this.responseBuilder = responseBuilder;
}

public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
/**
* Set the stream reference as soon as it is acquired from the pool, so that closeConnection can
* cancel it even if onResponseHeaders has not yet fired (e.g. the server is unresponsive).
*/
public void onAcquireStream(HttpStreamBase stream) {
synchronized (streamLock) {
if (this.stream == null) {
this.stream = stream;
}
}
}

public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
onAcquireStream(stream);
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.http.HttpTestUtils.createProvider;
import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest;

import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpMetric;
import software.amazon.awssdk.http.RecordingResponseHandler;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.metrics.MetricCollector;

/**
* Verifies connection pool behavior when requests are aborted in between.
*/
public class AwsCrtHttpClientAbortBehaviorTest {

@RegisterExtension
static WireMockExtension mockServer = WireMockExtension.newInstance()
.options(wireMockConfig().dynamicPort())
.build();

private static ScheduledExecutorService scheduler;

@BeforeAll
static void setup() {
scheduler = Executors.newScheduledThreadPool(1);
}

@AfterAll
static void tearDown() {
scheduler.shutdown();
}

/**
* Verifies that aborting in-flight requests evicts connections from the pool —
* the next request succeeds and LEASED_CONCURRENCY is 1.
*/
@Test
void syncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception {
URI uri = URI.create("http://localhost:" + wm.getHttpPort());

try (SdkHttpClient client = AwsCrtHttpClient.builder().maxConcurrency(3).build()) {
stubUnresponsiveServer();
executeAndAbort(client, uri, 3);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why do we need to send 3 requests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use 3 to match maxConcurrency(3) and verify all 3 dead connections are evicted and the pool can accept 3 new requests.


// allow cancel() callbacks to complete before asserting pool state
Thread.sleep(200);

stubResponsiveServer();
int successCount = 0;
MetricCollector collector = MetricCollector.create("test");
for (int i = 0; i < 3; i++) {
try {
client.prepareRequest(syncRequest(uri, i == 0 ? collector : null)).call();
successCount++;
} catch (Exception e) {
// connection not evicted
}
}

assertThat(successCount).as("%d/%d requests succeeded after aborts", successCount, 3).isEqualTo(3);
assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY))
.as("LEASED_CONCURRENCY must be 1 after aborts, not %d", 4)
.containsExactly(1);
}
}

/**
* Verifies that when an async request future completes exceptionally, the connection is
* evicted from the pool and LEASED_CONCURRENCY is 1 for the next request.
*/
@Test
void asyncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception {
URI uri = URI.create("http://localhost:" + wm.getHttpPort());

try (SdkAsyncHttpClient client = AwsCrtAsyncHttpClient.builder().maxConcurrency(3).build()) {
stubUnresponsiveServer();
for (int i = 0; i < 3; i++) {
RecordingResponseHandler recorder = new RecordingResponseHandler();
CompletableFuture<Void> future = client.execute(AsyncExecuteRequest.builder()
.request(createRequest(uri))
.requestContentPublisher(createProvider(""))
.responseHandler(recorder)
.build());
// abort() equivalent for async: complete the future exceptionally after stream is acquired
scheduler.schedule(() -> future.completeExceptionally(new RuntimeException("timeout")),
100, TimeUnit.MILLISECONDS);
try {
future.get(2, TimeUnit.SECONDS);
} catch (Exception e) {
// expected
}
// wait for the response handler to finish so cancel() has completed before next iteration
try {
recorder.completeFuture().get(2, TimeUnit.SECONDS);
} catch (Exception e) {
// expected — handler receives the error
}
}

stubResponsiveServer();
MetricCollector collector = MetricCollector.create("test");
RecordingResponseHandler recorder = new RecordingResponseHandler();
client.execute(AsyncExecuteRequest.builder()
.request(createRequest(uri))
.requestContentPublisher(createProvider(""))
.responseHandler(recorder)
.metricCollector(collector)
.build());
recorder.completeFuture().get(5, TimeUnit.SECONDS);

assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY))
.as("LEASED_CONCURRENCY must be 1 after exceptionally-completed futures, not %d", 4)
.containsExactly(1);
}
}

private void executeAndAbort(SdkHttpClient client, URI uri, int count) {
for (int i = 0; i < count; i++) {
ExecutableHttpRequest req = client.prepareRequest(syncRequest(uri, null));
// abort() must be called from another thread while call() is blocking
scheduler.schedule(req::abort, 100, TimeUnit.MILLISECONDS);
try {
req.call();
} catch (Exception e) {
// expected — aborted
}
}
}

private HttpExecuteRequest syncRequest(URI uri, MetricCollector collector) {
HttpExecuteRequest.Builder builder = HttpExecuteRequest.builder()
.request(createRequest(uri))
.contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]));
if (collector != null) {
builder.metricCollector(collector);
}
return builder.build();
}

private void stubUnresponsiveServer() {
mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(5000).withBody("slow")));
}

private void stubResponsiveServer() {
mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withStatus(200).withBody("OK")));
}
}
Loading