Skip to content

Commit 0b4a1bf

Browse files
committed
Use async stream API in async code path
1 parent 3a694c5 commit 0b4a1bf

17 files changed

Lines changed: 943 additions & 197 deletions

http-clients/aws-crt-client/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,32 @@
213213
</archive>
214214
</configuration>
215215
</plugin>
216+
<!-- The Reactive Streams TCK tests are based on TestNG. See http://maven.apache.org/surefire/maven-surefire-plugin/examples/testng.html#Running_TestNG_and_JUnit_Tests -->
217+
<plugin>
218+
<groupId>org.apache.maven.plugins</groupId>
219+
<artifactId>maven-surefire-plugin</artifactId>
220+
<version>${maven.surefire.version}</version>
221+
<configuration>
222+
<properties>
223+
<property>
224+
<name>junit</name>
225+
<value>false</value>
226+
</property>
227+
</properties>
228+
</configuration>
229+
<dependencies>
230+
<dependency>
231+
<groupId>org.apache.maven.surefire</groupId>
232+
<artifactId>surefire-junit-platform</artifactId>
233+
<version>${maven.surefire.version}</version>
234+
</dependency>
235+
<dependency>
236+
<groupId>org.apache.maven.surefire</groupId>
237+
<artifactId>surefire-testng</artifactId>
238+
<version>${maven.surefire.version}</version>
239+
</dependency>
240+
</dependencies>
241+
</plugin>
216242
</plugins>
217243
</build>
218244
</project>

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,9 @@ public HttpExecuteResponse call() throws IOException {
126126
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
127127

128128
try {
129-
CrtStreamHandler streamHandler = new CrtStreamHandler();
130-
responseFuture = new CrtRequestExecutor().execute(context, streamHandler);
131-
132-
// Write the request body from the caller thread via the stream handler,
133-
// which guards against concurrent stream close with a synchronized block.
134-
// This avoids blocking the CRT event loop thread in InputStream.read().
135-
writeRequestBody(streamHandler);
129+
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
130+
responseFuture = result.responseFuture();
131+
writeRequestBody(result.streamHandler());
136132

137133
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
138134
builder.response(response);

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,33 +26,32 @@
2626
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
2727
import software.amazon.awssdk.http.SdkCancellationException;
2828
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
29-
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
29+
import software.amazon.awssdk.http.crt.internal.request.CrtRequestBodyPublisherSubscriber;
3030
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
3131
import software.amazon.awssdk.metrics.MetricCollector;
3232
import software.amazon.awssdk.metrics.NoOpMetricCollector;
33-
import software.amazon.awssdk.utils.Logger;
3433

3534
@SdkInternalApi
3635
public final class CrtAsyncRequestExecutor {
3736

38-
private static final Logger log = Logger.loggerFor(CrtAsyncRequestExecutor.class);
39-
4037
public CompletableFuture<Void> execute(CrtAsyncRequestContext executionContext) {
4138
AsyncExecuteRequest asyncRequest = executionContext.sdkRequest();
4239
CompletableFuture<Void> requestFuture = createAsyncExecutionFuture(asyncRequest);
40+
ResponseHandlerErrorNotifier errorNotifier = new ResponseHandlerErrorNotifier(asyncRequest.responseHandler());
4341

4442
try {
45-
doExecute(executionContext, asyncRequest, requestFuture);
43+
doExecute(executionContext, asyncRequest, requestFuture, errorNotifier);
4644
} catch (Throwable t) {
47-
reportAsyncFailure(t, requestFuture, asyncRequest.responseHandler());
45+
reportAsyncFailure(t, requestFuture, errorNotifier);
4846
}
4947

5048
return requestFuture;
5149
}
5250

5351
private void doExecute(CrtAsyncRequestContext executionContext,
5452
AsyncExecuteRequest asyncRequest,
55-
CompletableFuture<Void> requestFuture) {
53+
CompletableFuture<Void> requestFuture,
54+
ResponseHandlerErrorNotifier errorNotifier) {
5655
MetricCollector metricCollector = executionContext.metricCollector();
5756
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
5857

@@ -67,28 +66,48 @@ private void doExecute(CrtAsyncRequestContext executionContext,
6766

6867
HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext);
6968

70-
CrtStreamHandler streamHandler = new CrtStreamHandler();
69+
CompletableFuture<HttpStreamBase> streamFuture = new CompletableFuture<>();
70+
CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture);
7171

7272
HttpStreamBaseResponseHandler crtResponseHandler =
73-
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler);
73+
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler, errorNotifier);
7474

75-
CompletableFuture<HttpStreamBase> streamFuture =
76-
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
75+
CrtRequestBodyPublisherSubscriber bodySubscriber =
76+
new CrtRequestBodyPublisherSubscriber(streamHandler, requestFuture, errorNotifier);
7777

7878
long finalAcquireStartTime = acquireStartTime;
7979

80-
streamFuture.whenComplete((stream, throwable) -> {
81-
if (shouldPublishMetrics) {
82-
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
83-
}
80+
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, true)
81+
.handle((stream, throwable) -> {
82+
if (shouldPublishMetrics) {
83+
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
84+
}
85+
86+
if (throwable != null) {
87+
handleAcquireFailure(throwable, streamFuture, requestFuture, errorNotifier);
88+
return null;
89+
}
90+
try {
91+
stream.activate();
92+
streamFuture.complete(stream);
93+
asyncRequest.requestContentPublisher().subscribe(bodySubscriber);
94+
} catch (Throwable t) {
95+
handleAcquireFailure(t, streamFuture, requestFuture, errorNotifier);
96+
}
97+
return null;
98+
}).exceptionally(t -> {
99+
handleAcquireFailure(t, streamFuture, requestFuture, errorNotifier);
100+
return null;
101+
});
102+
}
84103

85-
if (throwable != null) {
86-
Throwable toThrow = wrapCrtException(throwable);
87-
reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler());
88-
} else {
89-
streamHandler.setStream(stream);
90-
}
91-
});
104+
private void handleAcquireFailure(Throwable t,
105+
CompletableFuture<HttpStreamBase> streamFuture,
106+
CompletableFuture<Void> requestFuture,
107+
ResponseHandlerErrorNotifier errorNotifier) {
108+
Throwable toThrow = wrapCrtException(t);
109+
streamFuture.completeExceptionally(toThrow);
110+
reportAsyncFailure(toThrow, requestFuture, errorNotifier);
92111
}
93112

