Skip to content

Commit ecd1881

Browse files
committed
GH-3522: Add batch read APIs to ValuesReader hierarchy
Add readIntegers(), readLongs(), readFloats(), readDoubles() batch methods to ValuesReader with default loop-based implementations. Override in: - RunLengthBitPackingHybridDecoder.readInts(): batch across RLE runs and packed groups using Arrays.fill/System.arraycopy - DictionaryValuesReader: batch-decode dictionary IDs first, then batch-lookup values (eliminates per-value IOException try/catch) - DeltaBinaryPackingValuesReader: System.arraycopy from pre-decoded buffer - PlainValuesReader (all types): loop over LittleEndianDataInputStream - ByteStreamSplitValuesReader (all types): indexed ByteBuffer bulk read These APIs enable callers to amortize per-value overhead (virtual dispatch, bounds checks, mode switches) across batches. On the perf branch where the RLE decoder uses ByteBuffer, this yielded +148% RLE throughput and +67% dictionary decode throughput.
1 parent 28593d5 commit ecd1881

10 files changed

Lines changed: 248 additions & 0 deletions

parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,62 @@ public long readLong() {
185185
throw new UnsupportedOperationException();
186186
}
187187

188+
// ---- Batch read methods ----
189+
// Default implementations loop over the per-value methods.
190+
// Subclasses should override with bulk/memcpy-style implementations.
191+
192+
/**
193+
* Reads {@code count} integers into {@code dest} starting at {@code offset}.
194+
*
195+
* @param dest destination array
196+
* @param offset start index in dest
197+
* @param count number of values to read
198+
*/
199+
public void readIntegers(int[] dest, int offset, int count) {
200+
for (int i = 0; i < count; i++) {
201+
dest[offset + i] = readInteger();
202+
}
203+
}
204+
205+
/**
206+
* Reads {@code count} longs into {@code dest} starting at {@code offset}.
207+
*
208+
* @param dest destination array
209+
* @param offset start index in dest
210+
* @param count number of values to read
211+
*/
212+
public void readLongs(long[] dest, int offset, int count) {
213+
for (int i = 0; i < count; i++) {
214+
dest[offset + i] = readLong();
215+
}
216+
}
217+
218+
/**
219+
* Reads {@code count} floats into {@code dest} starting at {@code offset}.
220+
*
221+
* @param dest destination array
222+
* @param offset start index in dest
223+
* @param count number of values to read
224+
*/
225+
public void readFloats(float[] dest, int offset, int count) {
226+
for (int i = 0; i < count; i++) {
227+
dest[offset + i] = readFloat();
228+
}
229+
}
230+
231+
/**
232+
* Reads {@code count} doubles into {@code dest} starting at {@code offset}.
233+
*
234+
* @param dest destination array
235+
* @param offset start index in dest
236+
* @param count number of values to read
237+
*/
238+
public void readDoubles(double[] dest, int offset, int count) {
239+
for (int i = 0; i < count; i++) {
240+
dest[offset + i] = readDouble();
241+
}
242+
}
243+
188244
/**
189245
* Skips the next value in the page
190246
*/

parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,19 @@ protected int nextElementByteOffset() {
4949
return offset;
5050
}
5151

