Skip to content

Commit 7acd52f

Browse files
committed
Use async stream API in async code path
1 parent 3a694c5 commit 7acd52f

15 files changed

Lines changed: 845 additions & 172 deletions

http-clients/aws-crt-client/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,32 @@
213213
</archive>
214214
</configuration>
215215
</plugin>
216+
<!-- The Reactive Streams TCK tests are based on TestNG. See http://maven.apache.org/surefire/maven-surefire-plugin/examples/testng.html#Running_TestNG_and_JUnit_Tests -->
217+
<plugin>
218+
<groupId>org.apache.maven.plugins</groupId>
219+
<artifactId>maven-surefire-plugin</artifactId>
220+
<version>${maven.surefire.version}</version>
221+
<configuration>
222+
<properties>
223+
<property>
224+
<name>junit</name>
225+
<value>false</value>
226+
</property>
227+
</properties>
228+
</configuration>
229+
<dependencies>
230+
<dependency>
231+
<groupId>org.apache.maven.surefire</groupId>
232+
<artifactId>surefire-junit-platform</artifactId>
233+
<version>${maven.surefire.version}</version>
234+
</dependency>
235+
<dependency>
236+
<groupId>org.apache.maven.surefire</groupId>
237+
<artifactId>surefire-testng</artifactId>
238+
<version>${maven.surefire.version}</version>
239+
</dependency>
240+
</dependencies>
241+
</plugin>
216242
</plugins>
217243
</build>
218244
</project>

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,9 @@ public HttpExecuteResponse call() throws IOException {
126126
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
127127

128128
try {
129-
CrtStreamHandler streamHandler = new CrtStreamHandler();
130-
responseFuture = new CrtRequestExecutor().execute(context, streamHandler);
131-
132-
// Write the request body from the caller thread via the stream handler,
133-
// which guards against concurrent stream close with a synchronized block.
134-
// This avoids blocking the CRT event loop thread in InputStream.read().
135-
writeRequestBody(streamHandler);
129+
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
130+
responseFuture = result.responseFuture();
131+
writeRequestBody(result.streamHandler());
136132

137133
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
138134
builder.response(response);

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import software.amazon.awssdk.http.SdkCancellationException;
2828
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
2929
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
30+
import software.amazon.awssdk.http.crt.internal.request.CrtRequestBodyPublisherSubscriber;
3031
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
3132
import software.amazon.awssdk.metrics.MetricCollector;
3233
import software.amazon.awssdk.metrics.NoOpMetricCollector;
@@ -67,28 +68,43 @@ private void doExecute(CrtAsyncRequestContext executionContext,
6768

6869
HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext);
6970

70-
CrtStreamHandler streamHandler = new CrtStreamHandler();
71+
CompletableFuture<HttpStreamBase> streamFuture = new CompletableFuture<>();
72+
CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture);
7173

7274
HttpStreamBaseResponseHandler crtResponseHandler =
7375
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler);
7476

75-
CompletableFuture<HttpStreamBase> streamFuture =
76-
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
77-
7877
long finalAcquireStartTime = acquireStartTime;
7978

80-
streamFuture.whenComplete((stream, throwable) -> {
81-
if (shouldPublishMetrics) {
82-
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
83-
}
84-
85-
if (throwable != null) {
86-
Throwable toThrow = wrapCrtException(throwable);
87-
reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler());
88-
} else {
89-
streamHandler.setStream(stream);
90-
}
91-
});
79+
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, true)
80+
.whenComplete((stream, throwable) -> {
81+
if (shouldPublishMetrics) {
82+
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
83+
}
84+
85+
if (throwable != null) {
86+
Throwable toThrow = wrapCrtException(throwable);
87+
streamFuture.completeExceptionally(toThrow);
88+
reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler());
89+
return;
90+
}
91+
try {
92+
// CRT's stream manager activates the stream after completing this future, but
93+
// writeData requires an active stream. Activate before completing streamFuture
94+
// so any caller chained on streamFuture cannot race ahead. activate() is idempotent.
95+
stream.activate();
96+
streamFuture.complete(stream);
97+
// Subscribe after activate(): request(1) may synchronously deliver onNext, which
98+
// calls writeData and requires the stream to already be active.
99+
asyncRequest.requestContentPublisher()
100+
.subscribe(new CrtRequestBodyPublisherSubscriber(streamHandler,
101+
requestFuture,
102+
asyncRequest.responseHandler()));
103+
} catch (Throwable t) {
104+
streamFuture.completeExceptionally(t);
105+
reportAsyncFailure(t, requestFuture, asyncRequest.responseHandler());
106+
}
107+
});
92108
}
93109

