Skip to content

Commit 7b530f9

Browse files
zyclonitevietj
authored andcommitted
Move http2 compression handling from connection to stream level
1 parent 4fceeb6 commit 7b530f9

2 files changed

Lines changed: 119 additions & 12 deletions

File tree

vertx-core/src/main/java/io/vertx/core/http/impl/http2/codec/VertxCompressorHttp2ConnectionEncoder.java

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,26 @@
3535

3636
public class VertxCompressorHttp2ConnectionEncoder implements Http2FrameWriter, Http2ConnectionEncoder, Http2SettingsReceivedConsumer {
3737

38-
private Http2ConnectionEncoder delegate;
38+
private final Http2ConnectionEncoder delegate;
3939
private final Http2ConnectionEncoder plainEncoder;
40+
private final Http2Connection.PropertyKey encoderKey;
4041

4142
public VertxCompressorHttp2ConnectionEncoder(Http2ConnectionEncoder plainEncoder, CompressionOptions[] compressionOptions) {
4243
this.delegate = new CompressorHttp2ConnectionEncoder(plainEncoder, compressionOptions);
4344
this.plainEncoder = plainEncoder;
45+
this.encoderKey = plainEncoder.connection().newKey();
4446
}
4547

46-
private void beforeWritingHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) {
48+
private Http2ConnectionEncoder selectEncoder(ChannelHandlerContext ctx, int streamId, Http2Headers responseHeaders) {
4749
String contentEncodingToApply = determineContentEncodingToApply(ctx, streamId, responseHeaders);
4850
if (contentEncodingToApply == null || contentEncodingToApply.equalsIgnoreCase(IDENTITY.toString())) {
4951
if (responseHeaders.contains(CONTENT_ENCODING, IDENTITY)) {
5052
responseHeaders.remove(CONTENT_ENCODING);
5153
}
52-
delegate = plainEncoder;
54+
return plainEncoder;
5355
} else {
5456
responseHeaders.set(CONTENT_ENCODING, contentEncodingToApply);
57+
return delegate;
5558
}
5659
}
5760

@@ -69,16 +72,42 @@ private <T, R> R ifType(Object obj, Class<T> type, Function<T, R> then) {
6972
return obj != null && type.isAssignableFrom(obj.getClass()) ? then.apply(type.cast(obj)) : null;
7073
}
7174

75+
private void storeEncoder(int streamId, Http2ConnectionEncoder encoder) {
76+
io.netty.handler.codec.http2.Http2Stream stream = delegate.connection().stream(streamId);
77+
if (stream != null) {
78+
stream.setProperty(encoderKey, encoder);
79+
}
80+
}
81+
82+
private Http2ConnectionEncoder getStoredEncoder(int streamId) {
83+
io.netty.handler.codec.http2.Http2Stream stream = delegate.connection().stream(streamId);
84+
if (stream != null) {
85+
Http2ConnectionEncoder encoder = stream.getProperty(encoderKey);
86+
if (encoder != null) {
87+
return encoder;
88+
}
89+
}
90+
return delegate;
91+
}
92+
7293
@Override
7394
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) {
74-
beforeWritingHeaders(ctx, streamId, headers);
75-
return delegate.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
95+
Http2ConnectionEncoder encoder = selectEncoder(ctx, streamId, headers);
96+
storeEncoder(streamId, encoder);
97+
return encoder.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
7698
}
7799

78100
@Override
79101
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise) {
80-
beforeWritingHeaders(ctx, streamId, headers);
81-
return delegate.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream, promise);
102+
Http2ConnectionEncoder encoder = selectEncoder(ctx, streamId, headers);
103+
storeEncoder(streamId, encoder);
104+
return encoder.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream, promise);
105+
}
106+
107+
@Override
108+
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise) {
109+
Http2ConnectionEncoder encoder = getStoredEncoder(streamId);
110+
return encoder.writeData(ctx, streamId, data, padding, endStream, promise);
82111
}
83112

84113
@Override
@@ -166,11 +195,6 @@ public void close() {
166195
delegate.close();
167196
}
168197

