Skip to content

Commit a673d6f

Browse files
committed
refactor code and address feedback
1 parent 0b4a1bf commit a673d6f

10 files changed

Lines changed: 124 additions & 71 deletions

File tree

.changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"type": "bugfix",
33
"category": "AWS CRT HTTP Client",
44
"contributor": "",
5-
"description": "Fixed a potential deadlock in `AwsCrtHttpClient` that could occur when the request body `InputStream` blocked waiting for data on the CRT event loop thread. This could happen when a blocking stream (e.g., a `BufferedInputStream` wrapping a `ResponseInputStream`) was used as a request body and the read depended on the same event loop thread to deliver data. Request body writing now happens on the caller thread."
5+
"description": "Fixed a potential deadlock in both `AwsCrtHttpClient` and `AwsCrtAsyncHttpClient` that could occur when the request body source delivered data on the CRT event loop thread. For sync, this happened when a blocking `InputStream` (e.g., a `BufferedInputStream` wrapping a `ResponseInputStream`) was used as a request body. For async, this happened when the user-supplied `Publisher<ByteBuffer>` scheduled `onNext` back onto the same event loop. Request body data is now pushed via the CRT push-based `writeData` API: sync writes from the caller thread, async writes from a Reactive Streams subscriber outside the event loop."
66
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ private AwsCrtHttpClient(DefaultBuilder builder, AttributeMap config) {
6969
}
7070
}
7171

