Skip to content

Commit ba21d05

Browse files
committed
Extract CrtStreamHandler to manage stream lifecycle
Move stream lifecycle operations (writeData, incrementWindow, releaseConnection, closeConnection) from ResponseHandlerHelper into a dedicated CrtStreamHandler class. This separates header parsing from stream management and makes the shared stream guard explicit between the request executor and response handler. ResponseHandlerHelper now only handles response header parsing.
1 parent 051f869 commit ba21d05

13 files changed

Lines changed: 303 additions & 265 deletions

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtHttpClient.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.function.Consumer;
2727
import software.amazon.awssdk.annotations.SdkPublicApi;
2828
import software.amazon.awssdk.crt.http.HttpException;
29-
import software.amazon.awssdk.crt.http.HttpStreamBase;
3029
import software.amazon.awssdk.crt.http.HttpStreamManager;
3130
import software.amazon.awssdk.http.ContentStreamProvider;
3231
import software.amazon.awssdk.http.ExecutableHttpRequest;
@@ -39,6 +38,7 @@
3938
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
4039
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
4140
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
41+
import software.amazon.awssdk.http.crt.internal.CrtStreamHandler;
4242
import software.amazon.awssdk.http.crt.internal.CrtUtils;
4343
import software.amazon.awssdk.utils.AttributeMap;
4444
import software.amazon.awssdk.utils.CompletableFutureUtils;
@@ -126,13 +126,13 @@ public HttpExecuteResponse call() throws IOException {
126126
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();
127127

128128
try {
129-
CrtRequestExecutor.ExecutionResult execution = new CrtRequestExecutor().execute(context);
130-
responseFuture = execution.responseFuture();
129+
CrtStreamHandler streamHandler = new CrtStreamHandler();
130+
responseFuture = new CrtRequestExecutor().execute(context, streamHandler);
131131

132-
// Wait for the stream to be acquired, then write the request body from the caller thread.
132+
// Write the request body from the caller thread via the stream handler,
133+
// which guards against concurrent stream close with a synchronized block.
133134
// This avoids blocking the CRT event loop thread in InputStream.read().
134-
HttpStreamBase stream = CompletableFutureUtils.joinInterruptibly(execution.streamFuture());
135-
writeRequestBody(stream);
135+
writeRequestBody(streamHandler);
136136

137137
SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
138138
builder.response(response);
@@ -169,21 +169,21 @@ public HttpExecuteResponse call() throws IOException {
169169
}
170170
}
171171

172-
private void writeRequestBody(HttpStreamBase stream) throws IOException {
172+
private void writeRequestBody(CrtStreamHandler streamHandler) throws IOException {
173173
ContentStreamProvider provider = context.sdkRequest().contentStreamProvider().orElse(null);
174174
if (provider == null) {
175175
return;
176176
}
177177

178+
streamHandler.waitForStream();
178179
try (InputStream inputStream = provider.newStream()) {
179180
byte[] buf = new byte[WRITE_BUFFER_SIZE];
180181
int read;
181-
182182
while ((read = inputStream.read(buf, 0, buf.length)) >= 0) {
183183
byte[] chunk = read == buf.length ? buf : Arrays.copyOf(buf, read);
184-
CompletableFutureUtils.joinInterruptibly(stream.writeData(chunk, false));
184+
CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(chunk, false));
185185
}
186-
CompletableFutureUtils.joinInterruptibly(stream.writeData(null, true));
186+
CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(null, true));
187187
}
188188
}
189189

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ private void doExecute(CrtAsyncRequestContext executionContext,
6767

6868
HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext);
6969

70+
CrtStreamHandler streamHandler = new CrtStreamHandler();
71+
7072
HttpStreamBaseResponseHandler crtResponseHandler =
71-
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler());
73+
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler);
7274

