Skip to content

Commit 6b35e0b

Browse files
authored
New unified stream manager (#509)
1 parent 9ee809c commit 6b35e0b

11 files changed

Lines changed: 728 additions & 26 deletions

File tree

crt/aws-c-io

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
package software.amazon.awssdk.crt.http;
6+
7+
import software.amazon.awssdk.crt.CrtRuntimeException;
8+
9+
import java.util.concurrent.CompletableFuture;
10+
11+
/**
12+
* Manages a Pool of HTTP/1.1 Streams. Creates and manages HTTP/1.1 connections
13+
* under the hood. Will grab a connection from HttpClientConnectionManager to
14+
* make request on it, and will return it back until the request finishes.
15+
*/
16+
public class Http1StreamManager implements AutoCloseable {
17+
18+
private HttpClientConnectionManager connectionManager = null;
19+
20+
/**
21+
* Factory function for Http1StreamManager instances
22+
*
23+
* @param options the connection manager options configure to connection manager under the hood
24+
* @return a new instance of an Http1StreamManager
25+
*/
26+
public static Http1StreamManager create(HttpClientConnectionManagerOptions options) {
27+
return new Http1StreamManager(options);
28+
}
29+
30+
private Http1StreamManager(HttpClientConnectionManagerOptions options) {
31+
this.connectionManager = HttpClientConnectionManager.create(options);
32+
}
33+
34+
public CompletableFuture<Void> getShutdownCompleteFuture() {
35+
return this.connectionManager.getShutdownCompleteFuture();
36+
}
37+
38+
/**
39+
* Request an HTTP/1.1 HttpStream from StreamManager.
40+
*
41+
* @param request HttpRequest. The Request to make to the Server.
42+
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
43+
* @return A future for a HttpStream that will be completed when the stream is
44+
* acquired.
45+
* @throws CrtRuntimeException Exception happens from acquiring stream.
46+
*/
47+
public CompletableFuture<HttpStream> acquireStream(HttpRequest request,
48+
HttpStreamBaseResponseHandler streamHandler) {
49+
return this.acquireStream((HttpRequestBase) request, streamHandler);
50+
}
51+
52+
/**
53+
* Request an HTTP/1.1 HttpStream from StreamManager.
54+
*
55+
* @param request HttpRequestBase. The Request to make to the Server.
56+
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
57+
* @return A future for a HttpStream that will be completed when the stream is
58+
* acquired.
59+
* @throws CrtRuntimeException Exception happens from acquiring stream.
60+
*/
61+
public CompletableFuture<HttpStream> acquireStream(HttpRequestBase request,
62+
HttpStreamBaseResponseHandler streamHandler) {
63+
CompletableFuture<HttpStream> completionFuture = new CompletableFuture<>();
64+
HttpClientConnectionManager connManager = this.connectionManager;
65+
66+
connManager.acquireConnection().whenComplete((conn, throwable) -> {
67+
if (throwable != null) {
68+
completionFuture.completeExceptionally(throwable);
69+
} else {
70+
try {
71+
HttpStreamBase stream = conn.makeRequest(request, new HttpStreamBaseResponseHandler() {
72+
@Override
73+
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
74+
HttpHeader[] nextHeaders) {
75+
streamHandler.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders);
76+
}
77+
78+
@Override
79+
public void onResponseHeadersDone(HttpStreamBase stream, int blockType) {
80+
streamHandler.onResponseHeadersDone(stream, blockType);
81+
}
82+
83+
@Override
84+
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
85+
return streamHandler.onResponseBody(stream, bodyBytesIn);
86+
}
87+
88+
@Override
89+
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
90+
streamHandler.onResponseComplete(stream, errorCode);
91+
/* Release the connection back */
92+
connManager.releaseConnection(conn);
93+
}
94+
});
95+
completionFuture.complete((HttpStream) stream);
96+
/* Active the stream for user */
97+
try {
98+
stream.activate();
99+
} catch (CrtRuntimeException e) {
100+
/* If activate failed, complete callback will not be invoked */
101+
streamHandler.onResponseComplete(stream, e.errorCode);
102+
/* Release the connection back */
103+
connManager.releaseConnection(conn);
104+
}
105+
} catch (Exception ex) {
106+
connManager.releaseConnection(conn);
107+
completionFuture.completeExceptionally(ex);
108+
}
109+
}
110+
});
111+
return completionFuture;
112+
}
113+
114+
/**
115+
* @return concurrency metrics for the current manager
116+
*/
117+
public HttpManagerMetrics getManagerMetrics() {
118+
return this.connectionManager.getManagerMetrics();
119+
}
120+
121+
/**
122+
* @return maximum number of connections this manager will pool
123+
*/
124+
public int getMaxConnections() {
125+
return this.connectionManager.getMaxConnections();
126+
}
127+
128+
@Override
129+
public void close() {
130+
this.connectionManager.close();
131+
}
132+
}

