Skip to content

Commit 93b9cde

Browse files
zyclonitevietj
authored andcommitted
Move http2 compression handling from connection to stream level
1 parent 325d607 commit 93b9cde

2 files changed

Lines changed: 119 additions & 12 deletions

File tree

vertx-core/src/main/java/io/vertx/core/http/impl/VertxCompressorHttp2ConnectionEncoder.java

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,47 @@
3333

3434
public class VertxCompressorHttp2ConnectionEncoder implements Http2FrameWriter, Http2ConnectionEncoder, Http2SettingsReceivedConsumer {
3535

36-
private Http2ConnectionEncoder delegate;
36+
private final Http2ConnectionEncoder delegate;
3737
private final Http2ConnectionEncoder plainEncoder;
38+
private final Http2Connection.PropertyKey encoderKey;
3839

3940
public VertxCompressorHttp2ConnectionEncoder(Http2ConnectionEncoder plainEncoder, CompressionOptions[] compressionOptions) {
4041
this.delegate = new CompressorHttp2ConnectionEncoder(plainEncoder, compressionOptions);
4142
this.plainEncoder = plainEncoder;
43+
this.encoderKey = plainEncoder.connection().newKey();
4244
}
4345

44-
private void beforeWritingHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) {
46+
private Http2ConnectionEncoder selectEncoder(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) {
4547
String contentEncodingToApply = determineContentEncodingToApply(ctx, streamId, responseHeaders);
4648
if (contentEncodingToApply == null || contentEncodingToApply.equalsIgnoreCase(IDENTITY.toString())) {
4749
if (responseHeaders.contains(CONTENT_ENCODING, IDENTITY)) {
4850
responseHeaders.remove(CONTENT_ENCODING);
4951
}
50-
delegate = plainEncoder;
52+
return plainEncoder;
5153
} else {
5254
responseHeaders.set(CONTENT_ENCODING, contentEncodingToApply);
55+
return delegate;
5356
}
5457
}
5558

59+
private void storeEncoder(int streamId, Http2ConnectionEncoder encoder) {
60+
io.netty.handler.codec.http2.Http2Stream stream = delegate.connection().stream(streamId);
61+
if (stream != null) {
62+
stream.setProperty(encoderKey, encoder);
63+
}
64+
}
65+
66+
private Http2ConnectionEncoder getStoredEncoder(int streamId) {
67+
io.netty.handler.codec.http2.Http2Stream stream = delegate.connection().stream(streamId);
68+
if (stream != null) {
69+
Http2ConnectionEncoder encoder = stream.getProperty(encoderKey);
70+
if (encoder != null) {
71+
return encoder;
72+
}
73+
}
74+
return delegate;
75+
}
76+
5677
private String determineContentEncodingToApply(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) {
5778
if (responseHeaders.contains(CONTENT_ENCODING)) {
5879
return null;
@@ -69,14 +90,22 @@ private <T, R> R ifType(Object obj, Class<T> type, Function<T, R> then) {
6990

7091
@Override
7192
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) {
72-
beforeWritingHeaders(ctx, streamId, headers);
73-
return delegate.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
93+
Http2ConnectionEncoder encoder = selectEncoder(ctx, streamId, headers);
94+
storeEncoder(streamId, encoder);
95+
return encoder.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
7496
}
7597

7698
@Override
7799
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise) {
78-
beforeWritingHeaders(ctx, streamId, headers);
79-
return delegate.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream, promise);
100+
Http2ConnectionEncoder encoder = selectEncoder(ctx, streamId, headers);
101+
storeEncoder(streamId, encoder);
102+
return encoder.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream, promise);
103+
}
104+
105+
@Override
106+
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise) {
107+
Http2ConnectionEncoder encoder = getStoredEncoder(streamId);
108+
return encoder.writeData(ctx, streamId, data, padding, endStream, promise);
80109
}
81110

82111
@Override
@@ -164,11 +193,6 @@ public void close() {
164193
delegate.close();
165194
}
166195

