Skip to content

Commit ded1a47

Browse files
committed
apacheGH-3522: Use pack32Values and cache packers in delta writers
Delta binary packing writers encoded each 32-value mini block with four pack8Values calls despite the generated packers exposing pack32Values. Switch both int and long writers to the 32-value entry point so each mini block is packed in a single call. Also cache BytePacker/BytePackerForLong instances in per-instance arrays indexed by bit width, avoiding the factory dispatch per miniblock flush. This resolves the TODO at DeltaBinaryPackingValuesWriterForLong.java:119. Combined benchmark for both par12 commits (decode batch unpack32 + encode pack32Values + cached packers): IntEncodingBenchmark (100k INT32 values, JMH -wi 3 -i 5 -f 1): Benchmark Pattern Before (ops/s) After (ops/s) Improvement decodeDelta SEQUENTIAL 903,892,292 1,096,895,285 +21% (1.21x) decodeDelta RANDOM 364,659,977 410,632,530 +13% (1.13x) decodeDelta LOW_CARDINALITY 581,649,861 676,449,008 +16% (1.16x) decodeDelta HIGH_CARDINALITY 370,718,831 506,116,434 +37% (1.37x) encodeDelta SEQUENTIAL 556,155,088 558,868,426 flat encodeDelta RANDOM 360,327,834 376,594,239 +5% encodeDelta LOW_CARDINALITY 412,396,181 434,569,306 +5% encodeDelta HIGH_CARDINALITY 335,702,852 345,528,410 +3% The decode path shows the larger gains because it eliminates per-miniblock ByteBuffer.slice() allocations and uses the faster byte[] unpack path. Encode gains are modest because pack32Values is structurally similar to 4x pack8Values at this optimization level. All 573 parquet-column tests pass.
1 parent 47bcad3 commit ded1a47

2 files changed

Lines changed: 30 additions & 24 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ public class DeltaBinaryPackingValuesWriterForInteger extends DeltaBinaryPacking
6060
*/
6161
private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
6262

63+
/**
64+
* Cache of BytePacker instances indexed by bit width [0, 32].
65+
* Avoids a factory dispatch + array load per miniblock flush.
66+
*/
67+
private final BytePacker[] packerCache = new BytePacker[MAX_BITWIDTH + 1];
68+
6369
public DeltaBinaryPackingValuesWriterForInteger(int slabSize, int pageSize, ByteBufferAllocator allocator) {
6470
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator);
6571
}
@@ -116,18 +122,16 @@ private void flushBlockBuffer() {
116122
for (int i = 0; i < miniBlocksToFlush; i++) {
117123
// writing i th miniblock
118124
int currentBitWidth = bitWidths[i];
119-
int blockOffset = 0;
120-
BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
121-
int miniBlockStart = i * config.miniBlockSizeInValues;
122-
for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) { // 8 values per pack
123-
// mini block is atomic in terms of flushing
124-
// This may write more values when reach to the end of data writing to last mini block,
125-
// since it may not be aligned to miniblock,
126-
// but doesn't matter. The reader uses total count to see if reached the end.
127-
packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
128-
blockOffset += currentBitWidth;
125+
BytePacker packer = packerCache[currentBitWidth];
126+
if (packer == null) {
127+
packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
128+
packerCache[currentBitWidth] = packer;
129129
}
130-
baos.write(miniBlockByteBuffer, 0, blockOffset);
130+
int miniBlockStart = i * config.miniBlockSizeInValues;
131+
// Mini blocks are always flushed as full 32-value groups in the current format.
132+
// Use the packer's 32-value entry point to avoid four pack8Values calls per miniblock.
133+
packer.pack32Values(deltaBlockBuffer, miniBlockStart, miniBlockByteBuffer, 0);
134+
baos.write(miniBlockByteBuffer, 0, currentBitWidth * 4);
131135
}
132136

133137
minDeltaInCurrentBlock = Integer.MAX_VALUE;

parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ public class DeltaBinaryPackingValuesWriterForLong extends DeltaBinaryPackingVal
6060
*/
6161
private long minDeltaInCurrentBlock = Long.MAX_VALUE;
6262

63+
/**
64+
* Cache of BytePackerForLong instances indexed by bit width [0, 64].
65+
* Avoids a factory dispatch + array load per miniblock flush.
66+
*/
67+
private final BytePackerForLong[] packerCache = new BytePackerForLong[MAX_BITWIDTH + 1];
68+
6369
public DeltaBinaryPackingValuesWriterForLong(int slabSize, int pageSize, ByteBufferAllocator allocator) {
6470
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator);
6571
}
@@ -116,20 +122,16 @@ private void flushBlockBuffer() {
116122
for (int i = 0; i < miniBlocksToFlush; i++) {
117123
// writing i th miniblock
118124
int currentBitWidth = bitWidths[i];
119-
int blockOffset = 0;
120-
// TODO: should this cache the packer?
121-
BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(currentBitWidth);
122-
int miniBlockStart = i * config.miniBlockSizeInValues;
123-
// pack values into the miniblock buffer, 8 at a time to get exactly currentBitWidth bytes
124-
for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) {
125-
// mini block is atomic in terms of flushing
126-
// This may write more values when reach to the end of data writing to last mini block,
127-
// since it may not be aligned to miniblock,
128-
// but doesn't matter. The reader uses total count to see if reached the end.
129-
packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
130-
blockOffset += currentBitWidth;
125+
BytePackerForLong packer = packerCache[currentBitWidth];
126+
if (packer == null) {
127+
packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(currentBitWidth);
128+
packerCache[currentBitWidth] = packer;
131129
}
132-
baos.write(miniBlockByteBuffer, 0, blockOffset);
130+
int miniBlockStart = i * config.miniBlockSizeInValues;
131+
// Mini blocks are always flushed as full 32-value groups in the current format.
132+
// Use the packer's 32-value entry point to avoid four pack8Values calls per miniblock.
133+
packer.pack32Values(deltaBlockBuffer, miniBlockStart, miniBlockByteBuffer, 0);
134+
baos.write(miniBlockByteBuffer, 0, currentBitWidth * 4);
133135
}
134136

135137
minDeltaInCurrentBlock = Long.MAX_VALUE;

0 commit comments

Comments
 (0)