7375
CompletableFuture<HttpStreamBase> streamFuture =
7476
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
@@ -83,6 +85,8 @@ private void doExecute(CrtAsyncRequestContext executionContext,
8385
if (throwable != null) {
8486
Throwable toThrow = wrapCrtException(throwable);
8587
reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler());
88+
} else {
89+
streamHandler.setStream(stream);
8690
}
8791
});
8892
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import software.amazon.awssdk.annotations.SdkInternalApi;
2323
import software.amazon.awssdk.crt.http.HttpRequest;
2424
import software.amazon.awssdk.crt.http.HttpStreamBase;
25-
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
2625
import software.amazon.awssdk.http.SdkHttpFullResponse;
2726
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
2827
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
@@ -32,23 +31,22 @@
3231
@SdkInternalApi
3332
public final class CrtRequestExecutor {
3433

35-
public ExecutionResult execute(CrtRequestContext executionContext) {
34+
public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext,
35+
CrtStreamHandler streamHandler) {
3636
CompletableFuture<SdkHttpFullResponse> responseFuture = new CompletableFuture<>();
37-
CompletableFuture<HttpStreamBase> streamFuture;
3837

3938
try {
40-
streamFuture = doExecute(executionContext, responseFuture);
39+
doExecute(executionContext, responseFuture, streamHandler);
4140
} catch (Throwable t) {
4241
responseFuture.completeExceptionally(t);
43-
streamFuture = new CompletableFuture<>();
44-
streamFuture.completeExceptionally(t);
4542
}
4643

47-
return new ExecutionResult(streamFuture, responseFuture);
44+
return responseFuture;
4845
}
4946

50-
private CompletableFuture<HttpStreamBase> doExecute(CrtRequestContext executionContext,
51-
CompletableFuture<SdkHttpFullResponse> responseFuture) {
47+
private void doExecute(CrtRequestContext executionContext,
48+
CompletableFuture<SdkHttpFullResponse> responseFuture,
49+
CrtStreamHandler streamHandler) {
5250
MetricCollector metricCollector = executionContext.metricCollector();
5351
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
5452

@@ -58,7 +56,8 @@ private CompletableFuture<HttpStreamBase> doExecute(CrtRequestContext executionC
5856
acquireStartTime = System.nanoTime();
5957
}
6058

61-
HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(responseFuture);
59+
InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
60+
new InputStreamAdaptingHttpStreamResponseHandler(responseFuture, streamHandler);
6261

6362
HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
6463

@@ -77,32 +76,9 @@ private CompletableFuture<HttpStreamBase> doExecute(CrtRequestContext executionC
7776
if (throwable != null) {
7877
Throwable toThrow = wrapCrtException(throwable);
7978
responseFuture.completeExceptionally(toThrow);
79+
} else {
80+
streamHandler.setStream(streamBase);
8081
}
8182
});
82-
83-
return streamFuture;
84-
}
85-
86-
/**
87-
* Holds the result of submitting a request to CRT: the stream (for writing body data via
88-
* {@code writeData}) and the response future (for reading the response).
89-
*/
90-
public static final class ExecutionResult {
91-
private final CompletableFuture<HttpStreamBase> streamFuture;
92-
private final CompletableFuture<SdkHttpFullResponse> responseFuture;
93-
94-
ExecutionResult(CompletableFuture<HttpStreamBase> streamFuture,
95-
CompletableFuture<SdkHttpFullResponse> responseFuture) {
96-
this.streamFuture = streamFuture;
97-
this.responseFuture = responseFuture;
98-
}
99-
100-
public CompletableFuture<HttpStreamBase> streamFuture() {
101-
return streamFuture;
102-
}
103-
104-
public CompletableFuture<SdkHttpFullResponse> responseFuture() {
105-
return responseFuture;
106-
}
10783
}
10884
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt.internal;
17+
18+
import java.io.IOException;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.CountDownLatch;
21+
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
import software.amazon.awssdk.crt.http.HttpStreamBase;
23+
24+
/**
25+
* Manages the lifecycle of a CRT HTTP stream, providing thread-safe access to stream operations.
26+
* Shared between the request executor (for writing body data) and the response handler (for
27+
* incrementing the window and releasing/closing the connection).
28+
*/
29+
@SdkInternalApi
30+
public final class CrtStreamHandler {
31+
32+
private final Object streamLock = new Object();
33+
private final CountDownLatch streamLatch = new CountDownLatch(1);
34+
private HttpStreamBase stream;
35+
private boolean streamClosed;
36+
37+
/**
38+
* Sets the stream. Called once when the stream is acquired from the connection pool.
39+
*/
40+
public void setStream(HttpStreamBase stream) {
41+
this.stream = stream;
42+
streamLatch.countDown();
43+
}
44+
45+
/**
46+
* Blocks until the stream has been acquired.
47+
*/
48+
public void waitForStream() {
49+
try {
50+
streamLatch.await();
51+
} catch (InterruptedException e) {
52+
Thread.currentThread().interrupt();
53+
throw new RuntimeException("Interrupted while waiting for stream", e);
54+
}
55+
}
56+
57+
/**
58+
* Write data to the stream. The caller must ensure the stream is ready (via {@link #waitForStream()})
59+
* before calling this method.
60+
*/
61+
public CompletableFuture<Void> writeData(byte[] data, boolean endStream) {
62+
if (streamLatch.getCount() != 0) {
63+
CompletableFuture<Void> future = new CompletableFuture<>();
64+
future.completeExceptionally(
65+
new IllegalStateException("writeData called before stream is ready. Call waitForStream() first."));
66+
return future;
67+
}
68+
synchronized (streamLock) {
69+
if (streamClosed) {
70+
CompletableFuture<Void> future = new CompletableFuture<>();
71+
future.completeExceptionally(
72+
new IOException("Stream is already closed, cannot write data."));
73+
return future;
74+
}
75+
return stream.writeData(data, endStream);
76+
}
77+
}
78+
79+
public void incrementWindow(int windowSize) {
80+
if (streamLatch.getCount() != 0) {
81+
throw new IllegalStateException("incrementWindow called before stream is ready.");
82+
}
83+
synchronized (streamLock) {
84+
if (!streamClosed) {
85+
stream.incrementWindow(windowSize);
86+
}
87+
}
88+
}
89+
90+
/**
91+
* Release the connection back to the pool so that it may be reused. This should be called when the request
92+
* completes successfully and the response has been fully consumed.
93+
*/
94+
public void releaseConnection() {
95+
synchronized (streamLock) {
96+
if (!streamClosed && stream != null) {
97+
streamClosed = true;
98+
stream.close();
99+
}
100+
}
101+
}
102+
103+
/**
104+
* Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the
105+
* connection pool. This should be called on error paths or when the stream is aborted before the response is
106+
* fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract.
107+
*/
108+
public void closeConnection() {
109+
synchronized (streamLock) {
110+
if (!streamClosed && stream != null) {
111+
streamClosed = true;
112+
stream.cancel();
113+
stream.close();
114+
}
115+
}
116+
}
117+
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import software.amazon.awssdk.http.SdkHttpResponse;
3131
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
3232
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
33+
import software.amazon.awssdk.http.crt.internal.CrtStreamHandler;
3334
import software.amazon.awssdk.utils.Logger;
3435
import software.amazon.awssdk.utils.Validate;
3536
import software.amazon.awssdk.utils.async.SimplePublisher;
@@ -48,31 +49,44 @@ public final class CrtResponseAdapter implements HttpStreamBaseResponseHandler {
4849
private final SimplePublisher<ByteBuffer> responsePublisher;
4950
private final SdkHttpResponse.Builder responseBuilder;
5051
private final ResponseHandlerHelper responseHandlerHelper;
52+
private final CrtStreamHandler streamHandler;
5153

5254
private CrtResponseAdapter(CompletableFuture<Void> completionFuture,
53-
SdkAsyncHttpResponseHandler responseHandler) {
54-
this(completionFuture, responseHandler, new SimplePublisher<>());
55+
SdkAsyncHttpResponseHandler responseHandler,
56+
CrtStreamHandler streamHandler) {
57+
this(completionFuture, responseHandler, new SimplePublisher<>(), streamHandler);
5558
}
5659

5760
@SdkTestInternalApi
5861
public CrtResponseAdapter(CompletableFuture<Void> completionFuture,
5962
SdkAsyncHttpResponseHandler responseHandler,
6063
SimplePublisher<ByteBuffer> simplePublisher) {
64+
this(completionFuture, responseHandler, simplePublisher, new CrtStreamHandler());
65+
}
66+
67+
@SdkTestInternalApi
68+
public CrtResponseAdapter(CompletableFuture<Void> completionFuture,
69+
SdkAsyncHttpResponseHandler responseHandler,
70+
SimplePublisher<ByteBuffer> simplePublisher,
71+
CrtStreamHandler streamHandler) {
6172
this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture");
6273
this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler");
6374
this.responseBuilder = SdkHttpResponse.builder();
6475
this.responsePublisher = simplePublisher;
76+
this.streamHandler = streamHandler;
6577
this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder);
6678
}
6779

6880
public static HttpStreamBaseResponseHandler toCrtResponseHandler(
6981
CompletableFuture<Void> requestFuture,
70-
SdkAsyncHttpResponseHandler responseHandler) {
71-
return new CrtResponseAdapter(requestFuture, responseHandler);
82+
SdkAsyncHttpResponseHandler responseHandler,
83+
CrtStreamHandler streamHandler) {
84+
return new CrtResponseAdapter(requestFuture, responseHandler, streamHandler);
7285
}
7386

7487
@Override
7588
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
89+
streamHandler.setStream(stream);
7690
responseHandlerHelper.onResponseHeaders(stream, responseStatusCode, headerType, nextHeaders);
7791
}
7892

@@ -89,17 +103,16 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
89103
CompletableFuture<Void> writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn));
90104

