Skip to content

Commit f7fdee5

Browse files
committed
GH-3530: Optimize BYTE_STREAM_SPLIT encoding/decoding
Reader: replace generic ByteBuffer.get() transpose loop in decodeData() with specialized single-pass loops for element sizes 2/4/8/12/16 bytes plus a stream-oriented generic fallback. Bulk-access the backing array directly when available, falling back to a single bulk copy for direct buffers. Writer: replace per-value scatterBytes() (which allocates a temp byte[] and issues N single-byte stream writes) with batched scatter buffers. Int/Long values accumulate in int[]/long[] batches of 64 and flush as bulk write(byte[], off, len) calls -- one per stream. FLBA uses per-stream byte[][] scratch buffers with the same batching strategy. getBufferedSize() now accounts for unflushed batch values. Add JMH benchmarks for scalar encode/decode of all 5 BSS types (FLOAT, DOUBLE, INT32, INT64, FIXED_LEN_BYTE_ARRAY). Add TestDataFactory for deterministic FLBA benchmark data generation. Add unit tests for transpose specializations, batch-boundary crossing, getBufferedSize with partial batches, direct ByteBuffer decode paths, and close/reset with pending unflushed batches.
1 parent c7e7acb commit f7fdee5

6 files changed

Lines changed: 1226 additions & 23 deletions

