Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
return new NullIntIterator();
}
return new RLEIntIterator(new RunLengthBitPackingHybridDecoder(
BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toInputStream()));
BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toByteBuffer()));
} catch (IOException e) {
throw new ParquetDecodingException("could not read levels in page for col " + path, e);
}
Expand Down Expand Up @@ -832,11 +832,7 @@ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {

@Override
int nextInt() {
try {
return delegate.readInt();
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
return delegate.readInt();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.parquet.column.values.dictionary;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -52,77 +53,50 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO
LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in);
LOG.debug("bit width {}", bitWidth);
decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
ByteBuffer buf = in.slice(in.available());
decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf);
} else {
decoder = new RunLengthBitPackingHybridDecoder(1, in) {
decoder = new RunLengthBitPackingHybridDecoder(1, ByteBuffer.allocate(0)) {
@Override
public int readInt() throws IOException {
throw new IOException("Attempt to read from empty page");
public int readInt() {
throw new ParquetDecodingException("Attempt to read from empty page");
}
};
}
}

@Override
public int readValueDictionaryId() {
try {
return decoder.readInt();
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
return decoder.readInt();
}

@Override
public Binary readBytes() {
try {
return dictionary.decodeToBinary(decoder.readInt());
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
return dictionary.decodeToBinary(decoder.readInt());
}

@Override
public float readFloat() {
try {
return dictionary.decodeToFloat(decoder.readInt());
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
return dictionary.decodeToFloat(decoder.readInt());
}

@Override
public double readDouble() {
try {
return dictionary.decodeToDouble(decoder.readInt());
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
return dictionary.decodeToDouble(decoder.readInt());
}

@Override
public int readInteger() {
try {
return dictionary.decodeToInt(decoder.readInt());
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
return dictionary.decodeToInt(decoder.readInt());
}

@Override
public long readLong() {
try {
return dictionary.decodeToLong(decoder.readInt());
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
return dictionary.decodeToLong(decoder.readInt());
}

@Override
public void skip() {
try {
decoder.readInt(); // Type does not matter as we are just skipping dictionary keys
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
decoder.readInt(); // Type does not matter as we are just skipping dictionary keys
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
*/
package org.apache.parquet.column.values.rle;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bitpacking.BytePacker;
Expand All @@ -42,23 +41,34 @@ private static enum MODE {

private final int bitWidth;
private final BytePacker packer;
private final InputStream in;
private final ByteBuffer buffer;

private MODE mode;
private int currentCount;
private int currentValue;
private int[] currentBuffer;
private int currentBufferLength;

public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) {
// Reusable buffers to avoid per-run allocation in PACKED mode
private int[] packedValuesBuffer = new int[0];
private byte[] packedBytesBuffer = new byte[0];

public RunLengthBitPackingHybridDecoder(int bitWidth, ByteBuffer buffer) {
LOG.debug("decoding bitWidth {}", bitWidth);

Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
this.bitWidth = bitWidth;
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
this.in = in;
this.buffer = buffer.order(ByteOrder.LITTLE_ENDIAN);
}

public int readInt() throws IOException {
/**
* Reads the next int value from the RLE/Bit-Packing hybrid stream.
*
* @return the next decoded integer value
* @throws ParquetDecodingException if a decoding error occurs
*/
public int readInt() {
if (currentCount == 0) {
readNext();
}
Expand All @@ -69,38 +79,53 @@ public int readInt() throws IOException {
result = currentValue;
break;
case PACKED:
result = currentBuffer[currentBuffer.length - 1 - currentCount];
result = currentBuffer[currentBufferLength - 1 - currentCount];
break;
default:
throw new ParquetDecodingException("not a valid mode " + mode);
}
return result;
}

private void readNext() throws IOException {
Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
final int header = BytesUtils.readUnsignedVarInt(in);
private void readNext() {
Preconditions.checkArgument(buffer.hasRemaining(), "Reading past RLE/BitPacking stream.");
final int header = BytesUtils.readUnsignedVarInt(buffer);
mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
case RLE:
currentCount = header >>> 1;
LOG.debug("reading {} values RLE", currentCount);
currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth);
currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(buffer, bitWidth);
break;
case PACKED:
int numGroups = header >>> 1;
currentCount = numGroups * 8;
currentBufferLength = currentCount;
LOG.debug("reading {} values BIT PACKED", currentCount);
currentBuffer = new int[currentCount]; // TODO: reuse a buffer
byte[] bytes = new byte[numGroups * bitWidth];
if (packedValuesBuffer.length < currentCount) {
packedValuesBuffer = new int[currentCount];
}
currentBuffer = packedValuesBuffer;
int bytesRequired = numGroups * bitWidth;
if (packedBytesBuffer.length < bytesRequired) {
packedBytesBuffer = new byte[bytesRequired];
}
// At the end of the file RLE data though, there might not be that many bytes left.
int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0);
bytesToRead = Math.min(bytesToRead, in.available());
new DataInputStream(in).readFully(bytes, 0, bytesToRead);
for (int valueIndex = 0, byteIndex = 0;
valueIndex < currentCount;
valueIndex += 8, byteIndex += bitWidth) {
packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex);
bytesToRead = Math.min(bytesToRead, buffer.remaining());
buffer.get(packedBytesBuffer, 0, bytesToRead);
// Unpack 32 values (4 groups) at a time when possible — symmetric to the encoder's
// pack32Values fast path. Falls back to unpack8Values for any residual groups.
int groupIdx = 0;
int byteIndex = 0;
final int step32 = bitWidth * 4;
while (groupIdx + 4 <= numGroups) {
packer.unpack32Values(packedBytesBuffer, byteIndex, currentBuffer, groupIdx * 8);
groupIdx += 4;
byteIndex += step32;
}
for (; groupIdx < numGroups; groupIdx++, byteIndex += bitWidth) {
packer.unpack8Values(packedBytesBuffer, byteIndex, currentBuffer, groupIdx * 8);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable {
*/
private final byte[] packBuffer;

/**
* Buffer four 8-value groups so we can use the packer's 32-value fast path.
*/
private final int[] bitPackedValuesBuffer;

/**
* Previous value written, used to detect repeated values
*/
Expand All @@ -98,6 +103,8 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable {
*/
private int bitPackedGroupCount;

private int numBitPackedValues;

/**
* A "pointer" to a single byte in baos,
* which we use as our bit-packed-header. It's really
Expand Down Expand Up @@ -125,7 +132,8 @@ public RunLengthBitPackingHybridEncoder(

this.bitWidth = bitWidth;
this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator);
this.packBuffer = new byte[bitWidth];
this.packBuffer = new byte[bitWidth * 4];
this.bitPackedValuesBuffer = new int[32];
this.bufferedValues = new int[8];
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
reset(false);
Expand All @@ -139,6 +147,7 @@ private void reset(boolean resetBaos) {
this.numBufferedValues = 0;
this.repeatCount = 0;
this.bitPackedGroupCount = 0;
this.numBitPackedValues = 0;
this.bitPackedRunHeaderPointer = -1;
this.toBytesCalled = false;
}
Expand Down Expand Up @@ -196,8 +205,9 @@ private void writeOrAppendBitPackedRun() throws IOException {
bitPackedRunHeaderPointer = baos.getCurrentIndex();
}

packer.pack8Values(bufferedValues, 0, packBuffer, 0);
baos.write(packBuffer);
System.arraycopy(bufferedValues, 0, bitPackedValuesBuffer, numBitPackedValues, 8);
numBitPackedValues += 8;
flushBitPackedValuesIfFull();

// empty the buffer, they've all been written
numBufferedValues = 0;
Expand All @@ -209,6 +219,34 @@ private void writeOrAppendBitPackedRun() throws IOException {
++bitPackedGroupCount;
}

private void flushBitPackedValuesIfFull() {
if (numBitPackedValues == bitPackedValuesBuffer.length) {
packer.pack32Values(bitPackedValuesBuffer, 0, packBuffer, 0);
baos.write(packBuffer, 0, bitWidth * 4);
numBitPackedValues = 0;
}
}

private void flushBitPackedValues() {
if (numBitPackedValues == 0) {
return;
}

if (numBitPackedValues == bitPackedValuesBuffer.length) {
packer.pack32Values(bitPackedValuesBuffer, 0, packBuffer, 0);
baos.write(packBuffer, 0, bitWidth * 4);
} else {
int outPos = 0;
for (int inPos = 0; inPos < numBitPackedValues; inPos += 8) {
packer.pack8Values(bitPackedValuesBuffer, inPos, packBuffer, outPos);
outPos += bitWidth;
}
baos.write(packBuffer, 0, outPos);
}

numBitPackedValues = 0;
}

/**
* If we are currently writing a bit-packed-run, update the
* bit-packed-header and consider this run to be over
Expand All @@ -221,6 +259,8 @@ private void endPreviousBitPackedRun() {
return;
}

flushBitPackedValues();

// create bit-packed-header, which needs to fit in 1 byte
byte bitPackHeader = (byte) ((bitPackedGroupCount << 1) | 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.parquet.column.values.rle;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;

/**
* This ValuesReader does all the reading in {@link #initFromPage}
Expand All @@ -39,19 +39,16 @@ public RunLengthBitPackingHybridValuesReader(int bitWidth) {
@Override
public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws IOException {
int length = BytesUtils.readIntLittleEndian(stream);
this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, stream.sliceStream(length));
ByteBuffer buf = stream.slice(length);
this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf);

// 4 is for the length which is stored as 4 bytes little endian
updateNextOffset(length + 4);
}

@Override
public int readInteger() {
try {
return decoder.readInt();
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
return decoder.readInt();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;

import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.junit.Test;

Expand Down Expand Up @@ -57,9 +56,8 @@ private void doIntegrationTest(int bitWidth) throws Exception {
encoder.writeInt((int) (17 % modValue));
}
ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer();
ByteBufferInputStream in = ByteBufferInputStream.wrap(encodedBytes);

RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, encodedBytes);

for (int i = 0; i < 100; i++) {
assertEquals(i % modValue, decoder.readInt());
Expand Down
Loading
Loading