52+
/**
53+
* Advances the stream position by {@code count} elements and returns the byte offset
54+
* of the first element. Used by batch read methods in subclasses.
55+
*/
56+
protected int advanceByteOffset(int count) {
57+
if (indexInStream + count > valuesCount) {
58+
throw new ParquetDecodingException("Byte-stream data was already exhausted.");
59+
}
60+
int offset = indexInStream * elementSizeInBytes;
61+
indexInStream += count;
62+
return offset;
63+
}
64+
5265
// Decode an entire data page
5366
private byte[] decodeData(ByteBuffer encoded, int valuesCount) {
5467
assert encoded.limit() == valuesCount * elementSizeInBytes;

parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForDouble.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,12 @@ public ByteStreamSplitValuesReaderForDouble() {
2727
public double readDouble() {
2828
return decodedDataBuffer.getDouble(nextElementByteOffset());
2929
}
30+
31+
@Override
32+
public void readDoubles(double[] dest, int offset, int count) {
33+
int byteOffset = advanceByteOffset(count);
34+
for (int i = 0; i < count; i++) {
35+
dest[offset + i] = decodedDataBuffer.getDouble(byteOffset + i * 8);
36+
}
37+
}
3038
}

parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForFloat.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,12 @@ public ByteStreamSplitValuesReaderForFloat() {
2727
public float readFloat() {
2828
return decodedDataBuffer.getFloat(nextElementByteOffset());
2929
}
30+
31+
@Override
32+
public void readFloats(float[] dest, int offset, int count) {
33+
int byteOffset = advanceByteOffset(count);
34+
for (int i = 0; i < count; i++) {
35+
dest[offset + i] = decodedDataBuffer.getFloat(byteOffset + i * 4);
36+
}
37+
}
3038
}

parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForInteger.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,12 @@ public ByteStreamSplitValuesReaderForInteger() {
2727
public int readInteger() {
2828
return decodedDataBuffer.getInt(nextElementByteOffset());
2929
}
30+
31+
@Override
32+
public void readIntegers(int[] dest, int offset, int count) {
33+
int byteOffset = advanceByteOffset(count);
34+
for (int i = 0; i < count; i++) {
35+
dest[offset + i] = decodedDataBuffer.getInt(byteOffset + i * 4);
36+
}
37+
}
3038
}

parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReaderForLong.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,12 @@ public ByteStreamSplitValuesReaderForLong() {
2727
public long readLong() {
2828
return decodedDataBuffer.getLong(nextElementByteOffset());
2929
}
30+
31+
@Override
32+
public void readLongs(long[] dest, int offset, int count) {
33+
int byteOffset = advanceByteOffset(count);
34+
for (int i = 0; i < count; i++) {
35+
dest[offset + i] = decodedDataBuffer.getLong(byteOffset + i * 8);
36+
}
37+
}
3038
}

parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,22 @@ public long readLong() {
112112
return valuesBuffer[valuesRead++];
113113
}
114114

115+
@Override
116+
public void readIntegers(int[] dest, int offset, int count) {
117+
checkRead();
118+
for (int i = 0; i < count; i++) {
119+
dest[offset + i] = (int) valuesBuffer[valuesRead + i];
120+
}
121+
valuesRead += count;
122+
}
123+
124+
@Override
125+
public void readLongs(long[] dest, int offset, int count) {
126+
checkRead();
127+
System.arraycopy(valuesBuffer, valuesRead, dest, offset, count);
128+
valuesRead += count;
129+
}
130+
115131
private void checkRead() {
116132
if (valuesRead >= totalValueCount) {
117133
throw new ParquetDecodingException("no more value to read, total value count is " + totalValueCount);

parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,59 @@ public long readLong() {
117117
}
118118
}
119119

120+
@Override
121+
public void readIntegers(int[] dest, int offset, int count) {
122+
try {
123+
// Batch-decode dictionary IDs, then batch-lookup
124+
int[] ids = new int[count];
125+
decoder.readInts(ids, 0, count);
126+
for (int i = 0; i < count; i++) {
127+
dest[offset + i] = dictionary.decodeToInt(ids[i]);
128+
}
129+
} catch (IOException e) {
130+
throw new ParquetDecodingException(e);
131+
}
132+
}
133+
134+
@Override
135+
public void readLongs(long[] dest, int offset, int count) {
136+
try {
137+
int[] ids = new int[count];
138+
decoder.readInts(ids, 0, count);
139+
for (int i = 0; i < count; i++) {
140+
dest[offset + i] = dictionary.decodeToLong(ids[i]);
141+
}
142+
} catch (IOException e) {
143+
throw new ParquetDecodingException(e);
144+
}
145+
}
146+
147+
@Override
148+
public void readFloats(float[] dest, int offset, int count) {
149+
try {
150+
int[] ids = new int[count];
151+
decoder.readInts(ids, 0, count);
152+
for (int i = 0; i < count; i++) {
153+
dest[offset + i] = dictionary.decodeToFloat(ids[i]);
154+
}
155+
} catch (IOException e) {
156+
throw new ParquetDecodingException(e);
157+
}
158+
}
159+
160+
@Override
161+
public void readDoubles(double[] dest, int offset, int count) {
162+
try {
163+
int[] ids = new int[count];
164+
decoder.readInts(ids, 0, count);
165+
for (int i = 0; i < count; i++) {
166+
dest[offset + i] = dictionary.decodeToDouble(ids[i]);
167+
}
168+
} catch (IOException e) {
169+
throw new ParquetDecodingException(e);
170+
}
171+
}
172+
120173
@Override
121174
public void skip() {
122175
try {

parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ public double readDouble() {
7171
throw new ParquetDecodingException("could not read double", e);
7272
}
7373
}
74+
75+
@Override
76+
public void readDoubles(double[] dest, int offset, int count) {
77+
try {
78+
for (int i = 0; i < count; i++) {
79+
dest[offset + i] = in.readDouble();
80+
}
81+
} catch (IOException e) {
82+
throw new ParquetDecodingException("could not read doubles", e);
83+
}
84+
}
7485
}
7586

7687
public static class FloatPlainValuesReader extends PlainValuesReader {
@@ -92,6 +103,17 @@ public float readFloat() {
92103
throw new ParquetDecodingException("could not read float", e);
93104
}
94105
}
106+
107+
@Override
108+
public void readFloats(float[] dest, int offset, int count) {
109+
try {
110+
for (int i = 0; i < count; i++) {
111+
dest[offset + i] = in.readFloat();
112+
}
113+
} catch (IOException e) {
114+
throw new ParquetDecodingException("could not read floats", e);
115+
}
116+
}
95117
}
96118

97119
public static class IntegerPlainValuesReader extends PlainValuesReader {
@@ -113,6 +135,17 @@ public int readInteger() {
113135
throw new ParquetDecodingException("could not read int", e);
114136
}
115137
}
138+
139+
@Override
140+
public void readIntegers(int[] dest, int offset, int count) {
141+
try {
142+
for (int i = 0; i < count; i++) {
143+
dest[offset + i] = in.readInt();
144+
}
145+
} catch (IOException e) {
146+
throw new ParquetDecodingException("could not read ints", e);
147+
}
148+
}
116149
}
117150

