From cea8a6d81102e6cddd9dbe8651c3c288bd1deb26 Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Tue, 12 May 2026 16:20:27 +0200 Subject: [PATCH] NoSQL: Use actual key serialization to split NoSQL index stripes Base stripe splitting on the actual serialized key shape, including prefix compression, instead of a loose full-key estimate. The estimate was safe but too pessimistic and could over-split perfectly reasonable stripes. Reuse the real serialization walk so the split logic and persisted bytes stay aligned. --- .../nosql/impl/indexes/IndexImpl.java | 259 ++++++++++++------ .../nosql/impl/indexes/TestIndexImpl.java | 22 ++ .../nosql/impl/indexes/TestLazyIndexImpl.java | 18 ++ .../impl/indexes/TestUpdatableIndexImpl.java | 80 ++++++ 4 files changed, 297 insertions(+), 82 deletions(-) diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java index 0e8a52750c..3bf7555ee1 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java @@ -354,35 +354,67 @@ public List> splitByTargetSize(long targetSerializedSize) { return List.of(this); } - var result = new ArrayList>(); - var currentElements = new ArrayList>(); - long currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE; - - for (var element : elements) { - var elementSerializedSize = - addElementDiff(element, element.contentSerializedSize(serializer)); - if (!currentElements.isEmpty() - && currentSerializedSize + elementSerializedSize > targetSerializedSize) { - result.add(newSplitPart(currentElements, currentSerializedSize)); - currentElements = new ArrayList<>(); - currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE; - } + final class SplitState { + private final List> result = new ArrayList<>(); + private List> currentElements = new ArrayList<>(); + private long currentSerializedBodySize; + private ByteBuffer previousSplitKey; + } - currentElements.add(element); - currentSerializedSize += elementSerializedSize; + var splitState = new SplitState(); + + walkSerializedElements( + (element, keyBuf, previousMaterializationKey) -> { + var valueSerializedSize = element.contentSerializedSize(serializer); + var entrySerializedSize = + serializedEntrySize(splitState.previousSplitKey, keyBuf, valueSerializedSize); + var candidateSerializedSize = + splitSerializedSize( + splitState.currentElements.size() + 1, + splitState.currentSerializedBodySize + entrySerializedSize); + + if (!splitState.currentElements.isEmpty() + && candidateSerializedSize > targetSerializedSize) { + splitState.result.add( + newSplitPart( + splitState.currentElements, + splitSerializedSize( + splitState.currentElements.size(), splitState.currentSerializedBodySize))); + splitState.currentElements = new ArrayList<>(); + splitState.currentSerializedBodySize = 0L; + splitState.previousSplitKey = null; + entrySerializedSize = serializedEntrySize(null, keyBuf, valueSerializedSize); + candidateSerializedSize = splitSerializedSize(1, entrySerializedSize); + } - if (currentElements.size() == 1 && currentSerializedSize > targetSerializedSize) { - result.add(newSplitPart(currentElements, currentSerializedSize)); - currentElements = new ArrayList<>(); - currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE; - } - } + splitState.currentElements.add(element); + splitState.currentSerializedBodySize += entrySerializedSize; + splitState.previousSplitKey = copyKey(splitState.previousSplitKey, keyBuf); + + if (splitState.currentElements.size() == 1 + && candidateSerializedSize > targetSerializedSize) { + splitState.result.add( + newSplitPart(splitState.currentElements, candidateSerializedSize)); + splitState.currentElements = new ArrayList<>(); + splitState.currentSerializedBodySize = 0L; + splitState.previousSplitKey = null; + } + + return copyKey(previousMaterializationKey, keyBuf); + }); - if (!currentElements.isEmpty()) { - result.add(newSplitPart(currentElements, currentSerializedSize)); + if (splitState.result.isEmpty()) { + return List.of(this); + } + if (!splitState.currentElements.isEmpty()) { + splitState.result.add( + newSplitPart( + splitState.currentElements, + splitSerializedSize( + splitState.currentElements.size(), splitState.currentSerializedBodySize))); } - return result; + return splitState.result; } private IndexSpi newSplitPart( @@ -396,6 +428,15 @@ private IndexSpi newSplitPart( partElements, checkedCast(adjustedEstimatedSerializedSize), serializer, true); } + private static long splitSerializedSize(int elementCount, long serializedBodySize) { + return 1L + varIntLen(elementCount) + serializedBodySize; + } + + private static long serializedEntrySize( + @Nullable ByteBuffer previousKey, ByteBuffer keyBuf, int valueSerializedSize) { + return (long) serializedKeyDelta(previousKey, keyBuf).serializedSize() + valueSerializedSize; + } + @Override public List> stripes() { return List.of(this); @@ -628,55 +669,14 @@ public int estimatedSerializedSize() { // Serialized segment index version target.put(CURRENT_STORE_INDEX_VERSION); putVarInt(target, elements.size()); + var serializeTarget = target; - ByteBuffer previousKey = null; - - var scratchKeyBuffer = acquireScratchKeyBuffer(); - try { - boolean onlyLazy; - InternalIndexElement previous = null; - for (var el : elements) { - ByteBuffer keyBuf = null; - if (isLazyElementImpl(el)) { - var lazyEl = (LazyIndexElement) el; - // The purpose of this 'if'-branch is to determine whether it can serialize the - // 'IndexKey' by _not_ fully materializing the `IndexKey`. This is possible if (and - // only if!) the current and the previous element are `LazyStoreIndexElement`s, where - // the previous element is exactly the one that has been deserialized. - //noinspection RedundantIfStatement - if (lazyEl.prefixLen == 0 || lazyEl.previous == previous) { - // Can use the optimized serialization in `LazyStoreIndexElement` if the current - // element has no prefix of if the previously serialized element was also a - // `LazyStoreIndexElement`. In other words, no intermediate `LazyStoreIndexElement` - // has been removed and no new element has been added. - onlyLazy = true; - } else { - // This if-branch detects whether an element has been removed from the index. In that - // case, serialization has to materialize the `IndexKey` for serialization. - onlyLazy = false; - } - if (onlyLazy) { - // Key serialization via 'LazyStoreIndexElement' is much cheaper (CPU and heap) than - // having to first materialize and then serialize it. - keyBuf = lazyEl.serializeKey(scratchKeyBuffer, previousKey); - } - } else { - onlyLazy = false; - } - - if (!onlyLazy) { - // Either 'el' is not a 'LazyStoreIndexElement' or the previous element of a - // 'LazyStoreIndexElement' is not suitable (see above). - keyBuf = serializeIndexKeyString(el.key(), scratchKeyBuffer); - } - - previousKey = serializeKey(keyBuf, previousKey, target); - el.serializeContent(serializer, target); - previous = el; - } - } finally { - releaseScratchKeyBuffer(scratchKeyBuffer); - } + walkSerializedElements( + (element, keyBuf, previousMaterializationKey) -> { + var previousKey = serializeKey(keyBuf, previousMaterializationKey, serializeTarget); + element.serializeContent(serializer, serializeTarget); + return previousKey; + }); target = target.flip(); } else { @@ -690,26 +690,121 @@ private boolean isLazyElementImpl(InternalIndexElement el) { return el.getClass() == LazyIndexElement.class; } - private ByteBuffer serializeKey(ByteBuffer keyBuf, ByteBuffer previousKey, ByteBuffer target) { - var keyPos = keyBuf.position(); - if (previousKey != null) { - var mismatch = previousKey.mismatch(keyBuf); - checkState(mismatch != -1, "Previous and current keys must not be equal"); - var strip = previousKey.remaining() - mismatch; - putVarInt(target, strip); - keyBuf.position(keyPos + mismatch); + @FunctionalInterface + private interface SerializedElementVisitor { + ByteBuffer visit( + InternalIndexElement element, + ByteBuffer keyBuf, + @Nullable ByteBuffer previousMaterializationKey); + } + + private void walkSerializedElements(SerializedElementVisitor visitor) { + ByteBuffer previousMaterializationKey = null; + + var scratchKeyBuffer = acquireScratchKeyBuffer(); + try { + InternalIndexElement previous = null; + for (var el : elements) { + var keyBuf = + materializeSerializedKey(el, previous, previousMaterializationKey, scratchKeyBuffer); + previousMaterializationKey = visitor.visit(el, keyBuf, previousMaterializationKey); + previous = el; + } + } finally { + releaseScratchKeyBuffer(scratchKeyBuffer); + } + } + + private ByteBuffer materializeSerializedKey( + InternalIndexElement el, + @Nullable InternalIndexElement previous, + @Nullable ByteBuffer previousKey, + ByteBuffer scratchKeyBuffer) { + boolean onlyLazy; + ByteBuffer keyBuf = null; + if (isLazyElementImpl(el)) { + var lazyEl = (LazyIndexElement) el; + // The purpose of this 'if'-branch is to determine whether it can serialize the + // 'IndexKey' by _not_ fully materializing the `IndexKey`. This is possible if (and + // only if!) the current and the previous element are `LazyStoreIndexElement`s, where + // the previous element is exactly the one that has been deserialized. + //noinspection RedundantIfStatement + if (lazyEl.prefixLen == 0 || lazyEl.previous == previous) { + // Can use the optimized serialization in `LazyStoreIndexElement` if the current + // element has no prefix of if the previously serialized element was also a + // `LazyStoreIndexElement`. In other words, no intermediate `LazyStoreIndexElement` + // has been removed and no new element has been added. + onlyLazy = true; + } else { + // This if-branch detects whether an element has been removed from the index. In that + // case, serialization has to materialize the `IndexKey` for serialization. + onlyLazy = false; + } + if (onlyLazy) { + // Key serialization via 'LazyStoreIndexElement' is much cheaper (CPU and heap) than + // having to first materialize and then serialize it. + keyBuf = lazyEl.serializeKey(scratchKeyBuffer, previousKey); + } } else { + onlyLazy = false; + } + + if (!onlyLazy) { + // Either 'el' is not a 'LazyStoreIndexElement' or the previous element of a + // 'LazyStoreIndexElement' is not suitable (see above). + keyBuf = serializeIndexKeyString(el.key(), scratchKeyBuffer); + } + + return keyBuf; + } + + private static ByteBuffer serializeKey( + ByteBuffer keyBuf, ByteBuffer previousKey, ByteBuffer target) { + var keyDelta = serializedKeyDelta(previousKey, keyBuf); + if (previousKey == null) { previousKey = newKeyBuffer(); + } else { + putVarInt(target, keyDelta.strip()); + keyBuf.position(keyDelta.suffixPosition()); } target.put(keyBuf); previousKey.clear(); - keyBuf.position(keyPos); + keyBuf.position(keyDelta.keyPosition()); previousKey.put(keyBuf).flip(); return previousKey; } + private static KeyDelta serializedKeyDelta(@Nullable ByteBuffer previousKey, ByteBuffer keyBuf) { + var keyPosition = keyBuf.position(); + if (previousKey == null) { + return new KeyDelta(keyPosition, keyPosition, 0, keyBuf.remaining()); + } + + var mismatch = previousKey.mismatch(keyBuf); + checkState(mismatch != -1, "Previous and current keys must not be equal"); + var strip = previousKey.remaining() - mismatch; + return new KeyDelta( + keyPosition, + keyPosition + mismatch, + strip, + (int) (varIntLen(strip) + (long) keyBuf.remaining() - mismatch)); + } + + private static ByteBuffer copyKey(@Nullable ByteBuffer previousKey, ByteBuffer keyBuf) { + if (previousKey == null) { + previousKey = newKeyBuffer(); + } + previousKey.clear(); + var keyPosition = keyBuf.position(); + previousKey.put(keyBuf).flip(); + keyBuf.position(keyPosition); + return previousKey; + } + + private record KeyDelta(int keyPosition, int suffixPosition, int strip, int serializedSize) {} + static IndexSpi deserializeStoreIndex(ByteBuffer serialized, IndexValueSerializer ser) { return new IndexImpl<>(serialized, ser); } @@ -750,7 +845,7 @@ private IndexImpl(ByteBuffer serialized, IndexValueSerializer ser) { // It has no predecessor that would be needed to re-construct (aka materialize) the full key. var elementPredecessor = prefixLen > 0 ? predecessor : null; var element = - new LazyIndexElement( + new LazyIndexElement<>( this, elementPredecessor, previous, keyOffset, prefixLen, valueOffset, endOffset); if (elementPredecessor == null) { predecessor = element; diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java index feb9e4b725..0a931be217 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java @@ -839,6 +839,28 @@ public void splitByTargetSizeElementCountVarIntBoundary(int entryCount) { .isGreaterThanOrEqualTo(splits.getFirst().serialize().remaining()); } + @Test + public void splitByTargetSizeReturnsSameMultiElementInstanceWhenAlreadyWithinTarget() { + var index = basicIndexTestSet().keyIndex(); + var targetSize = index.serialize().remaining(); + + soft.assertThat(index.splitByTargetSize(targetSize)).containsExactly(index); + } + + @Test + public void splitByTargetSizeUsesActualSerializedKeySizeForLongCommonPrefixes() { + var index = newStoreIndex(OBJ_REF_SERIALIZER); + var keyPrefix = "catalog/" + "shared-prefix/".repeat(32); + for (var i = 0; i < 64; i++) { + index.put(key(keyPrefix + format("table-%03d", i)), randomObjId()); + } + + var actualSerializedSize = index.serialize().remaining(); + + soft.assertThat(index.estimatedSerializedSize()).isGreaterThan(actualSerializedSize); + soft.assertThat(index.splitByTargetSize(actualSerializedSize)).containsExactly(index); + } + @Test public void stateRelated() { var indexTestSet = basicIndexTestSet(); diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java index ecf184a853..3cdb9efb4b 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java @@ -22,6 +22,7 @@ import static java.util.stream.StreamSupport.stream; import static org.apache.polaris.persistence.nosql.api.index.IndexKey.key; import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.OBJ_REF_SERIALIZER; +import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.deserializeStoreIndex; import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.indexElement; import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.lazyStoreIndex; import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.newStoreIndex; @@ -164,6 +165,23 @@ public void splitByTargetSize(long targetSize) { .containsExactlyElementsOf(expectedSplits.stream().map(IndexSpi::asKeyList).toList()); } + @Test + public void splitByTargetSizeUsesLazyCommonPrefixKeysWithoutSplitting() { + var base = newStoreIndex(OBJ_REF_SERIALIZER); + var keyPrefix = "catalog/" + "shared-prefix/".repeat(32); + for (var i = 0; i < 512; i++) { + base.put(key(keyPrefix + "table-%03d".formatted(i)), randomObjId()); + } + + var serialized = base.serialize(); + var deserialized = deserializeStoreIndex(serialized, OBJ_REF_SERIALIZER); + + var lazyIndex = lazyStoreIndex(() -> deserialized, deserialized.first(), deserialized.last()); + + soft.assertThat(lazyIndex.splitByTargetSize(serialized.remaining())) + .containsExactly(deserialized); + } + @Test public void firstLastKeyDontLoad() { var index = newStoreIndex(OBJ_REF_SERIALIZER); diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java index aef140ace0..ddc1c36d9c 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java @@ -572,6 +572,86 @@ public void legacyOversizedReferenceStripeCanBeRewritten() { soft.assertThat(readIndex.get(forceSpillKey)).isEqualTo(forceSpillValue); } + @Test + public void persistedCommonPrefixStripeStaysUnsplitDuringLaterSpillOut() { + var targetStripeSize = persistence.params().maxIndexStripeSize().asLong(); + var sharedValue = objTestValueFromString("cafe"); + var keyPrefix = "catalog/" + "shared-prefix/".repeat(32); + var prefixStripe = newStoreIndex(OBJ_TEST_SERIALIZER); + IndexKey middlePrefixKey = null; + for (var i = 0; i < 512; i++) { + var key = key(keyPrefix + "table-%03d".formatted(i)); + prefixStripe.add(indexElement(key, sharedValue)); + if (i == 256) { + middlePrefixKey = key; + } + } + var firstPrefixKey = requireNonNull(prefixStripe.first()); + var lastPrefixKey = requireNonNull(prefixStripe.last()); + + soft.assertThat((long) prefixStripe.serialize().remaining()) + .isLessThanOrEqualTo(targetStripeSize); + soft.assertThat((long) prefixStripe.estimatedSerializedSize()).isGreaterThan(targetStripeSize); + + var secondStripeKey = key("zzz-base"); + var secondStripeValue = objTestValueFromString("dead"); + var forceSpillKey = key("zzzz-trigger"); + var forceSpillValue = + objTestValueOfSize( + valueSizeAroundUpperBound( + forceSpillKey, + persistence.params().maxEmbeddedIndexSize().asLong() + / UpdatableIndexImpl.FORCE_SPILL_MAX_EMBEDDED_ENTRY_DIVISOR, + 1)); + var secondStripe = newStoreIndex(OBJ_TEST_SERIALIZER); + secondStripe.add(indexElement(secondStripeKey, secondStripeValue)); + + var prefixStripeObj = + persistence.write( + IndexStripeObj.indexStripeObj(persistence.generateId(), prefixStripe.serialize()), + IndexStripeObj.class); + var secondStripeObj = + persistence.write( + IndexStripeObj.indexStripeObj(persistence.generateId(), secondStripe.serialize()), + IndexStripeObj.class); + + var legacyIndexContainer = + ImmutableIndexContainer.builder() + .embedded(newStoreIndex(OBJ_TEST_SERIALIZER).serialize()) + .addStripe( + IndexStripe.indexStripe(firstPrefixKey, lastPrefixKey, objRef(prefixStripeObj))) + .addStripe( + IndexStripe.indexStripe(secondStripeKey, secondStripeKey, objRef(secondStripeObj))) + .build(); + + var updatable = + (UpdatableIndexImpl) + legacyIndexContainer.asUpdatableIndex(persistence, OBJ_TEST_SERIALIZER); + updatable.put(forceSpillKey, forceSpillValue); + + var toPersist = new ArrayList>(); + var indexed = updatable.toIndexed("idx-", (n, o) -> toPersist.add(Map.entry(n, o))); + + soft.assertThat(toPersist).hasSize(1); + soft.assertThat(indexed.stripes()).hasSize(2); + soft.assertThat(indexed.stripes()) + .anySatisfy( + stripe -> { + assertThat(stripe.firstKey()).isEqualTo(firstPrefixKey); + assertThat(stripe.lastKey()).isEqualTo(lastPrefixKey); + assertThat(stripe.segment()).isEqualTo(objRef(prefixStripeObj)); + }); + + toPersist.stream().map(Map.Entry::getValue).forEach(o -> persistence.write(o, Obj.class)); + + var readIndex = indexed.indexForRead(persistence, OBJ_TEST_SERIALIZER); + soft.assertThat(readIndex.get(firstPrefixKey)).isEqualTo(sharedValue); + soft.assertThat(readIndex.get(requireNonNull(middlePrefixKey))).isEqualTo(sharedValue); + soft.assertThat(readIndex.get(lastPrefixKey)).isEqualTo(sharedValue); + soft.assertThat(readIndex.get(secondStripeKey)).isEqualTo(secondStripeValue); + soft.assertThat(readIndex.get(forceSpillKey)).isEqualTo(forceSpillValue); + } + @Test public void legacyOversizedMultiElementReferenceStripeCanBeRepartitionedDuringLaterSpillOut() { var firstKey = key("a");