Skip to content

Commit 964f1ae

Browse files
arturobernalgfitekonea
authored andcommitted
Implement shared FIFO execution queue for H2 client
Introduce one shared per-client queue to cap concurrently executing requests and enqueue overflow. Ensure queued starts release the slot on any terminal path, including synchronous start failures.
1 parent e005c48 commit 964f1ae

15 files changed

+1052
-572
lines changed

httpclient5-testing/src/test/java/org/apache/hc/client5/http/impl/async/InternalTestHttpAsyncExecRuntime.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.List;
3232
import java.util.concurrent.Future;
3333
import java.util.concurrent.atomic.AtomicBoolean;
34-
import java.util.concurrent.atomic.AtomicInteger;
3534

3635
import org.apache.hc.client5.http.HttpRoute;
3736
import org.apache.hc.client5.http.async.AsyncExecRuntime;
@@ -64,7 +63,7 @@ public final class InternalTestHttpAsyncExecRuntime extends InternalHttpAsyncExe
6463
public InternalTestHttpAsyncExecRuntime(final AsyncClientConnectionManager manager,
6564
final ConnectionInitiator connectionInitiator,
6665
final TlsConfig tlsConfig) {
67-
super(LOG, manager, connectionInitiator, null, tlsConfig, -1, new AtomicInteger());
66+
super(LOG, manager, connectionInitiator, null, tlsConfig);
6867
this.cancelled = new AtomicBoolean();
6968
}
7069

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,12 +328,12 @@ public final H2AsyncClientBuilder disableRequestPriority() {
328328
}
329329

330330
/**
331-
* Sets a hard cap on the number of requests allowed to be queued/in-flight
332-
* within the internal async execution pipeline. When the limit is reached,
333-
* new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
331+
* Sets the maximum number of concurrent request executions within the
332+
* internal async execution pipeline. Requests beyond this limit are queued
333+
* in FIFO order and dispatched as in-flight executions complete.
334334
* A value {@code <= 0} means unlimited (default).
335335
*
336-
* @param max maximum number of queued requests; {@code <= 0} to disable the cap
336+
* @param max maximum number of concurrent executions; {@code <= 0} to disable the cap
337337
* @return this builder
338338
* @since 5.7
339339
*/

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -910,12 +910,12 @@ public final HttpAsyncClientBuilder setTlsRequired(final boolean tlsRequired) {
910910
}
911911

912912
/**
913-
* Sets a hard cap on the number of requests allowed to be queued/in-flight
914-
* within the internal async execution pipeline. When the limit is reached,
915-
* new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
916-
* A value <= 0 means unlimited (default).
913+
* Sets the maximum number of concurrent request executions within the
914+
* internal async execution pipeline. Requests beyond this limit are queued
915+
* in FIFO order and dispatched as in-flight executions complete.
916+
* A value {@code <= 0} means unlimited (default).
917917
*
918-
* @param max maximum number of queued requests; <= 0 to disable the cap
918+
* @param max maximum number of concurrent executions; {@code <= 0} to disable the cap
919919
* @return this builder
920920
* @since 5.7
921921
*/

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.io.Closeable;
3030
import java.util.List;
3131
import java.util.concurrent.ThreadFactory;
32-
import java.util.concurrent.atomic.AtomicInteger;
3332

3433
import org.apache.hc.client5.http.HttpRoute;
3534
import org.apache.hc.client5.http.async.AsyncExecRuntime;
@@ -56,11 +55,6 @@
5655

5756
/**
5857
* Internal implementation of HTTP/2 only {@link CloseableHttpAsyncClient}.
59-
* <p>
60-
* Concurrent message exchanges with the same connection route executed by
61-
* this client will get automatically multiplexed over a single physical HTTP/2
62-
* connection.
63-
* </p>
6458
*
6559
* @since 5.0
6660
*/
@@ -71,8 +65,12 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
7165
private static final Logger LOG = LoggerFactory.getLogger(InternalH2AsyncClient.class);
7266
private final HttpRoutePlanner routePlanner;
7367
private final InternalH2ConnPool connPool;
74-
private final int maxQueuedRequests;
75-
private final AtomicInteger queuedRequests;
68+
69+
/**
70+
* One shared FIFO queue per client instance.
71+
* {@code null} means unlimited / no throttling.
72+
*/
73+
private final SharedRequestExecutionQueue executionQueue;
7674

