Skip to content

Commit 051f869

Browse files
committed
Switch from the CRT pull-based HttpRequestBodyStream model to the
push-based HttpStreamBase.writeData() API to avoid potential deadlock issue when request body InputStream blocks.
1 parent bd50931 commit 051f869

7 files changed

Lines changed: 94 additions & 109 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT HTTP Client",
4+
"contributor": "",
5+
"description": "Fixed a potential deadlock in `AwsCrtHttpClient` that could occur when the request body `InputStream` blocked waiting for data on the CRT event loop thread. This could happen when a blocking stream (e.g., a `BufferedInputStream` wrapping a `ResponseInputStream`) was used as a request body and the read depended on the same event loop thread to deliver data. Request body writing now happens on the caller thread."
6+
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818
import static software.amazon.awssdk.http.HttpMetric.HTTP_CLIENT_NAME;
1919

2020
import java.io.IOException;
21+
import java.io.InputStream;
2122
import java.time.Duration;
23+
import java.util.Arrays;
2224
import java.util.concurrent.CompletableFuture;
2325
import java.util.concurrent.CompletionException;
2426
import java.util.function.Consumer;
2527
import software.amazon.awssdk.annotations.SdkPublicApi;
2628
import software.amazon.awssdk.crt.http.HttpException;
29+
import software.amazon.awssdk.crt.http.HttpStreamBase;
2730
import software.amazon.awssdk.crt.http.HttpStreamManager;
31+
import software.amazon.awssdk.http.ContentStreamProvider;
2832
import software.amazon.awssdk.http.ExecutableHttpRequest;
2933
import software.amazon.awssdk.http.HttpExecuteRequest;
3034
import software.amazon.awssdk.http.HttpExecuteResponse;
@@ -35,6 +39,7 @@
3539
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
3640
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
3741
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
42+
import software.amazon.awssdk.http.crt.internal.CrtUtils;
3843
import software.amazon.awssdk.utils.AttributeMap;
3944
import software.amazon.awssdk.utils.CompletableFutureUtils;
4045

@@ -107,6 +112,8 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
107112
}
108113