118151
public static class LongPlainValuesReader extends PlainValuesReader {
@@ -134,5 +167,16 @@ public long readLong() {
134167
throw new ParquetDecodingException("could not read long", e);
135168
}
136169
}
170+
171+
@Override
172+
public void readLongs(long[] dest, int offset, int count) {
173+
try {
174+
for (int i = 0; i < count; i++) {
175+
dest[offset + i] = in.readLong();
176+
}
177+
} catch (IOException e) {
178+
throw new ParquetDecodingException("could not read longs", e);
179+
}
180+
}
137181
}
138182
}

parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,40 @@ public int readInt() throws IOException {
7777
return result;
7878
}
7979

80+
/**
81+
* Reads {@code count} int values into {@code dest} starting at {@code offset}.
82+
* This avoids per-value virtual dispatch overhead by batching across RLE runs
83+
* and packed groups.
84+
*
85+
* @param dest destination array
86+
* @param offset start index in dest
87+
* @param count number of values to read
88+
*/
89+
public void readInts(int[] dest, int offset, int count) throws IOException {
90+
int remaining = count;
91+
int pos = offset;
92+
while (remaining > 0) {
93+
if (currentCount == 0) {
94+
readNext();
95+
}
96+
int batchSize = Math.min(remaining, currentCount);
97+
switch (mode) {
98+
case RLE:
99+
java.util.Arrays.fill(dest, pos, pos + batchSize, currentValue);
100+
break;
101+
case PACKED:
102+
int startIdx = currentBuffer.length - currentCount;
103+
System.arraycopy(currentBuffer, startIdx, dest, pos, batchSize);
104+
break;
105+
default:
106+
throw new ParquetDecodingException("not a valid mode " + mode);
107+
}
108+
currentCount -= batchSize;
109+
remaining -= batchSize;
110+
pos += batchSize;
111+
}
112+
}
113+
80114
private void readNext() throws IOException {
81115
Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
82116
final int header = BytesUtils.readUnsignedVarInt(in);

0 commit comments

Comments
 (0)