Skip to content

Commit 7be05b4

Browse files
Optimizing Variant read path with lazy caching (#3481)
* Fix thread-safety in Variant lazy caches and add comments Co-authored-by: Steve Loughran <stevel@cloudera.com> * Remove unnecessary volatile fields and fix PR comments * Add readUnsignedLittleEndian for bulk ByteBuffer reads and concurrency javadoc * PR comments --------- Co-authored-by: Steve Loughran <stevel@cloudera.com>
1 parent 9428487 commit 7be05b4

5 files changed

Lines changed: 212 additions & 76 deletions

File tree

parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java

Lines changed: 138 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020

2121
import java.math.BigDecimal;
2222
import java.nio.ByteBuffer;
23+
import java.nio.ByteOrder;
2324
import java.util.UUID;
25+
import org.apache.parquet.Preconditions;
2426

2527
/**
2628
* This Variant class holds the Variant-encoded value and metadata binary values.
29+
*
30+
* <p>Concurrency: the byte buffers are read-only and all lazy caches are idempotent,
31+
* so concurrent reads are safe - the worst outcome is a redundant decode. The metadata
32+
* dictionary cache is {@code volatile} for safe publication to child Variants.
2733
*/
2834
public final class Variant {
2935
/**
@@ -36,6 +42,26 @@ public final class Variant {
3642
*/
3743
final ByteBuffer metadata;
3844

45+
/**
46+
* Pre-computed metadata dictionary size.
47+
*/
48+
private final int dictSize;
49+
50+
/**
51+
* Lazy cache for metadata dictionary strings, shared with child Variants.
52+
*/
53+
private volatile String[] metadataCache;
54+
55+
/**
56+
* Lazy cache for the parsed object header.
57+
*/
58+
private VariantUtil.ObjectInfo cachedObjectInfo;
59+
60+
/**
61+
* Lazy cache for the parsed array header.
62+
*/
63+
private VariantUtil.ArrayInfo cachedArrayInfo;
64+
3965
/**
4066
* The threshold to switch from linear search to binary search when looking up a field by key in
4167
* an object. This is a performance optimization to avoid the overhead of binary search for a
@@ -56,17 +82,42 @@ public Variant(byte[] value, int valuePos, int valueLength, byte[] metadata, int
5682
}
5783

5884
public Variant(ByteBuffer value, ByteBuffer metadata) {
59-
// The buffers are read a single-byte at a time, so the endianness of the input buffers
60-
// is not important.
61-
this.value = value.asReadOnlyBuffer();
62-
this.metadata = metadata.asReadOnlyBuffer();
85+
this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
86+
this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
6387

6488
// There is currently only one allowed version.
6589
if ((metadata.get(metadata.position()) & VariantUtil.VERSION_MASK) != VariantUtil.VERSION) {
6690
throw new UnsupportedOperationException(String.format(
6791
"Unsupported variant metadata version: %d",
6892
metadata.get(metadata.position()) & VariantUtil.VERSION_MASK));
6993
}
94+
95+
// Pre-compute dictionary size for lazy metadata cache allocation.
96+
int pos = this.metadata.position();
97+
int metaOffsetSize = ((this.metadata.get(pos) >> 6) & 0x3) + 1;
98+
if (this.metadata.remaining() > 1) {
99+
Preconditions.checkArgument(
100+
this.metadata.remaining() >= 1 + metaOffsetSize,
101+
"variant metadata truncated: offsetSize=" + metaOffsetSize);
102+
this.dictSize = VariantUtil.readUnsignedLittleEndian(this.metadata, pos + 1, metaOffsetSize);
103+
long dictTableEnd = 1L + metaOffsetSize + ((long) this.dictSize + 1) * metaOffsetSize;
104+
Preconditions.checkArgument(
105+
dictTableEnd <= this.metadata.remaining(),
106+
"variant metadata dictionary extends past buffer: dictSize=" + this.dictSize);
107+
} else {
108+
this.dictSize = 0;
109+
}
110+
this.metadataCache = null;
111+
}
112+
113+
/**
114+
* Package-private constructor that shares pre-parsed metadata state from a parent Variant.
115+
*/
116+
Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize) {
117+
this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
118+
this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
119+
this.metadataCache = metadataCache;
120+
this.dictSize = dictSize;
70121
}
71122

72123
public ByteBuffer getValueBuffer() {
@@ -194,7 +245,7 @@ public Type getType() {
194245
* @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT`
195246
*/
196247
public int numObjectElements() {
197-
return VariantUtil.getObjectInfo(value).numElements;
248+
return objectInfo().numElements;
198249
}
199250

200251
/**
@@ -206,22 +257,19 @@ public int numObjectElements() {
206257
* @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT`
207258
*/
208259
public Variant getFieldByKey(String key) {
209-
VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value);
210-
// Use linear search for a short list. Switch to binary search when the length reaches
211-
// `BINARY_SEARCH_THRESHOLD`.
260+
VariantUtil.ObjectInfo info = objectInfo();
261+
int idStart = value.position() + info.idStartOffset;
262+
int offsetStart = value.position() + info.offsetStartOffset;
263+
int dataStart = value.position() + info.dataStartOffset;
264+
212265
if (info.numElements < BINARY_SEARCH_THRESHOLD) {
213266
for (int i = 0; i < info.numElements; ++i) {
214-
ObjectField field = getFieldAtIndex(
215-
i,
216-
value,
217-
metadata,
218-
info.idSize,
219-
info.offsetSize,
220-
value.position() + info.idStartOffset,
221-
value.position() + info.offsetStartOffset,
222-
value.position() + info.dataStartOffset);
223-
if (field.key.equals(key)) {
224-
return field.value;
267+
int id = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize * i, info.idSize);
268+
String fieldKey = getMetadataKeyCached(id);
269+
if (fieldKey.equals(key)) {
270+
int offset = VariantUtil.readUnsignedLittleEndian(
271+
value, offsetStart + info.offsetSize * i, info.offsetSize);
272+
return childVariant(VariantUtil.slice(value, dataStart + offset));
225273
}
226274
}
227275
} else {
@@ -232,22 +280,17 @@ public Variant getFieldByKey(String key) {
232280
// performance optimization, because it can properly handle the case where `low + high`
233281
// overflows int.
234282
int mid = (low + high) >>> 1;
235-
ObjectField field = getFieldAtIndex(
236-
mid,
237-
value,
238-
metadata,
239-
info.idSize,
240-
info.offsetSize,
241-
value.position() + info.idStartOffset,
242-
value.position() + info.offsetStartOffset,
243-
value.position() + info.dataStartOffset);
244-
int cmp = field.key.compareTo(key);
283+
int midId = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize * mid, info.idSize);
284+
String midKey = getMetadataKeyCached(midId);
285+
int cmp = midKey.compareTo(key);
245286
if (cmp < 0) {
246287
low = mid + 1;
247288
} else if (cmp > 0) {
248289
high = mid - 1;
249290
} else {
250-
return field.value;
291+
int offset = VariantUtil.readUnsignedLittleEndian(
292+
value, offsetStart + info.offsetSize * mid, info.offsetSize);
293+
return childVariant(VariantUtil.slice(value, dataStart + offset));
251294
}
252295
}
253296
}
@@ -275,35 +318,14 @@ public ObjectField(String key, Variant value) {
275318
* @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT`
276319
*/
277320
public ObjectField getFieldAtIndex(int idx) {
278-
VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value);
279-
// Use linear search for a short list. Switch to binary search when the length reaches
280-
// `BINARY_SEARCH_THRESHOLD`.
281-
ObjectField field = getFieldAtIndex(
282-
idx,
283-
value,
284-
metadata,
285-
info.idSize,
286-
info.offsetSize,
287-
value.position() + info.idStartOffset,
288-
value.position() + info.offsetStartOffset,
289-
value.position() + info.dataStartOffset);
290-
return field;
291-
}
292-
293-
static ObjectField getFieldAtIndex(
294-
int index,
295-
ByteBuffer value,
296-
ByteBuffer metadata,
297-
int idSize,
298-
int offsetSize,
299-
int idStart,
300-
int offsetStart,
301-
int dataStart) {
302-
// idStart, offsetStart, and dataStart are absolute positions in the `value` buffer.
303-
int id = VariantUtil.readUnsigned(value, idStart + idSize * index, idSize);
304-
int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize);
305-
String key = VariantUtil.getMetadataKey(metadata, id);
306-
Variant v = new Variant(VariantUtil.slice(value, dataStart + offset), metadata);
321+
VariantUtil.ObjectInfo info = objectInfo();
322+
int idStart = value.position() + info.idStartOffset;
323+
int offsetStart = value.position() + info.offsetStartOffset;
324+
int dataStart = value.position() + info.dataStartOffset;
325+
int id = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize * idx, info.idSize);
326+
int offset = VariantUtil.readUnsignedLittleEndian(value, offsetStart + info.offsetSize * idx, info.offsetSize);
327+
String key = getMetadataKeyCached(id);
328+
Variant v = childVariant(VariantUtil.slice(value, dataStart + offset));
307329
return new ObjectField(key, v);
308330
}
309331

@@ -312,7 +334,7 @@ static ObjectField getFieldAtIndex(
312334
* @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY`
313335
*/
314336
public int numArrayElements() {
315-
return VariantUtil.getArrayInfo(value).numElements;
337+
return arrayInfo().numElements;
316338
}
317339

318340
/**
@@ -324,23 +346,66 @@ public int numArrayElements() {
324346
* @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY`
325347
*/
326348
public Variant getElementAtIndex(int index) {
327-
VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(value);
349+
VariantUtil.ArrayInfo info = arrayInfo();
328350
if (index < 0 || index >= info.numElements) {
329351
return null;
330352
}
331-
return getElementAtIndex(
332-
index,
333-
value,
334-
metadata,
335-
info.offsetSize,
336-
value.position() + info.offsetStartOffset,
337-
value.position() + info.dataStartOffset);
353+
int offsetStart = value.position() + info.offsetStartOffset;
354+
int dataStart = value.position() + info.dataStartOffset;
355+
int offset =
356+
VariantUtil.readUnsignedLittleEndian(value, offsetStart + info.offsetSize * index, info.offsetSize);
357+
return childVariant(VariantUtil.slice(value, dataStart + offset));
358+
}
359+
360+
/**
361+
* Creates a child Variant that shares this instance's metadata cache.
362+
*/
363+
private Variant childVariant(ByteBuffer childValue) {
364+
return new Variant(childValue, metadata, metadataCache, dictSize);
338365
}
339366

340-
private static Variant getElementAtIndex(
341-
int index, ByteBuffer value, ByteBuffer metadata, int offsetSize, int offsetStart, int dataStart) {
342-
// offsetStart and dataStart are absolute positions in the `value` buffer.
343-
int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize);
344-
return new Variant(VariantUtil.slice(value, dataStart + offset), metadata);
367+
/**
368+
* Returns the metadata dictionary string for the given ID, caching the result.
369+
*/
370+
String getMetadataKeyCached(int id) {
371+
if (id < 0 || id >= dictSize) {
372+
return VariantUtil.getMetadataKey(metadata, id);
373+
}
374+
// Demand-create shared dictionary cache.
375+
String[] cache = metadataCache;
376+
if (cache == null) {
377+
cache = new String[dictSize];
378+
metadataCache = cache;
379+
}
380+
String key = cache[id];
381+
if (key == null) {
382+
key = VariantUtil.getMetadataKey(metadata, id);
383+
cache[id] = key;
384+
}
385+
return key;
386+
}
387+
388+
/**
389+
* Returns the cached object header, parsing it on first access.
390+
*/
391+
private VariantUtil.ObjectInfo objectInfo() {
392+
VariantUtil.ObjectInfo info = cachedObjectInfo;
393+
if (info == null) {
394+
info = VariantUtil.getObjectInfo(value);
395+
cachedObjectInfo = info;
396+
}
397+
return info;
398+
}
399+
400+
/**
401+
* Returns the cached array header, parsing it on first access.
402+
*/
403+
private VariantUtil.ArrayInfo arrayInfo() {
404+
VariantUtil.ArrayInfo info = cachedArrayInfo;
405+
if (info == null) {
406+
info = VariantUtil.getArrayInfo(value);
407+
cachedArrayInfo = info;
408+
}
409+
return info;
345410
}
346411
}

parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.Collections;
2626
import java.util.Set;
27+
import org.apache.parquet.io.api.Binary;
2728

2829
/**
2930
* Builder for creating Variant value and metadata.
@@ -109,7 +110,14 @@ public void appendEncodedValue(ByteBuffer value) {
109110
*/
110111
public void appendString(String str) {
111112
onAppend();
112-
byte[] data = str.getBytes(StandardCharsets.UTF_8);
113+
writeUTF8bytes(str.getBytes(StandardCharsets.UTF_8));
114+
}
115+
116+
/**
117+
* Write bytes as a UTF8 string.
118+
* @param data data to write; this is not modified.
119+
*/
120+
private void writeUTF8bytes(final byte[] data) {
113121
boolean longStr = data.length > VariantUtil.MAX_SHORT_STR_SIZE;
114122
checkCapacity((longStr ? 1 + VariantUtil.U32_SIZE : 1) + data.length);
115123
if (longStr) {
@@ -125,6 +133,16 @@ public void appendString(String str) {
125133
writePos += data.length;
126134
}
127135

136+
/**
137+
* Given a Binary, append it to the variant as a string.
138+
* Avoids intermediate String creation when unmarshalling from shredded string columns.
139+
* @param binary source data.
140+
*/
141+
void appendAsString(Binary binary) {
142+
onAppend();
143+
writeUTF8bytes(binary.getBytesUnsafe());
144+
}
145+
128146
/**
129147
* Appends a null value to the Variant builder.
130148
*/

parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,14 @@ static class PartiallyShreddedFieldsConverter extends GroupConverter {
233233
PartiallyShreddedFieldsConverter(GroupType fieldsType, ParentConverter<VariantBuilder> parent) {
234234
this.converters = new Converter[fieldsType.getFieldCount()];
235235
this.parent = parent;
236+
ParentConverter<VariantObjectBuilder> newParent = converter -> converter.accept(objectBuilder);
236237

237238
for (int index = 0; index < fieldsType.getFieldCount(); index += 1) {
238239
Type field = fieldsType.getType(index);
239240
Preconditions.checkArgument(!field.isPrimitive(), "Invalid field group: " + field);
240241

241242
String name = field.getName();
242243
shreddedFieldNames.add(name);
243-
ParentConverter<VariantObjectBuilder> newParent = converter -> converter.accept(objectBuilder);
244244
converters[index] = new FieldValueConverter(name, field.asGroupType(), newParent);
245245
}
246246
}
@@ -501,7 +501,7 @@ static class VariantStringConverter extends ShreddedScalarConverter {
501501

502502
@Override
503503
public void addBinary(Binary value) {
504-
parent.build(builder -> builder.appendString(value.toStringUsingUTF8()));
504+
parent.build(builder -> builder.appendAsString(value));
505505
}
506506
}
507507

parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.nio.ByteOrder;
2323
import java.util.Arrays;
2424
import java.util.HashMap;
25+
import org.apache.parquet.Preconditions;
2526

2627
/**
2728
* This class defines constants related to the Variant format and provides functions for
@@ -299,6 +300,28 @@ static int readUnsigned(ByteBuffer bytes, int pos, int numBytes) {
299300
return result;
300301
}
301302

303+
/**
304+
* Fast little-endian unsigned read using bulk ByteBuffer operations.
305+
* Requires the buffer to have {@link java.nio.ByteOrder#LITTLE_ENDIAN} byte order.
306+
* Adapted from Apache Iceberg's VariantUtil.readLittleEndianUnsigned.
307+
*/
308+
static int readUnsignedLittleEndian(ByteBuffer buffer, int pos, int numBytes) {
309+
switch (numBytes) {
310+
case 1:
311+
return buffer.get(pos) & U8_MAX;
312+
case 2:
313+
return buffer.getShort(pos) & U16_MAX;
314+
case 3:
315+
return (buffer.getShort(pos) & U16_MAX) | ((buffer.get(pos + 2) & U8_MAX) << 16);
316+
case 4:
317+
int v = buffer.getInt(pos);
318+
Preconditions.checkArgument(v >= 0, "Failed to read unsigned int. numBytes: " + numBytes);
319+
return v;
320+
default:
321+
throw new IllegalArgumentException(String.format("Invalid numBytes: %d", numBytes));
322+
}
323+
}
324+
302325
/**
303326
* Returns the value type of Variant value `value[pos...]`. It is only legal to call `get*` if
304327
* `getType` returns the corresponding type. For example, it is only legal to call

0 commit comments

Comments
 (0)