Skip to content

Commit 404ed02

Browse files
committed
Add JMH CompressionBenchmark for isolated codec throughput measurement
Benchmarks raw compress/decompress throughput for each supported codec (SNAPPY, ZSTD, LZ4_RAW, GZIP) at page sizes 8KB, 64KB, and 256KB using the heap-based CodecFactory path. Input data mixes sequential, repeated, low-range random, and full random patterns for realistic compression ratios.
1 parent 5bf53dd commit 404ed02

1 file changed

Lines changed: 153 additions & 0 deletions

File tree

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.benchmarks;
20+
21+
import java.io.IOException;
22+
import java.util.Random;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.parquet.bytes.BytesInput;
26+
import org.apache.parquet.compression.CompressionCodecFactory;
27+
import org.apache.parquet.hadoop.CodecFactory;
28+
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
29+
import org.openjdk.jmh.annotations.Benchmark;
30+
import org.openjdk.jmh.annotations.BenchmarkMode;
31+
import org.openjdk.jmh.annotations.Fork;
32+
import org.openjdk.jmh.annotations.Level;
33+
import org.openjdk.jmh.annotations.Measurement;
34+
import org.openjdk.jmh.annotations.Mode;
35+
import org.openjdk.jmh.annotations.OutputTimeUnit;
36+
import org.openjdk.jmh.annotations.Param;
37+
import org.openjdk.jmh.annotations.Scope;
38+
import org.openjdk.jmh.annotations.Setup;
39+
import org.openjdk.jmh.annotations.State;
40+
import org.openjdk.jmh.annotations.TearDown;
41+
import org.openjdk.jmh.annotations.Warmup;
42+
43+
/**
44+
* Isolated JMH benchmarks for raw Parquet compression and decompression throughput.
45+
*
46+
* <p>Measures the performance of {@link CompressionCodecFactory.BytesInputCompressor}
47+
* and {@link CompressionCodecFactory.BytesInputDecompressor} for each supported codec,
48+
* using the heap-based {@link CodecFactory} path. Input data is generated to approximate
49+
* realistic Parquet page content (a mix of sequential, repeated, and random byte patterns).
50+
*
51+
* <p>This benchmark isolates the codec hot path from file I/O, encoding, and other
52+
* Parquet overhead, making it ideal for measuring compression-specific optimizations.
53+
*/
54+
@BenchmarkMode(Mode.Throughput)
55+
@OutputTimeUnit(TimeUnit.SECONDS)
56+
@Fork(1)
57+
@Warmup(iterations = 3, time = 2)
58+
@Measurement(iterations = 5, time = 3)
59+
@State(Scope.Thread)
60+
public class CompressionBenchmark {
61+
62+
@Param({"SNAPPY", "ZSTD", "LZ4_RAW", "GZIP"})
63+
public String codec;
64+
65+
@Param({"8192", "65536", "262144"})
66+
public int pageSize;
67+
68+
private byte[] uncompressedData;
69+
private byte[] compressedData;
70+
private int decompressedSize;
71+
72+
private CompressionCodecFactory.BytesInputCompressor compressor;
73+
private CompressionCodecFactory.BytesInputDecompressor decompressor;
74+
private CodecFactory factory;
75+
76+
@Setup(Level.Trial)
77+
public void setup() throws IOException {
78+
uncompressedData = generatePageData(pageSize, 42L);
79+
decompressedSize = uncompressedData.length;
80+
81+
Configuration conf = new Configuration();
82+
factory = new CodecFactory(conf, pageSize);
83+
CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
84+
85+
compressor = factory.getCompressor(codecName);
86+
decompressor = factory.getDecompressor(codecName);
87+
88+
// Pre-compress for decompression benchmark; copy to a stable byte array
89+
// since the compressor may reuse its internal buffer.
90+
BytesInput compressed = compressor.compress(BytesInput.from(uncompressedData));
91+
compressedData = compressed.toByteArray();
92+
}
93+
94+
@TearDown(Level.Trial)
95+
public void tearDown() {
96+
factory.release();
97+
}
98+
99+
@Benchmark
100+
public BytesInput compress() throws IOException {
101+
return compressor.compress(BytesInput.from(uncompressedData));
102+
}
103+
104+
@Benchmark
105+
public byte[] decompress() throws IOException {
106+
// Force materialization of the decompressed data. Without this, codecs using
107+
// the stream-based HeapBytesDecompressor (e.g. GZIP) would return a lazy
108+
// StreamBytesInput, deferring the actual work. toByteArray() is essentially
109+
// free for our optimized implementations (returns the existing byte[]).
110+
return decompressor.decompress(BytesInput.from(compressedData), decompressedSize).toByteArray();
111+
}
112+
113+
/**
114+
* Generates byte data that approximates realistic Parquet page content.
115+
* Mixes sequential runs, repeated values, low-range random, and full random
116+
* to produce a realistic compression ratio (~2-4x for fast codecs).
117+
*/
118+
static byte[] generatePageData(int size, long seed) {
119+
Random random = new Random(seed);
120+
byte[] data = new byte[size];
121+
int i = 0;
122+
while (i < size) {
123+
int patternType = random.nextInt(4);
124+
int chunkSize = Math.min(random.nextInt(256) + 64, size - i);
125+
switch (patternType) {
126+
case 0: // Sequential bytes (highly compressible)
127+
for (int j = 0; j < chunkSize && i < size; j++) {
128+
data[i++] = (byte) (j & 0xFF);
129+
}
130+
break;
131+
case 1: // Repeated value (highly compressible)
132+
byte val = (byte) random.nextInt(256);
133+
for (int j = 0; j < chunkSize && i < size; j++) {
134+
data[i++] = val;
135+
}
136+
break;
137+
case 2: // Small range random (moderately compressible)
138+
for (int j = 0; j < chunkSize && i < size; j++) {
139+
data[i++] = (byte) random.nextInt(16);
140+
}
141+
break;
142+
case 3: // Full random (low compressibility)
143+
byte[] randomChunk = new byte[chunkSize];
144+
random.nextBytes(randomChunk);
145+
int toCopy = Math.min(chunkSize, size - i);
146+
System.arraycopy(randomChunk, 0, data, i, toCopy);
147+
i += toCopy;
148+
break;
149+
}
150+
}
151+
return data;
152+
}
153+
}

0 commit comments

Comments
 (0)