Skip to content

Commit fe836a6

Browse files
committed
GH-3530: Bypass Hadoop codec abstraction to optimize compression performance
Some of the Parquet compression codecs rely on Hadoop's CompressionCodec. After evaluating with performance tests that isolate the CPU utilization it is clear that the Hadoop abstraction introduces considerable overhead. This PR improves that for Snappy, LZ4_RAW, ZSTD, GZIP, LZO, and BROTLI. It also migrates Brotli from jbrotli to brotli4j. Bypass Hadoop CompressionCodec for Snappy (xerial JNI), LZ4_RAW (airlift), ZSTD (zstd-jni), GZIP (JDK), LZO (airlift), and BROTLI (brotli4j) in both CodecFactory and DirectCodecFactory, eliminating per-page codec pool lookups, stream wrapper allocation, and unnecessary buffer copies. ZSTD: replace streaming ZstdOutputStreamNoFinalizer/ZstdInputStreamNoFinalizer with reusable ZstdCompressCtx/ZstdDecompressCtx single-call APIs. GZIP: bypass Hadoop's GzipCodec and its codec-pool/stream-wrapper overhead with direct JDK GZIPOutputStream/GZIPInputStream. Compression level is read from the existing "zlib.compress.level" Hadoop configuration key. LZO: bypass the GPL-licensed com.hadoop.compression.lzo.LzoCodec entirely using aircompressor's LzoHadoopStreams (Apache 2.0). The framing format (big-endian length-prefixed blocks) is wire-compatible with Hadoop's LzoCodec, so existing LZO Parquet files remain readable. Removes the GPL dependency for LZO support. Uncomment previously disabled LZO benchmarks and tests. BROTLI: migrate from abandoned brotli-codec (jbrotli, 2016, x86-only) to brotli4j 1.23.0 (com.aayushatharva.brotli4j) which supports 10 platforms including linux/darwin/windows aarch64. brotli4j is a runtime-only optional dependency accessed via reflection (Encoder.compress and Decoder.decompress) to avoid a compile-time dependency. Uses Decoder.decompress(byte[], int, int) instead of DirectDecompress to avoid loading classes that reference Netty. Remove non-aarch64 Maven profile guards and aarch64 test skips. ByteBuffer decompressors use native APIs with slice + manual position advancement pattern (matching DirectCodecFactory.BaseDecompressor): - Snappy: Snappy.uncompress(slice, slice) - ZSTD: Zstd.decompress(slice, slice) - LZ4_RAW: decompressor.decompress(slice, slice) - GZIP: ByteBufferInputStream.wrap(slice) -> GZIPInputStream - LZO: ByteBufferInputStream.wrap(slice) -> LzoHadoopInputStream - BROTLI: byte[] copy through Decoder.decompress (no direct ByteBuffer API) Add BytesInput.toByteArray() zero-copy override in ByteArrayBytesInput. Add benchmarks: CompressionBenchmark, CpuReadBenchmark, CpuWriteBenchmark, FileReadBenchmark, FileWriteBenchmark, InMemoryInputFile, InMemoryOutputFile, ConcurrentReadWriteBenchmark. Remove encoding/row-group benchmarks. Add 15 new tests in TestDirectCodecFactory, 3 new tests in TestBytesInput.
1 parent c7e7acb commit fe836a6

22 files changed

Lines changed: 2441 additions & 120 deletions

File tree

parquet-benchmarks/pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,31 @@
8787
<artifactId>slf4j-api</artifactId>
8888
<version>${slf4j.version}</version>
8989
</dependency>
90+
<dependency>
91+
<groupId>com.aayushatharva.brotli4j</groupId>
92+
<artifactId>brotli4j</artifactId>
93+
<version>${brotli4j.version}</version>
94+
<scope>runtime</scope>
95+
</dependency>
9096
</dependencies>
9197

9298
<build>
9399
<plugins>
94100
<plugin>
95101
<groupId>org.apache.maven.plugins</groupId>
96102
<artifactId>maven-compiler-plugin</artifactId>
103+
<configuration>
104+
<annotationProcessorPaths>
105+
<path>
106+
<groupId>org.openjdk.jmh</groupId>
107+
<artifactId>jmh-generator-annprocess</artifactId>
108+
<version>${jmh.version}</version>
109+
</path>
110+
</annotationProcessorPaths>
111+
<annotationProcessors>
112+
<annotationProcessor>org.openjdk.jmh.generators.BenchmarkProcessor</annotationProcessor>
113+
</annotationProcessors>
114+
</configuration>
97115
</plugin>
98116
<plugin>
99117
<groupId>org.apache.maven.plugins</groupId>
@@ -112,6 +130,12 @@
112130
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
113131
<mainClass>org.openjdk.jmh.Main</mainClass>
114132
</transformer>
133+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
134+
<resource>META-INF/BenchmarkList</resource>
135+
</transformer>
136+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
137+
<resource>META-INF/CompilerHints</resource>
138+
</transformer>
115139
</transformers>
116140
<artifactSet>
117141
<includes>

parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class BenchmarkFiles {
3636
public static final Path file_1M_BS512M_PS8M = new Path(TARGET_DIR + "/PARQUET-1M-BS512M_PS8M");
3737

3838
// different compression codecs
39-
// public final Path parquetFile_1M_LZO = new Path("target/tests/ParquetBenchmarks/PARQUET-1M-LZO");
39+
public static final Path file_1M_LZO = new Path(TARGET_DIR + "/PARQUET-1M-LZO");
4040
public static final Path file_1M_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-SNAPPY");
4141
public static final Path file_1M_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-GZIP");
4242

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.benchmarks;
20+
21+
import java.io.IOException;
22+
import org.apache.parquet.io.OutputFile;
23+
import org.apache.parquet.io.PositionOutputStream;
24+
25+
/**
26+
* A no-op {@link OutputFile} that discards all written data.
27+
* Useful for isolating CPU/encoding cost from filesystem I/O in write benchmarks.
28+
*/
29+
public final class BlackHoleOutputFile implements OutputFile {
30+
31+
public static final BlackHoleOutputFile INSTANCE = new BlackHoleOutputFile();
32+
33+
private BlackHoleOutputFile() {}
34+
35+
@Override
36+
public boolean supportsBlockSize() {
37+
return false;
38+
}
39+
40+
@Override
41+
public long defaultBlockSize() {
42+
return -1L;
43+
}
44+
45+
@Override
46+
public PositionOutputStream createOrOverwrite(long blockSizeHint) {
47+
return create(blockSizeHint);
48+
}
49+
50+
@Override
51+
public PositionOutputStream create(long blockSizeHint) {
52+
return new PositionOutputStream() {
53+
private long pos;
54+
55+
@Override
56+
public long getPos() throws IOException {
57+
return pos;
58+
}
59+
60+
@Override
61+
public void write(int b) throws IOException {
62+
++pos;
63+
}
64+
65+
@Override
66+
public void write(byte[] b, int off, int len) throws IOException {
67+
pos += len;
68+
}
69+
};
70+
}
71+
72+
@Override
73+
public String getPath() {
74+
return "/dev/null";
75+
}
76+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.benchmarks;
20+
21+
import java.io.IOException;
22+
import java.util.Random;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.parquet.bytes.BytesInput;
26+
import org.apache.parquet.bytes.DirectByteBufferAllocator;
27+
import org.apache.parquet.compression.CompressionCodecFactory;
28+
import org.apache.parquet.hadoop.CodecFactory;
29+
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
30+
import org.openjdk.jmh.annotations.Benchmark;
31+
import org.openjdk.jmh.annotations.BenchmarkMode;
32+
import org.openjdk.jmh.annotations.Fork;
33+
import org.openjdk.jmh.annotations.Level;
34+
import org.openjdk.jmh.annotations.Measurement;
35+
import org.openjdk.jmh.annotations.Mode;
36+
import org.openjdk.jmh.annotations.OutputTimeUnit;
37+
import org.openjdk.jmh.annotations.Param;
38+
import org.openjdk.jmh.annotations.Scope;
39+
import org.openjdk.jmh.annotations.Setup;
40+
import org.openjdk.jmh.annotations.State;
41+
import org.openjdk.jmh.annotations.TearDown;
42+
import org.openjdk.jmh.annotations.Warmup;
43+
44+
/**
45+
* Isolated JMH benchmarks for raw Parquet compression and decompression throughput.
46+
*
47+
* <p>Measures the performance of {@link CompressionCodecFactory.BytesInputCompressor}
48+
* and {@link CompressionCodecFactory.BytesInputDecompressor} for each supported codec,
49+
* comparing the heap-based {@link CodecFactory} path (what all production users take)
50+
* against the direct-memory {@link DirectCodecFactory} path (off-heap ByteBuffers).
51+
*
52+
* <p>This benchmark isolates the codec hot path from file I/O, encoding, and other
53+
* Parquet overhead, making it ideal for measuring compression-specific optimizations.
54+
*/
55+
@BenchmarkMode(Mode.Throughput)
56+
@OutputTimeUnit(TimeUnit.SECONDS)
57+
@Fork(1)
58+
@Warmup(iterations = 2, time = 1)
59+
@Measurement(iterations = 3, time = 2)
60+
@State(Scope.Thread)
61+
public class CompressionBenchmark {
62+
63+
@Param({"SNAPPY", "ZSTD", "LZ4_RAW", "GZIP", "BROTLI", "LZO"})
64+
public String codec;
65+
66+
@Param({"65536", "131072", "262144", "1048576"})
67+
public int pageSize;
68+
69+
@Param({"HEAP", "DIRECT"})
70+
public String factoryType;
71+
72+
private byte[] uncompressedData;
73+
private byte[] compressedData;
74+
private int decompressedSize;
75+
76+
private CompressionCodecFactory.BytesInputCompressor compressor;
77+
private CompressionCodecFactory.BytesInputDecompressor decompressor;
78+
private CodecFactory factory;
79+
80+
@Setup(Level.Trial)
81+
public void setup() throws IOException {
82+
uncompressedData = generatePageData(pageSize, 42L);
83+
decompressedSize = uncompressedData.length;
84+
85+
Configuration conf = new Configuration();
86+
if ("DIRECT".equals(factoryType)) {
87+
factory = CodecFactory.createDirectCodecFactory(conf, DirectByteBufferAllocator.getInstance(), pageSize);
88+
} else {
89+
factory = new CodecFactory(conf, pageSize);
90+
}
91+
CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
92+
93+
compressor = factory.getCompressor(codecName);
94+
decompressor = factory.getDecompressor(codecName);
95+
96+
// Pre-compress for decompression benchmark; copy to a stable byte array
97+
// since the compressor may reuse its internal buffer.
98+
BytesInput compressed = compressor.compress(BytesInput.from(uncompressedData));
99+
compressedData = compressed.toByteArray();
100+
}
101+
102+
@TearDown(Level.Trial)
103+
public void tearDown() {
104+
factory.release();
105+
}
106+
107+
@Benchmark
108+
public BytesInput compress() throws IOException {
109+
return compressor.compress(BytesInput.from(uncompressedData));
110+
}
111+
112+
@Benchmark
113+
public byte[] decompress() throws IOException {
114+
// Force materialization of the decompressed data. Without this, codecs using
115+
// the stream-based HeapBytesDecompressor (e.g. GZIP) would return a lazy
116+
// StreamBytesInput, deferring the actual work. toByteArray() is essentially
117+
// free for our optimized implementations (returns the existing byte[]).
118+
return decompressor
119+
.decompress(BytesInput.from(compressedData), decompressedSize)
120+
.toByteArray();
121+
}
122+
123+
/**
124+
* Generates byte data that approximates realistic Parquet page content.
125+
* Mixes sequential runs, repeated values, low-range random, and full random
126+
* to produce a realistic compression ratio (~2-4x for fast codecs).
127+
*/
128+
static byte[] generatePageData(int size, long seed) {
129+
Random random = new Random(seed);
130+
byte[] data = new byte[size];
131+
int i = 0;
132+
while (i < size) {
133+
int patternType = random.nextInt(4);
134+
int chunkSize = Math.min(random.nextInt(256) + 64, size - i);
135+
switch (patternType) {
136+
case 0: // Sequential bytes (highly compressible)
137+
for (int j = 0; j < chunkSize && i < size; j++) {
138+
data[i++] = (byte) (j & 0xFF);
139+
}
140+
break;
141+
case 1: // Repeated value (highly compressible)
142+
byte val = (byte) random.nextInt(256);
143+
for (int j = 0; j < chunkSize && i < size; j++) {
144+
data[i++] = val;
145+
}
146+
break;
147+
case 2: // Small range random (moderately compressible)
148+
for (int j = 0; j < chunkSize && i < size; j++) {
149+
data[i++] = (byte) random.nextInt(16);
150+
}
151+
break;
152+
case 3: // Full random (low compressibility)
153+
byte[] randomChunk = new byte[chunkSize];
154+
random.nextBytes(randomChunk);
155+
int toCopy = Math.min(chunkSize, size - i);
156+
System.arraycopy(randomChunk, 0, data, i, toCopy);
157+
i += toCopy;
158+
break;
159+
}
160+
}
161+
return data;
162+
}
163+
}

0 commit comments

Comments
 (0)