72-
public static AwsCrtHttpClient.Builder builder() {
72+
public static Builder builder() {
7373
return new DefaultBuilder();
7474
}
7575

@@ -124,45 +124,53 @@ private CrtHttpRequest(CrtRequestContext context) {
124124
@Override
125125
public HttpExecuteResponse call() throws IOException {
126126
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
127+
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
128+
responseFuture = result.responseFuture();
127129

128130
try {
129-
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
130-
responseFuture = result.responseFuture();
131131
writeRequestBody(result.streamHandler());
132132

133133
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
134134
builder.response(response);
135135
builder.responseBody(response.content().orElse(null));
136136
return builder.build();
137-
} catch (CompletionException e) {
138-
Throwable cause = e.getCause();
139-
140-
// Complete the future exceptionally to trigger connection cleanup in the response handler.
141-
// Handles thread-interrupt case where joinInterruptibly throws due to
142-
// InterruptedException. Without this, the
143-
// Ensures that closeConnection() is invoked to prevent leaking the connection from the pool.
144-
if (responseFuture != null) {
145-
responseFuture.completeExceptionally(cause != null ? cause : e);
146-
}
147-
148-
if (cause instanceof IOException) {
149-
throw (IOException) cause;
150-
}
151-
152-
if (cause instanceof HttpException) {
153-
Throwable wrapped = CrtUtils.wrapCrtException(cause);
154-
if (wrapped instanceof IOException) {
155-
throw (IOException) wrapped;
156-
}
157-
throw (HttpException) cause;
158-
}
137+
} catch (Throwable t) {
138+
// CompletionException is the wrapper from joinInterruptibly; direct throws
139+
// (e.g., IOException from inputStream.read in writeRequestBody) arrive unwrapped.
140+
Throwable cause = (t instanceof CompletionException && t.getCause() != null) ? t.getCause() : t;
141+
142+
// Tear down the stream so the connection is not leaked back to the pool.
143+
// closeConnection() is idempotent and a no-op if the stream is not yet acquired
144+
// or is already closed.
145+
result.streamHandler().closeConnection();
146+
responseFuture.completeExceptionally(cause);
147+
148+
throw mapToIoExceptionOrRethrow(cause);
149+
}
150+
}
159151

160-
if (cause instanceof InterruptedException) {
161-
Thread.currentThread().interrupt();
162-
throw new IOException("Request was cancelled", cause);
152+
private static IOException mapToIoExceptionOrRethrow(Throwable cause) {
153+
if (cause instanceof IOException) {
154+
return (IOException) cause;
155+
}
156+
if (cause instanceof HttpException) {
157+
Throwable wrapped = CrtUtils.wrapCrtException(cause);
158+
if (wrapped instanceof IOException) {
159+
return (IOException) wrapped;
163160
}
164-
throw new RuntimeException(e.getCause());
161+
throw (HttpException) cause;
162+
}
163+
if (cause instanceof InterruptedException) {
164+
Thread.currentThread().interrupt();
165+
return new IOException("Request was cancelled", cause);
166+
}
167+
if (cause instanceof RuntimeException) {
168+
throw (RuntimeException) cause;
169+
}
170+
if (cause instanceof Error) {
171+
throw (Error) cause;
165172
}
173+
return new IOException(cause);
166174
}
167175

168176
private void writeRequestBody(CrtStreamHandler streamHandler) throws IOException {
@@ -194,14 +202,14 @@ public void abort() {
194202
/**
195203
* Builder that allows configuration of the AWS CRT HTTP implementation.
196204
*/
197-
public interface Builder extends SdkHttpClient.Builder<AwsCrtHttpClient.Builder> {
205+
public interface Builder extends SdkHttpClient.Builder<Builder> {
198206

199207
/**
200208
* The Maximum number of allowed concurrent requests. For HTTP/1.1 this is the same as max connections.
201209
* @param maxConcurrency maximum concurrency per endpoint
202210
* @return The builder of the method chaining.
203211
*/
204-
AwsCrtHttpClient.Builder maxConcurrency(Integer maxConcurrency);
212+
Builder maxConcurrency(Integer maxConcurrency);
205213

206214
/**
207215
* Configures the number of unread bytes that can be buffered in the
@@ -211,22 +219,22 @@ public interface Builder extends SdkHttpClient.Builder<AwsCrtHttpClient.Builder>
211219
* @param readBufferSize The number of bytes that can be buffered.
212220
* @return The builder of the method chaining.
213221
*/
214-
AwsCrtHttpClient.Builder readBufferSizeInBytes(Long readBufferSize);
222+
Builder readBufferSizeInBytes(Long readBufferSize);
215223

216224
/**
217225
* Sets the http proxy configuration to use for this client.
218226
* @param proxyConfiguration The http proxy configuration to use
219227
* @return The builder of the method chaining.
220228
*/
221-
AwsCrtHttpClient.Builder proxyConfiguration(ProxyConfiguration proxyConfiguration);
229+
Builder proxyConfiguration(ProxyConfiguration proxyConfiguration);
222230

223231
/**
224232
* Sets the http proxy configuration to use for this client.
225233
*
226234
* @param proxyConfigurationBuilderConsumer The consumer of the proxy configuration builder object.
227235
* @return the builder for method chaining.
228236
*/
229-
AwsCrtHttpClient.Builder proxyConfiguration(Consumer<ProxyConfiguration.Builder> proxyConfigurationBuilderConsumer);
237+
Builder proxyConfiguration(Consumer<ProxyConfiguration.Builder> proxyConfigurationBuilderConsumer);
230238

231239
/**
232240
* Configure the health checks for all connections established by this client.
@@ -246,7 +254,7 @@ public interface Builder extends SdkHttpClient.Builder<AwsCrtHttpClient.Builder>
246254
* @param healthChecksConfiguration The health checks config to use
247255
* @return The builder of the method chaining.
248256
*/
249-
AwsCrtHttpClient.Builder connectionHealthConfiguration(ConnectionHealthConfiguration healthChecksConfiguration);
257+
Builder connectionHealthConfiguration(ConnectionHealthConfiguration healthChecksConfiguration);
250258

251259
/**
252260
* A convenience method that creates an instance of the {@link ConnectionHealthConfiguration} builder, avoiding the
@@ -256,29 +264,29 @@ public interface Builder extends SdkHttpClient.Builder<AwsCrtHttpClient.Builder>
256264
* @return The builder of the method chaining.
257265
* @see #connectionHealthConfiguration(ConnectionHealthConfiguration)
258266
*/
259-
AwsCrtHttpClient.Builder connectionHealthConfiguration(Consumer<ConnectionHealthConfiguration.Builder>
267+
Builder connectionHealthConfiguration(Consumer<ConnectionHealthConfiguration.Builder>
260268
healthChecksConfigurationBuilder);
261269

262270
/**
263271
* Configure the maximum amount of time that a connection should be allowed to remain open while idle.
264272
* @param connectionMaxIdleTime the maximum amount of connection idle time
265273
* @return The builder of the method chaining.
266274
*/
267-
AwsCrtHttpClient.Builder connectionMaxIdleTime(Duration connectionMaxIdleTime);
275+
Builder connectionMaxIdleTime(Duration connectionMaxIdleTime);
268276

269277
/**
270278
* The amount of time to wait when initially establishing a connection before giving up and timing out.
271279
* @param connectionTimeout timeout
272280
* @return The builder of the method chaining.
273281
*/
274-
AwsCrtHttpClient.Builder connectionTimeout(Duration connectionTimeout);
282+
Builder connectionTimeout(Duration connectionTimeout);
275283

276284
/**
277285
* The amount of time to wait when acquiring a connection from the pool before giving up and timing out.
278286
* @param connectionAcquisitionTimeout the timeout duration
279287
* @return this builder for method chaining.
280288
*/
281-
AwsCrtHttpClient.Builder connectionAcquisitionTimeout(Duration connectionAcquisitionTimeout);
289+
Builder connectionAcquisitionTimeout(Duration connectionAcquisitionTimeout);
282290

283291
/**
284292
* Configure whether to enable {@code tcpKeepAlive} and relevant configuration for all connections established by this
@@ -292,7 +300,7 @@ AwsCrtHttpClient.Builder connectionHealthConfiguration(Consumer<ConnectionHealth
292300
* @param tcpKeepAliveConfiguration The TCP keep-alive configuration to use
293301
* @return The builder of the method chaining.
294302
*/
295-
AwsCrtHttpClient.Builder tcpKeepAliveConfiguration(TcpKeepAliveConfiguration tcpKeepAliveConfiguration);
303+
Builder tcpKeepAliveConfiguration(TcpKeepAliveConfiguration tcpKeepAliveConfiguration);
296304

297305
/**
298306
* Configure whether to enable {@code tcpKeepAlive} and relevant configuration for all connections established by this
@@ -306,7 +314,7 @@ AwsCrtHttpClient.Builder connectionHealthConfiguration(Consumer<ConnectionHealth
306314
* @return The builder of the method chaining.
307315
* @see #tcpKeepAliveConfiguration(TcpKeepAliveConfiguration)
308316
*/
309-
AwsCrtHttpClient.Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfiguration.Builder>
317+
Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfiguration.Builder>
310318
tcpKeepAliveConfigurationBuilder);
311319

312320
/**
@@ -325,15 +333,15 @@ AwsCrtHttpClient.Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfigur
325333
* @param postQuantumTlsEnabled whether to prefer Post Quantum TLS
326334
* @return The builder of the method chaining.
327335
*/
328-
AwsCrtHttpClient.Builder postQuantumTlsEnabled(Boolean postQuantumTlsEnabled);
336+
Builder postQuantumTlsEnabled(Boolean postQuantumTlsEnabled);
329337
}
330338

331339
/**
332340
* Factory that allows more advanced configuration of the AWS CRT HTTP implementation.
333341
* Use {@link #builder()} to configure and construct an immutable instance of the factory.
334342
*/
335343
private static final class DefaultBuilder
336-
extends AwsCrtClientBuilderBase<AwsCrtHttpClient.Builder> implements AwsCrtHttpClient.Builder {
344+
extends AwsCrtClientBuilderBase<Builder> implements Builder {
337345

338346

339347
@Override

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,22 @@ private void doExecute(CrtAsyncRequestContext executionContext,
8989
}
9090
try {
9191
stream.activate();
92-
streamFuture.complete(stream);
93-
asyncRequest.requestContentPublisher().subscribe(bodySubscriber);
9492
} catch (Throwable t) {
93+
// Stream is acquired but not activated and not yet published via
94+
// streamFuture. No other path can reach it, so clean it up directly.
95+
stream.cancel();
96+
stream.close();
9597
handleAcquireFailure(t, streamFuture, requestFuture, errorNotifier);
98+
return null;
9699
}
100+
streamFuture.complete(stream);
101+
asyncRequest.requestContentPublisher().subscribe(bodySubscriber);
97102
return null;
98103
}).exceptionally(t -> {
104+
// Reached when the handle lambda throws (e.g., publisher.subscribe).
105+
// closeConnection is a no-op if the stream isn't in streamFuture yet;
106+
// otherwise it tears down the published stream.
107+
streamHandler.closeConnection();
99108
handleAcquireFailure(t, streamFuture, requestFuture, errorNotifier);
100109
return null;
101110
});

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,18 @@ private void doExecute(CrtRequestContext executionContext,
8181
}
8282
try {
8383
streamBase.activate();
84-
streamFuture.complete(streamBase);
8584
} catch (Throwable t) {
85+
// Stream is acquired but not activated and not yet published via
86+
// streamFuture. No other path can reach it, so clean it up directly.
87+
streamBase.cancel();
88+
streamBase.close();
8689
handleAcquireFailure(t, streamFuture, responseFuture);
90+
return null;
8791
}
92+
streamFuture.complete(streamBase);
8893
return null;
8994
}).exceptionally(t -> {
95+
// Defensive: only reached if the handle lambda itself throws.
9096
handleAcquireFailure(t, streamFuture, responseFuture);
9197
return null;
9298
});

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private static Throwable unwrap(Throwable t) {
106106

107107
public void incrementWindow(int windowSize) {
108108
synchronized (streamLock) {
109-
HttpStreamBase s = streamFuture.getNow(null);
109+
HttpStreamBase s = streamIfAvailable();
110110
if (!streamClosed && s != null) {
111111
s.incrementWindow(windowSize);
112112
}
@@ -119,7 +119,7 @@ public void incrementWindow(int windowSize) {
119119
*/
120120
public void releaseConnection() {
121121
synchronized (streamLock) {
122-
HttpStreamBase s = streamFuture.getNow(null);
122+
HttpStreamBase s = streamIfAvailable();
123123
if (!streamClosed && s != null) {
124124
streamClosed = true;
125125
s.close();
@@ -134,12 +134,24 @@ public void releaseConnection() {
134134
*/
135135
public void closeConnection() {
136136
synchronized (streamLock) {
137-
HttpStreamBase s = streamFuture.getNow(null);
137+
HttpStreamBase s = streamIfAvailable();
138138
if (!streamClosed && s != null) {
139139
streamClosed = true;
140140
s.cancel();
141141
s.close();
142142
}
143143
}
144144
}
145+
146+
/**
147+
* Returns the acquired stream if {@link #streamFuture} completed normally, otherwise {@code null}.
148+
* Tolerates exceptional or pending completion (in contrast to {@link CompletableFuture#getNow}, which
149+
* throws {@link CompletionException} when the future is exceptional).
150+
*/
151+
private HttpStreamBase streamIfAvailable() {
152+
if (!streamFuture.isDone() || streamFuture.isCompletedExceptionally()) {
153+
return null;
154+
}
155+
return streamFuture.getNow(null);
156+
}
145157
}

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientWireMockTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,36 @@ public void sharedEventLoopGroup_closeOneClient_shouldNotAffectOtherClients() th
113113
}
114114
}
115115

116+
@Test
117+
public void contentStreamReadThrows_propagatesIoExceptionAndDoesNotLeakConnection() throws Exception {
118+
try (SdkHttpClient client = AwsCrtHttpClient.builder().maxConcurrency(1).build()) {
119+
URI uri = URI.create("http://localhost:" + mockServer.port());
120+
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withBody(randomAlphabetic(10))));
121+
SdkHttpRequest request = createRequest(uri);
122+
123+
IOException readError = new IOException("simulated read failure");
124+
ExecutableHttpRequest failing = client.prepareRequest(
125+
HttpExecuteRequest.builder()
126+
.request(request)
127+
.contentStreamProvider(() -> new java.io.InputStream() {
128+
@Override
129+
public int read() throws IOException {
130+
throw readError;
131+
}
132+
})
133+
.build());
134+
135+
assertThatThrownBy(failing::call)
136+
.isInstanceOf(IOException.class)
137+
.isSameAs(readError);
138+
139+
// If the connection leaked, this second request would hang since maxConcurrency=1.
140+
// Use a short overall test timeout to fail fast if the leak regresses.
141+
HttpExecuteResponse second = makeSimpleRequest(client, null);
142+
assertThat(second.httpResponse().statusCode()).isEqualTo(200);
143+
}
144+
}
145+
116146
@Test
117147
public void abortRequest_shouldFailTheExceptionWithIOException() throws Exception {
118148
try (SdkHttpClient client = AwsCrtHttpClient.create()) {

0 commit comments

Comments
 (0)