94113
/**
@@ -113,18 +132,10 @@ private CompletableFuture<Void> createAsyncExecutionFuture(AsyncExecuteRequest r
113132
return future;
114133
}
115134

116-
/**
117-
* Notify the provided response handler and future of the failure.
118-
*/
119135
private void reportAsyncFailure(Throwable cause,
120136
CompletableFuture<Void> executeFuture,
121-
SdkAsyncHttpResponseHandler responseHandler) {
122-
try {
123-
responseHandler.onError(cause);
124-
} catch (Exception e) {
125-
log.error(() -> "SdkAsyncHttpResponseHandler " + responseHandler + " threw an exception in onError. It will be "
126-
+ "ignored.", e);
127-
}
137+
ResponseHandlerErrorNotifier errorNotifier) {
138+
errorNotifier.tryNotifyError(cause);
128139
executeFuture.completeExceptionally(cause);
129140
}
130141
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,26 @@
3131
@SdkInternalApi
3232
public final class CrtRequestExecutor {
3333

34-
public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext,
35-
CrtStreamHandler streamHandler) {
34+
public Result execute(CrtRequestContext executionContext) {
3635
CompletableFuture<SdkHttpFullResponse> responseFuture = new CompletableFuture<>();
36+
CompletableFuture<HttpStreamBase> streamFuture = new CompletableFuture<>();
37+
CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture);
3738

3839
try {
39-
doExecute(executionContext, responseFuture, streamHandler);
40+
doExecute(executionContext, responseFuture, streamHandler, streamFuture);
4041
} catch (Throwable t) {
42+
// Fail streamFuture too so any caller blocked in waitForStream() unblocks.
43+
streamFuture.completeExceptionally(t);
4144
responseFuture.completeExceptionally(t);
4245
}
4346

44-
return responseFuture;
47+
return new Result(responseFuture, streamHandler);
4548
}
4649

4750
private void doExecute(CrtRequestContext executionContext,
4851
CompletableFuture<SdkHttpFullResponse> responseFuture,
49-
CrtStreamHandler streamHandler) {
52+
CrtStreamHandler streamHandler,
53+
CompletableFuture<HttpStreamBase> streamFuture) {
5054
MetricCollector metricCollector = executionContext.metricCollector();
5155
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
5256

@@ -63,22 +67,55 @@ private void doExecute(CrtRequestContext executionContext,
6367

6468
boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent();
6569

66-
CompletableFuture<HttpStreamBase> streamFuture =
67-
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody);
68-
6970
long finalAcquireStartTime = acquireStartTime;
7071

71-
streamFuture.whenComplete((streamBase, throwable) -> {
72-
if (shouldPublishMetrics) {
73-
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
74-
}
75-
76-
if (throwable != null) {
77-
Throwable toThrow = wrapCrtException(throwable);
78-
responseFuture.completeExceptionally(toThrow);
79-
} else {
80-
streamHandler.setStream(streamBase);
81-
}
82-
});
72+
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody)
73+
.handle((streamBase, throwable) -> {
74+
if (shouldPublishMetrics) {
75+
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
76+
}
77+
78+
if (throwable != null) {
79+
handleAcquireFailure(throwable, streamFuture, responseFuture);
80+
return null;
81+
}
82+
try {
83+
streamBase.activate();
84+
streamFuture.complete(streamBase);
85+
} catch (Throwable t) {
86+
handleAcquireFailure(t, streamFuture, responseFuture);
87+
}
88+
return null;
89+
}).exceptionally(t -> {
90+
handleAcquireFailure(t, streamFuture, responseFuture);
91+
return null;
92+
});
93+
}
94+
95+
private void handleAcquireFailure(Throwable t,
96+
CompletableFuture<HttpStreamBase> streamFuture,
97+
CompletableFuture<SdkHttpFullResponse> responseFuture) {
98+
Throwable toThrow = wrapCrtException(t);
99+
streamFuture.completeExceptionally(toThrow);
100+
responseFuture.completeExceptionally(toThrow);
101+
}
102+
103+
@SdkInternalApi
104+
public static final class Result {
105+
private final CompletableFuture<SdkHttpFullResponse> responseFuture;
106+
private final CrtStreamHandler streamHandler;
107+
108+
private Result(CompletableFuture<SdkHttpFullResponse> responseFuture, CrtStreamHandler streamHandler) {
109+
this.responseFuture = responseFuture;
110+
this.streamHandler = streamHandler;
111+
}
112+
113+
public CompletableFuture<SdkHttpFullResponse> responseFuture() {
114+
return responseFuture;
115+
}
116+
117+
public CrtStreamHandler streamHandler() {
118+
return streamHandler;
119+
}
83120
}
84121
}

0 commit comments

Comments
 (0)