Skip to content

Commit c79ed9c

Browse files
committed
Fix compression codec config gaps from Hadoop bypass: honor ZSTD bufferPool, route GZIP in DirectCodecFactory
Respect parquet.compression.codec.zstd.bufferPool.enabled in the optimized ZstdBytesCompressor/Decompressor (was hardcoded to RecyclingBufferPool). Route GZIP decompression through the optimized path in DirectCodecFactory instead of falling back to the Hadoop codec pool. Remove dead GZIP/ZSTD branches from cacheKey(). Document ISA-L native library bypass in GZIP Javadocs. Replace obsolete Hadoop codec caching tests with end-to-end compression level verification tests.
1 parent 40f02a4 commit c79ed9c

3 files changed

Lines changed: 92 additions & 54 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.parquet.hadoop;
2020

21+
import com.github.luben.zstd.BufferPool;
22+
import com.github.luben.zstd.NoPool;
2123
import com.github.luben.zstd.RecyclingBufferPool;
2224
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
2325
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
@@ -285,12 +287,17 @@ protected BytesCompressor createCompressor(CompressionCodecName codecName) {
285287
case SNAPPY:
286288
return new SnappyBytesCompressor();
287289
case ZSTD:
290+
BufferPool zstdCompressPool = conf.getBoolean(
291+
ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED,
292+
ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)
293+
? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE;
288294
return new ZstdBytesCompressor(
289295
conf.getInt(
290296
ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
291297
conf.getInt(
292298
ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS),
293-
pageSize);
299+
pageSize,
300+
zstdCompressPool);
294301
case LZ4_RAW:
295302
return new Lz4RawBytesCompressor();
296303
case GZIP:
@@ -310,7 +317,11 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
310317
case SNAPPY:
311318
return new SnappyBytesDecompressor();
312319
case ZSTD:
313-
return new ZstdBytesDecompressor();
320+
BufferPool zstdDecompressPool = conf.getBoolean(
321+
ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED,
322+
ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)
323+
? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE;
324+
return new ZstdBytesDecompressor(zstdDecompressPool);
314325
case LZ4_RAW:
315326
return new Lz4RawBytesDecompressor();
316327
case GZIP:
@@ -356,15 +367,9 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
356367
private String cacheKey(CompressionCodecName codecName) {
357368
String level = null;
358369
switch (codecName) {
359-
case GZIP:
360-
level = conf.get("zlib.compress.level");
361-
break;
362370
case BROTLI:
363371
level = conf.get("compression.brotli.quality");
364372
break;
365-
case ZSTD:
366-
level = conf.get("parquet.compression.codec.zstd.level");
367-
break;
368373
default:
369374
// compression level is not supported; ignore it
370375
}
@@ -472,28 +477,32 @@ public void release() {}
472477
/**
473478
* Compresses using zstd-jni's {@link ZstdOutputStreamNoFinalizer} directly,
474479
* bypassing the Hadoop codec framework ({@code ZstandardCodec}, {@code CodecPool},
475-
* {@code CompressionOutputStream} wrapper). Uses {@link RecyclingBufferPool} for the
476-
* internal 128KB output buffer, matching the streaming API's natural buffer size.
480+
* {@code CompressionOutputStream} wrapper). Uses a configurable {@link BufferPool}
481+
* (defaulting to {@link RecyclingBufferPool}) for the internal 128KB output buffer,
482+
* matching the streaming API's natural buffer size. The buffer pool strategy is
483+
* controlled by the {@code parquet.compression.codec.zstd.bufferPool.enabled} config.
477484
* This avoids the overhead of Hadoop codec instantiation and compressor pool management
478485
* while using the same underlying ZSTD streaming path, which is well-optimized for all
479486
* input sizes including large pages (256KB+).
480487
*/
481488
static class ZstdBytesCompressor extends BytesCompressor {
482489
private final int level;
483490
private final int workers;
491+
private final BufferPool bufferPool;
484492
private final ByteArrayOutputStream compressedOutBuffer;
485493

486-
ZstdBytesCompressor(int level, int workers, int pageSize) {
494+
ZstdBytesCompressor(int level, int workers, int pageSize, BufferPool bufferPool) {
487495
this.level = level;
488496
this.workers = workers;
497+
this.bufferPool = bufferPool;
489498
this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
490499
}
491500

492501
@Override
493502
public BytesInput compress(BytesInput bytes) throws IOException {
494503
compressedOutBuffer.reset();
495504
try (ZstdOutputStreamNoFinalizer zos =
496-
new ZstdOutputStreamNoFinalizer(compressedOutBuffer, RecyclingBufferPool.INSTANCE, level)) {
505+
new ZstdOutputStreamNoFinalizer(compressedOutBuffer, bufferPool, level)) {
497506
if (workers > 0) {
498507
zos.setWorkers(workers);
499508
}
@@ -515,16 +524,23 @@ public void release() {
515524

516525
/**
517526
* Decompresses using zstd-jni's {@link ZstdInputStreamNoFinalizer} directly,
518-
* bypassing the Hadoop codec framework. Uses {@link RecyclingBufferPool} for internal
519-
* buffers, matching the streaming decompression path. Reads the full decompressed output
520-
* in a single pass via {@link InputStream#readNBytes(int)}.
527+
* bypassing the Hadoop codec framework. Uses a configurable {@link BufferPool}
528+
* for internal buffers, matching the streaming decompression path. The buffer pool
529+
* strategy is controlled by the {@code parquet.compression.codec.zstd.bufferPool.enabled}
530+
* config. Reads the full decompressed output in a single pass via
531+
* {@link InputStream#readNBytes(int)}.
521532
*/
522533
static class ZstdBytesDecompressor extends BytesDecompressor {
534+
private final BufferPool bufferPool;
535+
536+
ZstdBytesDecompressor(BufferPool bufferPool) {
537+
this.bufferPool = bufferPool;
538+
}
523539

524540
@Override
525541
public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException {
526542
try (ZstdInputStreamNoFinalizer zis =
527-
new ZstdInputStreamNoFinalizer(bytes.toInputStream(), RecyclingBufferPool.INSTANCE)) {
543+
new ZstdInputStreamNoFinalizer(bytes.toInputStream(), bufferPool)) {
528544
byte[] output = new byte[decompressedSize];
529545
int offset = 0;
530546
while (offset < decompressedSize) {
@@ -546,7 +562,7 @@ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output,
546562
input.get(inputBytes);
547563
ByteArrayInputStream bais = new ByteArrayInputStream(inputBytes);
548564
try (ZstdInputStreamNoFinalizer zis =
549-
new ZstdInputStreamNoFinalizer(bais, RecyclingBufferPool.INSTANCE)) {
565+
new ZstdInputStreamNoFinalizer(bais, bufferPool)) {
550566
byte[] outputBytes = new byte[decompressedSize];
551567
int offset = 0;
552568
while (offset < decompressedSize) {
@@ -659,6 +675,13 @@ public void release() {}
659675
* calls and reset via {@link Deflater#reset()}, avoiding native zlib
660676
* state allocation per page. Writes a minimal GZIP header and trailer
661677
* (CRC32 + original size) manually.
678+
*
679+
* <p>Note: this implementation always uses Java's built-in {@link Deflater}
680+
* (java.util.zip / JDK zlib). It does <em>not</em> use Hadoop native libraries,
681+
* so hardware-accelerated compression via Intel ISA-L will not be used even if
682+
* the native libraries are installed. The overhead reduction from bypassing the
683+
* Hadoop codec framework typically outweighs the ISA-L advantage for the page
684+
* sizes used by Parquet.
662685
*/
663686
static class GzipBytesCompressor extends BytesCompressor {
664687
private final Deflater deflater;
@@ -714,6 +737,11 @@ public void release() {
714737
* bypassing Hadoop's GzipCodec and the stream overhead of
715738
* {@link java.util.zip.GZIPInputStream}. Skips the GZIP header, inflates
716739
* into the output buffer, and verifies the CRC32 + size trailer.
740+
*
741+
* <p>Note: this implementation always uses Java's built-in {@link Inflater}
742+
* (java.util.zip / JDK zlib). It does <em>not</em> use Hadoop native libraries,
743+
* so hardware-accelerated decompression via Intel ISA-L will not be used even if
744+
* the native libraries are installed.
717745
*/
718746
static class GzipBytesDecompressor extends BytesDecompressor {
719747
private final Inflater inflater = new Inflater(true);

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,6 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName)
105105
return new ZstdCompressor();
106106
case LZ4_RAW:
107107
return new Lz4RawCompressor();
108-
// todo: create class similar to the SnappyCompressor for zlib and exclude it as
109-
// snappy is above since it also generates allocateDirect calls.
110108
default:
111109
return super.createCompressor(codecName);
112110
}
@@ -121,6 +119,9 @@ protected BytesDecompressor createDecompressor(final CompressionCodecName codecN
121119
return new ZstdDecompressor();
122120
case LZ4_RAW:
123121
return new Lz4RawDecompressor();
122+
case GZIP:
123+
case UNCOMPRESSED:
124+
return super.createDecompressor(codecName);
124125
default:
125126
CompressionCodec codec = getCodec(codecName);
126127
if (codec == null) {

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Random;
2929
import java.util.Set;
3030
import org.apache.hadoop.conf.Configuration;
31-
import org.apache.hadoop.io.compress.CompressionCodec;
3231
import org.apache.parquet.bytes.ByteBufferAllocator;
3332
import org.apache.parquet.bytes.ByteBufferReleaser;
3433
import org.apache.parquet.bytes.BytesInput;
@@ -235,53 +234,63 @@ public void compressionCodecs() {
235234
}
236235
}
237236

238-
static class PublicCodecFactory extends CodecFactory {
239-
// To make getCodec public
237+
@Test
238+
public void compressionLevelGzip() throws IOException {
239+
Configuration config_zlib_1 = new Configuration();
240+
config_zlib_1.set("zlib.compress.level", "1");
240241

241-
public PublicCodecFactory(Configuration configuration, int pageSize) {
242-
super(configuration, pageSize);
243-
}
242+
Configuration config_zlib_9 = new Configuration();
243+
config_zlib_9.set("zlib.compress.level", "9");
244244

245-
public org.apache.hadoop.io.compress.CompressionCodec getCodec(CompressionCodecName name) {
246-
return super.getCodec(name);
247-
}
248-
}
245+
// Generate compressible data so different levels produce different sizes
246+
byte[] data = new byte[64 * 1024];
247+
new Random(42).nextBytes(data);
249248

250-
@Test
251-
public void cachingKeysGzip() {
252-
Configuration config_zlib_2 = new Configuration();
253-
config_zlib_2.set("zlib.compress.level", "2");
249+
final CodecFactory codecFactory_1 = new CodecFactory(config_zlib_1, pageSize);
250+
final CodecFactory codecFactory_9 = new CodecFactory(config_zlib_9, pageSize);
254251

255-
Configuration config_zlib_5 = new Configuration();
256-
config_zlib_5.set("zlib.compress.level", "5");
252+
BytesInputCompressor compressor_1 = codecFactory_1.getCompressor(CompressionCodecName.GZIP);
253+
BytesInputCompressor compressor_9 = codecFactory_9.getCompressor(CompressionCodecName.GZIP);
257254

258-
final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zlib_2, pageSize);
259-
final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zlib_5, pageSize);
255+
long size_1 = compressor_1.compress(BytesInput.from(data)).size();
256+
long size_9 = compressor_9.compress(BytesInput.from(data)).size();
260257

261-
CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.GZIP);
262-
CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.GZIP);
263-
CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.GZIP);
258+
// Level 9 should produce smaller (or equal) output than level 1
259+
Assert.assertTrue(
260+
"Expected level 9 (" + size_9 + ") <= level 1 (" + size_1 + ")",
261+
size_9 <= size_1);
264262

265-
Assert.assertEquals(codec_2_1, codec_2_2);
266-
Assert.assertNotEquals(codec_2_1, codec_5_1);
263+
codecFactory_1.release();
264+
codecFactory_9.release();
267265
}
268266

269267
@Test
270-
public void cachingKeysZstd() {
271-
Configuration config_zstd_2 = new Configuration();
272-
config_zstd_2.set("parquet.compression.codec.zstd.level", "2");
268+
public void compressionLevelZstd() throws IOException {
269+
Configuration config_zstd_1 = new Configuration();
270+
config_zstd_1.set("parquet.compression.codec.zstd.level", "1");
271+
272+
Configuration config_zstd_19 = new Configuration();
273+
config_zstd_19.set("parquet.compression.codec.zstd.level", "19");
274+
275+
// Generate compressible data so different levels produce different sizes
276+
byte[] data = new byte[64 * 1024];
277+
new Random(42).nextBytes(data);
278+
279+
final CodecFactory codecFactory_1 = new CodecFactory(config_zstd_1, pageSize);
280+
final CodecFactory codecFactory_19 = new CodecFactory(config_zstd_19, pageSize);
273281

274-
Configuration config_zstd_5 = new Configuration();
275-
config_zstd_5.set("parquet.compression.codec.zstd.level", "5");
282+
BytesInputCompressor compressor_1 = codecFactory_1.getCompressor(CompressionCodecName.ZSTD);
283+
BytesInputCompressor compressor_19 = codecFactory_19.getCompressor(CompressionCodecName.ZSTD);
276284

277-
final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zstd_2, pageSize);
278-
final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zstd_5, pageSize);
285+
long size_1 = compressor_1.compress(BytesInput.from(data)).size();
286+
long size_19 = compressor_19.compress(BytesInput.from(data)).size();
279287

280-
CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.ZSTD);
281-
CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.ZSTD);
282-
CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.ZSTD);
288+
// Level 19 should produce smaller (or equal) output than level 1
289+
Assert.assertTrue(
290+
"Expected level 19 (" + size_19 + ") <= level 1 (" + size_1 + ")",
291+
size_19 <= size_1);
283292

284-
Assert.assertEquals(codec_2_1, codec_2_2);
285-
Assert.assertNotEquals(codec_2_1, codec_5_1);
293+
codecFactory_1.release();
294+
codecFactory_19.release();
286295
}
287296
}

0 commit comments

Comments
 (0)