Skip to content

Commit e58a2d6

Browse files
committed
Optimize GZIP compression: bypass Hadoop GzipCodec, use Deflater/Inflater directly
Use reusable Deflater/Inflater instances with manual GZIP header/trailer, bypassing Hadoop's GzipCodec, CodecPool, and GZIPOutputStream/GZIPInputStream. Deflater.reset() reuses native zlib state across pages, avoiding per-call allocation. Manual header/trailer eliminates stream wrapper overhead. Results (3 forks, 15 iterations, AMD EPYC 9V45): Compress: 8KB +3%, 64KB +1%, 256KB +1% Decompress: 8KB +6%, 64KB +3%, 256KB +9%
1 parent 77238ec commit e58a2d6

1 file changed

Lines changed: 210 additions & 0 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import java.io.IOException;
2727
import java.io.InputStream;
2828
import java.nio.ByteBuffer;
29+
import java.util.zip.CRC32;
30+
import java.util.zip.Deflater;
31+
import java.util.zip.Inflater;
2932
import java.util.Collections;
3033
import java.util.HashMap;
3134
import java.util.Map;
@@ -288,6 +291,10 @@ protected BytesCompressor createCompressor(CompressionCodecName codecName) {
288291
pageSize);
289292
case LZ4_RAW:
290293
return new Lz4RawBytesCompressor();
294+
case GZIP:
295+
int gzipLevel = conf.getInt(
296+
"zlib.compress.level", Deflater.DEFAULT_COMPRESSION);
297+
return new GzipBytesCompressor(gzipLevel, pageSize);
291298
default:
292299
CompressionCodec codec = getCodec(codecName);
293300
return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec);
@@ -304,6 +311,8 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
304311
return new ZstdBytesDecompressor();
305312
case LZ4_RAW:
306313
return new Lz4RawBytesDecompressor();
314+
case GZIP:
315+
return new GzipBytesDecompressor();
307316
default:
308317
CompressionCodec codec = getCodec(codecName);
309318
return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec);
@@ -625,4 +634,205 @@ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output,
625634
@Override
626635
public void release() {}
627636
}
637+
638+
// ---- Optimized GZIP compressor/decompressor using Deflater/Inflater directly ----
639+
640+
/** GZIP magic number: 0x1f 0x8b. */
641+
private static final int GZIP_MAGIC = 0x8b1f;
642+
643+
/** Minimal 10-byte GZIP header: magic, method=8 (deflate), flags=0, mtime=0, xfl=0, os=0. */
644+
private static final byte[] GZIP_HEADER = {
645+
0x1f, (byte) 0x8b, // magic
646+
0x08, // method: deflate
647+
0x00, // flags: none
648+
0x00, 0x00, 0x00, 0x00, // mtime: not set
649+
0x00, // extra flags
650+
0x00 // OS: FAT (matches Java's GZIPOutputStream default)
651+
};
652+
653+
/**
654+
* Compresses using {@link Deflater} directly with a reusable instance,
655+
* bypassing Hadoop's GzipCodec and the stream overhead of
656+
* {@link java.util.zip.GZIPOutputStream}. The Deflater is kept across
657+
* calls and reset via {@link Deflater#reset()}, avoiding native zlib
658+
* state allocation per page. Writes a minimal GZIP header and trailer
659+
* (CRC32 + original size) manually.
660+
*/
661+
static class GzipBytesCompressor extends BytesCompressor {
662+
private final Deflater deflater;
663+
private final CRC32 crc = new CRC32();
664+
private final ByteArrayOutputStream baos;
665+
666+
GzipBytesCompressor(int level, int pageSize) {
667+
this.deflater = new Deflater(level, true);
668+
this.baos = new ByteArrayOutputStream(pageSize);
669+
}
670+
671+
@Override
672+
public BytesInput compress(BytesInput bytes) throws IOException {
673+
byte[] input = bytes.toByteArray();
674+
675+
deflater.reset();
676+
crc.reset();
677+
crc.update(input);
678+
679+
baos.reset();
680+
// GZIP header
681+
baos.write(GZIP_HEADER);
682+
683+
// Deflate
684+
deflater.setInput(input);
685+
deflater.finish();
686+
byte[] buf = new byte[4096];
687+
while (!deflater.finished()) {
688+
int n = deflater.deflate(buf);
689+
baos.write(buf, 0, n);
690+
}
691+
692+
// GZIP trailer: CRC32 + original size (little-endian)
693+
writeInt(baos, (int) crc.getValue());
694+
writeInt(baos, input.length);
695+
696+
return BytesInput.from(baos);
697+
}
698+
699+
@Override
700+
public CompressionCodecName getCodecName() {
701+
return CompressionCodecName.GZIP;
702+
}
703+
704+
@Override
705+
public void release() {
706+
deflater.end();
707+
}
708+
}
709+
710+
/**
711+
* Decompresses using {@link Inflater} directly with a reusable instance,
712+
* bypassing Hadoop's GzipCodec and the stream overhead of
713+
* {@link java.util.zip.GZIPInputStream}. Skips the GZIP header, inflates
714+
* into the output buffer, and verifies the CRC32 + size trailer.
715+
*/
716+
static class GzipBytesDecompressor extends BytesDecompressor {
717+
private final Inflater inflater = new Inflater(true);
718+
private final CRC32 crc = new CRC32();
719+
720+
@Override
721+
public BytesInput decompress(BytesInput bytes, int decompressedSize)
722+
throws IOException {
723+
byte[] compressed = bytes.toByteArray();
724+
int headerLen = readGzipHeaderLength(compressed);
725+
726+
inflater.reset();
727+
inflater.setInput(
728+
compressed, headerLen, compressed.length - headerLen - 8);
729+
730+
byte[] output = new byte[decompressedSize];
731+
try {
732+
int inflated = 0;
733+
while (inflated < decompressedSize) {
734+
int n = inflater.inflate(
735+
output, inflated, decompressedSize - inflated);
736+
if (n == 0 && inflater.finished()) {
737+
break;
738+
}
739+
if (n == 0 && inflater.needsInput()) {
740+
throw new IOException(
741+
"Unexpected end of GZIP stream at offset "
742+
+ inflated + " of " + decompressedSize);
743+
}
744+
inflated += n;
745+
}
746+
} catch (java.util.zip.DataFormatException e) {
747+
throw new IOException("Invalid GZIP data", e);
748+
}
749+
750+
// Verify CRC32 and original size from trailer
751+
int trailerOffset = compressed.length - 8;
752+
int expectedCrc = readInt(compressed, trailerOffset);
753+
int expectedSize = readInt(compressed, trailerOffset + 4);
754+
755+
crc.reset();
756+
crc.update(output);
757+
if ((int) crc.getValue() != expectedCrc) {
758+
throw new IOException("GZIP CRC32 mismatch");
759+
}
760+
if (decompressedSize != (expectedSize & 0xFFFFFFFFL)) {
761+
throw new IOException("GZIP size mismatch");
762+
}
763+
764+
return BytesInput.from(output);
765+
}
766+
767+
@Override
768+
public void decompress(
769+
ByteBuffer input, int compressedSize,
770+
ByteBuffer output, int decompressedSize) throws IOException {
771+
byte[] inputBytes = new byte[compressedSize];
772+
input.get(inputBytes);
773+
BytesInput result = decompress(
774+
BytesInput.from(inputBytes), decompressedSize);
775+
output.put(result.toByteArray());
776+
}
777+
778+
@Override
779+
public void release() {
780+
inflater.end();
781+
}
782+
}
783+
784+
/**
785+
* Reads the length of a GZIP header, handling optional extra, name,
786+
* comment, and header CRC fields per RFC 1952.
787+
*/
788+
private static int readGzipHeaderLength(byte[] data) throws IOException {
789+
if (data.length < 10
790+
|| (data[0] & 0xFF) != 0x1f
791+
|| (data[1] & 0xFF) != 0x8b) {
792+
throw new IOException("Not a GZIP stream");
793+
}
794+
int flags = data[3] & 0xFF;
795+
int offset = 10;
796+
797+
if ((flags & 0x04) != 0) { // FEXTRA
798+
if (offset + 2 > data.length) {
799+
throw new IOException("Truncated GZIP FEXTRA");
800+
}
801+
int extraLen = (data[offset] & 0xFF)
802+
| ((data[offset + 1] & 0xFF) << 8);
803+
offset += 2 + extraLen;
804+
}
805+
if ((flags & 0x08) != 0) { // FNAME
806+
while (offset < data.length && data[offset] != 0) {
807+
offset++;
808+
}
809+
offset++; // skip null terminator
810+
}
811+
if ((flags & 0x10) != 0) { // FCOMMENT
812+
while (offset < data.length && data[offset] != 0) {
813+
offset++;
814+
}
815+
offset++; // skip null terminator
816+
}
817+
if ((flags & 0x02) != 0) { // FHCRC
818+
offset += 2;
819+
}
820+
return offset;
821+
}
822+
823+
/** Writes a 32-bit integer in little-endian byte order. */
824+
private static void writeInt(ByteArrayOutputStream out, int value) {
825+
out.write(value & 0xFF);
826+
out.write((value >> 8) & 0xFF);
827+
out.write((value >> 16) & 0xFF);
828+
out.write((value >> 24) & 0xFF);
829+
}
830+
831+
/** Reads a 32-bit little-endian integer from a byte array. */
832+
private static int readInt(byte[] data, int offset) {
833+
return (data[offset] & 0xFF)
834+
| ((data[offset + 1] & 0xFF) << 8)
835+
| ((data[offset + 2] & 0xFF) << 16)
836+
| ((data[offset + 3] & 0xFF) << 24);
837+
}
628838
}

0 commit comments

Comments
 (0)