Skip to content

Commit c336661

Browse files
committed
HTTP/2 multiplex implementation
Motivation: Vert.x HTTP/2 implementation has originally been written against the HTTP/2 codec directly. Since then Netty has implemented codecs that more or less do what Vert.x HTTP/2 implementation do and would relieve Vert.x implementation of this responsibility. On the long run, the frame codec and the multiplex handler is the preferred API to go with.
1 parent 6089d8e commit c336661

33 files changed

Lines changed: 2176 additions & 48 deletions

vertx-core/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, HttpCli
3434
obj.setHttp2UpgradeMaxContentLength(((Number)member.getValue()).intValue());
3535
}
3636
break;
37+
case "http2MultiplexImplementation":
38+
if (member.getValue() instanceof Boolean) {
39+
obj.setHttp2MultiplexImplementation((Boolean)member.getValue());
40+
}
41+
break;
3742
case "keepAlive":
3843
if (member.getValue() instanceof Boolean) {
3944
obj.setKeepAlive((Boolean)member.getValue());
@@ -162,6 +167,7 @@ static void toJson(HttpClientOptions obj, java.util.Map<String, Object> json) {
162167
json.put("http2ConnectionWindowSize", obj.getHttp2ConnectionWindowSize());
163168
json.put("http2KeepAliveTimeout", obj.getHttp2KeepAliveTimeout());
164169
json.put("http2UpgradeMaxContentLength", obj.getHttp2UpgradeMaxContentLength());
170+
json.put("http2MultiplexImplementation", obj.getHttp2MultiplexImplementation());
165171
json.put("keepAlive", obj.isKeepAlive());
166172
json.put("keepAliveTimeout", obj.getKeepAliveTimeout());
167173
json.put("pipelining", obj.isPipelining());

vertx-core/src/main/generated/io/vertx/core/http/HttpServerOptionsConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, HttpSer
184184
obj.setStrictThreadMode((Boolean)member.getValue());
185185
}
186186
break;
187+
case "http2MultiplexImplementation":
188+
if (member.getValue() instanceof Boolean) {
189+
obj.setHttp2MultiplexImplementation((Boolean)member.getValue());
190+
}
191+
break;
187192
}
188193
}
189194
}
@@ -239,5 +244,6 @@ static void toJson(HttpServerOptions obj, java.util.Map<String, Object> json) {
239244
json.put("http2RstFloodWindowDurationTimeUnit", obj.getHttp2RstFloodWindowDurationTimeUnit().name());
240245
}
241246
json.put("strictThreadMode", obj.getStrictThreadMode());
247+
json.put("http2MultiplexImplementation", obj.getHttp2MultiplexImplementation());
242248
}
243249
}

