2323import com .github .luben .zstd .RecyclingBufferPool ;
2424import com .github .luben .zstd .ZstdInputStreamNoFinalizer ;
2525import com .github .luben .zstd .ZstdOutputStreamNoFinalizer ;
26+ import io .airlift .compress .lz4 .Lz4Compressor ;
27+ import io .airlift .compress .lz4 .Lz4Decompressor ;
2628import java .io .ByteArrayInputStream ;
2729import java .io .ByteArrayOutputStream ;
2830import java .io .IOException ;
2931import java .io .InputStream ;
3032import java .nio .ByteBuffer ;
31- import java .util .zip .CRC32 ;
32- import java .util .zip .Deflater ;
33- import java .util .zip .Inflater ;
3433import java .util .Collections ;
3534import java .util .HashMap ;
3635import java .util .Map ;
3736import java .util .Objects ;
37+ import java .util .zip .CRC32 ;
38+ import java .util .zip .Deflater ;
39+ import java .util .zip .Inflater ;
3840import org .apache .hadoop .conf .Configuration ;
3941import org .apache .hadoop .io .compress .CodecPool ;
4042import org .apache .hadoop .io .compress .CompressionCodec ;
@@ -288,21 +290,23 @@ protected BytesCompressor createCompressor(CompressionCodecName codecName) {
288290 return new SnappyBytesCompressor ();
289291 case ZSTD :
290292 BufferPool zstdCompressPool = conf .getBoolean (
291- ZstandardCodec .PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED ,
292- ZstandardCodec .DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED )
293- ? RecyclingBufferPool .INSTANCE : NoPool .INSTANCE ;
293+ ZstandardCodec .PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED ,
294+ ZstandardCodec .DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED )
295+ ? RecyclingBufferPool .INSTANCE
296+ : NoPool .INSTANCE ;
294297 return new ZstdBytesCompressor (
295298 conf .getInt (
296- ZstandardCodec .PARQUET_COMPRESS_ZSTD_LEVEL , ZstandardCodec .DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL ),
299+ ZstandardCodec .PARQUET_COMPRESS_ZSTD_LEVEL ,
300+ ZstandardCodec .DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL ),
297301 conf .getInt (
298- ZstandardCodec .PARQUET_COMPRESS_ZSTD_WORKERS , ZstandardCodec .DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS ),
302+ ZstandardCodec .PARQUET_COMPRESS_ZSTD_WORKERS ,
303+ ZstandardCodec .DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS ),
299304 pageSize ,
300305 zstdCompressPool );
301306 case LZ4_RAW :
302307 return new Lz4RawBytesCompressor ();
303308 case GZIP :
304- int gzipLevel = conf .getInt (
305- "zlib.compress.level" , Deflater .DEFAULT_COMPRESSION );
309+ int gzipLevel = conf .getInt ("zlib.compress.level" , Deflater .DEFAULT_COMPRESSION );
306310 return new GzipBytesCompressor (gzipLevel , pageSize );
307311 default :
308312 CompressionCodec codec = getCodec (codecName );
@@ -318,9 +322,10 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
318322 return new SnappyBytesDecompressor ();
319323 case ZSTD :
320324 BufferPool zstdDecompressPool = conf .getBoolean (
321- ZstandardCodec .PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED ,
322- ZstandardCodec .DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED )
323- ? RecyclingBufferPool .INSTANCE : NoPool .INSTANCE ;
325+ ZstandardCodec .PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED ,
326+ ZstandardCodec .DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED )
327+ ? RecyclingBufferPool .INSTANCE
328+ : NoPool .INSTANCE ;
324329 return new ZstdBytesDecompressor (zstdDecompressPool );
325330 case LZ4_RAW :
326331 return new Lz4RawBytesDecompressor ();
@@ -539,8 +544,7 @@ static class ZstdBytesDecompressor extends BytesDecompressor {
539544
540545 @ Override
541546 public BytesInput decompress (BytesInput bytes , int decompressedSize ) throws IOException {
542- try (ZstdInputStreamNoFinalizer zis =
543- new ZstdInputStreamNoFinalizer (bytes .toInputStream (), bufferPool )) {
547+ try (ZstdInputStreamNoFinalizer zis = new ZstdInputStreamNoFinalizer (bytes .toInputStream (), bufferPool )) {
544548 byte [] output = new byte [decompressedSize ];
545549 int offset = 0 ;
546550 while (offset < decompressedSize ) {
@@ -561,8 +565,7 @@ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output,
561565 byte [] inputBytes = new byte [compressedSize ];
562566 input .get (inputBytes );
563567 ByteArrayInputStream bais = new ByteArrayInputStream (inputBytes );
564- try (ZstdInputStreamNoFinalizer zis =
565- new ZstdInputStreamNoFinalizer (bais , bufferPool )) {
568+ try (ZstdInputStreamNoFinalizer zis = new ZstdInputStreamNoFinalizer (bais , bufferPool )) {
566569 byte [] outputBytes = new byte [decompressedSize ];
567570 int offset = 0 ;
568571 while (offset < decompressedSize ) {
@@ -591,8 +594,7 @@ public void release() {
591594 * buffer copies.
592595 */
593596 static class Lz4RawBytesCompressor extends BytesCompressor {
594- private final io .airlift .compress .lz4 .Lz4Compressor compressor =
595- new io .airlift .compress .lz4 .Lz4Compressor ();
597+ private final Lz4Compressor compressor = new Lz4Compressor ();
596598 private byte [] outputBuffer ;
597599
598600 @ Override
@@ -624,8 +626,7 @@ public void release() {
624626 * Decompresses using airlift's LZ4 decompressor directly with heap ByteBuffers.
625627 */
626628 static class Lz4RawBytesDecompressor extends BytesDecompressor {
627- private final io .airlift .compress .lz4 .Lz4Decompressor decompressor =
628- new io .airlift .compress .lz4 .Lz4Decompressor ();
629+ private final Lz4Decompressor decompressor = new Lz4Decompressor ();
629630
630631 @ Override
631632 public BytesInput decompress (BytesInput bytes , int decompressedSize ) throws IOException {
@@ -660,10 +661,14 @@ public void release() {}
660661
661662 /** Minimal 10-byte GZIP header: magic, method=8 (deflate), flags=0, mtime=0, xfl=0, os=0. */
662663 private static final byte [] GZIP_HEADER = {
663- 0x1f , (byte ) 0x8b , // magic
664+ 0x1f ,
665+ (byte ) 0x8b , // magic
664666 0x08 , // method: deflate
665667 0x00 , // flags: none
666- 0x00 , 0x00 , 0x00 , 0x00 , // mtime: not set
668+ 0x00 ,
669+ 0x00 ,
670+ 0x00 ,
671+ 0x00 , // mtime: not set
667672 0x00 , // extra flags
668673 0x00 // OS: FAT (matches Java's GZIPOutputStream default)
669674 };
@@ -748,28 +753,24 @@ static class GzipBytesDecompressor extends BytesDecompressor {
748753 private final CRC32 crc = new CRC32 ();
749754
750755 @ Override
751- public BytesInput decompress (BytesInput bytes , int decompressedSize )
752- throws IOException {
756+ public BytesInput decompress (BytesInput bytes , int decompressedSize ) throws IOException {
753757 byte [] compressed = bytes .toByteArray ();
754758 int headerLen = readGzipHeaderLength (compressed );
755759
756760 inflater .reset ();
757- inflater .setInput (
758- compressed , headerLen , compressed .length - headerLen - 8 );
761+ inflater .setInput (compressed , headerLen , compressed .length - headerLen - 8 );
759762
760763 byte [] output = new byte [decompressedSize ];
761764 try {
762765 int inflated = 0 ;
763766 while (inflated < decompressedSize ) {
764- int n = inflater .inflate (
765- output , inflated , decompressedSize - inflated );
767+ int n = inflater .inflate (output , inflated , decompressedSize - inflated );
766768 if (n == 0 && inflater .finished ()) {
767769 break ;
768770 }
769771 if (n == 0 && inflater .needsInput ()) {
770772 throw new IOException (
771- "Unexpected end of GZIP stream at offset "
772- + inflated + " of " + decompressedSize );
773+ "Unexpected end of GZIP stream at offset " + inflated + " of " + decompressedSize );
773774 }
774775 inflated += n ;
775776 }
@@ -795,13 +796,11 @@ public BytesInput decompress(BytesInput bytes, int decompressedSize)
795796 }
796797
797798 @ Override
798- public void decompress (
799- ByteBuffer input , int compressedSize ,
800- ByteBuffer output , int decompressedSize ) throws IOException {
799+ public void decompress (ByteBuffer input , int compressedSize , ByteBuffer output , int decompressedSize )
800+ throws IOException {
801801 byte [] inputBytes = new byte [compressedSize ];
802802 input .get (inputBytes );
803- BytesInput result = decompress (
804- BytesInput .from (inputBytes ), decompressedSize );
803+ BytesInput result = decompress (BytesInput .from (inputBytes ), decompressedSize );
805804 output .put (result .toByteArray ());
806805 }
807806
@@ -816,9 +815,7 @@ public void release() {
816815 * comment, and header CRC fields per RFC 1952.
817816 */
818817 private static int readGzipHeaderLength (byte [] data ) throws IOException {
819- if (data .length < 10
820- || (data [0 ] & 0xFF ) != 0x1f
821- || (data [1 ] & 0xFF ) != 0x8b ) {
818+ if (data .length < 10 || (data [0 ] & 0xFF ) != 0x1f || (data [1 ] & 0xFF ) != 0x8b ) {
822819 throw new IOException ("Not a GZIP stream" );
823820 }
824821 int flags = data [3 ] & 0xFF ;
@@ -828,8 +825,7 @@ private static int readGzipHeaderLength(byte[] data) throws IOException {
828825 if (offset + 2 > data .length ) {
829826 throw new IOException ("Truncated GZIP FEXTRA" );
830827 }
831- int extraLen = (data [offset ] & 0xFF )
832- | ((data [offset + 1 ] & 0xFF ) << 8 );
828+ int extraLen = (data [offset ] & 0xFF ) | ((data [offset + 1 ] & 0xFF ) << 8 );
833829 offset += 2 + extraLen ;
834830 }
835831 if ((flags & 0x08 ) != 0 ) { // FNAME
0 commit comments