169-
@Override
170-
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise) {
171-
return delegate.writeData(ctx, streamId, data, padding, endStream, promise);
172-
}
173-
174198
@Override
175199
public void consumeReceivedSettings(Http2Settings settings) {
176200
if (delegate instanceof Http2SettingsReceivedConsumer) {

vertx-core/src/test/java/io/vertx/tests/http/Http2ServerTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import java.util.List;
9090
import java.util.Map;
9191
import java.util.Set;
92+
import java.util.concurrent.ConcurrentHashMap;
9293
import java.util.concurrent.CompletableFuture;
9394
import java.util.concurrent.TimeUnit;
9495
import java.util.concurrent.atomic.AtomicBoolean;
@@ -2257,6 +2258,88 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int
22572258
await();
22582259
}
22592260

2261+
@Test
2262+
public void testResponseCompressionEnabledMixedStreams() throws Exception {
2263+
waitFor(6);
2264+
String expected = TestUtils.randomAlphaString(1000);
2265+
server.close();
2266+
server = vertx.createHttpServer(new HttpServerOptions(serverOptions).setCompressionSupported(true));
2267+
server.requestHandler(req -> {
2268+
req.response().end(expected);
2269+
});
2270+
startServer();
2271+
TestClient client = new TestClient();
2272+
client.connect(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, request -> {
2273+
// Stream 1: with compression
2274+
int id1 = request.nextStreamId();
2275+
// Stream 2: without compression
2276+
int id2 = request.nextStreamId();
2277+
// Stream 3: with compression
2278+
int id3 = request.nextStreamId();
2279+
Map<Integer, ByteArrayOutputStream> streamData = new ConcurrentHashMap<>();
2280+
streamData.put(id1, new ByteArrayOutputStream());
2281+
streamData.put(id2, new ByteArrayOutputStream());
2282+
streamData.put(id3, new ByteArrayOutputStream());
2283+
request.decoder.frameListener(new Http2EventAdapter() {
2284+
@Override
2285+
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
2286+
vertx.runOnContext(v -> {
2287+
if (streamId == id1 || streamId == id3) {
2288+
// Stream with accept-encoding: gzip should be compressed
2289+
Assert.assertEquals("gzip", headers.get(HttpHeaderNames.CONTENT_ENCODING).toString());
2290+
} else {
2291+
// Stream without accept-encoding should not be compressed
2292+
Assert.assertFalse(headers.contains(HttpHeaderNames.CONTENT_ENCODING));
2293+
}
2294+
complete();
2295+
});
2296+
}
2297+
@Override
2298+
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
2299+
byte[] bytes = new byte[data.readableBytes()];
2300+
data.readBytes(bytes);
2301+
ByteArrayOutputStream buf = streamData.get(streamId);
2302+
buf.write(bytes, 0, bytes.length);
2303+
if (endOfStream) {
2304+
byte[] allBytes = buf.toByteArray();
2305+
vertx.runOnContext(v -> {
2306+
if (streamId == id1 || streamId == id3) {
2307+
// Compressed stream - decode gzip
2308+
String decoded;
2309+
try {
2310+
GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(allBytes));
2311+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
2312+
while (true) {
2313+
int i = in.read();
2314+
if (i == -1) {
2315+
break;
2316+
}
2317+
baos.write(i);
2318+
}
2319+
decoded = baos.toString();
2320+
} catch (IOException e) {
2321+
fail(e);
2322+
return;
2323+
}
2324+
Assert.assertEquals(expected, decoded);
2325+
} else {
2326+
// Uncompressed stream - plain text
2327+
Assert.assertEquals(expected, new String(allBytes, StandardCharsets.UTF_8));
2328+
}
2329+
complete();
2330+
});
2331+
}
2332+
return super.onDataRead(ctx, streamId, data, padding, endOfStream);
2333+
}
2334+
});
2335+
request.encoder.writeHeaders(request.context, id1, GET("/").add("accept-encoding", "gzip"), 0, true, request.context.newPromise());
2336+
request.encoder.writeHeaders(request.context, id2, GET("/"), 0, true, request.context.newPromise());
2337+
request.encoder.writeHeaders(request.context, id3, GET("/").add("accept-encoding", "gzip"), 0, true, request.context.newPromise());
2338+
request.context.flush();
2339+
});
2340+
await();
2341+
}
2342+
22602343
@Test
22612344
public void testRequestCompressionEnabled() throws Exception {
22622345
String expected = TestUtils.randomAlphaString(1000);

0 commit comments

Comments
 (0)