Skip to content

Commit 14bc392

Browse files
committed
Use async stream API in async code path
1 parent 3a694c5 commit 14bc392

15 files changed

Lines changed: 725 additions & 172 deletions

http-clients/aws-crt-client/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,32 @@
213213
</archive>
214214
</configuration>
215215
</plugin>
216+
<!-- The Reactive Streams TCK tests are based on TestNG. See http://maven.apache.org/surefire/maven-surefire-plugin/examples/testng.html#Running_TestNG_and_JUnit_Tests -->
217+
<plugin>
218+
<groupId>org.apache.maven.plugins</groupId>
219+
<artifactId>maven-surefire-plugin</artifactId>
220+
<version>${maven.surefire.version}</version>
221+
<configuration>
222+
<properties>
223+
<property>
224+
<name>junit</name>
225+
<value>false</value>
226+
</property>
227+
</properties>
228+
</configuration>
229+
<dependencies>
230+
<dependency>
231+
<groupId>org.apache.maven.surefire</groupId>
232+
<artifactId>surefire-junit-platform</artifactId>
233+
<version>${maven.surefire.version}</version>
234+
</dependency>
235+
<dependency>
236+
<groupId>org.apache.maven.surefire</groupId>
237+
<artifactId>surefire-testng</artifactId>
238+
<version>${maven.surefire.version}</version>
239+
</dependency>
240+
</dependencies>
241+
</plugin>
216242
</plugins>
217243
</build>
218244
</project>

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,9 @@ public HttpExecuteResponse call() throws IOException {
126126
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
127127

128128
try {
129-
CrtStreamHandler streamHandler = new CrtStreamHandler();
130-
responseFuture = new CrtRequestExecutor().execute(context, streamHandler);
131-
132-
// Write the request body from the caller thread via the stream handler,
133-
// which guards against concurrent stream close with a synchronized block.
134-
// This avoids blocking the CRT event loop thread in InputStream.read().
135-
writeRequestBody(streamHandler);
129+
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
130+
responseFuture = result.responseFuture();
131+
writeRequestBody(result.streamHandler());
136132

137133
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
138134
builder.response(response);

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import software.amazon.awssdk.http.SdkCancellationException;
2828
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
2929
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
30+
import software.amazon.awssdk.http.crt.internal.request.CrtRequestBodyPublisherSubscriber;
3031
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
3132
import software.amazon.awssdk.metrics.MetricCollector;
3233
import software.amazon.awssdk.metrics.NoOpMetricCollector;
@@ -67,28 +68,37 @@ private void doExecute(CrtAsyncRequestContext executionContext,
6768

6869
HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext);
6970

70-
CrtStreamHandler streamHandler = new CrtStreamHandler();
71+
CompletableFuture<HttpStreamBase> streamFuture = new CompletableFuture<>();
72+
CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture);
7173

7274
HttpStreamBaseResponseHandler crtResponseHandler =
7375
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler);
7476

75-
CompletableFuture<HttpStreamBase> streamFuture =
76-
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
77-
7877
long finalAcquireStartTime = acquireStartTime;
7978

80-
streamFuture.whenComplete((stream, throwable) -> {
81-
if (shouldPublishMetrics) {
82-
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
83-
}
84-
85-
if (throwable != null) {
86-
Throwable toThrow = wrapCrtException(throwable);
87-
reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler());
88-
} else {
89-
streamHandler.setStream(stream);
90-
}
91-
});
79+
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, true)
80+
.whenComplete((stream, throwable) -> {
81+
if (shouldPublishMetrics) {
82+
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
83+
}
84+
85+
if (throwable != null) {
86+
Throwable toThrow = wrapCrtException(throwable);
87+
streamFuture.completeExceptionally(toThrow);
88+
reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler());
89+
} else {
90+
streamFuture.complete(stream);
91+
// CRT's stream manager activates the stream after completing this future, but
92+
// writeData requires an active stream. activate() is idempotent.
93+
stream.activate();
94+
// Subscribe after activate(): request(1) may synchronously deliver onNext, which
95+
// calls writeData and requires the stream to already be active.
96+
asyncRequest.requestContentPublisher()
97+
.subscribe(new CrtRequestBodyPublisherSubscriber(streamHandler,
98+
requestFuture,
99+
asyncRequest.responseHandler()));
100+
}
101+
});
92102
}
93103

