Skip to content

Commit 32ac414

Browse files
committed
Enable header validation and align protocol with design
1 parent 975d520 commit 32ac414

File tree

10 files changed

+149
-25
lines changed

10 files changed

+149
-25
lines changed

client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ public static class Builder {
855855
private int sslSessionTimeout = defaultSslSessionTimeout();
856856
private @Nullable SslContext sslContext;
857857
private @Nullable SslEngineFactory sslEngineFactory;
858-
private boolean http2Enabled = true;
858+
private boolean http2Enabled = false;
859859

860860
// cookie store
861861
private CookieStore cookieStore = new ThreadSafeCookieStore();

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
3939
import io.netty.handler.codec.http2.Http2MultiplexHandler;
4040
import io.netty.handler.codec.http2.Http2Settings;
41+
import io.netty.handler.codec.http2.Http2StreamChannel;
4142
import io.netty.handler.logging.LogLevel;
4243
import io.netty.handler.logging.LoggingHandler;
4344
import io.netty.handler.proxy.ProxyHandler;
@@ -566,6 +567,14 @@ public static boolean isHttp2(Channel channel) {
566567
return channel.pipeline().get(HTTP2_MULTIPLEX) != null;
567568
}
568569

570+
/**
571+
* Checks whether the given channel is an HTTP/2 stream child channel.
572+
* Stream channels are single-use and don't support HTTP/1.1 operations like draining or pipeline modification.
573+
*/
574+
public static boolean isHttp2StreamChannel(Channel channel) {
575+
return channel instanceof Http2StreamChannel;
576+
}
577+
569578
/**
570579
* Returns the shared {@link Http2Handler} instance for use with stream child channels.
571580
*/

client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.netty.handler.codec.http.HttpResponseStatus;
2626
import io.netty.handler.codec.http.HttpVersion;
2727
import io.netty.handler.codec.http2.Http2DataFrame;
28+
import io.netty.handler.codec.http2.Http2GoAwayFrame;
2829
import io.netty.handler.codec.http2.Http2Headers;
2930
import io.netty.handler.codec.http2.Http2HeadersFrame;
3031
import io.netty.handler.codec.http2.Http2ResetFrame;
@@ -73,11 +74,18 @@ public void handleRead(final Channel channel, final NettyResponseFuture<?> futur
7374
AsyncHandler<?> handler = future.getAsyncHandler();
7475
try {
7576
if (e instanceof Http2HeadersFrame) {
76-
handleHttp2HeadersFrame((Http2HeadersFrame) e, channel, future, handler);
77+
Http2HeadersFrame headersFrame = (Http2HeadersFrame) e;
78+
if (headersFrame.headers().status() != null) {
79+
handleHttp2HeadersFrame(headersFrame, channel, future, handler);
80+
} else {
81+
handleHttp2TrailingHeadersFrame(headersFrame, channel, future, handler);
82+
}
7783
} else if (e instanceof Http2DataFrame) {
7884
handleHttp2DataFrame((Http2DataFrame) e, channel, future, handler);
7985
} else if (e instanceof Http2ResetFrame) {
8086
handleHttp2ResetFrame((Http2ResetFrame) e, channel, future);
87+
} else if (e instanceof Http2GoAwayFrame) {
88+
handleHttp2GoAwayFrame((Http2GoAwayFrame) e, channel, future);
8189
}
8290
} catch (Exception t) {
8391
if (hasIOExceptionFilters && t instanceof IOException
@@ -104,7 +112,7 @@ private void handleHttp2HeadersFrame(Http2HeadersFrame headersFrame, Channel cha
104112
HttpResponseStatus nettyStatus = HttpResponseStatus.valueOf(statusCode);
105113

106114
// Build HTTP/1.1-style headers, skipping HTTP/2 pseudo-headers (start with ':')
107-
HttpHeaders responseHeaders = new DefaultHttpHeaders(false);
115+
HttpHeaders responseHeaders = new DefaultHttpHeaders();
108116
h2Headers.forEach(entry -> {
109117
CharSequence name = entry.getKey();
110118
if (name.length() > 0 && name.charAt(0) != ':') {
@@ -122,7 +130,7 @@ private void handleHttp2HeadersFrame(Http2HeadersFrame headersFrame, Channel cha
122130

123131
if (!interceptors.exitAfterIntercept(channel, future, handler, syntheticResponse, status, responseHeaders)) {
124132
boolean abort = handler.onStatusReceived(status) == State.ABORT;
125-
if (!abort && !responseHeaders.isEmpty()) {
133+
if (!abort) {
126134
abort = handler.onHeadersReceived(responseHeaders) == State.ABORT;
127135
}
128136
if (abort) {
@@ -156,6 +164,32 @@ private void handleHttp2DataFrame(Http2DataFrame dataFrame, Channel channel,
156164
}
157165
}
158166

167+
/**
168+
* Processes trailing HTTP/2 HEADERS frame (no :status pseudo-header), which carries trailer headers
169+
* sent after the DATA frames. Delegates to {@link AsyncHandler#onTrailingHeadersReceived}.
170+
*/
171+
private void handleHttp2TrailingHeadersFrame(Http2HeadersFrame headersFrame, Channel channel,
172+
NettyResponseFuture<?> future, AsyncHandler<?> handler) throws Exception {
173+
Http2Headers h2Headers = headersFrame.headers();
174+
175+
HttpHeaders trailingHeaders = new DefaultHttpHeaders();
176+
h2Headers.forEach(entry -> {
177+
CharSequence name = entry.getKey();
178+
if (name.length() > 0 && name.charAt(0) != ':') {
179+
trailingHeaders.add(name, entry.getValue());
180+
}
181+
});
182+
183+
boolean abort = false;
184+
if (!trailingHeaders.isEmpty()) {
185+
abort = handler.onTrailingHeadersReceived(trailingHeaders) == State.ABORT;
186+
}
187+
188+
if (abort || headersFrame.isEndStream()) {
189+
finishUpdate(future, channel, false);
190+
}
191+
}
192+
159193
/**
160194
* Processes an HTTP/2 RST_STREAM frame, which indicates the server aborted the stream.
161195
*/
@@ -164,6 +198,23 @@ private void handleHttp2ResetFrame(Http2ResetFrame resetFrame, Channel channel,
164198
readFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode));
165199
}
166200

201+
/**
202+
* Processes an HTTP/2 GOAWAY frame, which indicates the server is shutting down the connection.
203+
* The parent connection is removed from the pool to prevent new streams from being created on it.
204+
* The current stream's future is failed so the request can be retried on a new connection.
205+
*/
206+
private void handleHttp2GoAwayFrame(Http2GoAwayFrame goAwayFrame, Channel channel, NettyResponseFuture<?> future) {
207+
long errorCode = goAwayFrame.errorCode();
208+
209+
// Remove the parent connection from the pool so no new streams are opened on it
210+
Channel parentChannel = (channel instanceof Http2StreamChannel)
211+
? ((Http2StreamChannel) channel).parent()
212+
: channel;
213+
channelManager.removeAll(parentChannel);
214+
215+
readFailed(channel, future, new IOException("HTTP/2 connection GOAWAY received, error code: " + errorCode));
216+
}
217+
167218
/**
168219
* Overrides the base {@link AsyncHttpClientHandler#finishUpdate} to correctly handle HTTP/2
169220
* connection pooling. HTTP/2 stream channels are single-use — after the stream completes,

client/src/main/java/org/asynchttpclient/netty/handler/intercept/ConnectSuccessInterceptor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.asynchttpclient.netty.handler.intercept;
1717

1818
import io.netty.channel.Channel;
19+
import io.netty.handler.codec.http2.Http2StreamChannel;
1920
import io.netty.util.concurrent.Future;
2021
import org.asynchttpclient.Request;
2122
import org.asynchttpclient.netty.NettyResponseFuture;
@@ -40,6 +41,12 @@ public class ConnectSuccessInterceptor {
4041
}
4142

4243
public boolean exitAfterHandlingConnect(Channel channel, NettyResponseFuture<?> future, Request request, ProxyServer proxyServer) {
44+
// CONNECT tunneling is an HTTP/1.1 concept — it should never occur on HTTP/2 stream channels.
45+
if (channel instanceof Http2StreamChannel) {
46+
LOGGER.warn("CONNECT success on HTTP/2 stream channel is unexpected — ignoring");
47+
return false;
48+
}
49+
4350
if (future.isKeepAlive()) {
4451
future.attachChannel(channel, true);
4552
}

client/src/main/java/org/asynchttpclient/netty/handler/intercept/Continue100Interceptor.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.asynchttpclient.netty.handler.intercept;
1717

1818
import io.netty.channel.Channel;
19+
import io.netty.handler.codec.http2.Http2StreamChannel;
1920
import org.asynchttpclient.netty.NettyResponseFuture;
2021
import org.asynchttpclient.netty.OnLastHttpContentCallback;
2122
import org.asynchttpclient.netty.channel.Channels;
@@ -32,14 +33,21 @@ class Continue100Interceptor {
3233
public boolean exitAfterHandling100(final Channel channel, final NettyResponseFuture<?> future) {
3334
future.setHeadersAlreadyWrittenOnContinue(true);
3435
future.setDontWriteBodyBecauseExpectContinue(false);
35-
// directly send the body
36-
Channels.setAttribute(channel, new OnLastHttpContentCallback(future) {
37-
@Override
38-
public void call() {
39-
Channels.setAttribute(channel, future);
40-
requestSender.writeRequest(future, channel);
41-
}
42-
});
36+
37+
if (channel instanceof Http2StreamChannel) {
38+
// HTTP/2 stream channels don't produce LastHttpContent.
39+
// Directly write the body on the stream channel.
40+
requestSender.writeRequest(future, channel);
41+
} else {
42+
// HTTP/1.1: wait for LastHttpContent before sending the body
43+
Channels.setAttribute(channel, new OnLastHttpContentCallback(future) {
44+
@Override
45+
public void call() {
46+
Channels.setAttribute(channel, future);
47+
requestSender.writeRequest(future, channel);
48+
}
49+
});
50+
}
4351
return true;
4452
}
4553
}

client/src/main/java/org/asynchttpclient/netty/handler/intercept/ProxyUnauthorized407Interceptor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.asynchttpclient.netty.channel.ChannelManager;
3030
import org.asynchttpclient.netty.channel.ChannelState;
3131
import org.asynchttpclient.netty.request.NettyRequestSender;
32+
import io.netty.handler.codec.http2.Http2StreamChannel;
3233
import org.asynchttpclient.ntlm.NtlmEngine;
3334
import org.asynchttpclient.proxy.ProxyServer;
3435
import org.asynchttpclient.spnego.SpnegoEngine;
@@ -170,7 +171,11 @@ public boolean exitAfterHandling407(Channel channel, NettyResponseFuture<?> futu
170171
final Request nextRequest = nextRequestBuilder.build();
171172

172173
LOGGER.debug("Sending proxy authentication to {}", request.getUri());
173-
if (future.isKeepAlive()
174+
if (channel instanceof Http2StreamChannel) {
175+
// HTTP/2 stream channels are single-use — close the stream and send the auth retry.
176+
channel.close();
177+
requestSender.sendNextRequest(nextRequest, future);
178+
} else if (future.isKeepAlive()
174179
&& !HttpUtil.isTransferEncodingChunked(httpRequest)
175180
&& !HttpUtil.isTransferEncodingChunked(response)) {
176181
future.setConnectAllowed(true);

client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.asynchttpclient.netty.NettyResponseFuture;
3131
import org.asynchttpclient.netty.channel.ChannelManager;
3232
import org.asynchttpclient.netty.request.NettyRequestSender;
33+
import io.netty.handler.codec.http2.Http2StreamChannel;
3334
import org.asynchttpclient.uri.Uri;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
@@ -159,7 +160,12 @@ public boolean exitAfterHandlingRedirect(Channel channel, NettyResponseFuture<?>
159160

160161
LOGGER.debug("Sending redirect to {}", newUri);
161162

162-
if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(response)) {
163+
if (channel instanceof Http2StreamChannel) {
164+
// HTTP/2 stream channels are single-use and close immediately after the response.
165+
// No draining needed — just close the stream and send the next request.
166+
channel.close();
167+
requestSender.sendNextRequest(nextRequest, future);
168+
} else if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(response)) {
163169
if (sameBase) {
164170
future.setReuseChannel(true);
165171
// we can't directly send the next request because we still have to received LastContent

client/src/main/java/org/asynchttpclient/netty/handler/intercept/Unauthorized401Interceptor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.asynchttpclient.netty.channel.ChannelManager;
2929
import org.asynchttpclient.netty.channel.ChannelState;
3030
import org.asynchttpclient.netty.request.NettyRequestSender;
31+
import io.netty.handler.codec.http2.Http2StreamChannel;
3132
import org.asynchttpclient.ntlm.NtlmEngine;
3233
import org.asynchttpclient.spnego.SpnegoEngine;
3334
import org.asynchttpclient.spnego.SpnegoEngineException;
@@ -162,7 +163,11 @@ public boolean exitAfterHandling401(Channel channel, NettyResponseFuture<?> futu
162163
final Request nextRequest = future.getCurrentRequest().toBuilder().setHeaders(requestHeaders).build();
163164

164165
LOGGER.debug("Sending authentication to {}", request.getUri());
165-
if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(httpRequest) && !HttpUtil.isTransferEncodingChunked(response)) {
166+
if (channel instanceof Http2StreamChannel) {
167+
// HTTP/2 stream channels are single-use — close the stream and send the auth retry.
168+
channel.close();
169+
requestSender.sendNextRequest(nextRequest, future);
170+
} else if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(httpRequest) && !HttpUtil.isTransferEncodingChunked(response)) {
166171
future.setReuseChannel(true);
167172
requestSender.drainChannelAndExecuteNextRequest(channel, future, nextRequest);
168173
} else {

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.asynchttpclient.netty.channel.NettyChannelConnector;
6464
import org.asynchttpclient.netty.channel.NettyConnectListener;
6565
import org.asynchttpclient.netty.request.body.NettyBody;
66+
import org.asynchttpclient.netty.request.body.NettyDirectBody;
6667
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
6768
import org.asynchttpclient.proxy.ProxyServer;
6869
import org.asynchttpclient.proxy.ProxyType;
@@ -480,6 +481,19 @@ protected void initChannel(Http2StreamChannel streamCh) {
480481
Channels.setAttribute(streamChannel, future);
481482
future.attachChannel(streamChannel, false);
482483
try {
484+
AsyncHandler<T> asyncHandler = future.getAsyncHandler();
485+
try {
486+
asyncHandler.onRequestSend(future.getNettyRequest());
487+
} catch (Exception e) {
488+
LOGGER.error("onRequestSend crashed", e);
489+
abort(parentChannel, future, e);
490+
return;
491+
}
492+
493+
if (asyncHandler instanceof TransferCompletionHandler) {
494+
configureTransferAdapter(asyncHandler, future.getNettyRequest().getHttpRequest());
495+
}
496+
483497
sendHttp2Frames(future, streamChannel);
484498
scheduleReadTimeout(future);
485499
} catch (Exception e) {
@@ -499,14 +513,18 @@ protected void initChannel(Http2StreamChannel streamCh) {
499513
* :scheme, :authority) plus all regular request headers, then writes them as a
500514
* {@link DefaultHttp2HeadersFrame}. If the request has a body, writes it as a
501515
* {@link DefaultHttp2DataFrame} with {@code endStream=true}.
516+
* <p>
517+
* Currently supports in-memory bodies ({@link DefaultFullHttpRequest} content and
518+
* {@link org.asynchttpclient.netty.request.body.NettyDirectBody}). Streaming bodies
519+
* (file uploads, input streams) are not yet supported over HTTP/2.
502520
*/
503521
private <T> void sendHttp2Frames(NettyResponseFuture<T> future, Http2StreamChannel streamChannel) {
504522
NettyRequest nettyRequest = future.getNettyRequest();
505523
HttpRequest httpRequest = nettyRequest.getHttpRequest();
506524
Uri uri = future.getUri();
507525

508526
// Build HTTP/2 pseudo-headers + regular headers
509-
Http2Headers h2Headers = new DefaultHttp2Headers(false)
527+
Http2Headers h2Headers = new DefaultHttp2Headers()
510528
.method(httpRequest.method().name())
511529
.path(uri.getNonEmptyPath() + (uri.getQuery() != null ? "?" + uri.getQuery() : ""))
512530
.scheme(uri.getScheme())
@@ -521,7 +539,8 @@ private <T> void sendHttp2Frames(NettyResponseFuture<T> future, Http2StreamChann
521539
}
522540
});
523541

524-
// Determine if we have a body to write
542+
// Determine if we have a body to write.
543+
// Support both DefaultFullHttpRequest (inline content) and NettyDirectBody (byte array/buffer bodies).
525544
ByteBuf bodyBuf = null;
526545
if (httpRequest instanceof DefaultFullHttpRequest) {
527546
ByteBuf content = ((DefaultFullHttpRequest) httpRequest).content();
@@ -530,6 +549,20 @@ private <T> void sendHttp2Frames(NettyResponseFuture<T> future, Http2StreamChann
530549
}
531550
}
532551

552+
NettyBody nettyBody = nettyRequest.getBody();
553+
if (bodyBuf == null && nettyBody != null) {
554+
if (nettyBody instanceof NettyDirectBody) {
555+
ByteBuf directBuf = ((NettyDirectBody) nettyBody).byteBuf();
556+
if (directBuf != null && directBuf.isReadable()) {
557+
bodyBuf = directBuf;
558+
}
559+
} else {
560+
throw new UnsupportedOperationException(
561+
"Streaming request bodies (" + nettyBody.getClass().getSimpleName()
562+
+ ") are not yet supported over HTTP/2. Use an in-memory body or disable HTTP/2.");
563+
}
564+
}
565+
533566
boolean hasBody = bodyBuf != null;
534567

535568
// Write HEADERS frame (endStream=true when there is no body)

client/src/test/java/org/asynchttpclient/BasicHttp2Test.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -587,19 +587,19 @@ public void http2DisabledFallsBackToHttp11() throws Exception {
587587
}
588588

589589
@Test
590-
public void http2IsEnabledByDefault() {
590+
public void http2IsDisabledByDefault() {
591591
AsyncHttpClientConfig defaultConfig = config().build();
592-
assertTrue(defaultConfig.isHttp2Enabled(),
593-
"HTTP/2 should be enabled by default");
592+
assertFalse(defaultConfig.isHttp2Enabled(),
593+
"HTTP/2 should be disabled by default for backward compatibility");
594594
}
595595

596596
@Test
597-
public void http2CanBeDisabledViaConfig() {
598-
AsyncHttpClientConfig configWithHttp2Disabled = config()
599-
.setHttp2Enabled(false)
597+
public void http2CanBeEnabledViaConfig() {
598+
AsyncHttpClientConfig configWithHttp2Enabled = config()
599+
.setHttp2Enabled(true)
600600
.build();
601-
assertFalse(configWithHttp2Disabled.isHttp2Enabled(),
602-
"HTTP/2 should be disabled when setHttp2Enabled(false) is called");
601+
assertTrue(configWithHttp2Enabled.isHttp2Enabled(),
602+
"HTTP/2 should be enabled when setHttp2Enabled(true) is called");
603603
}
604604

605605
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)