vertx-core/src/main/java/io/vertx/core/http/HttpClientOptions.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ public class HttpClientOptions extends ClientOptionsBase {
157157
*/
158158
public static final String DEFAULT_NAME = "__vertx.DEFAULT";
159159

160+
/**
161+
* Use HTTP/2 multiplex implementation = {@code false}
162+
*/
163+
public static final boolean DEFAULT_HTTP_2_MULTIPLEX_IMPLEMENTATION = false;
164+
160165
private boolean verifyHost = true;
161166
private boolean keepAlive;
162167
private int keepAliveTimeout;
@@ -166,6 +171,7 @@ public class HttpClientOptions extends ClientOptionsBase {
166171
private int http2ConnectionWindowSize;
167172
private int http2KeepAliveTimeout;
168173
private int http2UpgradeMaxContentLength;
174+
private boolean http2MultiplexImplementation;
169175

170176
private boolean decompressionSupported;
171177
private String defaultHost;
@@ -221,6 +227,7 @@ public HttpClientOptions(HttpClientOptions other) {
221227
this.http2ConnectionWindowSize = other.http2ConnectionWindowSize;
222228
this.http2KeepAliveTimeout = other.getHttp2KeepAliveTimeout();
223229
this.http2UpgradeMaxContentLength = other.getHttp2UpgradeMaxContentLength();
230+
this.http2MultiplexImplementation = other.getHttp2MultiplexImplementation();
224231
this.decompressionSupported = other.decompressionSupported;
225232
this.defaultHost = other.defaultHost;
226233
this.defaultPort = other.defaultPort;
@@ -272,6 +279,7 @@ private void init() {
272279
http2ConnectionWindowSize = DEFAULT_HTTP2_CONNECTION_WINDOW_SIZE;
273280
http2KeepAliveTimeout = DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT;
274281
http2UpgradeMaxContentLength = DEFAULT_HTTP2_UPGRADE_MAX_CONTENT_LENGTH;
282+
http2MultiplexImplementation = DEFAULT_HTTP_2_MULTIPLEX_IMPLEMENTATION;
275283
decompressionSupported = DEFAULT_DECOMPRESSION_SUPPORTED;
276284
defaultHost = DEFAULT_DEFAULT_HOST;
277285
defaultPort = DEFAULT_DEFAULT_PORT;
@@ -559,6 +567,24 @@ public HttpClientOptions setHttp2UpgradeMaxContentLength(int http2UpgradeMaxCont
559567
return this;
560568
}
561569

570+
/**
571+
* @return whether to use the HTTP/2 implementation based on multiplexed channel
572+
*/
573+
public boolean getHttp2MultiplexImplementation() {
574+
return http2MultiplexImplementation;
575+
}
576+
577+
/**
578+
* Set which HTTP/2 implementation to use
579+
*
580+
* @param http2MultiplexImplementation whether to use the HTTP/2 multiplex implementation
581+
* @return a reference to this, so the API can be used fluently
582+
*/
583+
public HttpClientOptions setHttp2MultiplexImplementation(boolean http2MultiplexImplementation) {
584+
this.http2MultiplexImplementation = http2MultiplexImplementation;
585+
return this;
586+
}
587+
562588
/**
563589
* Is keep alive enabled on the client?
564590
*

vertx-core/src/main/java/io/vertx/core/http/HttpServerOptions.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,11 @@ public class HttpServerOptions extends NetServerOptions {
206206
*/
207207
public static final boolean DEFAULT_STRICT_THREAD_MODE_STRICT = false;
208208

209+
/**
210+
* Use HTTP/2 multiplex implementation = {@code false}
211+
*/
212+
public static final boolean DEFAULT_HTTP_2_MULTIPLEX_IMPLEMENTATION = false;
213+
209214
private boolean compressionSupported;
210215
private int compressionLevel;
211216
private int compressionContentSizeThreshold;
@@ -239,6 +244,7 @@ public class HttpServerOptions extends NetServerOptions {
239244
private int http2RstFloodWindowDuration;
240245
private TimeUnit http2RstFloodWindowDurationTimeUnit;
241246
private boolean strictThreadMode;
247+
private boolean http2MultiplexImplementation;
242248

243249
/**
244250
* Default constructor
@@ -289,6 +295,7 @@ public HttpServerOptions(HttpServerOptions other) {
289295
this.http2RstFloodWindowDuration = other.http2RstFloodWindowDuration;
290296
this.http2RstFloodWindowDurationTimeUnit = other.http2RstFloodWindowDurationTimeUnit;
291297
this.strictThreadMode = other.strictThreadMode;
298+
this.http2MultiplexImplementation = other.http2MultiplexImplementation;
292299
}
293300

294301
/**
@@ -346,6 +353,7 @@ private void init() {
346353
http2RstFloodMaxRstFramePerWindow = DEFAULT_HTTP2_RST_FLOOD_MAX_RST_FRAME_PER_WINDOW;
347354
http2RstFloodWindowDuration = DEFAULT_HTTP2_RST_FLOOD_WINDOW_DURATION;
348355
http2RstFloodWindowDurationTimeUnit = DEFAULT_HTTP2_RST_FLOOD_WINDOW_DURATION_TIME_UNIT;
356+
http2MultiplexImplementation = DEFAULT_HTTP_2_MULTIPLEX_IMPLEMENTATION;
349357
}
350358

351359
/**
@@ -1285,4 +1293,22 @@ public HttpServerOptions setStrictThreadMode(boolean strictThreadMode) {
12851293
this.strictThreadMode = strictThreadMode;
12861294
return this;
12871295
}
1296+
1297+
/**
1298+
* @return whether to use the HTTP/2 implementation based on multiplexed channel
1299+
*/
1300+
public boolean getHttp2MultiplexImplementation() {
1301+
return http2MultiplexImplementation;
1302+
}
1303+
1304+
/**
1305+
* Set which HTTP/2 implementation to use
1306+
*
1307+
* @param http2MultiplexImplementation whether to use the HTTP/2 multiplex implementation
1308+
* @return a reference to this, so the API can be used fluently
1309+
*/
1310+
public HttpServerOptions setHttp2MultiplexImplementation(boolean http2MultiplexImplementation) {
1311+
this.http2MultiplexImplementation = http2MultiplexImplementation;
1312+
return this;
1313+
}
12881314
}

vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.vertx.core.http.HttpVersion;
2727
import io.vertx.core.http.impl.http2.Http2ClientChannelInitializer;
2828
import io.vertx.core.http.impl.http2.codec.Http2CodecClientChannelInitializer;
29+
import io.vertx.core.http.impl.http2.multiplex.Http2MultiplexClientChannelInitializer;
2930
import io.vertx.core.internal.ContextInternal;
3031
import io.vertx.core.internal.PromiseInternal;
3132
import io.vertx.core.internal.http.HttpHeadersInternal;
@@ -39,6 +40,7 @@
3940
import java.util.ArrayList;
4041
import java.util.List;
4142
import java.util.Map;
43+
import java.util.concurrent.TimeUnit;
4244

4345
import static io.vertx.core.http.HttpMethod.OPTIONS;
4446

@@ -75,7 +77,24 @@ public HttpChannelConnector(HttpClientBase client,
7577
HostAndPort authority,
7678
SocketAddress server,
7779
boolean pooled,
78-
long maxLifetime) {
80+
long maxLifetimeMillis) {
81+
82+
Http2ClientChannelInitializer http2ChannelInitializer;
83+
if (client.options.getHttp2MultiplexImplementation()) {
84+
http2ChannelInitializer = new Http2MultiplexClientChannelInitializer(
85+
HttpUtils.fromVertxSettings(client.options.getInitialSettings()),
86+
client.metrics,
87+
metrics,
88+
TimeUnit.SECONDS.toMillis(client.options.getHttp2KeepAliveTimeout()),
89+
maxLifetimeMillis,
90+
authority,
91+
client.options.getHttp2MultiplexingLimit(),
92+
client.options.isDecompressionSupported(),
93+
client.options.getLogActivity());
94+
} else {
95+
http2ChannelInitializer = new Http2CodecClientChannelInitializer(client, metrics, pooled, maxLifetimeMillis, authority);
96+
}
97+
7998
this.client = client;
8099
this.netClient = netClient;
81100
this.metrics = metrics;
@@ -88,8 +107,8 @@ public HttpChannelConnector(HttpClientBase client,
88107
this.authority = authority;
89108
this.server = server;
90109
this.pooled = pooled;
91-
this.maxLifetime = maxLifetime;
92-
this.http2ChannelInitializer = new Http2CodecClientChannelInitializer(client, metrics, pooled, maxLifetime, authority);
110+
this.maxLifetime = maxLifetimeMillis;
111+
this.http2ChannelInitializer = http2ChannelInitializer;
93112
}
94113

95114
public SocketAddress server() {

vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerConnectionInitializer.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.vertx.core.http.HttpServerOptions;
2424
import io.vertx.core.http.impl.http2.Http2ServerChannelInitializer;
2525
import io.vertx.core.http.impl.http2.codec.Http2CodecServerChannelInitializer;
26+
import io.vertx.core.http.impl.http2.multiplex.Http2MultiplexServerChannelInitializer;
2627
import io.vertx.core.internal.ContextInternal;
2728
import io.vertx.core.internal.tls.SslContextManager;
2829
import io.vertx.core.internal.net.SslChannelProvider;
@@ -79,17 +80,31 @@ public class HttpServerConnectionInitializer {
7980
compressionManager = null;
8081
}
8182

82-
Http2ServerChannelInitializer http2ChannelInitializer = new Http2CodecServerChannelInitializer(
83-
this,
84-
(HttpServerMetrics) server.getMetrics(),
85-
options,
86-
compressionManager,
87-
streamContextSupplier,
88-
connectionHandler,
89-
serverOrigin,
90-
metric,
91-
options.getLogActivity()
92-
);
83+
Http2ServerChannelInitializer http2ChannelInitalizer;
84+
if (options.getHttp2MultiplexImplementation()) {
85+
http2ChannelInitalizer = new Http2MultiplexServerChannelInitializer(
86+
context,
87+
compressionManager,
88+
options.isDecompressionSupported(),
89+
(HttpServerMetrics) server.getMetrics(),
90+
metric,
91+
streamContextSupplier,
92+
connectionHandler,
93+
HttpUtils.fromVertxInitialSettings(true, options.getInitialSettings()),
94+
options.getLogActivity());
95+
} else {
96+
http2ChannelInitalizer = new Http2CodecServerChannelInitializer(
97+
this,
98+
(HttpServerMetrics) server.getMetrics(),
99+
options,
100+
compressionManager,
101+
streamContextSupplier,
102+
connectionHandler,
103+
serverOrigin,
104+
metric,
105+
options.getLogActivity()
106+
);
107+
}
93108

94109
this.context = context;
95110
this.threadingModel = threadingModel;
@@ -103,7 +118,7 @@ public class HttpServerConnectionInitializer {
103118
this.metric = metric;
104119
this.compressionManager = compressionManager;
105120
this.compressionContentSizeThreshold = options.getCompressionContentSizeThreshold();
106-
this.http2ChannelInitializer = http2ChannelInitializer;
121+
this.http2ChannelInitializer = http2ChannelInitalizer;
107122
}
108123

109124
void configurePipeline(Channel ch, SslChannelProvider sslChannelProvider, SslContextManager sslContextManager) {

vertx-core/src/main/java/io/vertx/core/http/impl/headers/HeadersMultiMap.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,15 @@ public Integer getInt(CharSequence name) {
483483

484484
@Override
485485
public int getInt(CharSequence name, int defaultValue) {
486-
throw new UnsupportedOperationException();
486+
String value = get(name);
487+
if (value == null) {
488+
return defaultValue;
489+
}
490+
try {
491+
return Integer.parseInt(value);
492+
} catch (NumberFormatException e) {
493+
return defaultValue;
494+
}
487495
}
488496

489497
@Override

vertx-core/src/main/java/io/vertx/core/http/impl/http2/Http2ServerResponse.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.netty.handler.codec.http.HttpResponseStatus;
1818
import io.netty.handler.codec.http.HttpStatusClass;
1919
import io.netty.handler.stream.ChunkedInput;
20+
import io.netty.handler.stream.ChunkedNioFile;
2021
import io.vertx.codegen.annotations.Nullable;
2122
import io.vertx.core.Future;
2223
import io.vertx.core.Handler;
@@ -105,7 +106,6 @@ void handleException(Throwable cause) {
105106
}
106107

107108
void handleClose() {
108-
Handler<Throwable> exceptionHandler;
109109
Handler<Void> endHandler;
110110
Handler<Void> closeHandler;
111111
synchronized (conn) {
@@ -638,7 +638,11 @@ private Future<Void> sendFileInternal(long offset, long length, long size, Rando
638638
if (file != null) {
639639
channel = file.getChannel();
640640
}
641-
chunkedFile = new UncloseableChunkedNioFile(channel, actualOffset, actualLength);
641+
if (close) {
642+
chunkedFile = new ChunkedNioFile(channel, actualOffset, actualLength, 8192);
643+
} else {
644+
chunkedFile = new UncloseableChunkedNioFile(channel, actualOffset, actualLength);
645+
}
642646
} catch (IOException e) {
643647
return context.failedFuture(e);
644648
}
@@ -666,6 +670,7 @@ private Future<Void> sendFileInternal(ChunkedInput<ByteBuf> file) {
666670
}
667671
checkSendHeaders(false);
668672
Promise<Void> promise = context.promise();
673+
ended = true;
669674
stream.sendFile(file, promise);
670675
Future<Void> future = promise.future();
671676
Handler<Void> bodyEndHandler = this.bodyEndHandler;

vertx-core/src/main/java/io/vertx/core/http/impl/http2/codec/Http2CodecServerChannelInitializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ public class Http2CodecServerChannelInitializer implements Http2ServerChannelIni
2929

3030
private final HttpServerConnectionInitializer initializer;
3131
private final HttpServerMetrics serverMetrics;
32+
private final Object metric;
3233
private final HttpServerOptions options;
3334
private final CompressionManager compressionManager;
3435
private final Supplier<ContextInternal> streamContextSupplier;
3536
private final Handler<HttpServerConnection> connectionHandler;
3637
private final String serverOrigin;
37-
private final Object metric;
3838
private final boolean logEnabled;
3939

4040
public Http2CodecServerChannelInitializer(HttpServerConnectionInitializer initializer,

0 commit comments

Comments
 (0)