Skip to content

Commit 2176d23

Browse files
authored
max concurrent streams for h2 stream manager (#977)
1 parent 5c71165 commit 2176d23

5 files changed

Lines changed: 128 additions & 3 deletions

File tree

src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ private Http2StreamManager(Http2StreamManagerOptions options) {
123123
maxConnections,
124124
idealConcurrentStreamsPerConnection,
125125
maxConcurrentStreamsPerConnection,
126+
options.getMaxConcurrentStreams(),
126127
options.hasPriorKnowledge(),
127128
options.shouldCloseConnectionOnServerError(),
128129
options.getConnectionPingPeriodMs(),
@@ -265,6 +266,7 @@ private static native long http2StreamManagerNew(Http2StreamManager thisObj,
265266
int maxConns,
266267
int ideal_concurrent_streams_per_connection,
267268
int max_concurrent_streams_per_connection,
269+
int max_concurrent_streams,
268270
boolean priorKnowledge,
269271
boolean closeConnectionOnServerError,
270272
int connectionPingPeriodMs,

src/main/java/software/amazon/awssdk/crt/http/Http2StreamManagerOptions.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
* instance
1010
*/
1111
public class Http2StreamManagerOptions {
12-
public static final int DEFAULT_MAX_WINDOW_SIZE = Integer.MAX_VALUE;
1312
public static final int DEFAULT_MAX = Integer.MAX_VALUE;
14-
public static final int DEFAULT_MAX_CONNECTIONS = 2;
1513
public static final int DEFAULT_CONNECTION_PING_TIMEOUT_MS = 3000;
1614
private static final String HTTPS = "https";
1715

@@ -20,6 +18,7 @@ public class Http2StreamManagerOptions {
2018
private int idealConcurrentStreamsPerConnection = 100;
2119
private boolean connectionManualWindowManagement = false;
2220
private int maxConcurrentStreamsPerConnection = DEFAULT_MAX;
21+
private int maxConcurrentStreams = 0;
2322

2423
private boolean priorKnowledge = false;
2524
private boolean closeConnectionOnServerError = false;
@@ -108,6 +107,28 @@ public int getMaxConcurrentStreamsPerConnection() {
108107
return maxConcurrentStreamsPerConnection;
109108
}
110109

110+
/**
111+
* The max number of concurrent streams that can be active across all connections
112+
* at the same time. 0 means no limit (default). When this limit is reached, the
113+
* stream manager will wait for existing streams to complete before creating new
114+
* ones, even if connections have available capacity.
115+
*
116+
* @param maxConcurrentStreams The max number of concurrent streams across all connections
117+
* @return this
118+
*/
119+
public Http2StreamManagerOptions withMaxConcurrentStreams(int maxConcurrentStreams) {
120+
this.maxConcurrentStreams = maxConcurrentStreams;
121+
return this;
122+
}
123+
124+
/**
125+
* @return The max number of concurrent streams across all connections.
126+
* 0 means no limit (default).
127+
*/
128+
public int getMaxConcurrentStreams() {
129+
return maxConcurrentStreams;
130+
}
131+
111132
/**
112133
* @return The connection level manual flow control enabled or not.
113134
*/

src/native/http2_stream_manager.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_
113113
jint jni_max_conns,
114114
jint jni_ideal_concurrent_streams_per_connection,
115115
jint jni_max_concurrent_streams_per_connection,
116+
jint jni_max_concurrent_streams,
116117
jboolean jni_prior_knowledge,
117118
jboolean jni_close_connection_on_server_error,
118119
jint jni_connection_ping_period_ms,
@@ -207,6 +208,7 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_
207208
.ideal_concurrent_streams_per_connection = (size_t)jni_ideal_concurrent_streams_per_connection,
208209
.max_concurrent_streams_per_connection = (size_t)jni_max_concurrent_streams_per_connection,
209210
.max_connections = (size_t)jni_max_conns,
211+
.max_concurrent_streams = (size_t)jni_max_concurrent_streams,
210212
};
211213

212214
struct aws_http_connection_monitoring_options monitoring_options;

src/test/java/software/amazon/awssdk/crt/test/Http2StreamManagerTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,4 +191,104 @@ public void testStreamManagerMetrics() throws Exception {
191191
CrtResource.logNativeResources();
192192
CrtResource.waitForNoResources();
193193
}
194+
195+
@Test
196+
public void testMaxConcurrentStreamsEnforcement() throws Exception {
197+
skipIfAndroid();
198+
skipIfNetworkUnavailable();
199+
200+
URI uri = new URI(endpoint);
201+
int maxConcurrentStreams = 1;
202+
int numRequestsToMake = 20;
203+
204+
try (EventLoopGroup eventLoopGroup = new EventLoopGroup(1);
205+
HostResolver resolver = new HostResolver(eventLoopGroup);
206+
ClientBootstrap bootstrap = new ClientBootstrap(eventLoopGroup, resolver);
207+
SocketOptions sockOpts = new SocketOptions();
208+
TlsContextOptions tlsOpts = TlsContextOptions.createDefaultClient().withAlpnList("h2");
209+
TlsContext tlsContext = createHttpClientTlsContext(tlsOpts)) {
210+
211+
Http2StreamManagerOptions options = new Http2StreamManagerOptions();
212+
options.withMaxConcurrentStreams(maxConcurrentStreams)
213+
.withMaxConcurrentStreamsPerConnection(100)
214+
.withIdealConcurrentStreamsPerConnection(100);
215+
216+
HttpClientConnectionManagerOptions connectionManagerOptions = new HttpClientConnectionManagerOptions();
217+
connectionManagerOptions.withClientBootstrap(bootstrap)
218+
.withSocketOptions(sockOpts)
219+
.withTlsContext(tlsContext)
220+
.withUri(uri)
221+
.withMaxConnections(2);
222+
options.withConnectionManagerOptions(connectionManagerOptions);
223+
224+
Http2StreamManager streamManager = Http2StreamManager.create(options);
225+
226+
try {
227+
Http2Request request = createHttp2Request("GET", endpoint, path, EMPTY_BODY);
228+
229+
final AtomicInteger completedRequests = new AtomicInteger(0);
230+
final AtomicInteger maxConcurrentActive = new AtomicInteger(0);
231+
final AtomicInteger currentActive = new AtomicInteger(0);
232+
final CountDownLatch allRequestsComplete = new CountDownLatch(numRequestsToMake);
233+
234+
// Acquire multiple streams rapidly
235+
for (int i = 0; i < numRequestsToMake; i++) {
236+
streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
237+
@Override
238+
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
239+
HttpHeader[] nextHeaders) {
240+
// Track concurrent active streams
241+
int active = currentActive.incrementAndGet();
242+
int prevMax = maxConcurrentActive.get();
243+
while (active > prevMax) {
244+
if (maxConcurrentActive.compareAndSet(prevMax, active)) {
245+
break;
246+
}
247+
prevMax = maxConcurrentActive.get();
248+
}
249+
}
250+
251+
@Override
252+
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
253+
currentActive.decrementAndGet();
254+
completedRequests.incrementAndGet();
255+
stream.close();
256+
allRequestsComplete.countDown();
257+
}
258+
}).whenComplete((stream, throwable) -> {
259+
if (throwable != null) {
260+
allRequestsComplete.countDown();
261+
}
262+
});
263+
}
264+
265+
// Wait for all requests to complete (with timeout)
266+
boolean completed = allRequestsComplete.await(60, TimeUnit.SECONDS);
267+
Assert.assertTrue("Requests did not complete within timeout", completed);
268+
269+
// Verify all requests completed
270+
Assert.assertEquals(numRequestsToMake, completedRequests.get());
271+
272+
// Verify that max concurrent streams limit was respected
273+
// The actual concurrent count should not exceed our limit significantly
274+
// (allowing small margin for timing)
275+
int observedMax = maxConcurrentActive.get();
276+
Log.log(Log.LogLevel.Info, Log.LogSubject.HttpConnectionManager,
277+
String.format("Max concurrent streams observed: %d (limit: %d)", observedMax, maxConcurrentStreams));
278+
279+
// The observed max should be close to our limit (allowing some flexibility for race conditions)
280+
Assert.assertTrue(
281+
String.format("Expected max concurrent streams around %d, but observed %d",
282+
maxConcurrentStreams, observedMax),
283+
observedMax <= maxConcurrentStreams);
284+
285+
} finally {
286+
streamManager.close();
287+
streamManager.getShutdownCompleteFuture().get(60, TimeUnit.SECONDS);
288+
}
289+
}
290+
291+
CrtResource.logNativeResources();
292+
CrtResource.waitForNoResources();
293+
}
194294
}

0 commit comments

Comments
 (0)