Skip to content

Commit 3cc7380

Browse files
committed
NoSQL: Use actual key serialization to split NoSQL index stripes
Base stripe splitting on the actual serialized key shape, including prefix compression, instead of a loose full-key estimate. The estimate was safe but too pessimistic and could over-split perfectly reasonable stripes. Reuse the real serialization walk so the split logic and persisted bytes stay aligned.
1 parent 91923ee commit 3cc7380

4 files changed

Lines changed: 297 additions & 82 deletions

File tree

persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java

Lines changed: 177 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -354,35 +354,67 @@ public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
354354
return List.of(this);
355355
}
356356

357-
var result = new ArrayList<IndexSpi<V>>();
358-
var currentElements = new ArrayList<InternalIndexElement<V>>();
359-
long currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
360-
361-
for (var element : elements) {
362-
var elementSerializedSize =
363-
addElementDiff(element, element.contentSerializedSize(serializer));
364-
if (!currentElements.isEmpty()
365-
&& currentSerializedSize + elementSerializedSize > targetSerializedSize) {
366-
result.add(newSplitPart(currentElements, currentSerializedSize));
367-
currentElements = new ArrayList<>();
368-
currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
369-
}
357+
final class SplitState {
358+
private final List<IndexSpi<V>> result = new ArrayList<>();
359+
private List<InternalIndexElement<V>> currentElements = new ArrayList<>();
360+
private long currentSerializedBodySize;
361+
private ByteBuffer previousSplitKey;
362+
}
370363

371-
currentElements.add(element);
372-
currentSerializedSize += elementSerializedSize;
364+
var splitState = new SplitState();
365+
366+
walkSerializedElements(
367+
(element, keyBuf, previousMaterializationKey) -> {
368+
var valueSerializedSize = element.contentSerializedSize(serializer);
369+
var entrySerializedSize =
370+
serializedEntrySize(splitState.previousSplitKey, keyBuf, valueSerializedSize);
371+
var candidateSerializedSize =
372+
splitSerializedSize(
373+
splitState.currentElements.size() + 1,
374+
splitState.currentSerializedBodySize + entrySerializedSize);
375+
376+
if (!splitState.currentElements.isEmpty()
377+
&& candidateSerializedSize > targetSerializedSize) {
378+
splitState.result.add(
379+
newSplitPart(
380+
splitState.currentElements,
381+
splitSerializedSize(
382+
splitState.currentElements.size(), splitState.currentSerializedBodySize)));
383+
splitState.currentElements = new ArrayList<>();
384+
splitState.currentSerializedBodySize = 0L;
385+
splitState.previousSplitKey = null;
386+
entrySerializedSize = serializedEntrySize(null, keyBuf, valueSerializedSize);
387+
candidateSerializedSize = splitSerializedSize(1, entrySerializedSize);
388+
}
373389

374-
if (currentElements.size() == 1 && currentSerializedSize > targetSerializedSize) {
375-
result.add(newSplitPart(currentElements, currentSerializedSize));
376-
currentElements = new ArrayList<>();
377-
currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
378-
}
379-
}
390+
splitState.currentElements.add(element);
391+
splitState.currentSerializedBodySize += entrySerializedSize;
392+
splitState.previousSplitKey = copyKey(splitState.previousSplitKey, keyBuf);
393+
394+
if (splitState.currentElements.size() == 1
395+
&& candidateSerializedSize > targetSerializedSize) {
396+
splitState.result.add(
397+
newSplitPart(splitState.currentElements, candidateSerializedSize));
398+
splitState.currentElements = new ArrayList<>();
399+
splitState.currentSerializedBodySize = 0L;
400+
splitState.previousSplitKey = null;
401+
}
402+
403+
return copyKey(previousMaterializationKey, keyBuf);
404+
});
380405

381-
if (!currentElements.isEmpty()) {
382-
result.add(newSplitPart(currentElements, currentSerializedSize));
406+
if (splitState.result.isEmpty()) {
407+
return List.of(this);
408+
}
409+
if (!splitState.currentElements.isEmpty()) {
410+
splitState.result.add(
411+
newSplitPart(
412+
splitState.currentElements,
413+
splitSerializedSize(
414+
splitState.currentElements.size(), splitState.currentSerializedBodySize)));
383415
}
384416

385-
return result;
417+
return splitState.result;
386418
}
387419

