Skip to content

Commit 39fc18c

Browse files
committed
GH-3495: Optimize PlainValuesWriter with direct ByteBuffer slab writes
PlainValuesWriter previously wrote values through a two-layer abstraction: PlainValuesWriter -> LittleEndianDataOutputStream -> CapacityByteArrayOutputStream. Each writeInt() decomposed the int into 4 bytes in a temp writeBuffer[8] array, then dispatched through the OutputStream chain. Since CapacityByteArrayOutputStream already uses ByteBuffer slabs internally, we can write directly to the slab with putInt()/putLong() using LITTLE_ENDIAN byte order -- a single JVM intrinsic on x86/ARM -- eliminating the byte decomposition, temp array, and virtual dispatch. Changes: - CapacityByteArrayOutputStream: set ByteOrder.LITTLE_ENDIAN on newly allocated slabs in addSlab(); add writeInt(int) and writeLong(long) methods that use currentSlab.putInt(v) / currentSlab.putLong(v) directly. - PlainValuesWriter: remove the LittleEndianDataOutputStream field; route writeInteger/writeLong/writeFloat/writeDouble/writeBytes through the underlying CapacityByteArrayOutputStream directly. writeFloat and writeDouble use Float.floatToIntBits / Double.doubleToLongBits + the new writeInt/writeLong methods. getBytes() no longer needs to flush a buffering layer; close() no longer closes the defunct stream. Benchmark (IntEncodingBenchmark.encodePlain, 100k INT32 values per invocation, JMH -wi 3 -i 5 -f 1): Pattern Before (ops/s) After (ops/s) Improvement SEQUENTIAL 26,817,451 52,953,193 +97.5% (2.0x) RANDOM 28,517,312 37,774,036 +32.5% LOW_CARDINALITY 28,705,158 52,819,678 +84.0% HIGH_CARDINALITY 28,595,519 37,862,571 +32.4% The same code path also benefits writeLong, writeFloat, writeDouble, and the length prefix written by writeBytes(Binary).
1 parent 53d7842 commit 39fc18c

2 files changed

Lines changed: 37 additions & 36 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.parquet.bytes.ByteBufferAllocator;
2424
import org.apache.parquet.bytes.BytesInput;
2525
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
26-
import org.apache.parquet.bytes.LittleEndianDataOutputStream;
2726
import org.apache.parquet.column.Encoding;
2827
import org.apache.parquet.column.values.ValuesWriter;
2928
import org.apache.parquet.io.ParquetEncodingException;
@@ -41,66 +40,44 @@ public class PlainValuesWriter extends ValuesWriter {
4140
public static final Charset CHARSET = Charset.forName("UTF-8");
4241

4342
private CapacityByteArrayOutputStream arrayOut;
44-
private LittleEndianDataOutputStream out;
4543

4644
public PlainValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) {
4745
arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator);
48-
out = new LittleEndianDataOutputStream(arrayOut);
4946
}
5047

5148
@Override
5249
public final void writeBytes(Binary v) {
5350
try {
54-
out.writeInt(v.length());
55-
v.writeTo(out);
51+
arrayOut.writeInt(v.length());
52+
v.writeTo(arrayOut);
5653
} catch (IOException e) {
5754
throw new ParquetEncodingException("could not write bytes", e);
5855
}
5956
}
6057

6158
@Override
6259
public final void writeInteger(int v) {
63-
try {
64-
out.writeInt(v);
65-
} catch (IOException e) {
66-
throw new ParquetEncodingException("could not write int", e);
67-
}
60+
arrayOut.writeInt(v);
6861
}
6962

7063
@Override
7164
public final void writeLong(long v) {
72-
try {
73-
out.writeLong(v);
74-
} catch (IOException e) {
75-
throw new ParquetEncodingException("could not write long", e);
76-
}
65+
arrayOut.writeLong(v);
7766
}
7867

7968
@Override
8069
public final void writeFloat(float v) {
81-
try {
82-
out.writeFloat(v);
83-
} catch (IOException e) {
84-
throw new ParquetEncodingException("could not write float", e);
85-
}
70+
arrayOut.writeInt(Float.floatToIntBits(v));
8671
}
8772

8873
@Override
8974
public final void writeDouble(double v) {
90-
try {
91-
out.writeDouble(v);
92-
} catch (IOException e) {
93-
throw new ParquetEncodingException("could not write double", e);
94-
}
75+
arrayOut.writeLong(Double.doubleToLongBits(v));
9576
}
9677

9778
@Override
9879
public void writeByte(int value) {
99-
try {
100-
out.write(value);
101-
} catch (IOException e) {
102-
throw new ParquetEncodingException("could not write byte", e);
103-
}
80+
arrayOut.write(value);
10481
}
10582

10683
@Override
@@ -110,11 +87,6 @@ public long getBufferedSize() {
11087

11188
@Override
11289
public BytesInput getBytes() {
113-
try {
114-
out.flush();
115-
} catch (IOException e) {
116-
throw new ParquetEncodingException("could not write page", e);
117-
}
11890
if (LOG.isDebugEnabled()) LOG.debug("writing a buffer of size {}", arrayOut.size());
11991
return BytesInput.from(arrayOut);
12092
}
@@ -127,7 +99,6 @@ public void reset() {
12799
@Override
128100
public void close() {
129101
arrayOut.close();
130-
out.close();
131102
}
132103

133104
@Override

parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.IOException;
2828
import java.io.OutputStream;
2929
import java.nio.ByteBuffer;
30+
import java.nio.ByteOrder;
3031
import java.util.ArrayList;
3132
import java.util.List;
3233
import org.apache.parquet.OutputStreamCloseException;
@@ -194,6 +195,7 @@ private void addSlab(int minimumSize) {
194195
LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize);
195196

196197
this.currentSlab = allocator.allocate(nextSlabSize);
198+
this.currentSlab.order(ByteOrder.LITTLE_ENDIAN);
197199
this.slabs.add(currentSlab);
198200
this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize);
199201
}
@@ -225,6 +227,34 @@ public void write(byte b[], int off, int len) {
225227
bytesUsed = Math.addExact(bytesUsed, len);
226228
}
227229

230+
/**
231+
* Writes an int in little-endian byte order directly to the underlying slab,
232+
* bypassing intermediate byte array decomposition. Slabs are set to
233+
* {@link ByteOrder#LITTLE_ENDIAN} order so {@code putInt} produces the correct encoding.
234+
*
235+
* @param v the int value to write
236+
*/
237+
public void writeInt(int v) {
238+
if (currentSlab.remaining() < 4) {
239+
addSlab(4);
240+
}
241+
currentSlab.putInt(v);
242+
bytesUsed = Math.addExact(bytesUsed, 4);
243+
}
244+
245+
/**
246+
* Writes a long in little-endian byte order directly to the underlying slab.
247+
*
248+
* @param v the long value to write
249+
*/
250+
public void writeLong(long v) {
251+
if (currentSlab.remaining() < 8) {
252+
addSlab(8);
253+
}
254+
currentSlab.putLong(v);
255+
bytesUsed = Math.addExact(bytesUsed, 8);
256+
}
257+
228258
private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException {
229259
if (buf.hasArray()) {
230260
out.write(buf.array(), buf.arrayOffset(), len);

0 commit comments

Comments
 (0)