Skip to content

Commit c1632dd

Browse files
committed
Add BROTLI direct bypass in DirectCodecFactory using jbrotli JNI
Bypass the Hadoop BrotliCodec/stream wrapper for BROTLI compression and decompression by using org.meteogroup.jbrotli's native JNI bindings directly with ByteBuffer support via reflection (brotli-codec remains runtime scope). This eliminates intermediate buffer copies and the BrotliStreamCompressor state machine overhead. Changes: - DirectCodecFactory: Add BrotliDirectCompressor (quality=1, matching Hadoop default) and BrotliDirectDecompressor using one-shot jbrotli API via reflection - Load native library eagerly with graceful fallback to Hadoop codec path - CompressionBenchmark: Switch from heap CodecFactory to DirectCodecFactory to benchmark the actual production code path Results at 64KB page size: - Compress: 6,746 -> 9,662 ops/s (1.43x speedup) - Decompress: 2,534 -> 2,786 ops/s (1.10x speedup)
1 parent 8b464dc commit c1632dd

2 files changed

Lines changed: 141 additions & 3 deletions

File tree

parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/CompressionBenchmark.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeUnit;
2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.parquet.bytes.BytesInput;
26+
import org.apache.parquet.bytes.DirectByteBufferAllocator;
2627
import org.apache.parquet.compression.CompressionCodecFactory;
2728
import org.apache.parquet.hadoop.CodecFactory;
2829
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -45,8 +46,9 @@
4546
*
4647
* <p>Measures the performance of {@link CompressionCodecFactory.BytesInputCompressor}
4748
* and {@link CompressionCodecFactory.BytesInputDecompressor} for each supported codec,
48-
* using the heap-based {@link CodecFactory} path. Input data is generated to approximate
49-
* realistic Parquet page content (a mix of sequential, repeated, and random byte patterns).
49+
* using the direct-memory {@link CodecFactory} path (same as actual Parquet file I/O).
50+
* Input data is generated to approximate realistic Parquet page content (a mix of
51+
* sequential, repeated, and random byte patterns).
5052
*
5153
* <p>This benchmark isolates the codec hot path from file I/O, encoding, and other
5254
* Parquet overhead, making it ideal for measuring compression-specific optimizations.
@@ -79,7 +81,7 @@ public void setup() throws IOException {
7981
decompressedSize = uncompressedData.length;
8082

8183
Configuration conf = new Configuration();
82-
factory = new CodecFactory(conf, pageSize);
84+
factory = CodecFactory.createDirectCodecFactory(conf, DirectByteBufferAllocator.getInstance(), pageSize);
8385
CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
8486

8587
compressor = factory.getCompressor(codecName);

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.github.luben.zstd.ZstdCompressCtx;
2222
import com.github.luben.zstd.ZstdDecompressCtx;
2323
import java.io.IOException;
24+
import java.lang.reflect.Constructor;
2425
import java.lang.reflect.InvocationTargetException;
2526
import java.lang.reflect.Method;
2627
import java.nio.ByteBuffer;
@@ -61,6 +62,14 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable {
6162
private static final Method DECOMPRESS_METHOD;
6263
private static final Method CREATE_DIRECT_DECOMPRESSOR_METHOD;
6364

65+
// Brotli JNI bypass via reflection (brotli-codec is a runtime-only dependency)
66+
private static final boolean BROTLI_NATIVE_AVAILABLE;
67+
private static final Method BROTLI_DECOMPRESS_METHOD; // BrotliDeCompressor.deCompress(ByteBuffer, ByteBuffer)
68+
private static final Method BROTLI_COMPRESS_METHOD; // BrotliCompressor.compress(Parameter, ByteBuffer, ByteBuffer)
69+
private static final Constructor<?> BROTLI_DECOMPRESSOR_CTOR; // BrotliDeCompressor()
70+
private static final Constructor<?> BROTLI_COMPRESSOR_CTOR; // BrotliCompressor()
71+
private static final Object BROTLI_COMPRESS_PARAMETER; // Brotli.Parameter instance (quality=1)
72+
6473
static {
6574
Class<?> tempClass = null;
6675
Method tempCreateMethod = null;
@@ -76,6 +85,46 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable {
7685
DIRECT_DECOMPRESSION_CODEC_CLASS = tempClass;
7786
CREATE_DIRECT_DECOMPRESSOR_METHOD = tempCreateMethod;
7887
DECOMPRESS_METHOD = tempDecompressMethod;
88+
89+
// Initialize Brotli JNI bypass via reflection
90+
boolean brotliLoaded = false;
91+
Method brotliDecompress = null;
92+
Method brotliCompress = null;
93+
Constructor<?> brotliDecompressorCtor = null;
94+
Constructor<?> brotliCompressorCtor = null;
95+
Object brotliParam = null;
96+
try {
97+
// Load native library
98+
Class<?> loaderClass = Class.forName("org.meteogroup.jbrotli.libloader.BrotliLibraryLoader");
99+
loaderClass.getMethod("loadBrotli").invoke(null);
100+
101+
// BrotliDeCompressor: no-arg ctor + deCompress(ByteBuffer, ByteBuffer) -> int
102+
Class<?> decompClass = Class.forName("org.meteogroup.jbrotli.BrotliDeCompressor");
103+
brotliDecompressorCtor = decompClass.getConstructor();
104+
brotliDecompress = decompClass.getMethod("deCompress", ByteBuffer.class, ByteBuffer.class);
105+
106+
// BrotliCompressor: no-arg ctor + compress(Parameter, ByteBuffer, ByteBuffer) -> int
107+
Class<?> compClass = Class.forName("org.meteogroup.jbrotli.BrotliCompressor");
108+
Class<?> paramClass = Class.forName("org.meteogroup.jbrotli.Brotli$Parameter");
109+
Class<?> modeClass = Class.forName("org.meteogroup.jbrotli.Brotli$Mode");
110+
brotliCompressorCtor = compClass.getConstructor();
111+
brotliCompress = compClass.getMethod("compress", paramClass, ByteBuffer.class, ByteBuffer.class);
112+
113+
// Create Parameter(Mode.GENERIC, quality=1, lgwin=22, lgblock=0)
114+
Object genericMode = modeClass.getField("GENERIC").get(null);
115+
Constructor<?> paramCtor = paramClass.getConstructor(modeClass, int.class, int.class, int.class);
116+
brotliParam = paramCtor.newInstance(genericMode, 1, 22, 0);
117+
118+
brotliLoaded = true;
119+
} catch (Throwable t) {
120+
LOG.debug("Brotli native library not available, falling back to Hadoop codec", t);
121+
}
122+
BROTLI_NATIVE_AVAILABLE = brotliLoaded;
123+
BROTLI_DECOMPRESS_METHOD = brotliDecompress;
124+
BROTLI_COMPRESS_METHOD = brotliCompress;
125+
BROTLI_DECOMPRESSOR_CTOR = brotliDecompressorCtor;
126+
BROTLI_COMPRESSOR_CTOR = brotliCompressorCtor;
127+
BROTLI_COMPRESS_PARAMETER = brotliParam;
79128
}
80129

81130
/**
@@ -105,6 +154,11 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName)
105154
return new ZstdCompressor();
106155
case LZ4_RAW:
107156
return new Lz4RawCompressor();
157+
case BROTLI:
158+
if (BROTLI_NATIVE_AVAILABLE) {
159+
return new BrotliDirectCompressor();
160+
}
161+
return super.createCompressor(codecName);
108162
default:
109163
return super.createCompressor(codecName);
110164
}
@@ -119,6 +173,11 @@ protected BytesDecompressor createDecompressor(final CompressionCodecName codecN
119173
return new ZstdDecompressor();
120174
case LZ4_RAW:
121175
return new Lz4RawDecompressor();
176+
case BROTLI:
177+
if (BROTLI_NATIVE_AVAILABLE) {
178+
return new BrotliDirectDecompressor();
179+
}
180+
// fall through to default Hadoop codec path
122181
case GZIP:
123182
case UNCOMPRESSED:
124183
return super.createDecompressor(codecName);
@@ -491,6 +550,83 @@ void closeCompressor() {
491550
}
492551
}
493552

553+
/**
554+
* Direct-memory Brotli decompressor using jbrotli's native JNI bindings via reflection,
555+
* bypassing the Hadoop BrotliCodec/stream wrapper overhead.
556+
*/
557+
private class BrotliDirectDecompressor extends BaseDecompressor {
558+
private final Object decompressor;
559+
560+
BrotliDirectDecompressor() {
561+
try {
562+
this.decompressor = BROTLI_DECOMPRESSOR_CTOR.newInstance();
563+
} catch (ReflectiveOperationException e) {
564+
throw new DirectCodecPool.ParquetCompressionCodecException("Failed to create Brotli decompressor", e);
565+
}
566+
}
567+
568+
@Override
569+
int decompress(ByteBuffer input, ByteBuffer output) throws IOException {
570+
try {
571+
return (int) BROTLI_DECOMPRESS_METHOD.invoke(decompressor, input, output);
572+
} catch (InvocationTargetException e) {
573+
throw new IOException("Brotli decompression failed", e.getCause());
574+
} catch (IllegalAccessException e) {
575+
throw new IOException("Brotli decompression failed", e);
576+
}
577+
}
578+
579+
@Override
580+
void closeDecompressor() {
581+
// no-op: BrotliDeCompressor has no resources to release
582+
}
583+
}
584+
585+
/**
586+
* Direct-memory Brotli compressor using jbrotli's native JNI bindings via reflection,
587+
* bypassing the Hadoop BrotliCodec/stream wrapper overhead.
588+
* Uses quality=1 by default (fast compression, matching Hadoop's BrotliCompressor default).
589+
*/
590+
private class BrotliDirectCompressor extends BaseCompressor {
591+
private final Object compressor;
592+
593+
BrotliDirectCompressor() {
594+
try {
595+
this.compressor = BROTLI_COMPRESSOR_CTOR.newInstance();
596+
} catch (ReflectiveOperationException e) {
597+
throw new DirectCodecPool.ParquetCompressionCodecException("Failed to create Brotli compressor", e);
598+
}
599+
}
600+
601+
@Override
602+
public CompressionCodecName getCodecName() {
603+
return CompressionCodecName.BROTLI;
604+
}
605+
606+
@Override
607+
int maxCompressedSize(int size) {
608+
// Brotli worst case: input size + (input size >> 2) + 1K overhead for small inputs
609+
// This is a conservative upper bound matching the Brotli spec
610+
return size + (size >> 2) + 1024;
611+
}
612+
613+
@Override
614+
int compress(ByteBuffer input, ByteBuffer output) throws IOException {
615+
try {
616+
return (int) BROTLI_COMPRESS_METHOD.invoke(compressor, BROTLI_COMPRESS_PARAMETER, input, output);
617+
} catch (InvocationTargetException e) {
618+
throw new IOException("Brotli compression failed", e.getCause());
619+
} catch (IllegalAccessException e) {
620+
throw new IOException("Brotli compression failed", e);
621+
}
622+
}
623+
624+
@Override
625+
void closeCompressor() {
626+
// no-op: BrotliCompressor has no resources to release
627+
}
628+
}
629+
494630
/**
495631
* @deprecated Use {@link CodecFactory#NO_OP_COMPRESSOR} instead
496632
*/

0 commit comments

Comments
 (0)