94110
/**

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,26 @@
3131
@SdkInternalApi
3232
public final class CrtRequestExecutor {
3333

34-
public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext,
35-
CrtStreamHandler streamHandler) {
34+
public Result execute(CrtRequestContext executionContext) {
3635
CompletableFuture<SdkHttpFullResponse> responseFuture = new CompletableFuture<>();
36+
CompletableFuture<HttpStreamBase> streamFuture = new CompletableFuture<>();
37+
CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture);
3738

3839
try {
39-
doExecute(executionContext, responseFuture, streamHandler);
40+
doExecute(executionContext, responseFuture, streamHandler, streamFuture);
4041
} catch (Throwable t) {
42+
// Fail streamFuture too so any caller blocked in waitForStream() unblocks.
43+
streamFuture.completeExceptionally(t);
4144
responseFuture.completeExceptionally(t);
4245
}
4346

44-
return responseFuture;
47+
return new Result(responseFuture, streamHandler);
4548
}
4649

4750
private void doExecute(CrtRequestContext executionContext,
4851
CompletableFuture<SdkHttpFullResponse> responseFuture,
49-
CrtStreamHandler streamHandler) {
52+
CrtStreamHandler streamHandler,
53+
CompletableFuture<HttpStreamBase> streamFuture) {
5054
MetricCollector metricCollector = executionContext.metricCollector();
5155
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
5256

@@ -63,22 +67,51 @@ private void doExecute(CrtRequestContext executionContext,
6367

6468
boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent();
6569

66-
CompletableFuture<HttpStreamBase> streamFuture =
67-
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody);
68-
6970
long finalAcquireStartTime = acquireStartTime;
7071

71-
streamFuture.whenComplete((streamBase, throwable) -> {
72-
if (shouldPublishMetrics) {
73-
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
74-
}
75-
76-
if (throwable != null) {
77-
Throwable toThrow = wrapCrtException(throwable);
78-
responseFuture.completeExceptionally(toThrow);
79-
} else {
80-
streamHandler.setStream(streamBase);
81-
}
82-
});
72+
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody)
73+
.whenComplete((streamBase, throwable) -> {
74+
if (shouldPublishMetrics) {
75+
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
76+
}
77+
78+
if (throwable != null) {
79+
Throwable toThrow = wrapCrtException(throwable);
80+
// Fail streamFuture first so writers blocked in waitForStream() unblock before
81+
// responseFuture's completion handlers run.
82+
streamFuture.completeExceptionally(toThrow);
83+
responseFuture.completeExceptionally(toThrow);
84+
return;
85+
}
86+
try {
87+
// CRT's stream manager activates the stream after this future completes, but
88+
// writeData requires an active stream. Activate before completing streamFuture
89+
// so the caller cannot race ahead. activate() is idempotent at the C layer.
90+
streamBase.activate();
91+
streamFuture.complete(streamBase);
92+
} catch (Throwable t) {
93+
streamFuture.completeExceptionally(t);
94+
responseFuture.completeExceptionally(t);
95+
}
96+
});
97+
}
98+
99+
@SdkInternalApi
100+
public static final class Result {
101+
private final CompletableFuture<SdkHttpFullResponse> responseFuture;
102+
private final CrtStreamHandler streamHandler;
103+
104+
private Result(CompletableFuture<SdkHttpFullResponse> responseFuture, CrtStreamHandler streamHandler) {
105+
this.responseFuture = responseFuture;
106+
this.streamHandler = streamHandler;
107+
}
108+
109+
public CompletableFuture<SdkHttpFullResponse> responseFuture() {
110+
return responseFuture;
111+
}
112+
113+
public CrtStreamHandler streamHandler() {
114+
return streamHandler;
115+
}
83116
}
84117
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtStreamHandler.java

Lines changed: 87 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,72 +17,116 @@
1717

1818
import java.io.IOException;
1919
import java.util.concurrent.CompletableFuture;
20-
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.CompletionException;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2122
import software.amazon.awssdk.annotations.SdkInternalApi;
2223
import software.amazon.awssdk.crt.http.HttpStreamBase;
24+
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
25+
import software.amazon.awssdk.utils.CompletableFutureUtils;
26+
import software.amazon.awssdk.utils.Logger;
2327

2428
/**
2529
* Manages the lifecycle of a CRT HTTP stream, providing thread-safe access to stream operations.
2630
* Shared between the request executor (for writing body data) and the response handler (for
2731
* incrementing the window and releasing/closing the connection).
32+
*
33+
* <p>The handler is constructed with a {@link CompletableFuture} representing stream acquisition.
34+
* The caller (request executor) completes that future once the underlying CRT stream manager has
35+
* either acquired the stream or failed. All operations on this handler chain off that future, so
36+
* writes issued before acquisition completes are queued.
2837
*/
2938
@SdkInternalApi
3039
public final class CrtStreamHandler {
3140

41+
private static final Logger log = Logger.loggerFor(CrtStreamHandler.class);
42+
3243
private final Object streamLock = new Object();
33-
private final CountDownLatch streamLatch = new CountDownLatch(1);
34-
private HttpStreamBase stream;
44+
private final CompletableFuture<HttpStreamBase> streamFuture;
45+
private final AtomicBoolean responseHandlerNotified = new AtomicBoolean(false);
3546
private boolean streamClosed;
3647

37-
/**
38-
* Sets the stream. Called once when the stream is acquired from the connection pool.
39-
*/
40-
public void setStream(HttpStreamBase stream) {
41-
this.stream = stream;
42-
streamLatch.countDown();
48+
public CrtStreamHandler(CompletableFuture<HttpStreamBase> streamFuture) {
49+
this.streamFuture = streamFuture;
4350
}
4451

4552
/**
46-
* Blocks until the stream has been acquired.
53+
* Atomically notifies the {@link SdkAsyncHttpResponseHandler#onError(Throwable)} callback at most
54+
* once across all callers sharing this handler. Returns {@code true} if this caller delivered the
55+
* notification, {@code false} if another caller already did. Exceptions thrown by the handler are
56+
* caught and logged so callers can proceed with their own cleanup.
4757
*/
48-
public void waitForStream() {
58+
public boolean tryNotifyResponseHandlerError(SdkAsyncHttpResponseHandler handler, Throwable t) {
59+
if (!responseHandlerNotified.compareAndSet(false, true)) {
60+
return false;
61+
}
4962
try {
50-
streamLatch.await();
51-
} catch (InterruptedException e) {
52-
Thread.currentThread().interrupt();
53-
throw new RuntimeException("Interrupted while waiting for stream", e);
63+
handler.onError(t);
64+
} catch (Exception e) {
65+
log.error(() -> "SdkAsyncHttpResponseHandler " + handler + " threw an exception in onError. It will be "
66+
+ "ignored.", e);
5467
}
68+
return true;
69+
}
70+
71+
/**
72+
* Blocks until the stream has been acquired or acquisition has failed. Returns the acquired
73+
* stream on success. If acquisition failed, the failure cause is rethrown wrapped in a
74+
* {@link CompletionException} so callers can use the same handling as for response futures.
75+
*/
76+
public HttpStreamBase waitForStream() {
77+
return CompletableFutureUtils.joinInterruptibly(streamFuture);
5578
}
5679

5780
/**
58-
* Write data to the stream. The caller must ensure the stream is ready (via {@link #waitForStream()})
59-
* before calling this method.
81+
* Write data to the stream. The returned future chains on stream acquisition: if the stream
82+
* is not yet ready, the write is queued until the {@code streamFuture} passed to the
83+
* constructor completes. Failures from either stream acquisition or the underlying CRT write
84+
* are propagated as the original cause (not wrapped in {@link CompletionException}) so callers
85+
* see the same exception type whether the failure happens before or after {@code thenCompose}-
86+
* style chaining.
6087
*/
6188
public CompletableFuture<Void> writeData(byte[] data, boolean endStream) {
62-
if (streamLatch.getCount() != 0) {
63-
CompletableFuture<Void> future = new CompletableFuture<>();
64-
future.completeExceptionally(
65-
new IllegalStateException("writeData called before stream is ready. Call waitForStream() first."));
66-
return future;
67-
}
68-
synchronized (streamLock) {
69-
if (streamClosed) {
70-
CompletableFuture<Void> future = new CompletableFuture<>();
71-
future.completeExceptionally(
72-
new IOException("Stream is already closed, cannot write data."));
73-
return future;
89+
CompletableFuture<Void> result = new CompletableFuture<>();
90+
streamFuture.whenComplete((s, t) -> {
91+
if (t != null) {
92+
result.completeExceptionally(unwrap(t));
93+
return;
7494
}
75-
return stream.writeData(data, endStream);
76-
}
95+
try {
96+
CompletableFuture<Void> writeFuture;
97+
synchronized (streamLock) {
98+
if (streamClosed) {
99+
result.completeExceptionally(new IOException("Stream is already closed, cannot write data."));
100+
return;
101+
}
102+
writeFuture = s.writeData(data, endStream);
103+
}
104+
writeFuture.whenComplete((v, err) -> {
105+
if (err != null) {
106+
result.completeExceptionally(unwrap(err));
107+
} else {
108+
result.complete(null);
109+
}
110+
});
111+
} catch (Throwable th) {
112+
// s.writeData can throw synchronously (e.g., AWS_ERROR_HTTP_STREAM_NOT_ACTIVATED).
113+
// Tear down the stream so the caller's connection is not leaked back to the pool.
114+
closeConnection();
115+
result.completeExceptionally(th);
116+
}
117+
});
118+
return result;
119+
}
120+
121+
private static Throwable unwrap(Throwable t) {
122+
return t instanceof CompletionException && t.getCause() != null ? t.getCause() : t;
77123
}
78124

79125
public void incrementWindow(int windowSize) {
80-
if (streamLatch.getCount() != 0) {
81-
throw new IllegalStateException("incrementWindow called before stream is ready.");
82-
}
83126
synchronized (streamLock) {
84-
if (!streamClosed) {
85-
stream.incrementWindow(windowSize);
127+
HttpStreamBase s = streamFuture.getNow(null);
128+
if (!streamClosed && s != null) {
129+
s.incrementWindow(windowSize);
86130
}
87131
}
88132
}
@@ -93,9 +137,10 @@ public void incrementWindow(int windowSize) {
93137
*/
94138
public void releaseConnection() {
95139
synchronized (streamLock) {
96-
if (!streamClosed && stream != null) {
140+
HttpStreamBase s = streamFuture.getNow(null);
141+
if (!streamClosed && s != null) {
97142
streamClosed = true;
98-
stream.close();
143+
s.close();
99144
}
100145
}
101146
}
@@ -107,10 +152,11 @@ public void releaseConnection() {
107152
*/
108153
public void closeConnection() {
109154
synchronized (streamLock) {
110-
if (!streamClosed && stream != null) {
155+
HttpStreamBase s = streamFuture.getNow(null);
156+
if (!streamClosed && s != null) {
111157
streamClosed = true;
112-
stream.cancel();
113-
stream.close();
158+
s.cancel();
159+
s.close();
114160
}
115161
}
116162
}

0 commit comments

Comments
 (0)