From 43c468aaa3fa0c6ba723ae551492ecc1910a5968 Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Tue, 12 May 2026 15:44:00 +0200 Subject: [PATCH] NoSQL: Split oversized NoSQL index stripes by target size Replace the old count-oriented stripe split behavior with a target-size driven path at the UpdatableIndexImpl level. That keeps oversized stripes from dragging too much neighboring data along when we have to rewrite them, and it gives us better control over large entry distributions. --- .../impl/indexes/ImmutableEmptyIndexImpl.java | 2 +- .../nosql/impl/indexes/IndexImpl.java | 70 ++++++++--- .../nosql/impl/indexes/IndexSpi.java | 9 +- .../nosql/impl/indexes/LazyIndexImpl.java | 4 +- .../indexes/ReadOnlyLayeredIndexImpl.java | 2 +- .../nosql/impl/indexes/StripedIndexImpl.java | 2 +- .../impl/indexes/UpdatableIndexImpl.java | 17 ++- .../indexes/TestAbstractLayeredIndexImpl.java | 2 +- .../indexes/TestImmutableEmptyIndexImpl.java | 2 +- .../nosql/impl/indexes/TestIndexImpl.java | 115 ++++++++++++++---- .../nosql/impl/indexes/TestLazyIndexImpl.java | 10 +- .../impl/indexes/TestStripedIndexImpl.java | 37 ++++-- .../impl/indexes/TestUpdatableIndexImpl.java | 80 ++++++++++++ 13 files changed, 282 insertions(+), 70 deletions(-) diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ImmutableEmptyIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ImmutableEmptyIndexImpl.java index 4fec2ced0a..ad32171554 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ImmutableEmptyIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ImmutableEmptyIndexImpl.java @@ -68,7 +68,7 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { + public List> splitByTargetSize(long targetSerializedSize) { throw unsupported(); } diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java index effd87fd79..0e8a52750c 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java @@ -20,12 +20,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.primitives.Ints.checkedCast; import static java.util.Collections.binarySearch; import static java.util.Objects.requireNonNull; import static org.apache.polaris.persistence.nosql.api.index.IndexKey.deserializeKey; import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.indexElement; import static org.apache.polaris.persistence.varint.VarInt.putVarInt; import static org.apache.polaris.persistence.varint.VarInt.readVarInt; +import static org.apache.polaris.persistence.varint.VarInt.varIntLen; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.AbstractIterator; @@ -339,29 +341,61 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { - var elems = elements; - var size = elems.size(); + public List> splitByTargetSize(long targetSerializedSize) { checkArgument( - parts > 0 && parts <= size, - "Number of parts %s must be greater than 0 and less or equal to number of elements %s", - parts, - size); - var partSize = size / parts; - var serializedMax = originalSerializedSize + estimatedSerializedSizeDiff; - - var result = new ArrayList>(parts); - var index = 0; - for (var i = 0; i < parts; i++) { - var end = i < parts - 1 ? index + partSize : elems.size(); - var partElements = new ArrayList<>(elements.subList(index, end)); - var part = new IndexImpl<>(partElements, serializedMax, serializer, true); - result.add(part); - index = end; + targetSerializedSize > 0, + "Target serialized size %s must be greater than 0", + targetSerializedSize); + + if (elements.isEmpty()) { + return List.of(); + } + if (elements.size() == 1) { + return List.of(this); } + + var result = new ArrayList>(); + var currentElements = new ArrayList>(); + long currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE; + + for (var element : elements) { + var elementSerializedSize = + addElementDiff(element, element.contentSerializedSize(serializer)); + if (!currentElements.isEmpty() + && currentSerializedSize + elementSerializedSize > targetSerializedSize) { + result.add(newSplitPart(currentElements, currentSerializedSize)); + currentElements = new ArrayList<>(); + currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE; + } + + currentElements.add(element); + currentSerializedSize += elementSerializedSize; + + if (currentElements.size() == 1 && currentSerializedSize > targetSerializedSize) { + result.add(newSplitPart(currentElements, currentSerializedSize)); + currentElements = new ArrayList<>(); + currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE; + } + } + + if (!currentElements.isEmpty()) { + result.add(newSplitPart(currentElements, currentSerializedSize)); + } + return result; } + private IndexSpi newSplitPart( + List> partElements, long estimatedSerializedSize) { + var adjustedEstimatedSerializedSize = + estimatedSerializedSize + - INDEX_SERIALIZATION_HEADER_SIZE + + 1L + + varIntLen(partElements.size()); + return new IndexImpl<>( + partElements, checkedCast(adjustedEstimatedSerializedSize), serializer, true); + } + @Override public List> stripes() { return List.of(this); diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexSpi.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexSpi.java index 7b2dc1d6eb..b1fccc0bcf 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexSpi.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexSpi.java @@ -152,7 +152,14 @@ default IndexSpi setObjId(ObjRef objRef) { boolean isMutable(); - List> divide(int parts); + /** + * Split this index into a variable number of stripes, trying to keep each resulting stripe near + * the target serialized size. + * + *

The target size is a goal, not a hard upper bound. A single oversized entry may still end up + * in its own stripe. + */ + List> splitByTargetSize(long targetSerializedSize); List> stripes(); diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/LazyIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/LazyIndexImpl.java index 1b0793a219..1c4c3921bc 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/LazyIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/LazyIndexImpl.java @@ -172,8 +172,8 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { - return loaded().divide(parts); + public List> splitByTargetSize(long targetSerializedSize) { + return loaded().splitByTargetSize(targetSerializedSize); } @Override diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ReadOnlyLayeredIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ReadOnlyLayeredIndexImpl.java index 7036ac2694..f76256d41d 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ReadOnlyLayeredIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ReadOnlyLayeredIndexImpl.java @@ -50,7 +50,7 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { + public List> splitByTargetSize(long targetSerializedSize) { throw unsupported(); } diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/StripedIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/StripedIndexImpl.java index c8f5a6cc4d..fb11c891f2 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/StripedIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/StripedIndexImpl.java @@ -143,7 +143,7 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { + public List> splitByTargetSize(long targetSerializedSize) { throw new UnsupportedOperationException("Striped indexes cannot be further divided"); } diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/UpdatableIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/UpdatableIndexImpl.java index 50e0a8ee71..2c78a44cbb 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/UpdatableIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/UpdatableIndexImpl.java @@ -19,7 +19,6 @@ package org.apache.polaris.persistence.nosql.impl.indexes; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.primitives.Ints.checkedCast; import static java.util.Objects.requireNonNull; import static org.apache.polaris.persistence.nosql.api.index.IndexStripe.indexStripe; import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; @@ -185,18 +184,16 @@ private List> collectSurvivingStripes(IndexSpi mutableReference) if (stripe.hasElements()) { var serSize = stripe.estimatedSerializedSize(); var maxStripeSize = params.maxIndexStripeSize().asLong(); - var desiredSplits = checkedCast((serSize - 1L) / maxStripeSize + 1L); - if (desiredSplits > 1) { - desiredSplits = Math.min(desiredSplits, stripe.asKeyList().size()); - } - if (desiredSplits > 1) { + if (serSize > maxStripeSize) { // The stripe became too big, needs to be split further + var splitStripes = stripe.splitByTargetSize(maxStripeSize); LOGGER.debug( - "Splitting index stripe {}, modified={}, into {} parts", + "Splitting index stripe {}, modified={}, into {} stripes for target size {}", stripe.getObjId(), stripe.isModified(), - desiredSplits); - survivingStripes.addAll(stripe.divide(desiredSplits)); + splitStripes.size(), + maxStripeSize); + survivingStripes.addAll(splitStripes); } else { LOGGER.debug( "Keeping index stripe {}, modified={}", stripe.getObjId(), stripe.isModified()); @@ -313,7 +310,7 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { + public List> splitByTargetSize(long targetSerializedSize) { throw unsupported(); } diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestAbstractLayeredIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestAbstractLayeredIndexImpl.java index f922d931eb..3b2184b21d 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestAbstractLayeredIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestAbstractLayeredIndexImpl.java @@ -222,7 +222,7 @@ public void stateRelated() { soft.assertThatThrownBy(layered::asMutableIndex) .isInstanceOf(UnsupportedOperationException.class); soft.assertThat(layered.isMutable()).isFalse(); - soft.assertThatThrownBy(() -> layered.divide(3)) + soft.assertThatThrownBy(() -> layered.splitByTargetSize(3)) .isInstanceOf(UnsupportedOperationException.class); } diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestImmutableEmptyIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestImmutableEmptyIndexImpl.java index b5d9dcbde0..8df2eb0787 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestImmutableEmptyIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestImmutableEmptyIndexImpl.java @@ -67,7 +67,7 @@ public void stateRelated() { soft.assertThat(index.asMutableIndex()).isNotSameAs(index); soft.assertThat(index.isMutable()).isFalse(); - soft.assertThatThrownBy(() -> index.divide(3)) + soft.assertThatThrownBy(() -> index.splitByTargetSize(3)) .isInstanceOf(UnsupportedOperationException.class); } diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java index 53977a42f7..feb9e4b725 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java @@ -37,6 +37,8 @@ import static org.apache.polaris.persistence.nosql.impl.indexes.ObjTestValue.objTestValueOfSize; import static org.apache.polaris.persistence.nosql.impl.indexes.Util.asHex; import static org.apache.polaris.persistence.nosql.impl.indexes.Util.randomObjId; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.groups.Tuple.tuple; import java.nio.ByteBuffer; @@ -747,35 +749,23 @@ public void updateAll() { } @Test - public void emptyIndexDivide() { - for (var i = -5; i < 5; i++) { - var parts = i; + public void invalidSplitByTargetSize() { + for (var i = -5; i <= 0; i++) { + var targetSize = i; soft.assertThatIllegalArgumentException() - .isThrownBy(() -> newStoreIndex(OBJ_REF_SERIALIZER).divide(parts)) - .withMessageStartingWith("Number of parts ") - .withMessageContaining( - " must be greater than 0 and less or equal to number of elements "); + .isThrownBy(() -> newStoreIndex(OBJ_REF_SERIALIZER).splitByTargetSize(targetSize)) + .withMessageStartingWith("Target serialized size ") + .withMessageContaining(" must be greater than 0"); } } - @Test - public void impossibleDivide() { - var indexTestSet = basicIndexTestSet(); - var index = indexTestSet.keyIndex(); - - soft.assertThatIllegalArgumentException() - .isThrownBy(() -> index.divide(index.asKeyList().size() + 1)) - .withMessageStartingWith("Number of parts ") - .withMessageContaining(" must be greater than 0 and less or equal to number of elements "); - } - @ParameterizedTest - @ValueSource(ints = {2, 3, 4, 5, 6}) - public void divide(int parts) { + @ValueSource(longs = {32, 48, 64, 96, 128}) + public void splitByTargetSize(long targetSize) { var indexTestSet = basicIndexTestSet(); var index = indexTestSet.keyIndex(); - var splits = index.divide(parts); + var splits = index.splitByTargetSize(targetSize); soft.assertThat(splits.stream().mapToInt(i -> i.asKeyList().size()).sum()) .isEqualTo(index.asKeyList().size()); @@ -786,6 +776,67 @@ public void divide(int parts) { .containsExactlyElementsOf(index); soft.assertThat(splits.getFirst().first()).isEqualTo(index.first()); soft.assertThat(splits.getLast().last()).isEqualTo(index.last()); + soft.assertThat(splits).allSatisfy(split -> assertSplitRespectsTargetSize(split, targetSize)); + } + + @Test + public void splitByTargetSizeIsolatesLargeEntries() { + var index = newStoreIndex(OBJ_TEST_SERIALIZER); + var firstKey = key("a"); + var largeKey = key("b"); + var lastKey = key("c"); + var firstValue = objTestValueFromString("cafe"); + var lastValue = objTestValueFromString("babe"); + var targetSize = 128L; + var largeValue = + objTestValueOfSize( + smallestValueSizeAboveActualSingleEntrySerializedSize(largeKey, targetSize)); + + index.put(firstKey, firstValue); + index.put(largeKey, largeValue); + index.put(lastKey, lastValue); + + var splits = index.splitByTargetSize(targetSize); + + soft.assertThat(splits).hasSize(3); + soft.assertThat(splits.get(0).asKeyList()).containsExactly(firstKey); + soft.assertThat(splits.get(1).asKeyList()).containsExactly(largeKey); + soft.assertThat(splits.get(2).asKeyList()).containsExactly(lastKey); + } + + @Test + public void splitByTargetSizeReturnsSameSingletonInstance() { + var key = key("singleton"); + var targetSize = 64L; + var value = + objTestValueOfSize(smallestValueSizeAboveActualSingleEntrySerializedSize(key, targetSize)); + + var modified = newStoreIndex(OBJ_TEST_SERIALIZER); + modified.put(key, value); + soft.assertThat(modified.splitByTargetSize(targetSize)).containsExactly(modified); + soft.assertThat(modified.isModified()).isTrue(); + + var deserialized = deserializeStoreIndex(modified.serialize(), OBJ_TEST_SERIALIZER); + soft.assertThat(deserialized.isModified()).isFalse(); + soft.assertThat(deserialized.splitByTargetSize(targetSize)).containsExactly(deserialized); + soft.assertThat(deserialized.isModified()).isFalse(); + } + + @ParameterizedTest + @ValueSource(ints = {127, 128, 16_383, 16_384}) + public void splitByTargetSizeElementCountVarIntBoundary(int entryCount) { + var index = newStoreIndex(OBJ_REF_SERIALIZER); + for (var i = 0; i < entryCount; i++) { + index.put(key(format("entry-%03d", i)), randomObjId()); + } + + var splits = index.splitByTargetSize(Long.MAX_VALUE); + + soft.assertThat(splits).hasSize(1); + soft.assertThat(splits.getFirst().asKeyList()).hasSize(entryCount); + assertThatCode(() -> splits.getFirst().serialize()).doesNotThrowAnyException(); + soft.assertThat(splits.getFirst().estimatedSerializedSize()) + .isGreaterThanOrEqualTo(splits.getFirst().serialize().remaining()); } @Test @@ -795,7 +846,27 @@ public void stateRelated() { soft.assertThat(index.asMutableIndex()).isSameAs(index); soft.assertThat(index.isMutable()).isTrue(); - soft.assertThatCode(() -> index.divide(3)).doesNotThrowAnyException(); + soft.assertThatCode(() -> index.splitByTargetSize(3)).doesNotThrowAnyException(); + } + + private static int smallestValueSizeAboveActualSingleEntrySerializedSize( + IndexKey key, long maxEntrySerializedSize) { + var valueSize = (int) Math.min(maxEntrySerializedSize, Integer.MAX_VALUE); + while (((long) IndexImpl.INDEX_SERIALIZATION_HEADER_SIZE + + key.serializedSize() + + OBJ_TEST_SERIALIZER.serializedSize(objTestValueOfSize(valueSize))) + <= maxEntrySerializedSize) { + valueSize++; + } + return valueSize; + } + + private static void assertSplitRespectsTargetSize(IndexSpi split, long targetSize) { + var estimatedSize = split.estimatedSerializedSize(); + if (estimatedSize <= targetSize) { + return; + } + assertThat(split.asKeyList()).hasSize(1); } /** diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java index a6f260e2d2..ecf184a853 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java @@ -140,14 +140,15 @@ public void stateRelated() { } @ParameterizedTest - @ValueSource(ints = {2, 3, 4, 5, 6}) - public void divide(int parts) { + @ValueSource(longs = {32, 48, 64, 96, 128}) + public void splitByTargetSize(long targetSize) { var indexTestSet = basicIndexTestSet(); var base = indexTestSet.keyIndex(); + var expectedSplits = base.splitByTargetSize(targetSize); var index = lazyStoreIndex(() -> base, base.first(), base.last()); - var splits = index.divide(parts); + var splits = index.splitByTargetSize(targetSize); soft.assertThat(splits.stream().mapToInt(i -> i.asKeyList().size()).sum()) .isEqualTo(index.asKeyList().size()); @@ -158,6 +159,9 @@ public void divide(int parts) { .containsExactlyElementsOf(index); soft.assertThat(splits.getFirst().first()).isEqualTo(index.first()); soft.assertThat(splits.getLast().last()).isEqualTo(index.last()); + soft.assertThat(splits).hasSize(expectedSplits.size()); + soft.assertThat(splits.stream().map(IndexSpi::asKeyList)) + .containsExactlyElementsOf(expectedSplits.stream().map(IndexSpi::asKeyList).toList()); } @Test diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestStripedIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestStripedIndexImpl.java index 902a4d0ec5..4dc4ec51eb 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestStripedIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestStripedIndexImpl.java @@ -55,7 +55,7 @@ public class TestStripedIndexImpl { public void isLoadedReflectedLazy() { IndexSpi reference = KeyIndexTestSet.basicIndexTestSet().keyIndex(); - var originalStripesList = reference.divide(5); + var originalStripesList = splitIntoStripeCountForTest(reference, 5); Supplier>> stripesSupplier = () -> originalStripesList.stream() @@ -120,7 +120,7 @@ public void isLoadedReflectedLazy() { public void isLoadedReflectedEager() { var reference = KeyIndexTestSet.basicIndexTestSet().keyIndex(); - var originalStripes = reference.divide(5); + var originalStripes = splitIntoStripeCountForTest(reference, 5); var firstLastKeys = originalStripes.stream() .flatMap(s -> Stream.of(s.first(), s.last())) @@ -140,7 +140,7 @@ public void isLoadedReflectedEager() { public void isModifiedReflected(boolean lazyStripes) { var reference = KeyIndexTestSet.basicIndexTestSet().keyIndex(); - var originalStripesList = reference.divide(5); + var originalStripesList = splitIntoStripeCountForTest(reference, 5); Supplier>> stripesSupplier = () -> originalStripesList.stream() @@ -218,7 +218,8 @@ private static Supplier> createStoreIndexSupplier( public void stripedLazy(int numStripes) { var indexTestSet = basicIndexTestSet(); - var striped = indexFromStripes(indexTestSet.keyIndex().divide(numStripes)); + var striped = + indexFromStripes(splitIntoStripeCountForTest(indexTestSet.keyIndex(), numStripes)); var stripes = striped.stripes(); // Sanity checks @@ -396,7 +397,8 @@ public void striped(int numStripes) { var indexTestSet = basicIndexTestSet(); var source = indexTestSet.keyIndex(); - var striped = indexFromStripes(indexTestSet.keyIndex().divide(numStripes)); + var striped = + indexFromStripes(splitIntoStripeCountForTest(indexTestSet.keyIndex(), numStripes)); // Sanity checks soft.assertThat(striped.stripes()).hasSize(numStripes); @@ -457,11 +459,11 @@ public void striped(int numStripes) { @Test public void stateRelated() { var indexTestSet = basicIndexTestSet(); - var striped = indexFromStripes(indexTestSet.keyIndex().divide(3)); + var striped = indexFromStripes(splitIntoStripeCountForTest(indexTestSet.keyIndex(), 3)); soft.assertThat(striped.asMutableIndex()).isSameAs(striped); soft.assertThat(striped.isMutable()).isTrue(); - soft.assertThatThrownBy(() -> striped.divide(3)) + soft.assertThatThrownBy(() -> striped.splitByTargetSize(3)) .isInstanceOf(UnsupportedOperationException.class); } @@ -471,7 +473,7 @@ public void modifyingStripedRemoveIterative(boolean lazy) { var indexTestSet = basicIndexTestSet(); var source = indexTestSet.keyIndex(); - var striped = indexFromStripes(indexTestSet.keyIndex().divide(3)); + var striped = indexFromStripes(splitIntoStripeCountForTest(indexTestSet.keyIndex(), 3)); if (lazy) { var lazyStripes = striped.stripes().stream() @@ -514,7 +516,7 @@ public void modifyingStripedAdding(boolean lazy) { indexOdd.add(elements.get(i)); } - var striped = indexFromStripes(indexEven.divide(4)); + var striped = indexFromStripes(splitIntoStripeCountForTest(indexEven, 4)); if (lazy) { var lazyStripes = striped.stripes().stream() @@ -535,4 +537,21 @@ public void modifyingStripedAdding(boolean lazy) { .isEqualTo(source.asKeyList().size()) .isEqualTo(elements.size()); } + + private static List> splitIntoStripeCountForTest( + IndexSpi index, int parts) { + var elements = newArrayList(index.elementIterator()); + var size = elements.size(); + var partSize = size / parts; + var stripes = new ArrayList>(parts); + var from = 0; + for (var i = 0; i < parts; i++) { + var to = i < parts - 1 ? from + partSize : size; + var stripe = newStoreIndex(OBJ_REF_SERIALIZER); + elements.subList(from, to).forEach(stripe::add); + stripes.add(stripe); + from = to; + } + return stripes; + } } diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java index 42617f0051..aef140ace0 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java @@ -572,6 +572,86 @@ public void legacyOversizedReferenceStripeCanBeRewritten() { soft.assertThat(readIndex.get(forceSpillKey)).isEqualTo(forceSpillValue); } + @Test + public void legacyOversizedMultiElementReferenceStripeCanBeRepartitionedDuringLaterSpillOut() { + var firstKey = key("a"); + var largeKey = key("b"); + var lastLegacyKey = key("c"); + var secondStripeKey = key("y"); + var forceSpillKey = key("zz"); + var firstValue = objTestValueFromString("cafe"); + var lastLegacyValue = objTestValueFromString("babe"); + var secondStripeValue = objTestValueFromString("dead"); + var targetStripeSize = persistence.params().maxIndexStripeSize().asLong(); + var largeValue = + objTestValueOfSize( + smallestValueSizeAboveActualSingleEntrySerializedSize(largeKey, targetStripeSize)); + var forceSpillValue = + objTestValueOfSize( + valueSizeAroundUpperBound( + forceSpillKey, + persistence.params().maxEmbeddedIndexSize().asLong() + / UpdatableIndexImpl.FORCE_SPILL_MAX_EMBEDDED_ENTRY_DIVISOR, + 1)); + + var legacyOversizedStripe = newStoreIndex(OBJ_TEST_SERIALIZER); + legacyOversizedStripe.add(indexElement(firstKey, firstValue)); + legacyOversizedStripe.add(indexElement(largeKey, largeValue)); + legacyOversizedStripe.add(indexElement(lastLegacyKey, lastLegacyValue)); + + soft.assertThat((long) legacyOversizedStripe.serialize().remaining()) + .isGreaterThan(targetStripeSize); + + var legacySmallStripe = newStoreIndex(OBJ_TEST_SERIALIZER); + legacySmallStripe.add(indexElement(secondStripeKey, secondStripeValue)); + + var oversizedStripeObj = + persistence.write( + IndexStripeObj.indexStripeObj( + persistence.generateId(), legacyOversizedStripe.serialize()), + IndexStripeObj.class); + var smallStripeObj = + persistence.write( + IndexStripeObj.indexStripeObj(persistence.generateId(), legacySmallStripe.serialize()), + IndexStripeObj.class); + + var legacyIndexContainer = + ImmutableIndexContainer.builder() + .embedded(newStoreIndex(OBJ_TEST_SERIALIZER).serialize()) + .addStripe(IndexStripe.indexStripe(firstKey, lastLegacyKey, objRef(oversizedStripeObj))) + .addStripe( + IndexStripe.indexStripe(secondStripeKey, secondStripeKey, objRef(smallStripeObj))) + .build(); + + var updatable = + (UpdatableIndexImpl) + legacyIndexContainer.asUpdatableIndex(persistence, OBJ_TEST_SERIALIZER); + updatable.put(forceSpillKey, forceSpillValue); + + var toPersist = new ArrayList>(); + var indexed = updatable.toIndexed("idx-", (n, o) -> toPersist.add(Map.entry(n, o))); + + soft.assertThat(toPersist).hasSizeGreaterThan(1); + soft.assertThat(indexed.stripes()).hasSizeGreaterThan(2); + soft.assertThat(indexed.stripes()) + .anySatisfy( + stripe -> { + assertThat(stripe.firstKey()).isEqualTo(largeKey); + assertThat(stripe.lastKey()).isEqualTo(largeKey); + }); + + toPersist.stream().map(Map.Entry::getValue).forEach(o -> persistence.write(o, Obj.class)); + + var readIndex = indexed.indexForRead(persistence, OBJ_TEST_SERIALIZER); + soft.assertThat(Streams.stream(readIndex).map(Index.Element::key)) + .containsExactly(firstKey, largeKey, lastLegacyKey, secondStripeKey, forceSpillKey); + soft.assertThat(readIndex.get(firstKey)).isEqualTo(firstValue); + soft.assertThat(readIndex.get(largeKey)).isEqualTo(largeValue); + soft.assertThat(readIndex.get(lastLegacyKey)).isEqualTo(lastLegacyValue); + soft.assertThat(readIndex.get(secondStripeKey)).isEqualTo(secondStripeValue); + soft.assertThat(readIndex.get(forceSpillKey)).isEqualTo(forceSpillValue); + } + @Test public void allowEntryThatExceedsPersistedObjectSizeViaMultipartPersistence() { var updatable = updatableIndexForTest(Map.of(), Map.of(), OBJ_TEST_SERIALIZER);