Skip to content

Commit 797374c

Browse files
committed
H2 streams initiated by the standard async client can be gracefully cancelled or time out without closing the underlying connection
1 parent e005c48 commit 797374c

File tree

5 files changed

+260
-26
lines changed

5 files changed

+260
-26
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.testing.async;
28+
29+
import static org.apache.hc.core5.util.ReflectionUtils.determineJRELevel;
30+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
31+
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.Future;
34+
35+
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
36+
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
37+
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
38+
import org.apache.hc.client5.http.config.ConnectionConfig;
39+
import org.apache.hc.client5.http.config.RequestConfig;
40+
import org.apache.hc.client5.http.config.TlsConfig;
41+
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
42+
import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
43+
import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
44+
import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
45+
import org.apache.hc.core5.http.HttpHost;
46+
import org.apache.hc.core5.http.URIScheme;
47+
import org.apache.hc.core5.http2.H2StreamTimeoutException;
48+
import org.apache.hc.core5.http2.HttpVersionPolicy;
49+
import org.apache.hc.core5.pool.PoolStats;
50+
import org.apache.hc.core5.util.Timeout;
51+
import org.junit.jupiter.api.Assertions;
52+
import org.junit.jupiter.api.Disabled;
53+
import org.junit.jupiter.api.Nested;
54+
import org.junit.jupiter.api.Test;
55+
56+
abstract class AbstractTestHttp2StreamResponseTimeout extends AbstractIntegrationTestBase {
57+
58+
public AbstractTestHttp2StreamResponseTimeout(final URIScheme scheme) {
59+
this(scheme, false);
60+
}
61+
62+
public AbstractTestHttp2StreamResponseTimeout(final URIScheme scheme, final boolean useUnixDomainSocket) {
63+
super(scheme, ClientProtocolLevel.STANDARD, ServerProtocolLevel.H2_ONLY, useUnixDomainSocket);
64+
}
65+
66+
void checkAssumptions() {
67+
}
68+
69+
@Test
70+
void testResponseTimeout() throws Exception {
71+
checkAssumptions();
72+
configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
73+
final HttpHost target = startServer();
74+
75+
final TestAsyncClient client = startClient();
76+
final PoolingAsyncClientConnectionManager connManager = client.getConnectionManager();
77+
connManager.setDefaultConnectionConfig(ConnectionConfig.custom()
78+
.setSocketTimeout(Timeout.ofMinutes(1))
79+
.build());
80+
connManager.setDefaultTlsConfig(TlsConfig.custom()
81+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
82+
.build());
83+
84+
final SimpleHttpRequest request1 = SimpleRequestBuilder.get()
85+
.setHttpHost(target)
86+
.setPath("/random/1024")
87+
.setRequestConfig(RequestConfig.custom()
88+
.setUnixDomainSocket(getUnixDomainSocket())
89+
.build())
90+
.build();
91+
final SimpleHttpRequest request2 = SimpleRequestBuilder.get()
92+
.setHttpHost(target)
93+
.setPath("/random/1024?delay=1000")
94+
.setRequestConfig(RequestConfig.custom()
95+
.setUnixDomainSocket(getUnixDomainSocket())
96+
.setResponseTimeout(Timeout.ofMilliseconds(100))
97+
.build())
98+
.build();
99+
100+
final Future<SimpleHttpResponse> future1 = client.execute(request1, null, null);
101+
final Future<SimpleHttpResponse> future2 = client.execute(request2, null, null);
102+
final SimpleHttpResponse response1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
103+
Assertions.assertNotNull(response1);
104+
final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
105+
future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
106+
Assertions.assertInstanceOf(H2StreamTimeoutException.class, exception.getCause());
107+
108+
final PoolStats totalStats = connManager.getTotalStats();
109+
Assertions.assertTrue(totalStats.getAvailable() > 0);
110+
}
111+
112+
}
113+
114+
@Disabled
115+
public class TestHttp2StreamResponseTimeout {
116+
117+
@Nested
118+
class Http extends AbstractTestHttp2StreamResponseTimeout {
119+
public Http() {
120+
super(URIScheme.HTTP, false);
121+
}
122+
}
123+
124+
@Nested
125+
class Https extends AbstractTestHttp2StreamResponseTimeout {
126+
public Https() {
127+
super(URIScheme.HTTPS, false);
128+
}
129+
}
130+
131+
@Nested
132+
class Uds extends AbstractTestHttp2StreamResponseTimeout {
133+
public Uds() {
134+
super(URIScheme.HTTP, true);
135+
}
136+
137+
@Override
138+
void checkAssumptions() {
139+
assumeTrue(determineJRELevel() >= 16, "Async UDS requires Java 16+");
140+
}
141+
}
142+
143+
}

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.net.InetSocketAddress;
3030
import java.util.concurrent.CancellationException;
31+
import java.util.concurrent.ExecutionException;
3132
import java.util.concurrent.Future;
3233
import java.util.function.Consumer;
3334