File tree

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.benchmarks;
20+
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.util.Random;
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.parquet.bytes.ByteBufferInputStream;
26+
import org.apache.parquet.bytes.HeapByteBufferAllocator;
27+
import org.apache.parquet.column.values.ValuesWriter;
28+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader;
29+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble;
30+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFLBA;
31+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFloat;
32+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForInteger;
33+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForLong;
34+
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter;
35+
import org.apache.parquet.io.api.Binary;
36+
import org.openjdk.jmh.annotations.Benchmark;
37+
import org.openjdk.jmh.annotations.BenchmarkMode;
38+
import org.openjdk.jmh.annotations.Fork;
39+
import org.openjdk.jmh.annotations.Level;
40+
import org.openjdk.jmh.annotations.Measurement;
41+
import org.openjdk.jmh.annotations.Mode;
42+
import org.openjdk.jmh.annotations.OperationsPerInvocation;
43+
import org.openjdk.jmh.annotations.OutputTimeUnit;
44+
import org.openjdk.jmh.annotations.Param;
45+
import org.openjdk.jmh.annotations.Scope;
46+
import org.openjdk.jmh.annotations.Setup;
47+
import org.openjdk.jmh.annotations.State;
48+
import org.openjdk.jmh.annotations.Warmup;
49+
import org.openjdk.jmh.infra.Blackhole;
50+
51+
/**
52+
* Decoding-level micro-benchmarks for the BYTE_STREAM_SPLIT encoding across all
53+
* Parquet types that support it: {@code FLOAT}, {@code DOUBLE}, {@code INT32},
54+
* {@code INT64}, and {@code FIXED_LEN_BYTE_ARRAY}.
55+
*
56+
* <p>Fixed-width numeric types are benchmarked directly by top-level methods.
57+
* {@code FIXED_LEN_BYTE_ARRAY} uses an inner {@link FlbaState} parameterised by
58+
* {@code fixedLength} to avoid cross-product pollution with the numeric benchmarks.
59+
*
60+
* <p>Each invocation decodes {@value #VALUE_COUNT} values; throughput is reported
61+
* per-value via {@link OperationsPerInvocation}. The cost includes both
62+
* {@code initFromPage} (which eagerly transposes the entire page) and the per-value
63+
* read calls. Page transposition is the part this benchmark is primarily designed
64+
* to exercise.
65+
*/
66+
@BenchmarkMode(Mode.Throughput)
67+
@OutputTimeUnit(TimeUnit.SECONDS)
68+
@Fork(1)
69+
@Warmup(iterations = 3, time = 1)
70+
@Measurement(iterations = 5, time = 1)
71+
@State(Scope.Thread)
72+
public class ByteStreamSplitDecodingBenchmark {
73+
74+
static final int VALUE_COUNT = 100_000;
75+
private static final int INIT_SLAB_SIZE = 64 * 1024;
76+
private static final int PAGE_SIZE = 4 * 1024 * 1024;
77+
78+
private byte[] floatPage;
79+
private byte[] doublePage;
80+
private byte[] intPage;
81+
private byte[] longPage;
82+
83+
@Setup(Level.Trial)
84+
public void setup() throws IOException {
85+
Random random = new Random(42);
86+
int[] intData = new int[VALUE_COUNT];
87+
long[] longData = new long[VALUE_COUNT];
88+
float[] floatData = new float[VALUE_COUNT];
89+
double[] doubleData = new double[VALUE_COUNT];
90+
for (int i = 0; i < VALUE_COUNT; i++) {
91+
intData[i] = random.nextInt();
92+
longData[i] = random.nextLong();
93+
floatData[i] = random.nextFloat();
94+
doubleData[i] = random.nextDouble();
95+
}
96+
97+
{
98+
ValuesWriter w = new ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter(
99+
INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
100+
for (float v : floatData) {
101+
w.writeFloat(v);
102+
}
103+
floatPage = w.getBytes().toByteArray();
104+
w.close();
105+
}
106+
{
107+
ValuesWriter w = new ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter(
108+
INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
109+
for (double v : doubleData) {
110+
w.writeDouble(v);
111+
}
112+
doublePage = w.getBytes().toByteArray();
113+
w.close();
114+
}
115+
{
116+
ValuesWriter w = new ByteStreamSplitValuesWriter.IntegerByteStreamSplitValuesWriter(
117+
INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
118+
for (int v : intData) {
119+
w.writeInteger(v);
120+
}
121+
intPage = w.getBytes().toByteArray();
122+
w.close();
123+
}
124+
{
125+
ValuesWriter w = new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter(
126+
INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
127+
for (long v : longData) {
128+
w.writeLong(v);
129+
}
130+
longPage = w.getBytes().toByteArray();
131+
w.close();
132+
}
133+
}
134+
135+
private static void init(ByteStreamSplitValuesReader r, byte[] page) throws IOException {
136+
r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(ByteBuffer.wrap(page)));
137+
}
138+
139+
private static void initDirect(ByteStreamSplitValuesReader r, ByteBuffer directPage) throws IOException {
140+
directPage.clear(); // reset position to 0 for each invocation
141+
r.initFromPage(VALUE_COUNT, ByteBufferInputStream.wrap(directPage));
142+
}
143+
144+
@Benchmark
145+
@OperationsPerInvocation(VALUE_COUNT)
146+
public void decodeFloat(Blackhole bh) throws IOException {
147+
ByteStreamSplitValuesReaderForFloat r = new ByteStreamSplitValuesReaderForFloat();
148+
init(r, floatPage);
149+
for (int i = 0; i < VALUE_COUNT; i++) {
150+
bh.consume(r.readFloat());
151+
}
152+
}
153+
154+
@Benchmark
155+
@OperationsPerInvocation(VALUE_COUNT)
156+
public void decodeDouble(Blackhole bh) throws IOException {
157+
ByteStreamSplitValuesReaderForDouble r = new ByteStreamSplitValuesReaderForDouble();
158+
init(r, doublePage);
159+
for (int i = 0; i < VALUE_COUNT; i++) {
160+
bh.consume(r.readDouble());
161+
}
162+
}
163+
164+
@Benchmark
165+
@OperationsPerInvocation(VALUE_COUNT)
166+
public void decodeInt(Blackhole bh) throws IOException {
167+
ByteStreamSplitValuesReaderForInteger r = new ByteStreamSplitValuesReaderForInteger();
168+
init(r, intPage);
169+
for (int i = 0; i < VALUE_COUNT; i++) {
170+
bh.consume(r.readInteger());
171+
}
172+
}
173+
174+
@Benchmark
175+
@OperationsPerInvocation(VALUE_COUNT)
176+
public void decodeLong(Blackhole bh) throws IOException {
177+
ByteStreamSplitValuesReaderForLong r = new ByteStreamSplitValuesReaderForLong();
178+
init(r, longPage);
179+
for (int i = 0; i < VALUE_COUNT; i++) {
180+
bh.consume(r.readLong());
181+
}
182+
}
183+
184+
// ---- FIXED_LEN_BYTE_ARRAY (parameterised by fixedLength) ----
185+
186+
@State(Scope.Thread)
187+
public static class FlbaState {
188+
@Param({"2", "7", "12", "16"})
189+
public int fixedLength;
190+
191+
byte[] flbaPage;
192+
193+
@Setup(Level.Trial)
194+
public void setup() throws IOException {
195+
Binary[] data = TestDataFactory.generateFixedLenByteArrays(
196+
VALUE_COUNT, fixedLength, 0, TestDataFactory.DEFAULT_SEED);
197+
ValuesWriter w = new ByteStreamSplitValuesWriter.FixedLenByteArrayByteStreamSplitValuesWriter(
198+
fixedLength, INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
199+
for (Binary v : data) {
200+
w.writeBytes(v);
201+
}
202+
flbaPage = w.getBytes().toByteArray();
203+
w.close();
204+
}
205+
}
206+
207+
@Benchmark
208+
@OperationsPerInvocation(VALUE_COUNT)
209+
public void decodeFlba(FlbaState state, Blackhole bh) throws IOException {
210+
ByteStreamSplitValuesReaderForFLBA r = new ByteStreamSplitValuesReaderForFLBA(state.fixedLength);
211+
init(r, state.flbaPage);
212+
for (int i = 0; i < VALUE_COUNT; i++) {
213+
bh.consume(r.readBytes());
214+
}
215+
}
216+
217+
// ---- Direct ByteBuffer decode (exercises the !hasArray() path in decodeData) ----
218+
219+
@State(Scope.Thread)
220+
public static class DirectBufferState {
221+
ByteBuffer directFloatPage;
222+
ByteBuffer directLongPage;
223+
224+
@Setup(Level.Trial)
225+
public void setup() throws IOException {
226+
Random random = new Random(42);
227+
{
228+
ValuesWriter w = new ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter(
229+
INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
230+
for (int i = 0; i < VALUE_COUNT; i++) {
231+
w.writeFloat(random.nextFloat());
232+
}
233+
byte[] page = w.getBytes().toByteArray();
234+
w.close();
235+
directFloatPage = ByteBuffer.allocateDirect(page.length);
236+
directFloatPage.put(page);
237+
directFloatPage.flip();
238+
}
239+
{
240+
ValuesWriter w = new ByteStreamSplitValuesWriter.LongByteStreamSplitValuesWriter(
241+
INIT_SLAB_SIZE, PAGE_SIZE, new HeapByteBufferAllocator());
242+
for (int i = 0; i < VALUE_COUNT; i++) {
243+
w.writeLong(random.nextLong());
244+
}
245+
byte[] page = w.getBytes().toByteArray();
246+
w.close();
247+
directLongPage = ByteBuffer.allocateDirect(page.length);
248+
directLongPage.put(page);
249+
directLongPage.flip();
250+
}
251+
}
252+
}
253+
254+
@Benchmark
255+
@OperationsPerInvocation(VALUE_COUNT)
256+
public void decodeFloatDirect(DirectBufferState state, Blackhole bh) throws IOException {
257+
ByteStreamSplitValuesReaderForFloat r = new ByteStreamSplitValuesReaderForFloat();
258+
initDirect(r, state.directFloatPage);
259+
for (int i = 0; i < VALUE_COUNT; i++) {
260+
bh.consume(r.readFloat());
261+
}
262+
}
263+
264+
@Benchmark
265+
@OperationsPerInvocation(VALUE_COUNT)
266+
public void decodeLongDirect(DirectBufferState state, Blackhole bh) throws IOException {
267+
ByteStreamSplitValuesReaderForLong r = new ByteStreamSplitValuesReaderForLong();
268+
initDirect(r, state.directLongPage);
269+
for (int i = 0; i < VALUE_COUNT; i++) {
270+
bh.consume(r.readLong());
271+
}
272+
}
273+
}

0 commit comments

Comments
 (0)