91105
if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
92-
// Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT.
93106
return bodyBytesIn.length;
94107
}
95108

96109
writeFuture.whenComplete((result, failure) -> {
97110
if (failure != null) {
98111
failResponseHandlerAndFuture(failure);
99-
responseHandlerHelper.closeConnection();
112+
streamHandler.closeConnection();
100113
return;
101114
}
102-
responseHandlerHelper.incrementWindow(bodyBytesIn.length);
115+
streamHandler.incrementWindow(bodyBytesIn.length);
103116
});
104117

105118
return 0;
@@ -122,15 +135,15 @@ private void onSuccessfulResponseComplete() {
122135
}
123136
completionFuture.complete(null);
124137
});
125-
responseHandlerHelper.releaseConnection();
138+
streamHandler.releaseConnection();
126139
}
127140

128141
private void onFailedResponseComplete(HttpException error) {
129142
log.debug(() -> "HTTP response encountered an error.", error);
130143
Throwable toThrow = wrapWithIoExceptionIfRetryable(error);
131144
responsePublisher.error(toThrow);
132145
failResponseHandlerAndFuture(toThrow);
133-
responseHandlerHelper.closeConnection();
146+
streamHandler.closeConnection();
134147
}
135148

136149
private void failResponseHandlerAndFuture(Throwable error) {

0 commit comments

Comments
 (0)