Skip to content

Commit 0081ccf

Browse files
committed
GH-3466 Improve RunLengthBitPackingHybridDecoder.readNext to avoid per-call buffer allocation and DataInputStream wrapping
1 parent 4c8f4d4 commit 0081ccf

2 files changed

Lines changed: 62 additions & 9 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
*/
1919
package org.apache.parquet.column.values.rle;
2020

21-
import java.io.DataInputStream;
2221
import java.io.IOException;
2322
import java.io.InputStream;
23+
import java.util.Arrays;
2424
import org.apache.parquet.Preconditions;
2525
import org.apache.parquet.bytes.BytesUtils;
2626
import org.apache.parquet.column.values.bitpacking.BytePacker;
@@ -48,6 +48,8 @@ private static enum MODE {
4848
private int currentCount;
4949
private int currentValue;
5050
private int[] currentBuffer;
51+
private int currentBufferLength;
52+
private byte[] packedBytes;
5153

5254
public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) {
5355
LOG.debug("decoding bitWidth {}", bitWidth);
@@ -69,7 +71,7 @@ public int readInt() throws IOException {
6971
result = currentValue;
7072
break;
7173
case PACKED:
72-
result = currentBuffer[currentBuffer.length - 1 - currentCount];
74+
result = currentBuffer[currentBufferLength - 1 - currentCount];
7375
break;
7476
default:
7577
throw new ParquetDecodingException("not a valid mode " + mode);
@@ -90,17 +92,23 @@ private void readNext() throws IOException {
9092
case PACKED:
9193
int numGroups = header >>> 1;
9294
currentCount = numGroups * 8;
95+
currentBufferLength = currentCount;
9396
LOG.debug("reading {} values BIT PACKED", currentCount);
94-
currentBuffer = new int[currentCount]; // TODO: reuse a buffer
95-
byte[] bytes = new byte[numGroups * bitWidth];
96-
// At the end of the file RLE data though, there might not be that many bytes left.
97-
int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0);
98-
bytesToRead = Math.min(bytesToRead, in.available());
99-
new DataInputStream(in).readFully(bytes, 0, bytesToRead);
97+
if (currentBuffer == null || currentBuffer.length < currentCount) {
98+
currentBuffer = new int[currentCount];
99+
}
100+
int bytesNeeded = numGroups * bitWidth;
101+
if (packedBytes == null || packedBytes.length < bytesNeeded) {
102+
packedBytes = new byte[bytesNeeded];
103+
}
104+
int bytesRead = in.readNBytes(packedBytes, 0, bytesNeeded);
105+
if (bytesRead < bytesNeeded) {
106+
Arrays.fill(packedBytes, bytesRead, bytesNeeded, (byte) 0);
107+
}
100108
for (int valueIndex = 0, byteIndex = 0;
101109
valueIndex < currentCount;
102110
valueIndex += 8, byteIndex += bitWidth) {
103-
packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex);
111+
packer.unpack8Values(packedBytes, byteIndex, currentBuffer, valueIndex);
104112
}
105113
break;
106114
default:

parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
*/
1919
package org.apache.parquet.column.values.rle;
2020

21+
import static org.junit.Assert.assertArrayEquals;
2122
import static org.junit.Assert.assertEquals;
2223

2324
import java.io.ByteArrayInputStream;
2425
import java.util.ArrayList;
26+
import java.util.Arrays;
2527
import java.util.List;
2628
import org.apache.parquet.bytes.BytesUtils;
2729
import org.apache.parquet.bytes.DirectByteBufferAllocator;
@@ -298,6 +300,49 @@ public void testGroupBoundary() throws Exception {
298300
assertEquals(stream.available(), 0);
299301
}
300302

303+
@Test
304+
public void testTruncatedPackedRunAfterFullPackedRunDoesNotReuseStaleBytes() throws Exception {
305+
int bitWidth = 3;
306+
BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
307+
308+
int[] firstRunValues = new int[8];
309+
Arrays.fill(firstRunValues, 7);
310+
byte[] firstRunPacked = new byte[bitWidth];
311+
packer.pack8Values(firstRunValues, 0, firstRunPacked, 0);
312+
313+
int[] secondRunValues = {1, 2, 3, 4, 5, 6, 7, 0};
314+
byte[] secondRunPacked = new byte[bitWidth];
315+
packer.pack8Values(secondRunValues, 0, secondRunPacked, 0);
316+
317+
byte[] encoded = {
318+
(byte) ((1 << 1) | 1),
319+
firstRunPacked[0],
320+
firstRunPacked[1],
321+
firstRunPacked[2],
322+
(byte) ((1 << 1) | 1),
323+
secondRunPacked[0]
324+
};
325+
326+
RunLengthBitPackingHybridDecoder decoder =
327+
new RunLengthBitPackingHybridDecoder(bitWidth, new ByteArrayInputStream(encoded));
328+
329+
for (int ignored = 0; ignored < 8; ignored++) {
330+
assertEquals(7, decoder.readInt());
331+
}
332+
333+
int[] actualSecondRun = new int[8];
334+
for (int i = 0; i < 8; i++) {
335+
actualSecondRun[i] = decoder.readInt();
336+
}
337+
338+
byte[] expectedSecondPacked = new byte[bitWidth];
339+
expectedSecondPacked[0] = secondRunPacked[0];
340+
int[] expectedSecondRun = new int[8];
341+
packer.unpack8Values(expectedSecondPacked, 0, expectedSecondRun, 0);
342+
343+
assertArrayEquals(expectedSecondRun, actualSecondRun);
344+
}
345+
301346
private static List<Integer> unpack(int bitWidth, int numValues, ByteArrayInputStream is) throws Exception {
302347

303348
BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);

0 commit comments

Comments
 (0)