src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ private Http2StreamManager(Http2StreamManagerOptions options) {
117117
proxyAuthorizationPassword != null ? proxyAuthorizationPassword.getBytes(UTF8) : null,
118118
noProxyHosts != null ? noProxyHosts.getBytes(UTF8) : null,
119119
connectionManagerOptions.isManualWindowManagement(),
120+
connectionManagerOptions.getWindowSize(),
120121
monitoringThroughputThresholdInBytesPerSecond,
121122
monitoringFailureIntervalInSeconds,
122123
maxConnections,
@@ -158,9 +159,8 @@ public CompletableFuture<Http2Stream> acquireStream(HttpRequest request,
158159
return this.acquireStream((HttpRequestBase) request, streamHandler);
159160
}
160161

161-
private CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
162-
HttpStreamBaseResponseHandler streamHandler) {
163-
162+
public CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
163+
HttpStreamBaseResponseHandler streamHandler) {
164164
CompletableFuture<Http2Stream> completionFuture = new CompletableFuture<>();
165165
AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null);
166166
if (isNull()) {
@@ -259,6 +259,7 @@ private static native long http2StreamManagerNew(Http2StreamManager thisObj,
259259
byte[] proxyAuthorizationPassword,
260260
byte[] noProxyHosts,
261261
boolean isManualWindowManagement,
262+
long windowSize,
262263
long monitoringThroughputThresholdInBytesPerSecond,
263264
int monitoringFailureIntervalInSeconds,
264265
int maxConns,

src/main/java/software/amazon/awssdk/crt/http/Http2StreamManagerOptions.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.List;
44
import java.util.ArrayList;
5+
import java.net.URI;
56

67
/**
78
* Contains all the configuration options for a Http2StreamManager
@@ -12,6 +13,7 @@ public class Http2StreamManagerOptions {
1213
public static final int DEFAULT_MAX = Integer.MAX_VALUE;
1314
public static final int DEFAULT_MAX_CONNECTIONS = 2;
1415
public static final int DEFAULT_CONNECTION_PING_TIMEOUT_MS = 3000;
16+
private static final String HTTPS = "https";
1517

1618
private HttpClientConnectionManagerOptions connectionManagerOptions;
1719

@@ -249,8 +251,9 @@ public void validateOptions() {
249251
throw new IllegalArgumentException("Connection manager options are required.");
250252
}
251253
connectionManagerOptions.validateOptions();
252-
if ((connectionManagerOptions.getTlsConnectionOptions() != null
253-
|| connectionManagerOptions.getTlsContext() != null) && priorKnowledge) {
254+
URI uri = connectionManagerOptions.getUri();
255+
boolean useTls = HTTPS.equals(uri.getScheme());
256+
if (useTls && priorKnowledge) {
254257
throw new IllegalArgumentException("HTTP/2 prior knowledge cannot be set when TLS is used.");
255258
}
256259
if ((connectionManagerOptions.getTlsConnectionOptions() == null
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
package software.amazon.awssdk.crt.http;
7+
8+
import java.util.concurrent.CompletableFuture;
9+
10+
/**
11+
* Manages a Pool of HTTP Streams. Wraps either Http1StreamManager or Http2StreamManager
12+
* depending on the expected protocol configured via HttpStreamManagerOptions.
13+
*/
14+
public class HttpStreamManager implements AutoCloseable {
15+
16+
private Http1StreamManager h1StreamManager = null;
17+
private Http2StreamManager h2StreamManager = null;
18+
private CompletableFuture<Void> shutdownComplete = null;
19+
20+
/**
21+
* Factory function for HttpStreamManager instances
22+
*
23+
* @param options the stream manager options to configure the manager
24+
* @return a new instance of an HttpStreamManager
25+
*/
26+
public static HttpStreamManager create(HttpStreamManagerOptions options) {
27+
return new HttpStreamManager(options);
28+
}
29+
30+
private HttpStreamManager(HttpStreamManagerOptions options) {
31+
if (options.getExpectedProtocol() == HttpVersion.UNKNOWN) {
32+
throw new IllegalArgumentException("UNKNOWN protocol is not supported. Please specify either HTTP_2 or HTTP_1_1/HTTP_1_0.");
33+
}
34+
35+
if (options.getExpectedProtocol() == HttpVersion.HTTP_2) {
36+
this.h2StreamManager = Http2StreamManager.create(options.getHTTP2StreamManagerOptions());
37+
this.shutdownComplete = this.h2StreamManager.getShutdownCompleteFuture();
38+
} else {
39+
this.h1StreamManager = Http1StreamManager.create(options.getHTTP1ConnectionManagerOptions());
40+
this.shutdownComplete = this.h1StreamManager.getShutdownCompleteFuture();
41+
}
42+
}
43+
44+
/**
45+
* Request an HttpStream from StreamManager.
46+
*
47+
* @param request HttpRequestBase. The Request to make to the Server.
48+
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
49+
* @return A future for a HttpStreamBase that will be completed when the stream is
50+
* acquired.
51+
*/
52+
public CompletableFuture<HttpStreamBase> acquireStream(HttpRequestBase request,
53+
HttpStreamBaseResponseHandler streamHandler) {
54+
if (this.h2StreamManager != null) {
55+
return this.h2StreamManager.acquireStream(request, streamHandler)
56+
.thenApply(stream -> (HttpStreamBase) stream);
57+
} else {
58+
return this.h1StreamManager.acquireStream(request, streamHandler)
59+
.thenApply(stream -> (HttpStreamBase) stream);
60+
}
61+
}
62+
63+
public CompletableFuture<Void> getShutdownCompleteFuture() {
64+
return shutdownComplete;
65+
}
66+
67+
/**
68+
* @return concurrency metrics for the current manager
69+
*/
70+
public HttpManagerMetrics getManagerMetrics() {
71+
if (this.h2StreamManager != null) {
72+
return this.h2StreamManager.getManagerMetrics();
73+
} else {
74+
return this.h1StreamManager.getManagerMetrics();
75+
}
76+
}
77+
78+
/**
79+
* @return maximum number of connections this connection manager will pool
80+
*/
81+
public int getMaxConnections() {
82+
if (this.h2StreamManager != null) {
83+
return this.h2StreamManager.getMaxConnections();
84+
} else {
85+
return this.h1StreamManager.getMaxConnections();
86+
}
87+
}
88+
89+
@Override
90+
public void close() {
91+
if (this.h1StreamManager != null) {
92+
this.h1StreamManager.close();
93+
}
94+
if (this.h2StreamManager != null) {
95+
this.h2StreamManager.close();
96+
}
97+
}
98+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
package software.amazon.awssdk.crt.http;
6+
7+
8+
/**
9+
* Contains all the configuration options for a Http2StreamManager
10+
* instance
11+
*/
12+
public class HttpStreamManagerOptions {
13+
14+
private HttpClientConnectionManagerOptions h1ConnectionManagerOptions;
15+
private Http2StreamManagerOptions h2StreamManagerOptions;
16+
/**
17+
* The expected protocol for the stream manager.
18+
*
19+
* - UNKNOWN: Default to use HTTP/2, but if server returns an HTTP/1.1 connection back, fallback to the HTTP/1.1 pool
20+
* - HTTP2: ONLY HTTP/2
21+
* - HTTP_1_1/HTTP_1_0: ONLY HTTP/1 and HTTP/1.1
22+
*/
23+
private HttpVersion expectedProtocol;
24+
25+
/**
26+
* Default constructor
27+
*/
28+
public HttpStreamManagerOptions() {
29+
this.expectedProtocol = HttpVersion.UNKNOWN;
30+
}
31+
32+
/**
33+
* The connection manager options for the HTTP/1.1 stream manager. Controls the behavior for HTTP/1 connections.
34+
*
35+
* @param connectionManagerOptions The connection manager options for the underlying HTTP/1.1 stream manager
36+
* @return this
37+
*/
38+
public HttpStreamManagerOptions withHTTP1ConnectionManagerOptions(HttpClientConnectionManagerOptions connectionManagerOptions) {
39+
this.h1ConnectionManagerOptions = connectionManagerOptions;
40+
return this;
41+
}
42+
43+
/**
44+
* @return The connection manager options for the HTTP/1.1 stream manager.
45+
*/
46+
public HttpClientConnectionManagerOptions getHTTP1ConnectionManagerOptions() {
47+
return h1ConnectionManagerOptions;
48+
}
49+
50+
/**
51+
* The stream manager options for the HTTP/2 stream manager. Controls the behavior for HTTP/2 connections.
52+
*
53+
* @param streamManagerOptions The stream manager options for the underlying HTTP/2 stream manager
54+
* @return this
55+
*/
56+
public HttpStreamManagerOptions withHTTP2StreamManagerOptions(Http2StreamManagerOptions streamManagerOptions) {
57+
this.h2StreamManagerOptions = streamManagerOptions;
58+
return this;
59+
}
60+
61+
/**
62+
* @return The stream manager options for the HTTP/2 stream manager.
63+
*/
64+
public Http2StreamManagerOptions getHTTP2StreamManagerOptions() {
65+
return h2StreamManagerOptions;
66+
}
67+
68+
/**
69+
* The expected protocol for whole stream manager. Default to UNKNOWN.
70+
*
71+
* - UNKNOWN: Default to use HTTP/2, but if server returns an HTTP/1.1 connection back, fallback to the HTTP/1.1 pool
72+
* - HTTP2: ONLY HTTP/2
73+
* - HTTP_1_1/HTTP_1_0: ONLY HTTP/1 and HTTP/1.1
74+
*
75+
* @param expectedProtocol The stream manager options for the underlying HTTP/2 stream manager
76+
* @return this
77+
*/
78+
public HttpStreamManagerOptions withExpectedProtocol(HttpVersion expectedProtocol) {
79+
this.expectedProtocol = expectedProtocol;
80+
return this;
81+
}
82+
83+
/**
84+
* @return The expected protocol for whole stream manager
85+
*/
86+
public HttpVersion getExpectedProtocol() {
87+
return expectedProtocol;
88+
}
89+
}

src/native/http2_stream_manager.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_
107107
jbyteArray jni_proxy_authorization_password,
108108
jbyteArray jni_no_proxy_hosts,
109109
jboolean jni_manual_window_management,
110+
jlong jni_init_window_size,
110111
jlong jni_monitoring_throughput_threshold_in_bytes_per_second,
111112
jint jni_monitoring_failure_interval_in_seconds,
112113
jint jni_max_conns,
@@ -196,6 +197,7 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_
196197
.monitoring_options = NULL,
197198
.host = endpoint,
198199
.port = port,
200+
.initial_window_size = (size_t)jni_init_window_size,
199201
.shutdown_complete_callback = &s_on_stream_manager_shutdown_complete_callback,
200202
.shutdown_complete_user_data = binding,
201203
.enable_read_back_pressure = jni_manual_window_management,

0 commit comments

Comments
 (0)