Skip to content

Commit 0571b9e

Browse files
committed
GH-3530: Bypass Hadoop codec abstraction to optimize compression performance
Bypass Hadoop CompressionCodec for Snappy (xerial JNI), LZ4_RAW (airlift), ZSTD (zstd-jni), GZIP (JDK), LZO (airlift), and BROTLI (brotli4j) in both CodecFactory and DirectCodecFactory, eliminating per-page codec pool lookups, stream wrapper allocation, and unnecessary buffer copies. SNAPPY: direct byte-array JNI calls via Snappy.compress/uncompress, avoiding the Hadoop stream abstraction and intermediate direct ByteBuffer copies. ZSTD: replace streaming ZstdOutputStreamNoFinalizer/ZstdInputStreamNoFinalizer with reusable ZstdCompressCtx/ZstdDecompressCtx single-call APIs. Supports compression level and multi-threaded workers configuration. LZ4_RAW: direct airlift Lz4Compressor/Lz4Decompressor with reusable direct ByteBuffers, bypassing Hadoop's NonBlockedCompressor overhead. GZIP: bypass Hadoop's GzipCodec and codec-pool/stream-wrapper overhead with direct JDK GZIPOutputStream/GZIPInputStream. Compression level is read from the existing "zlib.compress.level" Hadoop configuration key. LZO: use aircompressor's LzoHadoopStreams directly, bypassing the GPL-licensed com.hadoop.compression.lzo.LzoCodec. Wire-compatible with Hadoop's LzoCodec. BROTLI: migrate from jbrotli (unmaintained) to brotli4j via reflection, using single-call Encoder.compress/Decoder.decompress byte-array APIs. End-to-end interop tests (TestCompressionInterop) validate that files written with the old Hadoop CompressionCodec path are readable by the new direct path and vice versa, for all 6 codecs including multi-row-group scenarios.
1 parent c7e7acb commit 0571b9e

23 files changed

Lines changed: 2965 additions & 120 deletions

File tree

parquet-benchmarks/pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,31 @@
8787
<artifactId>slf4j-api</artifactId>
8888
<version>${slf4j.version}</version>
8989
</dependency>
90+
<dependency>
91+
<groupId>com.aayushatharva.brotli4j</groupId>
92+
<artifactId>brotli4j</artifactId>
93+
<version>${brotli4j.version}</version>
94+
<scope>runtime</scope>
95+
</dependency>
9096
</dependencies>
9197

9298
<build>
9399
<plugins>
94100
<plugin>
95101
<groupId>org.apache.maven.plugins</groupId>
96102
<artifactId>maven-compiler-plugin</artifactId>
103+
<configuration>
104+
<annotationProcessorPaths>
105+
<path>
106+
<groupId>org.openjdk.jmh</groupId>
107+
<artifactId>jmh-generator-annprocess</artifactId>
108+
<version>${jmh.version}</version>
109+
</path>
110+
</annotationProcessorPaths>
111+
<annotationProcessors>
112+
<annotationProcessor>org.openjdk.jmh.generators.BenchmarkProcessor</annotationProcessor>
113+
</annotationProcessors>
114+
</configuration>
97115
</plugin>
98116
<plugin>
99117
<groupId>org.apache.maven.plugins</groupId>
@@ -112,6 +130,12 @@
112130
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
113131
<mainClass>org.openjdk.jmh.Main</mainClass>
114132
</transformer>
133+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
134+
<resource>META-INF/BenchmarkList</resource>
135+
</transformer>
136+
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
137+
<resource>META-INF/CompilerHints</resource>
138+
</transformer>
115139
</transformers>
116140
<artifactSet>
117141
<includes>

parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class BenchmarkFiles {
3636
public static final Path file_1M_BS512M_PS8M = new Path(TARGET_DIR + "/PARQUET-1M-BS512M_PS8M");
3737

3838
// different compression codecs
39-
// public final Path parquetFile_1M_LZO = new Path("target/tests/ParquetBenchmarks/PARQUET-1M-LZO");
39+
public static final Path file_1M_LZO = new Path(TARGET_DIR + "/PARQUET-1M-LZO");
4040
public static final Path file_1M_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-SNAPPY");
4141
public static final Path file_1M_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-GZIP");
4242

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.benchmarks;
20+
21+
import java.io.IOException;
22+
import org.apache.parquet.io.OutputFile;
23+
import org.apache.parquet.io.PositionOutputStream;
24+
25+
/**
26+
* A no-op {@link OutputFile} that discards all written data.
27+
* Useful for isolating CPU/encoding cost from filesystem I/O in write benchmarks.
28+
*/
29+
public final class BlackHoleOutputFile implements OutputFile {
30+
31+
public static final BlackHoleOutputFile INSTANCE = new BlackHoleOutputFile();
32+
33+
private BlackHoleOutputFile() {}
34+
35+
@Override
36+
public boolean supportsBlockSize() {
37+
return false;
38+
}
39+
40+
@Override
41+
public long defaultBlockSize() {
42+
return -1L;
43+
}
44+
45+
@Override
46+
public PositionOutputStream createOrOverwrite(long blockSizeHint) {
47+
return create(blockSizeHint);
48+
}
49+
50+
@Override
51+
public PositionOutputStream create(long blockSizeHint) {
52+
return new PositionOutputStream() {
53+
private long pos;
54+
55+
@Override
56+
public long getPos() throws IOException {
57+
return pos;
58+
}
59+
60+
@Override
61+
public void write(int b) throws IOException {
62+
++pos;
63+
}
64+
65+
@Override
66+
public void write(byte[] b, int off, int len) throws IOException {
67+
pos += len;
68+
}
69+
};
70+
}
71+
72+
@Override
73+
public String getPath() {
74+
return "/dev/null";
75+
}
76+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.benchmarks;
20+
21+
import java.io.IOException;
22+
import java.util.Random;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.parquet.bytes.BytesInput;
26+
import org.apache.parquet.bytes.DirectByteBufferAllocator;
27+
import org.apache.parquet.compression.CompressionCodecFactory;
28+
import org.apache.parquet.hadoop.CodecFactory;
29+
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
30+
import org.openjdk.jmh.annotations.Benchmark;
31+
import org.openjdk.jmh.annotations.BenchmarkMode;
32+
import org.openjdk.jmh.annotations.Fork;
33+
import org.openjdk.jmh.annotations.Level;
34+
import org.openjdk.jmh.annotations.Measurement;
35+
import org.openjdk.jmh.annotations.Mode;
36+
import org.openjdk.jmh.annotations.OutputTimeUnit;
37+
import org.openjdk.jmh.annotations.Param;
38+
import org.openjdk.jmh.annotations.Scope;
39+
import org.openjdk.jmh.annotations.Setup;
40+
import org.openjdk.jmh.annotations.State;
41+
import org.openjdk.jmh.annotations.TearDown;
42+
import org.openjdk.jmh.annotations.Warmup;
43+
44+
/**
45+
* Isolated JMH benchmarks for raw Parquet compression and decompression throughput.
46+
*
47+
* <p>Measures the performance of {@link CompressionCodecFactory.BytesInputCompressor}
48+
* and {@link CompressionCodecFactory.BytesInputDecompressor} for each supported codec,
49+
* comparing the heap-based {@link CodecFactory} path (what all production users take)
50+
* against the direct-memory {@code DirectCodecFactory} path (off-heap ByteBuffers).
51+
*
52+
* <p>This benchmark isolates the codec hot path from file I/O, encoding, and other
53+
* Parquet overhead, making it ideal for measuring compression-specific optimizations.
54+
*/
55+
@BenchmarkMode(Mode.Throughput)
56+
@OutputTimeUnit(TimeUnit.SECONDS)
57+
@Fork(1)
58+
@Warmup(iterations = 2, time = 1)
59+
@Measurement(iterations = 3, time = 2)
60+
@State(Scope.Thread)
61+
public class CompressionBenchmark {
62+
63+
@Param({"SNAPPY", "ZSTD", "LZ4_RAW", "GZIP", "BROTLI", "LZO"})
64+
public String codec;
65+
66+
@Param({"65536", "131072", "262144", "1048576"})
67+
public int pageSize;
68+
69+
@Param({"HEAP", "DIRECT"})
70+
public String factoryType;
71+
72+
private byte[] uncompressedData;
73+
private byte[] compressedData;
74+
private int decompressedSize;
75+
76+
private CompressionCodecFactory.BytesInputCompressor compressor;
77+
private CompressionCodecFactory.BytesInputDecompressor decompressor;
78+
private CodecFactory factory;
79+
80+
@Setup(Level.Trial)
81+
public void setup() throws IOException {
82+
uncompressedData = generatePageData(pageSize, 42L);
83+
decompressedSize = uncompressedData.length;
84+
85+
Configuration conf = new Configuration();
86+
if ("DIRECT".equals(factoryType)) {
87+
factory = CodecFactory.createDirectCodecFactory(conf, DirectByteBufferAllocator.getInstance(), pageSize);
88+
} else {
89+
factory = new CodecFactory(conf, pageSize);
90+
}
91+
CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
92+
93+
compressor = factory.getCompressor(codecName);
94+
decompressor = factory.getDecompressor(codecName);
95+
96+
// Pre-compress for decompression benchmark; copy to a stable byte array
97+
// since the compressor may reuse its internal buffer.
98+
BytesInput compressed = compressor.compress(BytesInput.from(uncompressedData));
99+
compressedData = compressed.toByteArray();
100+
}
101+
102+
@TearDown(Level.Trial)
103+
public void tearDown() {
104+
factory.release();
105+
}
106+
107+
@Benchmark
108+
public BytesInput compress() throws IOException {
109+
return compressor.compress(BytesInput.from(uncompressedData));
110+
}
111+
112+
@Benchmark
113+
public byte[] decompress() throws IOException {
114+
// Force materialization of the decompressed data. Without this, codecs using
115+
// the stream-based HeapBytesDecompressor (e.g. GZIP) would return a lazy
116+
// StreamBytesInput, deferring the actual work. toByteArray() is essentially
117+
// free for our optimized implementations (returns the existing byte[]).
118+
return decompressor
119+
.decompress(BytesInput.from(compressedData), decompressedSize)
120+
.toByteArray();
121+
}
122+
123+
/**
124+
* Generates byte data that approximates realistic Parquet page content.
125+
* Mixes sequential runs, repeated values, low-range random, and full random
126+
* to produce a realistic compression ratio (~2-4x for fast codecs).
127+
*/
128+
static byte[] generatePageData(int size, long seed) {
129+
Random random = new Random(seed);
130+
byte[] data = new byte[size];
131+
int i = 0;
132+
while (i < size) {
133+
int patternType = random.nextInt(4);
134+
int chunkSize = Math.min(random.nextInt(256) + 64, size - i);
135+
switch (patternType) {
136+
case 0: // Sequential bytes (highly compressible)
137+
for (int j = 0; j < chunkSize && i < size; j++) {
138+
data[i++] = (byte) (j & 0xFF);
139+
}
140+
break;
141+
case 1: // Repeated value (highly compressible)
142+
byte val = (byte) random.nextInt(256);
143+
for (int j = 0; j < chunkSize && i < size; j++) {
144+
data[i++] = val;
145+
}
146+
break;
147+
case 2: // Small range random (moderately compressible)
148+
for (int j = 0; j < chunkSize && i < size; j++) {
149+
data[i++] = (byte) random.nextInt(16);
150+
}
151+
break;
152+
case 3: // Full random (low compressibility)
153+
byte[] randomChunk = new byte[chunkSize];
154+
random.nextBytes(randomChunk);
155+
int toCopy = Math.min(chunkSize, size - i);
156+
System.arraycopy(randomChunk, 0, data, i, toCopy);
157+
i += toCopy;
158+
break;
159+
}
160+
}
161+
return data;
162+
}
163+
}

0 commit comments

Comments
 (0)