Skip to content

Commit 56273ae

Browse files
committed
Fix compression codec config gaps from Hadoop bypass: honor ZSTD bufferPool, route GZIP in DirectCodecFactory
Respect parquet.compression.codec.zstd.bufferPool.enabled in the optimized ZstdBytesCompressor/Decompressor (was hardcoded to RecyclingBufferPool). Route GZIP decompression through the optimized path in DirectCodecFactory instead of falling back to the Hadoop codec pool. Remove dead GZIP/ZSTD branches from cacheKey(). Document ISA-L native library bypass in GZIP Javadocs. Replace obsolete Hadoop codec caching tests with end-to-end compression level verification tests.
1 parent e58a2d6 commit 56273ae

3 files changed

Lines changed: 92 additions & 54 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.parquet.hadoop;
2020

21+
import com.github.luben.zstd.BufferPool;
22+
import com.github.luben.zstd.NoPool;
2123
import com.github.luben.zstd.RecyclingBufferPool;
2224
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
2325
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
@@ -283,12 +285,17 @@ protected BytesCompressor createCompressor(CompressionCodecName codecName) {
283285
case SNAPPY:
284286
return new SnappyBytesCompressor();
285287
case ZSTD:
288+
BufferPool zstdCompressPool = conf.getBoolean(
289+
ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED,
290+
ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)
291+
? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE;
286292
return new ZstdBytesCompressor(
287293
conf.getInt(
288294
ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
289295
conf.getInt(
290296
ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS),
291-
pageSize);
297+
pageSize,
298+
zstdCompressPool);
292299
case LZ4_RAW:
293300
return new Lz4RawBytesCompressor();
294301
case GZIP:
@@ -308,7 +315,11 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
308315
case SNAPPY:
309316
return new SnappyBytesDecompressor();
310317
case ZSTD:
311-
return new ZstdBytesDecompressor();
318+
BufferPool zstdDecompressPool = conf.getBoolean(
319+
ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED,
320+
ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)
321+
? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE;
322+
return new ZstdBytesDecompressor(zstdDecompressPool);
312323
case LZ4_RAW:
313324
return new Lz4RawBytesDecompressor();
314325
case GZIP:
@@ -354,15 +365,9 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
354365
private String cacheKey(CompressionCodecName codecName) {
355366
String level = null;
356367
switch (codecName) {
357-
case GZIP:
358-
level = conf.get("zlib.compress.level");
359-
break;
360368
case BROTLI:
361369
level = conf.get("compression.brotli.quality");
362370
break;
363-
case ZSTD:
364-
level = conf.get("parquet.compression.codec.zstd.level");
365-
break;
366371
default:
367372
// compression level is not supported; ignore it
368373
}
@@ -470,28 +475,32 @@ public void release() {}
470475
/**
471476
* Compresses using zstd-jni's {@link ZstdOutputStreamNoFinalizer} directly,
472477
* bypassing the Hadoop codec framework ({@code ZstandardCodec}, {@code CodecPool},
473-
* {@code CompressionOutputStream} wrapper). Uses {@link RecyclingBufferPool} for the
474-
* internal 128KB output buffer, matching the streaming API's natural buffer size.
478+
* {@code CompressionOutputStream} wrapper). Uses a configurable {@link BufferPool}
479+
* (defaulting to {@link RecyclingBufferPool}) for the internal 128KB output buffer,
480+
* matching the streaming API's natural buffer size. The buffer pool strategy is
481+
* controlled by the {@code parquet.compression.codec.zstd.bufferPool.enabled} config.
475482
* This avoids the overhead of Hadoop codec instantiation and compressor pool management
476483
* while using the same underlying ZSTD streaming path, which is well-optimized for all
477484
* input sizes including large pages (256KB+).
478485
*/
479486
static class ZstdBytesCompressor extends BytesCompressor {
480487
private final int level;
481488
private final int workers;
489+
private final BufferPool bufferPool;
482490
private final ByteArrayOutputStream compressedOutBuffer;
483491

484-
ZstdBytesCompressor(int level, int workers, int pageSize) {
492+
ZstdBytesCompressor(int level, int workers, int pageSize, BufferPool bufferPool) {
485493
this.level = level;
486494
this.workers = workers;
495+
this.bufferPool = bufferPool;
487496
this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
488497
}
489498

490499
@Override
491500
public BytesInput compress(BytesInput bytes) throws IOException {
492501
compressedOutBuffer.reset();
493502
try (ZstdOutputStreamNoFinalizer zos =
494-
new ZstdOutputStreamNoFinalizer(compressedOutBuffer, RecyclingBufferPool.INSTANCE, level)) {
503+
new ZstdOutputStreamNoFinalizer(compressedOutBuffer, bufferPool, level)) {
495504
if (workers > 0) {
496505
zos.setWorkers(workers);
497506
}
@@ -513,16 +522,23 @@ public void release() {
513522

514523
/**
515524
* Decompresses using zstd-jni's {@link ZstdInputStreamNoFinalizer} directly,
516-
* bypassing the Hadoop codec framework. Uses {@link RecyclingBufferPool} for internal
517-
* buffers, matching the streaming decompression path. Reads the full decompressed output
518-
* in a single pass via {@link InputStream#readNBytes(int)}.
525+
* bypassing the Hadoop codec framework. Uses a configurable {@link BufferPool}
526+
* for internal buffers, matching the streaming decompression path. The buffer pool
527+
* strategy is controlled by the {@code parquet.compression.codec.zstd.bufferPool.enabled}
528+
* config. Reads the full decompressed output in a single pass via
529+
* {@link InputStream#readNBytes(int)}.
519530
*/
520531
static class ZstdBytesDecompressor extends BytesDecompressor {
532+
private final BufferPool bufferPool;
533+
534+
ZstdBytesDecompressor(BufferPool bufferPool) {
535+
this.bufferPool = bufferPool;
536+
}
521537

522538
@Override
523539
public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException {
524540
try (ZstdInputStreamNoFinalizer zis =
525-
new ZstdInputStreamNoFinalizer(bytes.toInputStream(), RecyclingBufferPool.INSTANCE)) {
541+
new ZstdInputStreamNoFinalizer(bytes.toInputStream(), bufferPool)) {
526542
byte[] output = new byte[decompressedSize];
527543
int offset = 0;
528544
while (offset < decompressedSize) {
@@ -544,7 +560,7 @@ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output,
544560
input.get(inputBytes);
545561
ByteArrayInputStream bais = new ByteArrayInputStream(inputBytes);
546562
try (ZstdInputStreamNoFinalizer zis =
547-
new ZstdInputStreamNoFinalizer(bais, RecyclingBufferPool.INSTANCE)) {
563+
new ZstdInputStreamNoFinalizer(bais, bufferPool)) {
548564
byte[] outputBytes = new byte[decompressedSize];
549565
int offset = 0;
550566
while (offset < decompressedSize) {
@@ -657,6 +673,13 @@ public void release() {}
657673
* calls and reset via {@link Deflater#reset()}, avoiding native zlib
658674
* state allocation per page. Writes a minimal GZIP header and trailer
659675
* (CRC32 + original size) manually.
676+
*
677+
* <p>Note: this implementation always uses Java's built-in {@link Deflater}
678+
* (java.util.zip / JDK zlib). It does <em>not</em> use Hadoop native libraries,
679+
* so hardware-accelerated compression via Intel ISA-L will not be used even if
680+
* the native libraries are installed. The overhead reduction from bypassing the
681+
* Hadoop codec framework typically outweighs the ISA-L advantage for the page
682+
* sizes used by Parquet.
660683
*/
661684
static class GzipBytesCompressor extends BytesCompressor {
662685
private final Deflater deflater;
@@ -712,6 +735,11 @@ public void release() {
712735
* bypassing Hadoop's GzipCodec and the stream overhead of
713736
* {@link java.util.zip.GZIPInputStream}. Skips the GZIP header, inflates
714737
* into the output buffer, and verifies the CRC32 + size trailer.
738+
*
739+
* <p>Note: this implementation always uses Java's built-in {@link Inflater}
740+
* (java.util.zip / JDK zlib). It does <em>not</em> use Hadoop native libraries,
741+
* so hardware-accelerated decompression via Intel ISA-L will not be used even if
742+
* the native libraries are installed.
715743
*/
716744
static class GzipBytesDecompressor extends BytesDecompressor {
717745
private final Inflater inflater = new Inflater(true);

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,6 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName)
105105
return new ZstdCompressor();
106106
case LZ4_RAW:
107107
return new Lz4RawCompressor();
108-
// todo: create class similar to the SnappyCompressor for zlib and exclude it as
109-
// snappy is above since it also generates allocateDirect calls.
110108
default:
111109
return super.createCompressor(codecName);
112110
}
@@ -121,6 +119,9 @@ protected BytesDecompressor createDecompressor(final CompressionCodecName codecN
121119
return new ZstdDecompressor();
122120
case LZ4_RAW:
123121
return new Lz4RawDecompressor();
122+
case GZIP:
123+
case UNCOMPRESSED:
124+
return super.createDecompressor(codecName);
124125
default:
125126
CompressionCodec codec = getCodec(codecName);
126127
if (codec == null) {

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Random;
2929
import java.util.Set;
3030
import org.apache.hadoop.conf.Configuration;
31-
import org.apache.hadoop.io.compress.CompressionCodec;
3231
import org.apache.parquet.bytes.ByteBufferAllocator;
3332
import org.apache.parquet.bytes.ByteBufferReleaser;
3433
import org.apache.parquet.bytes.BytesInput;
@@ -235,53 +234,63 @@ public void compressionCodecs() {
235234
}
236235
}
237236

238-
static class PublicCodecFactory extends CodecFactory {
239-
// To make getCodec public
237+
@Test
238+
public void compressionLevelGzip() throws IOException {
239+
Configuration config_zlib_1 = new Configuration();
240+
config_zlib_1.set("zlib.compress.level", "1");
240241

241-
public PublicCodecFactory(Configuration configuration, int pageSize) {
242-
super(configuration, pageSize);
243-
}
242+
Configuration config_zlib_9 = new Configuration();
243+
config_zlib_9.set("zlib.compress.level", "9");
244244

245-
public org.apache.hadoop.io.compress.CompressionCodec getCodec(CompressionCodecName name) {
246-
return super.getCodec(name);
247-
}
248-
}
245+
// Generate compressible data so different levels produce different sizes
246+
byte[] data = new byte[64 * 1024];
247+
new Random(42).nextBytes(data);
249248

250-
@Test
251-
public void cachingKeysGzip() {
252-
Configuration config_zlib_2 = new Configuration();
253-
config_zlib_2.set("zlib.compress.level", "2");
249+
final CodecFactory codecFactory_1 = new CodecFactory(config_zlib_1, pageSize);
250+
final CodecFactory codecFactory_9 = new CodecFactory(config_zlib_9, pageSize);
254251

255-
Configuration config_zlib_5 = new Configuration();
256-
config_zlib_5.set("zlib.compress.level", "5");
252+
BytesInputCompressor compressor_1 = codecFactory_1.getCompressor(CompressionCodecName.GZIP);
253+
BytesInputCompressor compressor_9 = codecFactory_9.getCompressor(CompressionCodecName.GZIP);
257254

258-
final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zlib_2, pageSize);
259-
final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zlib_5, pageSize);
255+
long size_1 = compressor_1.compress(BytesInput.from(data)).size();
256+
long size_9 = compressor_9.compress(BytesInput.from(data)).size();
260257

261-
CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.GZIP);
262-
CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.GZIP);
263-
CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.GZIP);
258+
// Level 9 should produce smaller (or equal) output than level 1
259+
Assert.assertTrue(
260+
"Expected level 9 (" + size_9 + ") <= level 1 (" + size_1 + ")",
261+
size_9 <= size_1);
264262

265-
Assert.assertEquals(codec_2_1, codec_2_2);
266-
Assert.assertNotEquals(codec_2_1, codec_5_1);
263+
codecFactory_1.release();
264+
codecFactory_9.release();
267265
}
268266

269267
@Test
270-
public void cachingKeysZstd() {
271-
Configuration config_zstd_2 = new Configuration();
272-
config_zstd_2.set("parquet.compression.codec.zstd.level", "2");
268+
public void compressionLevelZstd() throws IOException {
269+
Configuration config_zstd_1 = new Configuration();
270+
config_zstd_1.set("parquet.compression.codec.zstd.level", "1");
271+
272+
Configuration config_zstd_19 = new Configuration();
273+
config_zstd_19.set("parquet.compression.codec.zstd.level", "19");
274+
275+
// Generate compressible data so different levels produce different sizes
276+
byte[] data = new byte[64 * 1024];
277+
new Random(42).nextBytes(data);
278+
279+
final CodecFactory codecFactory_1 = new CodecFactory(config_zstd_1, pageSize);
280+
final CodecFactory codecFactory_19 = new CodecFactory(config_zstd_19, pageSize);
273281

274-
Configuration config_zstd_5 = new Configuration();
275-
config_zstd_5.set("parquet.compression.codec.zstd.level", "5");
282+
BytesInputCompressor compressor_1 = codecFactory_1.getCompressor(CompressionCodecName.ZSTD);
283+
BytesInputCompressor compressor_19 = codecFactory_19.getCompressor(CompressionCodecName.ZSTD);
276284

277-
final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zstd_2, pageSize);
278-
final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zstd_5, pageSize);
285+
long size_1 = compressor_1.compress(BytesInput.from(data)).size();
286+
long size_19 = compressor_19.compress(BytesInput.from(data)).size();
279287

280-
CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.ZSTD);
281-
CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.ZSTD);
282-
CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.ZSTD);
288+
// Level 19 should produce smaller (or equal) output than level 1
289+
Assert.assertTrue(
290+
"Expected level 19 (" + size_19 + ") <= level 1 (" + size_1 + ")",
291+
size_19 <= size_1);
283292

284-
Assert.assertEquals(codec_2_1, codec_2_2);
285-
Assert.assertNotEquals(codec_2_1, codec_5_1);
293+
codecFactory_1.release();
294+
codecFactory_19.release();
286295
}
287296
}

0 commit comments

Comments
 (0)