diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ImmutableEmptyIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ImmutableEmptyIndexImpl.java index 4fec2ced0a..ad32171554 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ImmutableEmptyIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ImmutableEmptyIndexImpl.java @@ -68,7 +68,7 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { + public List> splitByTargetSize(long targetSerializedSize) { throw unsupported(); } diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java index effd87fd79..0e8a52750c 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java @@ -20,12 +20,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.primitives.Ints.checkedCast; import static java.util.Collections.binarySearch; import static java.util.Objects.requireNonNull; import static org.apache.polaris.persistence.nosql.api.index.IndexKey.deserializeKey; import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.indexElement; import static org.apache.polaris.persistence.varint.VarInt.putVarInt; import static org.apache.polaris.persistence.varint.VarInt.readVarInt; +import static org.apache.polaris.persistence.varint.VarInt.varIntLen; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.AbstractIterator; @@ -339,29 +341,61 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { - var elems = elements; - var size = elems.size(); + public List> splitByTargetSize(long targetSerializedSize) { checkArgument( - parts > 0 && parts <= size, - "Number of parts %s must be greater than 0 and less or equal to number of elements %s", - parts, - size); - var partSize = size / parts; - var serializedMax = originalSerializedSize + estimatedSerializedSizeDiff; - - var result = new ArrayList>(parts); - var index = 0; - for (var i = 0; i < parts; i++) { - var end = i < parts - 1 ? index + partSize : elems.size(); - var partElements = new ArrayList<>(elements.subList(index, end)); - var part = new IndexImpl<>(partElements, serializedMax, serializer, true); - result.add(part); - index = end; + targetSerializedSize > 0, + "Target serialized size %s must be greater than 0", + targetSerializedSize); + + if (elements.isEmpty()) { + return List.of(); + } + if (elements.size() == 1) { + return List.of(this); } + + var result = new ArrayList>(); + var currentElements = new ArrayList>(); + long currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE; + + for (var element : elements) { + var elementSerializedSize = + addElementDiff(element, element.contentSerializedSize(serializer)); + if (!currentElements.isEmpty() + && currentSerializedSize + elementSerializedSize > targetSerializedSize) { + result.add(newSplitPart(currentElements, currentSerializedSize)); + currentElements = new ArrayList<>(); + currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE; + } + + currentElements.add(element); + currentSerializedSize += elementSerializedSize; + + if (currentElements.size() == 1 && currentSerializedSize > targetSerializedSize) { + result.add(newSplitPart(currentElements, currentSerializedSize)); + currentElements = new ArrayList<>(); + currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE; + } + } + + if (!currentElements.isEmpty()) { + result.add(newSplitPart(currentElements, currentSerializedSize)); + } + return result; } + private IndexSpi newSplitPart( + List> partElements, long estimatedSerializedSize) { + var adjustedEstimatedSerializedSize = + estimatedSerializedSize + - INDEX_SERIALIZATION_HEADER_SIZE + + 1L + + varIntLen(partElements.size()); + return new IndexImpl<>( + partElements, checkedCast(adjustedEstimatedSerializedSize), serializer, true); + } + @Override public List> stripes() { return List.of(this); diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexSpi.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexSpi.java index 7b2dc1d6eb..b1fccc0bcf 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexSpi.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexSpi.java @@ -152,7 +152,14 @@ default IndexSpi setObjId(ObjRef objRef) { boolean isMutable(); - List> divide(int parts); + /** + * Split this index into a variable number of stripes, trying to keep each resulting stripe near + * the target serialized size. + * + *

The target size is a goal, not a hard upper bound. A single oversized entry may still end up + * in its own stripe. + */ + List> splitByTargetSize(long targetSerializedSize); List> stripes(); diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/LazyIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/LazyIndexImpl.java index 1b0793a219..1c4c3921bc 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/LazyIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/LazyIndexImpl.java @@ -172,8 +172,8 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { - return loaded().divide(parts); + public List> splitByTargetSize(long targetSerializedSize) { + return loaded().splitByTargetSize(targetSerializedSize); } @Override diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ReadOnlyLayeredIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ReadOnlyLayeredIndexImpl.java index 7036ac2694..f76256d41d 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ReadOnlyLayeredIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ReadOnlyLayeredIndexImpl.java @@ -50,7 +50,7 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { + public List> splitByTargetSize(long targetSerializedSize) { throw unsupported(); } diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/StripedIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/StripedIndexImpl.java index c8f5a6cc4d..fb11c891f2 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/StripedIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/StripedIndexImpl.java @@ -143,7 +143,7 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { + public List> splitByTargetSize(long targetSerializedSize) { throw new UnsupportedOperationException("Striped indexes cannot be further divided"); } diff --git a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/UpdatableIndexImpl.java b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/UpdatableIndexImpl.java index 50e0a8ee71..2c78a44cbb 100644 --- a/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/UpdatableIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/UpdatableIndexImpl.java @@ -19,7 +19,6 @@ package org.apache.polaris.persistence.nosql.impl.indexes; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.primitives.Ints.checkedCast; import static java.util.Objects.requireNonNull; import static org.apache.polaris.persistence.nosql.api.index.IndexStripe.indexStripe; import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef; @@ -185,18 +184,16 @@ private List> collectSurvivingStripes(IndexSpi mutableReference) if (stripe.hasElements()) { var serSize = stripe.estimatedSerializedSize(); var maxStripeSize = params.maxIndexStripeSize().asLong(); - var desiredSplits = checkedCast((serSize - 1L) / maxStripeSize + 1L); - if (desiredSplits > 1) { - desiredSplits = Math.min(desiredSplits, stripe.asKeyList().size()); - } - if (desiredSplits > 1) { + if (serSize > maxStripeSize) { // The stripe became too big, needs to be split further + var splitStripes = stripe.splitByTargetSize(maxStripeSize); LOGGER.debug( - "Splitting index stripe {}, modified={}, into {} parts", + "Splitting index stripe {}, modified={}, into {} stripes for target size {}", stripe.getObjId(), stripe.isModified(), - desiredSplits); - survivingStripes.addAll(stripe.divide(desiredSplits)); + splitStripes.size(), + maxStripeSize); + survivingStripes.addAll(splitStripes); } else { LOGGER.debug( "Keeping index stripe {}, modified={}", stripe.getObjId(), stripe.isModified()); @@ -313,7 +310,7 @@ public boolean isMutable() { } @Override - public List> divide(int parts) { + public List> splitByTargetSize(long targetSerializedSize) { throw unsupported(); } diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestAbstractLayeredIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestAbstractLayeredIndexImpl.java index f922d931eb..3b2184b21d 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestAbstractLayeredIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestAbstractLayeredIndexImpl.java @@ -222,7 +222,7 @@ public void stateRelated() { soft.assertThatThrownBy(layered::asMutableIndex) .isInstanceOf(UnsupportedOperationException.class); soft.assertThat(layered.isMutable()).isFalse(); - soft.assertThatThrownBy(() -> layered.divide(3)) + soft.assertThatThrownBy(() -> layered.splitByTargetSize(3)) .isInstanceOf(UnsupportedOperationException.class); } diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestImmutableEmptyIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestImmutableEmptyIndexImpl.java index b5d9dcbde0..8df2eb0787 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestImmutableEmptyIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestImmutableEmptyIndexImpl.java @@ -67,7 +67,7 @@ public void stateRelated() { soft.assertThat(index.asMutableIndex()).isNotSameAs(index); soft.assertThat(index.isMutable()).isFalse(); - soft.assertThatThrownBy(() -> index.divide(3)) + soft.assertThatThrownBy(() -> index.splitByTargetSize(3)) .isInstanceOf(UnsupportedOperationException.class); } diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java index 53977a42f7..feb9e4b725 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java @@ -37,6 +37,8 @@ import static org.apache.polaris.persistence.nosql.impl.indexes.ObjTestValue.objTestValueOfSize; import static org.apache.polaris.persistence.nosql.impl.indexes.Util.asHex; import static org.apache.polaris.persistence.nosql.impl.indexes.Util.randomObjId; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.groups.Tuple.tuple; import java.nio.ByteBuffer; @@ -747,35 +749,23 @@ public void updateAll() { } @Test - public void emptyIndexDivide() { - for (var i = -5; i < 5; i++) { - var parts = i; + public void invalidSplitByTargetSize() { + for (var i = -5; i <= 0; i++) { + var targetSize = i; soft.assertThatIllegalArgumentException() - .isThrownBy(() -> newStoreIndex(OBJ_REF_SERIALIZER).divide(parts)) - .withMessageStartingWith("Number of parts ") - .withMessageContaining( - " must be greater than 0 and less or equal to number of elements "); + .isThrownBy(() -> newStoreIndex(OBJ_REF_SERIALIZER).splitByTargetSize(targetSize)) + .withMessageStartingWith("Target serialized size ") + .withMessageContaining(" must be greater than 0"); } } - @Test - public void impossibleDivide() { - var indexTestSet = basicIndexTestSet(); - var index = indexTestSet.keyIndex(); - - soft.assertThatIllegalArgumentException() - .isThrownBy(() -> index.divide(index.asKeyList().size() + 1)) - .withMessageStartingWith("Number of parts ") - .withMessageContaining(" must be greater than 0 and less or equal to number of elements "); - } - @ParameterizedTest - @ValueSource(ints = {2, 3, 4, 5, 6}) - public void divide(int parts) { + @ValueSource(longs = {32, 48, 64, 96, 128}) + public void splitByTargetSize(long targetSize) { var indexTestSet = basicIndexTestSet(); var index = indexTestSet.keyIndex(); - var splits = index.divide(parts); + var splits = index.splitByTargetSize(targetSize); soft.assertThat(splits.stream().mapToInt(i -> i.asKeyList().size()).sum()) .isEqualTo(index.asKeyList().size()); @@ -786,6 +776,67 @@ public void divide(int parts) { .containsExactlyElementsOf(index); soft.assertThat(splits.getFirst().first()).isEqualTo(index.first()); soft.assertThat(splits.getLast().last()).isEqualTo(index.last()); + soft.assertThat(splits).allSatisfy(split -> assertSplitRespectsTargetSize(split, targetSize)); + } + + @Test + public void splitByTargetSizeIsolatesLargeEntries() { + var index = newStoreIndex(OBJ_TEST_SERIALIZER); + var firstKey = key("a"); + var largeKey = key("b"); + var lastKey = key("c"); + var firstValue = objTestValueFromString("cafe"); + var lastValue = objTestValueFromString("babe"); + var targetSize = 128L; + var largeValue = + objTestValueOfSize( + smallestValueSizeAboveActualSingleEntrySerializedSize(largeKey, targetSize)); + + index.put(firstKey, firstValue); + index.put(largeKey, largeValue); + index.put(lastKey, lastValue); + + var splits = index.splitByTargetSize(targetSize); + + soft.assertThat(splits).hasSize(3); + soft.assertThat(splits.get(0).asKeyList()).containsExactly(firstKey); + soft.assertThat(splits.get(1).asKeyList()).containsExactly(largeKey); + soft.assertThat(splits.get(2).asKeyList()).containsExactly(lastKey); + } + + @Test + public void splitByTargetSizeReturnsSameSingletonInstance() { + var key = key("singleton"); + var targetSize = 64L; + var value = + objTestValueOfSize(smallestValueSizeAboveActualSingleEntrySerializedSize(key, targetSize)); + + var modified = newStoreIndex(OBJ_TEST_SERIALIZER); + modified.put(key, value); + soft.assertThat(modified.splitByTargetSize(targetSize)).containsExactly(modified); + soft.assertThat(modified.isModified()).isTrue(); + + var deserialized = deserializeStoreIndex(modified.serialize(), OBJ_TEST_SERIALIZER); + soft.assertThat(deserialized.isModified()).isFalse(); + soft.assertThat(deserialized.splitByTargetSize(targetSize)).containsExactly(deserialized); + soft.assertThat(deserialized.isModified()).isFalse(); + } + + @ParameterizedTest + @ValueSource(ints = {127, 128, 16_383, 16_384}) + public void splitByTargetSizeElementCountVarIntBoundary(int entryCount) { + var index = newStoreIndex(OBJ_REF_SERIALIZER); + for (var i = 0; i < entryCount; i++) { + index.put(key(format("entry-%03d", i)), randomObjId()); + } + + var splits = index.splitByTargetSize(Long.MAX_VALUE); + + soft.assertThat(splits).hasSize(1); + soft.assertThat(splits.getFirst().asKeyList()).hasSize(entryCount); + assertThatCode(() -> splits.getFirst().serialize()).doesNotThrowAnyException(); + soft.assertThat(splits.getFirst().estimatedSerializedSize()) + .isGreaterThanOrEqualTo(splits.getFirst().serialize().remaining()); } @Test @@ -795,7 +846,27 @@ public void stateRelated() { soft.assertThat(index.asMutableIndex()).isSameAs(index); soft.assertThat(index.isMutable()).isTrue(); - soft.assertThatCode(() -> index.divide(3)).doesNotThrowAnyException(); + soft.assertThatCode(() -> index.splitByTargetSize(3)).doesNotThrowAnyException(); + } + + private static int smallestValueSizeAboveActualSingleEntrySerializedSize( + IndexKey key, long maxEntrySerializedSize) { + var valueSize = (int) Math.min(maxEntrySerializedSize, Integer.MAX_VALUE); + while (((long) IndexImpl.INDEX_SERIALIZATION_HEADER_SIZE + + key.serializedSize() + + OBJ_TEST_SERIALIZER.serializedSize(objTestValueOfSize(valueSize))) + <= maxEntrySerializedSize) { + valueSize++; + } + return valueSize; + } + + private static void assertSplitRespectsTargetSize(IndexSpi split, long targetSize) { + var estimatedSize = split.estimatedSerializedSize(); + if (estimatedSize <= targetSize) { + return; + } + assertThat(split.asKeyList()).hasSize(1); } /** diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java index a6f260e2d2..ecf184a853 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestLazyIndexImpl.java @@ -140,14 +140,15 @@ public void stateRelated() { } @ParameterizedTest - @ValueSource(ints = {2, 3, 4, 5, 6}) - public void divide(int parts) { + @ValueSource(longs = {32, 48, 64, 96, 128}) + public void splitByTargetSize(long targetSize) { var indexTestSet = basicIndexTestSet(); var base = indexTestSet.keyIndex(); + var expectedSplits = base.splitByTargetSize(targetSize); var index = lazyStoreIndex(() -> base, base.first(), base.last()); - var splits = index.divide(parts); + var splits = index.splitByTargetSize(targetSize); soft.assertThat(splits.stream().mapToInt(i -> i.asKeyList().size()).sum()) .isEqualTo(index.asKeyList().size()); @@ -158,6 +159,9 @@ public void divide(int parts) { .containsExactlyElementsOf(index); soft.assertThat(splits.getFirst().first()).isEqualTo(index.first()); soft.assertThat(splits.getLast().last()).isEqualTo(index.last()); + soft.assertThat(splits).hasSize(expectedSplits.size()); + soft.assertThat(splits.stream().map(IndexSpi::asKeyList)) + .containsExactlyElementsOf(expectedSplits.stream().map(IndexSpi::asKeyList).toList()); } @Test diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestStripedIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestStripedIndexImpl.java index 902a4d0ec5..4dc4ec51eb 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestStripedIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestStripedIndexImpl.java @@ -55,7 +55,7 @@ public class TestStripedIndexImpl { public void isLoadedReflectedLazy() { IndexSpi reference = KeyIndexTestSet.basicIndexTestSet().keyIndex(); - var originalStripesList = reference.divide(5); + var originalStripesList = splitIntoStripeCountForTest(reference, 5); Supplier>> stripesSupplier = () -> originalStripesList.stream() @@ -120,7 +120,7 @@ public void isLoadedReflectedLazy() { public void isLoadedReflectedEager() { var reference = KeyIndexTestSet.basicIndexTestSet().keyIndex(); - var originalStripes = reference.divide(5); + var originalStripes = splitIntoStripeCountForTest(reference, 5); var firstLastKeys = originalStripes.stream() .flatMap(s -> Stream.of(s.first(), s.last())) @@ -140,7 +140,7 @@ public void isLoadedReflectedEager() { public void isModifiedReflected(boolean lazyStripes) { var reference = KeyIndexTestSet.basicIndexTestSet().keyIndex(); - var originalStripesList = reference.divide(5); + var originalStripesList = splitIntoStripeCountForTest(reference, 5); Supplier>> stripesSupplier = () -> originalStripesList.stream() @@ -218,7 +218,8 @@ private static Supplier> createStoreIndexSupplier( public void stripedLazy(int numStripes) { var indexTestSet = basicIndexTestSet(); - var striped = indexFromStripes(indexTestSet.keyIndex().divide(numStripes)); + var striped = + indexFromStripes(splitIntoStripeCountForTest(indexTestSet.keyIndex(), numStripes)); var stripes = striped.stripes(); // Sanity checks @@ -396,7 +397,8 @@ public void striped(int numStripes) { var indexTestSet = basicIndexTestSet(); var source = indexTestSet.keyIndex(); - var striped = indexFromStripes(indexTestSet.keyIndex().divide(numStripes)); + var striped = + indexFromStripes(splitIntoStripeCountForTest(indexTestSet.keyIndex(), numStripes)); // Sanity checks soft.assertThat(striped.stripes()).hasSize(numStripes); @@ -457,11 +459,11 @@ public void striped(int numStripes) { @Test public void stateRelated() { var indexTestSet = basicIndexTestSet(); - var striped = indexFromStripes(indexTestSet.keyIndex().divide(3)); + var striped = indexFromStripes(splitIntoStripeCountForTest(indexTestSet.keyIndex(), 3)); soft.assertThat(striped.asMutableIndex()).isSameAs(striped); soft.assertThat(striped.isMutable()).isTrue(); - soft.assertThatThrownBy(() -> striped.divide(3)) + soft.assertThatThrownBy(() -> striped.splitByTargetSize(3)) .isInstanceOf(UnsupportedOperationException.class); } @@ -471,7 +473,7 @@ public void modifyingStripedRemoveIterative(boolean lazy) { var indexTestSet = basicIndexTestSet(); var source = indexTestSet.keyIndex(); - var striped = indexFromStripes(indexTestSet.keyIndex().divide(3)); + var striped = indexFromStripes(splitIntoStripeCountForTest(indexTestSet.keyIndex(), 3)); if (lazy) { var lazyStripes = striped.stripes().stream() @@ -514,7 +516,7 @@ public void modifyingStripedAdding(boolean lazy) { indexOdd.add(elements.get(i)); } - var striped = indexFromStripes(indexEven.divide(4)); + var striped = indexFromStripes(splitIntoStripeCountForTest(indexEven, 4)); if (lazy) { var lazyStripes = striped.stripes().stream() @@ -535,4 +537,21 @@ public void modifyingStripedAdding(boolean lazy) { .isEqualTo(source.asKeyList().size()) .isEqualTo(elements.size()); } + + private static List> splitIntoStripeCountForTest( + IndexSpi index, int parts) { + var elements = newArrayList(index.elementIterator()); + var size = elements.size(); + var partSize = size / parts; + var stripes = new ArrayList>(parts); + var from = 0; + for (var i = 0; i < parts; i++) { + var to = i < parts - 1 ? from + partSize : size; + var stripe = newStoreIndex(OBJ_REF_SERIALIZER); + elements.subList(from, to).forEach(stripe::add); + stripes.add(stripe); + from = to; + } + return stripes; + } } diff --git a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java index 42617f0051..aef140ace0 100644 --- a/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java +++ b/persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestUpdatableIndexImpl.java @@ -572,6 +572,86 @@ public void legacyOversizedReferenceStripeCanBeRewritten() { soft.assertThat(readIndex.get(forceSpillKey)).isEqualTo(forceSpillValue); } + @Test + public void legacyOversizedMultiElementReferenceStripeCanBeRepartitionedDuringLaterSpillOut() { + var firstKey = key("a"); + var largeKey = key("b"); + var lastLegacyKey = key("c"); + var secondStripeKey = key("y"); + var forceSpillKey = key("zz"); + var firstValue = objTestValueFromString("cafe"); + var lastLegacyValue = objTestValueFromString("babe"); + var secondStripeValue = objTestValueFromString("dead"); + var targetStripeSize = persistence.params().maxIndexStripeSize().asLong(); + var largeValue = + objTestValueOfSize( + smallestValueSizeAboveActualSingleEntrySerializedSize(largeKey, targetStripeSize)); + var forceSpillValue = + objTestValueOfSize( + valueSizeAroundUpperBound( + forceSpillKey, + persistence.params().maxEmbeddedIndexSize().asLong() + / UpdatableIndexImpl.FORCE_SPILL_MAX_EMBEDDED_ENTRY_DIVISOR, + 1)); + + var legacyOversizedStripe = newStoreIndex(OBJ_TEST_SERIALIZER); + legacyOversizedStripe.add(indexElement(firstKey, firstValue)); + legacyOversizedStripe.add(indexElement(largeKey, largeValue)); + legacyOversizedStripe.add(indexElement(lastLegacyKey, lastLegacyValue)); + + soft.assertThat((long) legacyOversizedStripe.serialize().remaining()) + .isGreaterThan(targetStripeSize); + + var legacySmallStripe = newStoreIndex(OBJ_TEST_SERIALIZER); + legacySmallStripe.add(indexElement(secondStripeKey, secondStripeValue)); + + var oversizedStripeObj = + persistence.write( + IndexStripeObj.indexStripeObj( + persistence.generateId(), legacyOversizedStripe.serialize()), + IndexStripeObj.class); + var smallStripeObj = + persistence.write( + IndexStripeObj.indexStripeObj(persistence.generateId(), legacySmallStripe.serialize()), + IndexStripeObj.class); + + var legacyIndexContainer = + ImmutableIndexContainer.builder() + .embedded(newStoreIndex(OBJ_TEST_SERIALIZER).serialize()) + .addStripe(IndexStripe.indexStripe(firstKey, lastLegacyKey, objRef(oversizedStripeObj))) + .addStripe( + IndexStripe.indexStripe(secondStripeKey, secondStripeKey, objRef(smallStripeObj))) + .build(); + + var updatable = + (UpdatableIndexImpl) + legacyIndexContainer.asUpdatableIndex(persistence, OBJ_TEST_SERIALIZER); + updatable.put(forceSpillKey, forceSpillValue); + + var toPersist = new ArrayList>(); + var indexed = updatable.toIndexed("idx-", (n, o) -> toPersist.add(Map.entry(n, o))); + + soft.assertThat(toPersist).hasSizeGreaterThan(1); + soft.assertThat(indexed.stripes()).hasSizeGreaterThan(2); + soft.assertThat(indexed.stripes()) + .anySatisfy( + stripe -> { + assertThat(stripe.firstKey()).isEqualTo(largeKey); + assertThat(stripe.lastKey()).isEqualTo(largeKey); + }); + + toPersist.stream().map(Map.Entry::getValue).forEach(o -> persistence.write(o, Obj.class)); + + var readIndex = indexed.indexForRead(persistence, OBJ_TEST_SERIALIZER); + soft.assertThat(Streams.stream(readIndex).map(Index.Element::key)) + .containsExactly(firstKey, largeKey, lastLegacyKey, secondStripeKey, forceSpillKey); + soft.assertThat(readIndex.get(firstKey)).isEqualTo(firstValue); + soft.assertThat(readIndex.get(largeKey)).isEqualTo(largeValue); + soft.assertThat(readIndex.get(lastLegacyKey)).isEqualTo(lastLegacyValue); + soft.assertThat(readIndex.get(secondStripeKey)).isEqualTo(secondStripeValue); + soft.assertThat(readIndex.get(forceSpillKey)).isEqualTo(forceSpillValue); + } + @Test public void allowEntryThatExceedsPersistedObjectSizeViaMultipartPersistence() { var updatable = updatableIndexForTest(Map.of(), Map.of(), OBJ_TEST_SERIALIZER);