@@ -53,6 +54,7 @@
5354
import org.apache.hc.core5.http.URIScheme;
5455
import org.apache.hc.core5.http.nio.support.AsyncClientPipeline;
5556
import org.apache.hc.core5.http.support.BasicRequestBuilder;
57+
import org.apache.hc.core5.http2.H2StreamResetException;
5658
import org.apache.hc.core5.http2.HttpVersionPolicy;
5759
import org.apache.hc.core5.pool.PoolStats;
5860
import org.apache.hc.core5.reactor.ConnectionInitiator;
@@ -206,4 +208,65 @@ public void completed(final Message<HttpResponse, byte[]> result) {
206208
}
207209
}
208210

211+
@Test
212+
void testExecutionCancellation_http12_connectionAlive() throws Exception {
213+
configureServer(bootstrap -> {
214+
bootstrap.setServerProtocolLevel(ServerProtocolLevel.H2_ONLY);
215+
bootstrap.register("/random/*", AsyncRandomHandler::new);
216+
});
217+
final HttpHost target = startServer();
218+
219+
final TestAsyncClient client = startClient();
220+
final ConnectionInitiator connectionInitiator = client.getImplementation();
221+
final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager();
222+
for (int i = 0; i < REQ_NUM; i++) {
223+
final HttpClientContext context = HttpClientContext.create();
224+
225+
final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime(
226+
connectionManager,
227+
connectionInitiator,
228+
TlsConfig.custom()
229+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
230+
.build());
231+
final Future<Boolean> connectFuture = testRuntime.leaseAndConnect(target, context);
232+
Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
233+
234+
final BasicFuture<Message<HttpResponse, byte[]>> resultFuture = new BasicFuture<>(null);
235+
final Cancellable cancellable = testRuntime.execute(
236+
"test-" + i,
237+
AsyncClientPipeline.assemble()
238+
.request(createRequest(target))
239+
.noContent()
240+
.response().asByteArray()
241+
.result(new FutureContribution<Message<HttpResponse, byte[]>>(resultFuture) {
242+
243+
@Override
244+
public void completed(final Message<HttpResponse, byte[]> result) {
245+
resultFuture.completed(result);
246+
}
247+
248+
})
249+
.create(),
250+
context);
251+
// sleep a bit
252+
Thread.sleep(i % 10);
253+
cancellable.cancel();
254+
255+
// The message exchange is expected to get aborted
256+
try {
257+
resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
258+
} catch (final ExecutionException ex) {
259+
Assertions.assertInstanceOf(H2StreamResetException.class, ex.getCause());
260+
}
261+
Assertions.assertFalse(testRuntime.isAborted());
262+
// The underlying connection is expected to stay valid
263+
Assertions.assertTrue(testRuntime.isEndpointConnected());
264+
testRuntime.markConnectionReusable(null, TimeValue.ofMinutes(1));
265+
testRuntime.releaseEndpoint();
266+
267+
final PoolStats totalStats = connectionManager.getTotalStats();
268+
Assertions.assertTrue(totalStats.getAvailable() > 0);
269+
}
270+
}
271+
209272
}

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import java.io.InterruptedIOException;
3131
import java.util.concurrent.RejectedExecutionException;
32-
import java.util.concurrent.atomic.AtomicBoolean;
3332
import java.util.concurrent.atomic.AtomicInteger;
3433
import java.util.concurrent.atomic.AtomicReference;
3534