7775
InternalH2AsyncClient(
7876
final DefaultConnectingIOReactor ioReactor,
@@ -94,20 +92,16 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
9492
defaultConfig, closeables);
9593
this.connPool = connPool;
9694
this.routePlanner = routePlanner;
97-
this.maxQueuedRequests = maxQueuedRequests;
98-
this.queuedRequests = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
95+
this.executionQueue = maxQueuedRequests > 0 ? new SharedRequestExecutionQueue(maxQueuedRequests) : null;
9996
}
10097

10198
@Override
10299
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
103-
return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, maxQueuedRequests, queuedRequests);
100+
return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, executionQueue);
104101
}
105102

106103
@Override
107-
HttpRoute determineRoute(
108-
final HttpHost httpHost,
109-
final HttpRequest request,
110-
final HttpClientContext clientContext) throws HttpException {
104+
HttpRoute determineRoute(final HttpHost httpHost, final HttpRequest request, final HttpClientContext clientContext) throws HttpException {
111105
final HttpRoute route = routePlanner.determineRoute(httpHost, request, clientContext);
112106
if (route.isTunnelled()) {
113107
throw new HttpException("HTTP/2 tunneling not supported");

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
package org.apache.hc.client5.http.impl.async;
2929

3030
import java.io.InterruptedIOException;
31-
import java.util.concurrent.RejectedExecutionException;
32-
import java.util.concurrent.atomic.AtomicInteger;
3331
import java.util.concurrent.atomic.AtomicReference;
3432

3533
import org.apache.hc.client5.http.EndpointInfo;
@@ -63,30 +61,27 @@ class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
6361
private final InternalH2ConnPool connPool;
6462
private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
6563
private final AtomicReference<Endpoint> sessionRef;
66-
private final int maxQueued;
67-
private final AtomicInteger sharedQueued;
64+
private final SharedRequestExecutionQueue executionQueue;
6865
private volatile boolean reusable;
6966

7067
InternalH2AsyncExecRuntime(
7168
final Logger log,
7269
final InternalH2ConnPool connPool,
7370
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
74-
this(log, connPool, pushHandlerFactory, -1, null);
71+
this(log, connPool, pushHandlerFactory, null);
7572
}
7673

7774
InternalH2AsyncExecRuntime(
7875
final Logger log,
7976
final InternalH2ConnPool connPool,
8077
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
81-
final int maxQueued,
82-
final AtomicInteger sharedQueued) {
78+
final SharedRequestExecutionQueue executionQueue) {
8379
super();
8480
this.log = log;
8581
this.connPool = connPool;
8682
this.pushHandlerFactory = pushHandlerFactory;
8783
this.sessionRef = new AtomicReference<>();
88-
this.maxQueued = maxQueued;
89-
this.sharedQueued = sharedQueued;
84+
this.executionQueue = executionQueue;
9085
}
9186

9287
@Override
@@ -179,7 +174,6 @@ public boolean isEndpointConnected() {
179174
return endpoint != null && endpoint.session.isOpen();
180175
}
181176

182-
183177
Endpoint ensureValid() {
184178
final Endpoint endpoint = sessionRef.get();
185179
if (endpoint == null) {
@@ -261,41 +255,47 @@ public EndpointInfo getEndpointInfo() {
261255
return null;
262256
}
263257

264-
private boolean tryAcquireSlot() {
265-
if (sharedQueued == null || maxQueued <= 0) {
266-
return true;
258+
@Override
259+
public Cancellable execute(
260+
final String id,
261+
final AsyncClientExchangeHandler exchangeHandler,
262+
final HttpClientContext context) {
263+
264+
final Endpoint endpoint = ensureValid();
265+
final ComplexCancellable complexCancellable = new ComplexCancellable();
266+
267+
if (executionQueue == null) {
268+
startExecution(id, endpoint, exchangeHandler, context, complexCancellable);
269+
return complexCancellable;
267270
}
268-
for (;;) {
269-
final int q = sharedQueued.get();
270-
if (q >= maxQueued) {
271-
return false;
272-
}
273-
if (sharedQueued.compareAndSet(q, q + 1)) {
271+
272+
final Cancellable queued = executionQueue.enqueue(
273+
() -> {
274+
final AsyncClientExchangeHandler wrapped =
275+
new ReleasingAsyncClientExchangeHandler(exchangeHandler, executionQueue::completed);
276+
try {
277+
startExecution(id, endpoint, wrapped, context, complexCancellable);
278+
} catch (final RuntimeException ex) {
279+
wrapped.failed(ex);
280+
}
281+
},
282+
exchangeHandler::cancel);
283+
284+
return () -> {
285+
if (queued.cancel()) {
274286
return true;
275287
}
276-
}
277-
}
278-
279-
private void releaseSlot() {
280-
if (sharedQueued != null && maxQueued > 0) {
281-
sharedQueued.decrementAndGet();
282-
}
288+
return complexCancellable.cancel();
289+
};
283290
}
284291

285-
@Override
286-
public Cancellable execute(
292+
private void startExecution(
287293
final String id,
288-
final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
289-
final Endpoint endpoint = ensureValid();
290-
if (!tryAcquireSlot()) {
291-
exchangeHandler.failed(new RejectedExecutionException(
292-
"Execution pipeline queue limit reached (max=" + maxQueued + ")"));
293-
return Operations.nonCancellable();
294-
}
295-
final AsyncClientExchangeHandler actual = sharedQueued != null
296-
? new ReleasingAsyncClientExchangeHandler(exchangeHandler, this::releaseSlot)
297-
: exchangeHandler;
298-
final ComplexCancellable complexCancellable = new ComplexCancellable();
294+
final Endpoint endpoint,
295+
final AsyncClientExchangeHandler exchangeHandler,
296+
final HttpClientContext context,
297+
final ComplexCancellable complexCancellable) {
298+
299299
final IOSession session = endpoint.session;
300300
final RequestConfig requestConfig = context.getRequestConfigOrDefault();
301301
final Timeout responseTimeout = requestConfig.getResponseTimeout();
@@ -306,7 +306,7 @@ public Cancellable execute(
306306
context.setProtocolVersion(HttpVersion.HTTP_2);
307307
session.enqueue(
308308
new RequestExecutionCommand(
309-
actual,
309+
exchangeHandler,
310310
pushHandlerFactory,
311311
context,
312312
streamControl -> {
@@ -330,7 +330,7 @@ public void completed(final IOSession ioSession) {
330330
context.setProtocolVersion(HttpVersion.HTTP_2);
331331
ioSession.enqueue(
332332
new RequestExecutionCommand(
333-
actual,
333+
exchangeHandler,
334334
pushHandlerFactory,
335335
context,
336336
streamControl -> {
@@ -342,17 +342,16 @@ public void completed(final IOSession ioSession) {
342342

343343
@Override
344344
public void failed(final Exception ex) {
345-
actual.failed(ex);
345+
exchangeHandler.failed(ex);
346346
}
347347

348348
@Override
349349
public void cancelled() {
350-
actual.failed(new InterruptedIOException());
350+
exchangeHandler.failed(new InterruptedIOException());
351351
}
352352

353353
});
354354
}
355-
return complexCancellable;
356355
}
357356

358357
@Override
@@ -384,7 +383,7 @@ public String getId() {
384383

385384
@Override
386385
public AsyncExecRuntime fork() {
387-
return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, maxQueued, sharedQueued);
386+
return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, executionQueue);
388387
}
389388

390389
}

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.io.Closeable;
3030
import java.util.List;
3131
import java.util.concurrent.ThreadFactory;
32-
import java.util.concurrent.atomic.AtomicInteger;
3332
import java.util.function.Function;
3433

3534
import org.apache.hc.client5.http.HttpRoute;
@@ -77,8 +76,12 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
7776
private final AsyncClientConnectionManager manager;
7877
private final HttpRoutePlanner routePlanner;
7978
private final TlsConfig tlsConfig;
80-
private final int maxQueuedRequests;
81-
private final AtomicInteger queuedCounter;
79+
80+
/**
81+
* One shared FIFO queue per client instance.
82+
* null means "unlimited" / no throttling.
83+
*/
84+
private final SharedRequestExecutionQueue executionQueue;
8285

8386
InternalHttpAsyncClient(
8487
final DefaultConnectingIOReactor ioReactor,
@@ -103,13 +106,12 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
103106
this.manager = manager;
104107
this.routePlanner = routePlanner;
105108
this.tlsConfig = tlsConfig;
106-
this.maxQueuedRequests = maxQueuedRequests;
107-
this.queuedCounter = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
109+
this.executionQueue = maxQueuedRequests > 0 ? new SharedRequestExecutionQueue(maxQueuedRequests) : null;
108110
}
109111

110112
@Override
111113
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
112-
return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, maxQueuedRequests, queuedCounter);
114+
return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, executionQueue);
113115
}
114116

115117
@Override

0 commit comments

Comments
 (0)