Skip to content

Commit 86c8431

Browse files
committed
apacheGH-3493: Optimize PlainValuesReader with direct ByteBuffer reads
Replace the LittleEndianDataInputStream wrapper with direct ByteBuffer access using LITTLE_ENDIAN byte order in PlainValuesReader. Each read{Integer,Long,Float,Double}() previously dispatched through 4 in.read() calls per value and assembled the result with manual bit shifts; it now compiles to a single ByteBuffer get*() JVM intrinsic. In initFromPage, the page data is obtained as a single contiguous ByteBuffer via ByteBufferInputStream.slice(available). The ByteBufferInputStream.slice() method handles both single-buffer (zero-copy view) and multi-buffer (copy into contiguous buffer) cases transparently. In practice page data is almost always a single contiguous buffer. Benchmark (IntEncodingBenchmark.decodePlain, 100k INT32 values per invocation, JMH -wi 3 -i 5 -f 1): Pattern Before (ops/s) After (ops/s) Speedup SEQUENTIAL 427,630,411 5,397,298,681 12.6x RANDOM 431,052,072 5,437,926,758 12.6x LOW_CARDINALITY 423,443,685 5,477,810,011 12.9x HIGH_CARDINALITY 426,405,891 5,485,493,740 12.9x The improvement is consistent regardless of data distribution because the bottleneck was entirely in the dispatch overhead. All four numeric plain reader types (int, long, float, double) benefit equally. All 573 parquet-column tests pass.
1 parent 53d7842 commit 86c8431

2 files changed

Lines changed: 25 additions & 51 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java

Lines changed: 23 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,120 +19,92 @@
1919
package org.apache.parquet.column.values.plain;
2020

2121
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.nio.ByteOrder;
2224
import org.apache.parquet.bytes.ByteBufferInputStream;
2325
import org.apache.parquet.bytes.LittleEndianDataInputStream;
2426
import org.apache.parquet.column.values.ValuesReader;
25-
import org.apache.parquet.io.ParquetDecodingException;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
2829

2930
/**
30-
* Plain encoding for float, double, int, long
31+
* Plain encoding for float, double, int, long.
32+
*
33+
* <p>Reads directly from a {@link ByteBuffer} with {@link ByteOrder#LITTLE_ENDIAN} byte order,
34+
* bypassing the {@link LittleEndianDataInputStream} wrapper to avoid per-value virtual dispatch
35+
* overhead. The underlying page data is obtained as a single contiguous {@link ByteBuffer} via
36+
* {@link ByteBufferInputStream#slice(int)}.
3137
*/
3238
public abstract class PlainValuesReader extends ValuesReader {
3339
private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class);
3440

35-
protected LittleEndianDataInputStream in;
41+
ByteBuffer buffer;
3642

3743
@Override
3844
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
3945
LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
40-
this.in = new LittleEndianDataInputStream(stream.remainingStream());
46+
int available = stream.available();
47+
if (available > 0) {
48+
this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN);
49+
} else {
50+
this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN);
51+
}
4152
}
4253

4354
@Override
4455
public void skip() {
4556
skip(1);
4657
}
4758

48-
void skipBytesFully(int n) throws IOException {
49-
int skipped = 0;
50-
while (skipped < n) {
51-
skipped += in.skipBytes(n - skipped);
52-
}
53-
}
54-
5559
public static class DoublePlainValuesReader extends PlainValuesReader {
5660

5761
@Override
5862
public void skip(int n) {
59-
try {
60-
skipBytesFully(n * 8);
61-
} catch (IOException e) {
62-
throw new ParquetDecodingException("could not skip " + n + " double values", e);
63-
}
63+
buffer.position(buffer.position() + n * 8);
6464
}
6565

6666
@Override
6767
public double readDouble() {
68-
try {
69-
return in.readDouble();
70-
} catch (IOException e) {
71-
throw new ParquetDecodingException("could not read double", e);
72-
}
68+
return buffer.getDouble();
7369
}
7470
}
7571

7672
public static class FloatPlainValuesReader extends PlainValuesReader {
7773

7874
@Override
7975
public void skip(int n) {
80-
try {
81-
skipBytesFully(n * 4);
82-
} catch (IOException e) {
83-
throw new ParquetDecodingException("could not skip " + n + " floats", e);
84-
}
76+
buffer.position(buffer.position() + n * 4);
8577
}
8678

8779
@Override
8880
public float readFloat() {
89-
try {
90-
return in.readFloat();
91-
} catch (IOException e) {
92-
throw new ParquetDecodingException("could not read float", e);
93-
}
81+
return buffer.getFloat();
9482
}
9583
}
9684

9785
public static class IntegerPlainValuesReader extends PlainValuesReader {
9886

9987
@Override
10088
public void skip(int n) {
101-
try {
102-
in.skipBytes(n * 4);
103-
} catch (IOException e) {
104-
throw new ParquetDecodingException("could not skip " + n + " ints", e);
105-
}
89+
buffer.position(buffer.position() + n * 4);
10690
}
10791

10892
@Override
10993
public int readInteger() {
110-
try {
111-
return in.readInt();
112-
} catch (IOException e) {
113-
throw new ParquetDecodingException("could not read int", e);
114-
}
94+
return buffer.getInt();
11595
}
11696
}
11797

11898
public static class LongPlainValuesReader extends PlainValuesReader {
11999

120100
@Override
121101
public void skip(int n) {
122-
try {
123-
in.skipBytes(n * 8);
124-
} catch (IOException e) {
125-
throw new ParquetDecodingException("could not skip " + n + " longs", e);
126-
}
102+
buffer.position(buffer.position() + n * 8);
127103
}
128104

129105
@Override
130106
public long readLong() {
131-
try {
132-
return in.readLong();
133-
} catch (IOException e) {
134-
throw new ParquetDecodingException("could not read long", e);
135-
}
107+
return buffer.getLong();
136108
}
137109
}
138110
}

pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,8 @@
594594
<exclude>org.apache.parquet.internal.column.columnindex.IndexIterator</exclude>
595595
<!-- Removal of a protected method in a class that's not supposed to be subclassed by third-party code -->
596596
<exclude>org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[])</exclude>
597+
<!-- Removal of a protected internal field that should not have been part of the public API -->
598+
<exclude>org.apache.parquet.column.values.plain.PlainValuesReader#in</exclude>
597599
<!-- Due to the removal of deprecated methods -->
598600
<exclude>org.apache.parquet.arrow.schema.SchemaMapping$TypeMappingVisitor#visit(org.apache.parquet.arrow.schema.SchemaMapping$MapTypeMapping)</exclude>
599601
<!-- Intentional removal of deprecated Pig support from parquet-thrift -->

0 commit comments

Comments
 (0)