@@ -347,25 +346,23 @@ private Cancellable doExecute(
347346
if (!isH2 && responseTimeout != null) {
348347
endpoint.setSocketTimeout(responseTimeout);
349348
}
350-
endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
351-
if (isH2 || requestConfig.isHardCancellationEnabled()) {
352-
return new Cancellable() {
353-
354-
private final AtomicBoolean cancelled = new AtomicBoolean();
355-
356-
@Override
357-
public boolean cancel() {
358-
if (cancelled.compareAndSet(false, true)) {
359-
exchangeHandler.cancel();
360-
return true;
361-
}
362-
return false;
363-
}
364-
365-
};
366-
} else {
367-
return Operations.nonCancellable();
349+
final ComplexCancellable complexCancellable = new ComplexCancellable();
350+
endpoint.execute(
351+
id,
352+
exchangeHandler,
353+
pushHandlerFactory,
354+
context,
355+
isH2 ? streamControl -> {
356+
streamControl.setTimeout(responseTimeout);
357+
complexCancellable.setDependency(streamControl);
358+
} : null);
359+
if (!isH2 && requestConfig.isHardCancellationEnabled()) {
360+
complexCancellable.setDependency(() -> {
361+
exchangeHandler.cancel();
362+
return true;
363+
});
368364
}
365+
return complexCancellable;
369366
}
370367

371368
@Override

httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
5151
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
5252
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
53-
import org.apache.hc.client5.http.protocol.HttpClientContext;
5453
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
5554
import org.apache.hc.core5.annotation.Contract;
5655
import org.apache.hc.core5.annotation.Internal;
@@ -59,11 +58,13 @@
5958
import org.apache.hc.core5.concurrent.CallbackContribution;
6059
import org.apache.hc.core5.concurrent.ComplexFuture;
6160
import org.apache.hc.core5.concurrent.FutureCallback;
61+
import org.apache.hc.core5.function.Callback;
6262
import org.apache.hc.core5.function.Resolver;
6363
import org.apache.hc.core5.http.HttpConnection;
6464
import org.apache.hc.core5.http.HttpHost;
6565
import org.apache.hc.core5.http.HttpVersion;
6666
import org.apache.hc.core5.http.ProtocolVersion;
67+
import org.apache.hc.core5.http.StreamControl;
6768
import org.apache.hc.core5.http.URIScheme;
6869
import org.apache.hc.core5.http.config.Lookup;
6970
import org.apache.hc.core5.http.config.RegistryBuilder;
@@ -813,7 +814,8 @@ public void execute(
813814
final String exchangeId,
814815
final AsyncClientExchangeHandler exchangeHandler,
815816
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
816-
final HttpContext context) {
817+
final HttpContext context,
818+
final Callback<StreamControl> initiationCallback) {
817819
final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
818820
if (LOG.isDebugEnabled()) {
819821
LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
@@ -824,14 +826,19 @@ public void execute(
824826
exchangeHandler,
825827
pushHandlerFactory,
826828
context,
827-
streamControl -> {
828-
final HttpClientContext clientContext = HttpClientContext.cast(context);
829-
final Timeout responseTimeout = clientContext.getRequestConfigOrDefault().getResponseTimeout();
830-
streamControl.setTimeout(responseTimeout);
831-
}),
829+
initiationCallback),
832830
Command.Priority.NORMAL);
833831
}
834832

833+
@Override
834+
public void execute(
835+
final String id,
836+
final AsyncClientExchangeHandler exchangeHandler,
837+
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
838+
final HttpContext context) {
839+
execute(id, exchangeHandler, pushHandlerFactory, context, null);
840+
}
841+
835842
@Override
836843
public EndpointInfo getInfo() {
837844
final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();

httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.hc.core5.annotation.ThreadingBehavior;
3636
import org.apache.hc.core5.concurrent.BasicFuture;
3737
import org.apache.hc.core5.concurrent.FutureCallback;
38+
import org.apache.hc.core5.function.Callback;
39+
import org.apache.hc.core5.http.StreamControl;
3840
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
3941
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
4042
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
@@ -69,6 +71,28 @@ public abstract void execute(
6971
HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
7072
HttpContext context);
7173

74+
/**
75+
* Initiates a message exchange using the given handler.
76+
*
77+
* @param id unique operation ID or {@code null}.
78+
* @param exchangeHandler the message exchange handler.
79+
* @param pushHandlerFactory the push handler factory.
80+
* @param context the execution context.
81+
* @param initiationCallback Optional callback for message exchanges
82+
* executed over a separate stream. The callback
83+
* provides a interface allowing to control
84+
* the process of message exchange execution.
85+
* @since 5.7
86+
*/
87+
public void execute(
88+
final String id,
89+
final AsyncClientExchangeHandler exchangeHandler,
90+
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
91+
final HttpContext context,
92+
final Callback<StreamControl> initiationCallback) {
93+
execute(id, exchangeHandler, pushHandlerFactory, context);
94+
}
95+
7296
/**
7397
* Determines if the connection to the remote endpoint is still open and valid.
7498
*/

0 commit comments

Comments
 (0)