167-
@Override
168-
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise) {
169-
return delegate.writeData(ctx, streamId, data, padding, endStream, promise);
170-
}
171-
172196
@Override
173197
public void consumeReceivedSettings(Http2Settings settings) {
174198
if (delegate instanceof Http2SettingsReceivedConsumer) {

vertx-core/src/test/java/io/vertx/tests/http/Http2ServerTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import java.util.Map;
9090
import java.util.Set;
9191
import java.util.concurrent.CompletableFuture;
92+
import java.util.concurrent.ConcurrentHashMap;
9293
import java.util.concurrent.CountDownLatch;
9394
import java.util.concurrent.TimeUnit;
9495
import java.util.concurrent.atomic.AtomicBoolean;
@@ -2259,6 +2260,88 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int
22592260
await();
22602261
}
22612262

2263+
@Test
2264+
public void testResponseCompressionEnabledMixedStreams() throws Exception {
2265+
waitFor(6);
2266+
String expected = TestUtils.randomAlphaString(1000);
2267+
server.close();
2268+
server = vertx.createHttpServer(new HttpServerOptions(serverOptions).setCompressionSupported(true));
2269+
server.requestHandler(req -> {
2270+
req.response().end(expected);
2271+
});
2272+
startServer();
2273+
TestClient client = new TestClient();
2274+
client.connect(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, request -> {
2275+
// Stream 1: with compression
2276+
int id1 = request.nextStreamId();
2277+
// Stream 2: without compression
2278+
int id2 = request.nextStreamId();
2279+
// Stream 3: with compression
2280+
int id3 = request.nextStreamId();
2281+
Map<Integer, ByteArrayOutputStream> streamData = new ConcurrentHashMap<>();
2282+
streamData.put(id1, new ByteArrayOutputStream());
2283+
streamData.put(id2, new ByteArrayOutputStream());
2284+
streamData.put(id3, new ByteArrayOutputStream());
2285+
request.decoder.frameListener(new Http2EventAdapter() {
2286+
@Override
2287+
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
2288+
vertx.runOnContext(v -> {
2289+
if (streamId == id1 || streamId == id3) {
2290+
// Stream with accept-encoding: gzip should be compressed
2291+
assertEquals("gzip", headers.get(HttpHeaderNames.CONTENT_ENCODING).toString());
2292+
} else {
2293+
// Stream without accept-encoding should not be compressed
2294+
assertFalse(headers.contains(HttpHeaderNames.CONTENT_ENCODING));
2295+
}
2296+
complete();
2297+
});
2298+
}
2299+
@Override
2300+
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
2301+
byte[] bytes = new byte[data.readableBytes()];
2302+
data.readBytes(bytes);
2303+
ByteArrayOutputStream buf = streamData.get(streamId);
2304+
buf.write(bytes, 0, bytes.length);
2305+
if (endOfStream) {
2306+
byte[] allBytes = buf.toByteArray();
2307+
vertx.runOnContext(v -> {
2308+
if (streamId == id1 || streamId == id3) {
2309+
// Compressed stream - decode gzip
2310+
String decoded;
2311+
try {
2312+
GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(allBytes));
2313+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
2314+
while (true) {
2315+
int i = in.read();
2316+
if (i == -1) {
2317+
break;
2318+
}
2319+
baos.write(i);
2320+
}
2321+
decoded = baos.toString();
2322+
} catch (IOException e) {
2323+
fail(e);
2324+
return;
2325+
}
2326+
assertEquals(expected, decoded);
2327+
} else {
2328+
// Uncompressed stream - plain text
2329+
assertEquals(expected, new String(allBytes, StandardCharsets.UTF_8));
2330+
}
2331+
complete();
2332+
});
2333+
}
2334+
return super.onDataRead(ctx, streamId, data, padding, endOfStream);
2335+
}
2336+
});
2337+
request.encoder.writeHeaders(request.context, id1, GET("/").add("accept-encoding", "gzip"), 0, true, request.context.newPromise());
2338+
request.encoder.writeHeaders(request.context, id2, GET("/"), 0, true, request.context.newPromise());
2339+
request.encoder.writeHeaders(request.context, id3, GET("/").add("accept-encoding", "gzip"), 0, true, request.context.newPromise());
2340+
request.context.flush();
2341+
});
2342+
await();
2343+
}
2344+
22622345
@Test
22632346
public void testRequestCompressionEnabled() throws Exception {
22642347
String expected = TestUtils.randomAlphaString(1000);

0 commit comments

Comments
 (0)