94104
/**

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,24 @@
3131
@SdkInternalApi
3232
public final class CrtRequestExecutor {
3333

34-
public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext,
35-
CrtStreamHandler streamHandler) {
34+
public Result execute(CrtRequestContext executionContext) {
3635
CompletableFuture<SdkHttpFullResponse> responseFuture = new CompletableFuture<>();
36+
CompletableFuture<HttpStreamBase> streamFuture = new CompletableFuture<>();
37+
CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture);
3738

3839
try {
39-
doExecute(executionContext, responseFuture, streamHandler);
40+
doExecute(executionContext, responseFuture, streamHandler, streamFuture);
4041
} catch (Throwable t) {
4142
responseFuture.completeExceptionally(t);
4243
}
4344

44-
return responseFuture;
45+
return new Result(responseFuture, streamHandler);
4546
}
4647

4748
private void doExecute(CrtRequestContext executionContext,
4849
CompletableFuture<SdkHttpFullResponse> responseFuture,
49-
CrtStreamHandler streamHandler) {
50+
CrtStreamHandler streamHandler,
51+
CompletableFuture<HttpStreamBase> streamFuture) {
5052
MetricCollector metricCollector = executionContext.metricCollector();
5153
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
5254

@@ -63,22 +65,42 @@ private void doExecute(CrtRequestContext executionContext,
6365

6466
boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent();
6567

66-
CompletableFuture<HttpStreamBase> streamFuture =
67-
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody);
68-
6968
long finalAcquireStartTime = acquireStartTime;
7069

71-
streamFuture.whenComplete((streamBase, throwable) -> {
72-
if (shouldPublishMetrics) {
73-
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
74-
}
75-
76-
if (throwable != null) {
77-
Throwable toThrow = wrapCrtException(throwable);
78-
responseFuture.completeExceptionally(toThrow);
79-
} else {
80-
streamHandler.setStream(streamBase);
81-
}
82-
});
70+
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody)
71+
.whenComplete((streamBase, throwable) -> {
72+
if (shouldPublishMetrics) {
73+
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
74+
}
75+
76+
if (throwable != null) {
77+
Throwable toThrow = wrapCrtException(throwable);
78+
// Fail streamFuture first so writers blocked in waitForStream() unblock before
79+
// responseFuture's completion handlers run.
80+
streamFuture.completeExceptionally(toThrow);
81+
responseFuture.completeExceptionally(toThrow);
82+
} else {
83+
streamFuture.complete(streamBase);
84+
}
85+
});
86+
}
87+
88+
@SdkInternalApi
89+
public static final class Result {
90+
private final CompletableFuture<SdkHttpFullResponse> responseFuture;
91+
private final CrtStreamHandler streamHandler;
92+
93+
private Result(CompletableFuture<SdkHttpFullResponse> responseFuture, CrtStreamHandler streamHandler) {
94+
this.responseFuture = responseFuture;
95+
this.streamHandler = streamHandler;
96+
}
97+
98+
public CompletableFuture<SdkHttpFullResponse> responseFuture() {
99+
return responseFuture;
100+
}
101+
102+
public CrtStreamHandler streamHandler() {
103+
return streamHandler;
104+
}
83105
}
84106
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java

Lines changed: 80 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,72 +17,109 @@
1717

1818
import java.io.IOException;
1919
import java.util.concurrent.CompletableFuture;
20-
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.CompletionException;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2122
import software.amazon.awssdk.annotations.SdkInternalApi;
2223
import software.amazon.awssdk.crt.http.HttpStreamBase;
24+
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
25+
import software.amazon.awssdk.utils.CompletableFutureUtils;
26+
import software.amazon.awssdk.utils.Logger;
2327

2428
/**
2529
* Manages the lifecycle of a CRT HTTP stream, providing thread-safe access to stream operations.
2630
* Shared between the request executor (for writing body data) and the response handler (for
2731
* incrementing the window and releasing/closing the connection).
32+
*
33+
* <p>The handler is constructed with a {@link CompletableFuture} representing stream acquisition.
34+
* The caller (request executor) completes that future once the underlying CRT stream manager has
35+
* either acquired the stream or failed. All operations on this handler chain off that future, so
36+
* writes issued before acquisition completes are queued.
2837
*/
2938
@SdkInternalApi
3039
public final class CrtStreamHandler {
3140

41+
private static final Logger log = Logger.loggerFor(CrtStreamHandler.class);
42+
3243
private final Object streamLock = new Object();
33-
private final CountDownLatch streamLatch = new CountDownLatch(1);
34-
private HttpStreamBase stream;
44+
private final CompletableFuture<HttpStreamBase> streamFuture;
45+
private final AtomicBoolean responseHandlerNotified = new AtomicBoolean(false);
3546
private boolean streamClosed;
3647

37-
/**
38-
* Sets the stream. Called once when the stream is acquired from the connection pool.
39-
*/
40-
public void setStream(HttpStreamBase stream) {
41-
this.stream = stream;
42-
streamLatch.countDown();
48+
public CrtStreamHandler(CompletableFuture<HttpStreamBase> streamFuture) {
49+
this.streamFuture = streamFuture;
4350
}
4451

4552
/**
46-
* Blocks until the stream has been acquired.
53+
* Atomically notifies the {@link SdkAsyncHttpResponseHandler#onError(Throwable)} callback at most
54+
* once across all callers sharing this handler. Returns {@code true} if this caller delivered the
55+
* notification, {@code false} if another caller already did. Exceptions thrown by the handler are
56+
* caught and logged so callers can proceed with their own cleanup.
4757
*/
48-
public void waitForStream() {
58+
public boolean tryNotifyResponseHandlerError(SdkAsyncHttpResponseHandler handler, Throwable t) {
59+
if (!responseHandlerNotified.compareAndSet(false, true)) {
60+
return false;
61+
}
4962
try {
50-
streamLatch.await();
51-
} catch (InterruptedException e) {
52-
Thread.currentThread().interrupt();
53-
throw new RuntimeException("Interrupted while waiting for stream", e);
63+
handler.onError(t);
64+
} catch (Exception e) {
65+
log.error(() -> "SdkAsyncHttpResponseHandler " + handler + " threw an exception in onError. It will be "
66+
+ "ignored.", e);
5467
}
68+
return true;
69+
}
70+
71+
/**
72+
* Blocks until the stream has been acquired or acquisition has failed. Returns the acquired
73+
* stream on success. If acquisition failed, the failure cause is rethrown wrapped in a
74+
* {@link CompletionException} so callers can use the same handling as for response futures.
75+
*/
76+
public HttpStreamBase waitForStream() {
77+
return CompletableFutureUtils.joinInterruptibly(streamFuture);
5578
}
5679

5780
/**
58-
* Write data to the stream. The caller must ensure the stream is ready (via {@link #waitForStream()})
59-
* before calling this method.
81+
* Write data to the stream. The returned future chains on stream acquisition: if the stream
82+
* is not yet ready, the write is queued until the {@code streamFuture} passed to the
83+
* constructor completes. Failures from either stream acquisition or the underlying CRT write
84+
* are propagated as the original cause (not wrapped in {@link CompletionException}) so callers
85+
* see the same exception type whether the failure happens before or after {@code thenCompose}-
86+
* style chaining.
6087
*/
6188
public CompletableFuture<Void> writeData(byte[] data, boolean endStream) {
62-
if (streamLatch.getCount() != 0) {
63-
CompletableFuture<Void> future = new CompletableFuture<>();
64-
future.completeExceptionally(
65-
new IllegalStateException("writeData called before stream is ready. Call waitForStream() first."));
66-
return future;
67-
}
68-
synchronized (streamLock) {
69-
if (streamClosed) {
70-
CompletableFuture<Void> future = new CompletableFuture<>();
71-
future.completeExceptionally(
72-
new IOException("Stream is already closed, cannot write data."));
73-
return future;
89+
CompletableFuture<Void> result = new CompletableFuture<>();
90+
streamFuture.whenComplete((s, t) -> {
91+
if (t != null) {
92+
result.completeExceptionally(unwrap(t));
93+
return;
7494
}
75-
return stream.writeData(data, endStream);
76-
}
95+
CompletableFuture<Void> writeFuture;
96+
synchronized (streamLock) {
97+
if (streamClosed) {
98+
result.completeExceptionally(new IOException("Stream is already closed, cannot write data."));
99+
return;
100+
}
101+
writeFuture = s.writeData(data, endStream);
102+
}
103+
writeFuture.whenComplete((v, err) -> {
104+
if (err != null) {
105+
result.completeExceptionally(unwrap(err));
106+
} else {
107+
result.complete(null);
108+
}
109+
});
110+
});
111+
return result;
112+
}
113+
114+
private static Throwable unwrap(Throwable t) {
115+
return t instanceof CompletionException && t.getCause() != null ? t.getCause() : t;
77116
}
78117

79118
public void incrementWindow(int windowSize) {
80-
if (streamLatch.getCount() != 0) {
81-
throw new IllegalStateException("incrementWindow called before stream is ready.");
82-
}
83119
synchronized (streamLock) {
84-
if (!streamClosed) {
85-
stream.incrementWindow(windowSize);
120+
HttpStreamBase s = streamFuture.getNow(null);
121+
if (!streamClosed && s != null) {
122+
s.incrementWindow(windowSize);
86123
}
87124
}
88125
}
@@ -93,9 +130,10 @@ public void incrementWindow(int windowSize) {
93130
*/
94131
public void releaseConnection() {
95132
synchronized (streamLock) {
96-
if (!streamClosed && stream != null) {
133+
HttpStreamBase s = streamFuture.getNow(null);
134+
if (!streamClosed && s != null) {
97135
streamClosed = true;
98-
stream.close();
136+
s.close();
99137
}
100138
}
101139
}
@@ -107,10 +145,11 @@ public void releaseConnection() {
107145
*/
108146
public void closeConnection() {
109147
synchronized (streamLock) {
110-
if (!streamClosed && stream != null) {
148+
HttpStreamBase s = streamFuture.getNow(null);
149+
if (!streamClosed && s != null) {
111150
streamClosed = true;
112-
stream.cancel();
113-
stream.close();
151+
s.cancel();
152+
s.close();
114153
}
115154
}
116155
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,12 @@ public static HttpRequestBase toAsyncCrtRequest(CrtAsyncRequestContext request)
5050
.map(value -> "?" + value)
5151
.orElse("");
5252
String path = encodedPath + encodedQueryString;
53-
CrtRequestBodyAdapter crtRequestBodyAdapter = new CrtRequestBodyAdapter(sdkExecuteRequest.requestContentPublisher(),
54-
request.readBufferSize());
5553
HttpHeader[] crtHeaderArray = asArray(createAsyncHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest,
5654
request.protocol()));
5755
return new HttpRequest(method,
5856
path,
5957
crtHeaderArray,
60-
crtRequestBodyAdapter);
58+
null);
6159
}
6260

6361
public static HttpRequest toCrtRequest(CrtRequestContext request) {

0 commit comments

Comments
 (0)