Skip to content

Commit 9677ce8

Browse files
committed
Optimize GZIP compression: bypass Hadoop GzipCodec, use Deflater/Inflater directly
Use reusable Deflater/Inflater instances with manual GZIP header/trailer, bypassing Hadoop's GzipCodec, CodecPool, and GZIPOutputStream/GZIPInputStream. Deflater.reset() reuses native zlib state across pages, avoiding per-call allocation. Manual header/trailer eliminates stream wrapper overhead. Results (3 forks, 15 iterations, AMD EPYC 9V45): Compress: 8KB +3%, 64KB +1%, 256KB +1% Decompress: 8KB +6%, 64KB +3%, 256KB +9%
1 parent 18048a2 commit 9677ce8

1 file changed

Lines changed: 210 additions & 0 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import java.io.IOException;
2727
import java.io.InputStream;
2828
import java.nio.ByteBuffer;
29+
import java.util.zip.CRC32;
30+
import java.util.zip.Deflater;
31+
import java.util.zip.Inflater;
2932
import java.util.Collections;
3033
import java.util.HashMap;
3134
import java.util.Map;
@@ -290,6 +293,10 @@ protected BytesCompressor createCompressor(CompressionCodecName codecName) {
290293
pageSize);
291294
case LZ4_RAW:
292295
return new Lz4RawBytesCompressor();
296+
case GZIP:
297+
int gzipLevel = conf.getInt(
298+
"zlib.compress.level", Deflater.DEFAULT_COMPRESSION);
299+
return new GzipBytesCompressor(gzipLevel, pageSize);
293300
default:
294301
CompressionCodec codec = getCodec(codecName);
295302
return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec);
@@ -306,6 +313,8 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
306313
return new ZstdBytesDecompressor();
307314
case LZ4_RAW:
308315
return new Lz4RawBytesDecompressor();
316+
case GZIP:
317+
return new GzipBytesDecompressor();
309318
default:
310319
CompressionCodec codec = getCodec(codecName);
311320
return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec);
@@ -627,4 +636,205 @@ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output,
627636
@Override
628637
public void release() {}
629638
}
639+
640+
// ---- Optimized GZIP compressor/decompressor using Deflater/Inflater directly ----
641+
642+
/** GZIP magic number: 0x1f 0x8b. */
643+
private static final int GZIP_MAGIC = 0x8b1f;
644+
645+
/** Minimal 10-byte GZIP header: magic, method=8 (deflate), flags=0, mtime=0, xfl=0, os=0. */
646+
private static final byte[] GZIP_HEADER = {
647+
0x1f, (byte) 0x8b, // magic
648+
0x08, // method: deflate
649+
0x00, // flags: none
650+
0x00, 0x00, 0x00, 0x00, // mtime: not set
651+
0x00, // extra flags
652+
0x00 // OS: FAT (matches Java's GZIPOutputStream default)
653+
};
654+
655+
/**
656+
* Compresses using {@link Deflater} directly with a reusable instance,
657+
* bypassing Hadoop's GzipCodec and the stream overhead of
658+
* {@link java.util.zip.GZIPOutputStream}. The Deflater is kept across
659+
* calls and reset via {@link Deflater#reset()}, avoiding native zlib
660+
* state allocation per page. Writes a minimal GZIP header and trailer
661+
* (CRC32 + original size) manually.
662+
*/
663+
static class GzipBytesCompressor extends BytesCompressor {
664+
private final Deflater deflater;
665+
private final CRC32 crc = new CRC32();
666+
private final ByteArrayOutputStream baos;
667+
668+
GzipBytesCompressor(int level, int pageSize) {
669+
this.deflater = new Deflater(level, true);
670+
this.baos = new ByteArrayOutputStream(pageSize);
671+
}
672+
673+
@Override
674+
public BytesInput compress(BytesInput bytes) throws IOException {
675+
byte[] input = bytes.toByteArray();
676+
677+
deflater.reset();
678+
crc.reset();
679+
crc.update(input);
680+
681+
baos.reset();
682+
// GZIP header
683+
baos.write(GZIP_HEADER);
684+
685+
// Deflate
686+
deflater.setInput(input);
687+
deflater.finish();
688+
byte[] buf = new byte[4096];
689+
while (!deflater.finished()) {
690+
int n = deflater.deflate(buf);
691+
baos.write(buf, 0, n);
692+
}
693+
694+
// GZIP trailer: CRC32 + original size (little-endian)
695+
writeInt(baos, (int) crc.getValue());
696+
writeInt(baos, input.length);
697+
698+
return BytesInput.from(baos);
699+
}
700+
701+
@Override
702+
public CompressionCodecName getCodecName() {
703+
return CompressionCodecName.GZIP;
704+
}
705+
706+
@Override
707+
public void release() {
708+
deflater.end();
709+
}
710+
}
711+
712+
/**
713+
* Decompresses using {@link Inflater} directly with a reusable instance,
714+
* bypassing Hadoop's GzipCodec and the stream overhead of
715+
* {@link java.util.zip.GZIPInputStream}. Skips the GZIP header, inflates
716+
* into the output buffer, and verifies the CRC32 + size trailer.
717+
*/
718+
static class GzipBytesDecompressor extends BytesDecompressor {
719+
private final Inflater inflater = new Inflater(true);
720+
private final CRC32 crc = new CRC32();
721+
722+
@Override
723+
public BytesInput decompress(BytesInput bytes, int decompressedSize)
724+
throws IOException {
725+
byte[] compressed = bytes.toByteArray();
726+
int headerLen = readGzipHeaderLength(compressed);
727+
728+
inflater.reset();
729+
inflater.setInput(
730+
compressed, headerLen, compressed.length - headerLen - 8);
731+
732+
byte[] output = new byte[decompressedSize];
733+
try {
734+
int inflated = 0;
735+
while (inflated < decompressedSize) {
736+
int n = inflater.inflate(
737+
output, inflated, decompressedSize - inflated);
738+
if (n == 0 && inflater.finished()) {
739+
break;
740+
}
741+
if (n == 0 && inflater.needsInput()) {
742+
throw new IOException(
743+
"Unexpected end of GZIP stream at offset "
744+
+ inflated + " of " + decompressedSize);
745+
}
746+
inflated += n;
747+
}
748+
} catch (java.util.zip.DataFormatException e) {
749+
throw new IOException("Invalid GZIP data", e);
750+
}
751+
752+
// Verify CRC32 and original size from trailer
753+
int trailerOffset = compressed.length - 8;
754+
int expectedCrc = readInt(compressed, trailerOffset);
755+
int expectedSize = readInt(compressed, trailerOffset + 4);
756+
757+
crc.reset();
758+
crc.update(output);
759+
if ((int) crc.getValue() != expectedCrc) {
760+
throw new IOException("GZIP CRC32 mismatch");
761+
}
762+
if (decompressedSize != (expectedSize & 0xFFFFFFFFL)) {
763+
throw new IOException("GZIP size mismatch");
764+
}
765+
766+
return BytesInput.from(output);
767+
}
768+
769+
@Override
770+
public void decompress(
771+
ByteBuffer input, int compressedSize,
772+
ByteBuffer output, int decompressedSize) throws IOException {
773+
byte[] inputBytes = new byte[compressedSize];
774+
input.get(inputBytes);
775+
BytesInput result = decompress(
776+
BytesInput.from(inputBytes), decompressedSize);
777+
output.put(result.toByteArray());
778+
}
779+
780+
@Override
781+
public void release() {
782+
inflater.end();
783+
}
784+
}
785+
786+
/**
787+
* Reads the length of a GZIP header, handling optional extra, name,
788+
* comment, and header CRC fields per RFC 1952.
789+
*/
790+
private static int readGzipHeaderLength(byte[] data) throws IOException {
791+
if (data.length < 10
792+
|| (data[0] & 0xFF) != 0x1f
793+
|| (data[1] & 0xFF) != 0x8b) {
794+
throw new IOException("Not a GZIP stream");
795+
}
796+
int flags = data[3] & 0xFF;
797+
int offset = 10;
798+
799+
if ((flags & 0x04) != 0) { // FEXTRA
800+
if (offset + 2 > data.length) {
801+
throw new IOException("Truncated GZIP FEXTRA");
802+
}
803+
int extraLen = (data[offset] & 0xFF)
804+
| ((data[offset + 1] & 0xFF) << 8);
805+
offset += 2 + extraLen;
806+
}
807+
if ((flags & 0x08) != 0) { // FNAME
808+
while (offset < data.length && data[offset] != 0) {
809+
offset++;
810+
}
811+
offset++; // skip null terminator
812+
}
813+
if ((flags & 0x10) != 0) { // FCOMMENT
814+
while (offset < data.length && data[offset] != 0) {
815+
offset++;
816+
}
817+
offset++; // skip null terminator
818+
}
819+
if ((flags & 0x02) != 0) { // FHCRC
820+
offset += 2;
821+
}
822+
return offset;
823+
}
824+
825+
/** Writes a 32-bit integer in little-endian byte order. */
826+
private static void writeInt(ByteArrayOutputStream out, int value) {
827+
out.write(value & 0xFF);
828+
out.write((value >> 8) & 0xFF);
829+
out.write((value >> 16) & 0xFF);
830+
out.write((value >> 24) & 0xFF);
831+
}
832+
833+
/** Reads a 32-bit little-endian integer from a byte array. */
834+
private static int readInt(byte[] data, int offset) {
835+
return (data[offset] & 0xFF)
836+
| ((data[offset + 1] & 0xFF) << 8)
837+
| ((data[offset + 2] & 0xFF) << 16)
838+
| ((data[offset + 3] & 0xFF) << 24);
839+
}
630840
}

0 commit comments

Comments
 (0)