109114
private static final class CrtHttpRequest implements ExecutableHttpRequest {
115+
private static final int WRITE_BUFFER_SIZE = 16 * 1024;
116+
110117
private final CrtRequestContext context;
111118
private volatile CompletableFuture<SdkHttpFullResponse> responseFuture;
112119

@@ -119,7 +126,14 @@ public HttpExecuteResponse call() throws IOException {
119126
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
120127

121128
try {
122-
responseFuture = new CrtRequestExecutor().execute(context);
129+
CrtRequestExecutor.ExecutionResult execution = new CrtRequestExecutor().execute(context);
130+
responseFuture = execution.responseFuture();
131+
132+
// Wait for the stream to be acquired, then write the request body from the caller thread.
133+
// This avoids blocking the CRT event loop thread in InputStream.read().
134+
HttpStreamBase stream = CompletableFutureUtils.joinInterruptibly(execution.streamFuture());
135+
writeRequestBody(stream);
136+
123137
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
124138
builder.response(response);
125139
builder.responseBody(response.content().orElse(null));
@@ -140,6 +154,10 @@ public HttpExecuteResponse call() throws IOException {
140154
}
141155

142156
if (cause instanceof HttpException) {
157+
Throwable wrapped = CrtUtils.wrapCrtException(cause);
158+
if (wrapped instanceof IOException) {
159+
throw (IOException) wrapped;
160+
}
143161
throw (HttpException) cause;
144162
}
145163

@@ -151,6 +169,24 @@ public HttpExecuteResponse call() throws IOException {
151169
}
152170
}
153171

172+
private void writeRequestBody(HttpStreamBase stream) throws IOException {
173+
ContentStreamProvider provider = context.sdkRequest().contentStreamProvider().orElse(null);
174+
if (provider == null) {
175+
return;
176+
}
177+
178+
try (InputStream inputStream = provider.newStream()) {
179+
byte[] buf = new byte[WRITE_BUFFER_SIZE];
180+
int read;
181+
182+
while ((read = inputStream.read(buf, 0, buf.length)) >= 0) {
183+
byte[] chunk = read == buf.length ? buf : Arrays.copyOf(buf, read);
184+
CompletableFutureUtils.joinInterruptibly(stream.writeData(chunk, false));
185+
}
186+
CompletableFutureUtils.joinInterruptibly(stream.writeData(null, true));
187+
}
188+
}
189+
154190
@Override
155191
public void abort() {
156192
if (responseFuture != null) {

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,37 +32,40 @@
3232
@SdkInternalApi
3333
public final class CrtRequestExecutor {
3434

35-
public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext) {
36-
CompletableFuture<SdkHttpFullResponse> requestFuture = new CompletableFuture<>();
35+
public ExecutionResult execute(CrtRequestContext executionContext) {
36+
CompletableFuture<SdkHttpFullResponse> responseFuture = new CompletableFuture<>();
37+
CompletableFuture<HttpStreamBase> streamFuture;
3738

3839
try {
39-
doExecute(executionContext, requestFuture);
40+
streamFuture = doExecute(executionContext, responseFuture);
4041
} catch (Throwable t) {
41-
requestFuture.completeExceptionally(t);
42+
responseFuture.completeExceptionally(t);
43+
streamFuture = new CompletableFuture<>();
44+
streamFuture.completeExceptionally(t);
4245
}
4346

44-
return requestFuture;
47+
return new ExecutionResult(streamFuture, responseFuture);
4548
}
4649

47-
private void doExecute(CrtRequestContext executionContext, CompletableFuture<SdkHttpFullResponse> requestFuture) {
50+
private CompletableFuture<HttpStreamBase> doExecute(CrtRequestContext executionContext,
51+
CompletableFuture<SdkHttpFullResponse> responseFuture) {
4852
MetricCollector metricCollector = executionContext.metricCollector();
4953
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
5054

5155
long acquireStartTime = 0;
5256

5357
if (shouldPublishMetrics) {
54-
// go ahead and get acquireStartTime for the concurrency timer as early as possible,
55-
// so it's as accurate as possible, but only do it in a branch since clock_gettime()
56-
// results in a full sys call barrier (multiple mutexes and a hw interrupt).
5758
acquireStartTime = System.nanoTime();
5859
}
5960

60-
HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
61+
HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(responseFuture);
6162

6263
HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
6364

65+
boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent();
66+
6467
CompletableFuture<HttpStreamBase> streamFuture =
65-
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
68+
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody);
6669

6770
long finalAcquireStartTime = acquireStartTime;
6871

@@ -73,8 +76,33 @@ private void doExecute(CrtRequestContext executionContext, CompletableFuture<Sdk
7376

7477
if (throwable != null) {
7578
Throwable toThrow = wrapCrtException(throwable);
76-
requestFuture.completeExceptionally(toThrow);
79+
responseFuture.completeExceptionally(toThrow);
7780
}
7881
});
82+
83+
return streamFuture;
84+
}
85+
86+
/**
87+
* Holds the result of submitting a request to CRT: the stream (for writing body data via
88+
* {@code writeData}) and the response future (for reading the response).
89+
*/
90+
public static final class ExecutionResult {
91+
private final CompletableFuture<HttpStreamBase> streamFuture;
92+
private final CompletableFuture<SdkHttpFullResponse> responseFuture;
93+
94+
ExecutionResult(CompletableFuture<HttpStreamBase> streamFuture,
95+
CompletableFuture<SdkHttpFullResponse> responseFuture) {
96+
this.streamFuture = streamFuture;
97+
this.responseFuture = responseFuture;
98+
}
99+
100+
public CompletableFuture<HttpStreamBase> streamFuture() {
101+
return streamFuture;
102+
}
103+
104+
public CompletableFuture<SdkHttpFullResponse> responseFuture() {
105+
return responseFuture;
106+
}
79107
}
80108
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,7 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) {
7878
HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest));
7979

8080
String finalEncodedPath = encodedPath + encodedQueryString;
81-
return sdkExecuteRequest.contentStreamProvider()
82-
.map(provider -> new HttpRequest(method,
83-
finalEncodedPath,
84-
crtHeaderArray,
85-
new CrtRequestInputStreamAdapter(provider)))
86-
.orElse(new HttpRequest(method,
87-
finalEncodedPath,
88-
crtHeaderArray, null));
81+
return new HttpRequest(method, finalEncodedPath, crtHeaderArray, null);
8982
}
9083

9184
private static HttpHeader[] asArray(List<HttpHeader> crtHeaderList) {

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestInputStreamAdapter.java

Lines changed: 0 additions & 78 deletions
This file was deleted.

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutorTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() {
8787
.request(HttpExecuteRequest.builder().build())
8888
.build();
8989

90-
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
90+
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();
9191

9292
assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class);
9393
}
@@ -98,11 +98,11 @@ public void execute_acquireStreamFails_wrapsWithIOException() {
9898
CrtRequestContext context = crtRequestContext();
9999
CompletableFuture<HttpStreamBase> completableFuture = new CompletableFuture<>();
100100

101-
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
101+
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean()))
102102
.thenReturn(completableFuture);
103103
completableFuture.completeExceptionally(exception);
104104

105-
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
105+
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();
106106

107107
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class);
108108
}
@@ -113,10 +113,10 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable)
113113
CrtRequestContext context = crtRequestContext();
114114
CompletableFuture<HttpStreamBase> completableFuture = CompletableFutureUtils.failedFuture(throwable);
115115

116-
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
116+
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean()))
117117
.thenReturn(completableFuture);
118118

119-
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
119+
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();
120120
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class);
121121
}
122122

@@ -130,10 +130,10 @@ public void execute_httpException_mapsToCorrectException(Entry<Integer, Class<?
130130
HttpException exception = new HttpException(errorCode);
131131
CompletableFuture<HttpStreamBase> completableFuture = CompletableFutureUtils.failedFuture(exception);
132132

133-
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
133+
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean()))
134134
.thenReturn(completableFuture);
135135

136-
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
136+
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();
137137
assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass);
138138
}
139139

@@ -143,10 +143,10 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() {
143143
CrtRequestContext context = crtRequestContext();
144144
CompletableFuture<HttpStreamBase> completableFuture = CompletableFutureUtils.failedFuture(exception);
145145

146-
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
146+
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean()))
147147
.thenReturn(completableFuture);
148148

149-
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
149+
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();
150150
assertThatThrownBy(executeFuture::join).hasCause(exception);
151151
}
152152

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@
131131
<rxjava3.version>3.1.5</rxjava3.version>
132132
<commons-codec.verion>1.17.1</commons-codec.verion>
133133
<jmh.version>1.37</jmh.version>
134-
<awscrt.version>0.45.1</awscrt.version>
134+
<awscrt.version>1.0.0-SNAPSHOT</awscrt.version>
135135

136136
<!--Test dependencies -->
137137
<junit5.version>5.10.3</junit5.version>

0 commit comments

Comments
 (0)