388420
private IndexSpi<V> newSplitPart(
@@ -396,6 +428,15 @@ private IndexSpi<V> newSplitPart(
396428
partElements, checkedCast(adjustedEstimatedSerializedSize), serializer, true);
397429
}
398430

431+
private static long splitSerializedSize(int elementCount, long serializedBodySize) {
432+
return 1L + varIntLen(elementCount) + serializedBodySize;
433+
}
434+
435+
private static long serializedEntrySize(
436+
@Nullable ByteBuffer previousKey, ByteBuffer keyBuf, int valueSerializedSize) {
437+
return (long) serializedKeyDelta(previousKey, keyBuf).serializedSize() + valueSerializedSize;
438+
}
439+
399440
@Override
400441
public List<IndexSpi<V>> stripes() {
401442
return List.of(this);
@@ -628,55 +669,14 @@ public int estimatedSerializedSize() {
628669
// Serialized segment index version
629670
target.put(CURRENT_STORE_INDEX_VERSION);
630671
putVarInt(target, elements.size());
672+
var serializeTarget = target;
631673

632-
ByteBuffer previousKey = null;
633-
634-
var scratchKeyBuffer = acquireScratchKeyBuffer();
635-
try {
636-
boolean onlyLazy;
637-
InternalIndexElement<V> previous = null;
638-
for (var el : elements) {
639-
ByteBuffer keyBuf = null;
640-
if (isLazyElementImpl(el)) {
641-
var lazyEl = (LazyIndexElement<V>) el;
642-
// The purpose of this 'if'-branch is to determine whether it can serialize the
643-
// 'IndexKey' by _not_ fully materializing the `IndexKey`. This is possible if (and
644-
// only if!) the current and the previous element are `LazyStoreIndexElement`s, where
645-
// the previous element is exactly the one that has been deserialized.
646-
//noinspection RedundantIfStatement
647-
if (lazyEl.prefixLen == 0 || lazyEl.previous == previous) {
648-
// Can use the optimized serialization in `LazyStoreIndexElement` if the current
649-
// element has no prefix of if the previously serialized element was also a
650-
// `LazyStoreIndexElement`. In other words, no intermediate `LazyStoreIndexElement`
651-
// has been removed and no new element has been added.
652-
onlyLazy = true;
653-
} else {
654-
// This if-branch detects whether an element has been removed from the index. In that
655-
// case, serialization has to materialize the `IndexKey` for serialization.
656-
onlyLazy = false;
657-
}
658-
if (onlyLazy) {
659-
// Key serialization via 'LazyStoreIndexElement' is much cheaper (CPU and heap) than
660-
// having to first materialize and then serialize it.
661-
keyBuf = lazyEl.serializeKey(scratchKeyBuffer, previousKey);
662-
}
663-
} else {
664-
onlyLazy = false;
665-
}
666-
667-
if (!onlyLazy) {
668-
// Either 'el' is not a 'LazyStoreIndexElement' or the previous element of a
669-
// 'LazyStoreIndexElement' is not suitable (see above).
670-
keyBuf = serializeIndexKeyString(el.key(), scratchKeyBuffer);
671-
}
672-
673-
previousKey = serializeKey(keyBuf, previousKey, target);
674-
el.serializeContent(serializer, target);
675-
previous = el;
676-
}
677-
} finally {
678-
releaseScratchKeyBuffer(scratchKeyBuffer);
679-
}
674+
walkSerializedElements(
675+
(element, keyBuf, previousMaterializationKey) -> {
676+
var previousKey = serializeKey(keyBuf, previousMaterializationKey, serializeTarget);
677+
element.serializeContent(serializer, serializeTarget);
678+
return previousKey;
679+
});
680680

681681
target = target.flip();
682682
} else {
@@ -690,26 +690,121 @@ private boolean isLazyElementImpl(InternalIndexElement<V> el) {
690690
return el.getClass() == LazyIndexElement.class;
691691
}
692692

693-
private ByteBuffer serializeKey(ByteBuffer keyBuf, ByteBuffer previousKey, ByteBuffer target) {
694-
var keyPos = keyBuf.position();
695-
if (previousKey != null) {
696-
var mismatch = previousKey.mismatch(keyBuf);
697-
checkState(mismatch != -1, "Previous and current keys must not be equal");
698-
var strip = previousKey.remaining() - mismatch;
699-
putVarInt(target, strip);
700-
keyBuf.position(keyPos + mismatch);
693+
@FunctionalInterface
694+
private interface SerializedElementVisitor<V> {
695+
ByteBuffer visit(
696+
InternalIndexElement<V> element,
697+
ByteBuffer keyBuf,
698+
@Nullable ByteBuffer previousMaterializationKey);
699+
}
700+
701+
private void walkSerializedElements(SerializedElementVisitor<V> visitor) {
702+
ByteBuffer previousMaterializationKey = null;
703+
704+
var scratchKeyBuffer = acquireScratchKeyBuffer();
705+
try {
706+
InternalIndexElement<V> previous = null;
707+
for (var el : elements) {
708+
var keyBuf =
709+
materializeSerializedKey(el, previous, previousMaterializationKey, scratchKeyBuffer);
710+
previousMaterializationKey = visitor.visit(el, keyBuf, previousMaterializationKey);
711+
previous = el;
712+
}
713+
} finally {
714+
releaseScratchKeyBuffer(scratchKeyBuffer);
715+
}
716+
}
717+
718+
private ByteBuffer materializeSerializedKey(
719+
InternalIndexElement<V> el,
720+
@Nullable InternalIndexElement<V> previous,
721+
@Nullable ByteBuffer previousKey,
722+
ByteBuffer scratchKeyBuffer) {
723+
boolean onlyLazy;
724+
ByteBuffer keyBuf = null;
725+
if (isLazyElementImpl(el)) {
726+
var lazyEl = (LazyIndexElement<V>) el;
727+
// The purpose of this 'if'-branch is to determine whether it can serialize the
728+
// 'IndexKey' by _not_ fully materializing the `IndexKey`. This is possible if (and
729+
// only if!) the current and the previous element are `LazyStoreIndexElement`s, where
730+
// the previous element is exactly the one that has been deserialized.
731+
//noinspection RedundantIfStatement
732+
if (lazyEl.prefixLen == 0 || lazyEl.previous == previous) {
733+
// Can use the optimized serialization in `LazyStoreIndexElement` if the current
734+
// element has no prefix of if the previously serialized element was also a
735+
// `LazyStoreIndexElement`. In other words, no intermediate `LazyStoreIndexElement`
736+
// has been removed and no new element has been added.
737+
onlyLazy = true;
738+
} else {
739+
// This if-branch detects whether an element has been removed from the index. In that
740+
// case, serialization has to materialize the `IndexKey` for serialization.
741+
onlyLazy = false;
742+
}
743+
if (onlyLazy) {
744+
// Key serialization via 'LazyStoreIndexElement' is much cheaper (CPU and heap) than
745+
// having to first materialize and then serialize it.
746+
keyBuf = lazyEl.serializeKey(scratchKeyBuffer, previousKey);
747+
}
701748
} else {
749+
onlyLazy = false;
750+
}
751+
752+
if (!onlyLazy) {
753+
// Either 'el' is not a 'LazyStoreIndexElement' or the previous element of a
754+
// 'LazyStoreIndexElement' is not suitable (see above).
755+
keyBuf = serializeIndexKeyString(el.key(), scratchKeyBuffer);
756+
}
757+
758+
return keyBuf;
759+
}
760+
761+
private static ByteBuffer serializeKey(
762+
ByteBuffer keyBuf, ByteBuffer previousKey, ByteBuffer target) {
763+
var keyDelta = serializedKeyDelta(previousKey, keyBuf);
764+
if (previousKey == null) {
702765
previousKey = newKeyBuffer();
766+
} else {
767+
putVarInt(target, keyDelta.strip());
768+
keyBuf.position(keyDelta.suffixPosition());
703769
}
704770
target.put(keyBuf);
705771

706772
previousKey.clear();
707-
keyBuf.position(keyPos);
773+
keyBuf.position(keyDelta.keyPosition());
708774
previousKey.put(keyBuf).flip();
709775

710776
return previousKey;
711777
}
712778

779+
private static KeyDelta serializedKeyDelta(@Nullable ByteBuffer previousKey, ByteBuffer keyBuf) {
780+
var keyPosition = keyBuf.position();
781+
if (previousKey == null) {
782+
return new KeyDelta(keyPosition, keyPosition, 0, keyBuf.remaining());
783+
}
784+
785+
var mismatch = previousKey.mismatch(keyBuf);
786+
checkState(mismatch != -1, "Previous and current keys must not be equal");
787+
var strip = previousKey.remaining() - mismatch;
788+
return new KeyDelta(
789+
keyPosition,
790+
keyPosition + mismatch,
791+
strip,
792+
(int) (varIntLen(strip) + (long) keyBuf.remaining() - mismatch));
793+
}
794+
795+
private static ByteBuffer copyKey(@Nullable ByteBuffer previousKey, ByteBuffer keyBuf) {
796+
if (previousKey == null) {
797+
previousKey = newKeyBuffer();
798+
}
799+
previousKey.clear();
800+
var keyPosition = keyBuf.position();
801+
previousKey.put(keyBuf).flip();
802+
keyBuf.position(keyPosition);
803+
return previousKey;
804+
}
805+
806+
private record KeyDelta(int keyPosition, int suffixPosition, int strip, int serializedSize) {}
807+
713808
static <V> IndexSpi<V> deserializeStoreIndex(ByteBuffer serialized, IndexValueSerializer<V> ser) {
714809
return new IndexImpl<>(serialized, ser);
715810
}
@@ -750,7 +845,7 @@ private IndexImpl(ByteBuffer serialized, IndexValueSerializer<V> ser) {
750845
// It has no predecessor that would be needed to re-construct (aka materialize) the full key.
751846
var elementPredecessor = prefixLen > 0 ? predecessor : null;
752847
var element =
753-
new LazyIndexElement<V>(
848+
new LazyIndexElement<>(
754849
this, elementPredecessor, previous, keyOffset, prefixLen, valueOffset, endOffset);
755850
if (elementPredecessor == null) {
756851
predecessor = element;

persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,28 @@ public void splitByTargetSizeElementCountVarIntBoundary(int entryCount) {
839839
.isGreaterThanOrEqualTo(splits.getFirst().serialize().remaining());
840840
}
841841

842+
@Test
843+
public void splitByTargetSizeReturnsSameMultiElementInstanceWhenAlreadyWithinTarget() {
844+
var index = basicIndexTestSet().keyIndex();
845+
var targetSize = index.serialize().remaining();
846+
847+
soft.assertThat(index.splitByTargetSize(targetSize)).containsExactly(index);
848+
}
849+
850+
@Test
851+
public void splitByTargetSizeUsesActualSerializedKeySizeForLongCommonPrefixes() {
852+
var index = newStoreIndex(OBJ_REF_SERIALIZER);
853+
var keyPrefix = "catalog/" + "shared-prefix/".repeat(32);
854+
for (var i = 0; i < 64; i++) {
855+
index.put(key(keyPrefix + format("table-%03d", i)), randomObjId());
856+
}
857+
858+
var actualSerializedSize = index.serialize().remaining();
859+
860+
soft.assertThat(index.estimatedSerializedSize()).isGreaterThan(actualSerializedSize);
861+
soft.assertThat(index.splitByTargetSize(actualSerializedSize)).containsExactly(index);
862+
}
863+
842864
@Test
843865
public void stateRelated() {
844866
var indexTestSet = basicIndexTestSet();

persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static java.util.stream.StreamSupport.stream;
2323
import static org.apache.polaris.persistence.nosql.api.index.IndexKey.key;
2424
import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.OBJ_REF_SERIALIZER;
25+
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.deserializeStoreIndex;
2526
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.indexElement;
2627
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.lazyStoreIndex;
2728
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.newStoreIndex;
@@ -164,6 +165,23 @@ public void splitByTargetSize(long targetSize) {
164165
.containsExactlyElementsOf(expectedSplits.stream().map(IndexSpi::asKeyList).toList());
165166
}
166167

168+
@Test
169+
public void splitByTargetSizeUsesLazyCommonPrefixKeysWithoutSplitting() {
170+
var base = newStoreIndex(OBJ_REF_SERIALIZER);
171+
var keyPrefix = "catalog/" + "shared-prefix/".repeat(32);
172+
for (var i = 0; i < 512; i++) {
173+
base.put(key(keyPrefix + "table-%03d".formatted(i)), randomObjId());
174+
}
175+
176+
var serialized = base.serialize();
177+
var deserialized = deserializeStoreIndex(serialized, OBJ_REF_SERIALIZER);
178+
179+
var lazyIndex = lazyStoreIndex(() -> deserialized, deserialized.first(), deserialized.last());
180+
181+
soft.assertThat(lazyIndex.splitByTargetSize(serialized.remaining()))
182+
.containsExactly(deserialized);
183+
}
184+
167185
@Test
168186
public void firstLastKeyDontLoad() {
169187
var index = newStoreIndex(OBJ_REF_SERIALIZER);

0 commit comments

Comments
 (0)