Skip to content

Commit c114fce

Browse files
authored
NoSQL: Split oversized NoSQL index stripes by target size (#4566)
Replace the old count-oriented stripe split behavior with a target-size driven path at the UpdatableIndexImpl level. That keeps oversized stripes from dragging too much neighboring data along when we have to rewrite them, and it gives us better control over large entry distributions.
1 parent d6adc88 commit c114fce

13 files changed

Lines changed: 282 additions & 70 deletions

File tree

persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ImmutableEmptyIndexImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public boolean isMutable() {
6868
}
6969

7070
@Override
71-
public List<IndexSpi<V>> divide(int parts) {
71+
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
7272
throw unsupported();
7373
}
7474

persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexImpl.java

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import static com.google.common.base.Preconditions.checkState;
23+
import static com.google.common.primitives.Ints.checkedCast;
2324
import static java.util.Collections.binarySearch;
2425
import static java.util.Objects.requireNonNull;
2526
import static org.apache.polaris.persistence.nosql.api.index.IndexKey.deserializeKey;
2627
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.indexElement;
2728
import static org.apache.polaris.persistence.varint.VarInt.putVarInt;
2829
import static org.apache.polaris.persistence.varint.VarInt.readVarInt;
30+
import static org.apache.polaris.persistence.varint.VarInt.varIntLen;
2931

3032
import com.google.common.annotations.VisibleForTesting;
3133
import com.google.common.collect.AbstractIterator;
@@ -339,29 +341,61 @@ public boolean isMutable() {
339341
}
340342

341343
@Override
342-
public List<IndexSpi<V>> divide(int parts) {
343-
var elems = elements;
344-
var size = elems.size();
344+
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
345345
checkArgument(
346-
parts > 0 && parts <= size,
347-
"Number of parts %s must be greater than 0 and less or equal to number of elements %s",
348-
parts,
349-
size);
350-
var partSize = size / parts;
351-
var serializedMax = originalSerializedSize + estimatedSerializedSizeDiff;
352-
353-
var result = new ArrayList<IndexSpi<V>>(parts);
354-
var index = 0;
355-
for (var i = 0; i < parts; i++) {
356-
var end = i < parts - 1 ? index + partSize : elems.size();
357-
var partElements = new ArrayList<>(elements.subList(index, end));
358-
var part = new IndexImpl<>(partElements, serializedMax, serializer, true);
359-
result.add(part);
360-
index = end;
346+
targetSerializedSize > 0,
347+
"Target serialized size %s must be greater than 0",
348+
targetSerializedSize);
349+
350+
if (elements.isEmpty()) {
351+
return List.of();
352+
}
353+
if (elements.size() == 1) {
354+
return List.of(this);
361355
}
356+
357+
var result = new ArrayList<IndexSpi<V>>();
358+
var currentElements = new ArrayList<InternalIndexElement<V>>();
359+
long currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
360+
361+
for (var element : elements) {
362+
var elementSerializedSize =
363+
addElementDiff(element, element.contentSerializedSize(serializer));
364+
if (!currentElements.isEmpty()
365+
&& currentSerializedSize + elementSerializedSize > targetSerializedSize) {
366+
result.add(newSplitPart(currentElements, currentSerializedSize));
367+
currentElements = new ArrayList<>();
368+
currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
369+
}
370+
371+
currentElements.add(element);
372+
currentSerializedSize += elementSerializedSize;
373+
374+
if (currentElements.size() == 1 && currentSerializedSize > targetSerializedSize) {
375+
result.add(newSplitPart(currentElements, currentSerializedSize));
376+
currentElements = new ArrayList<>();
377+
currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
378+
}
379+
}
380+
381+
if (!currentElements.isEmpty()) {
382+
result.add(newSplitPart(currentElements, currentSerializedSize));
383+
}
384+
362385
return result;
363386
}
364387

388+
private IndexSpi<V> newSplitPart(
389+
List<InternalIndexElement<V>> partElements, long estimatedSerializedSize) {
390+
var adjustedEstimatedSerializedSize =
391+
estimatedSerializedSize
392+
- INDEX_SERIALIZATION_HEADER_SIZE
393+
+ 1L
394+
+ varIntLen(partElements.size());
395+
return new IndexImpl<>(
396+
partElements, checkedCast(adjustedEstimatedSerializedSize), serializer, true);
397+
}
398+
365399
@Override
366400
public List<IndexSpi<V>> stripes() {
367401
return List.of(this);

persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/IndexSpi.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,14 @@ default IndexSpi<V> setObjId(ObjRef objRef) {
152152

153153
boolean isMutable();
154154

155-
List<IndexSpi<V>> divide(int parts);
155+
/**
156+
* Split this index into a variable number of stripes, trying to keep each resulting stripe near
157+
* the target serialized size.
158+
*
159+
* <p>The target size is a goal, not a hard upper bound. A single oversized entry may still end up
160+
* in its own stripe.
161+
*/
162+
List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize);
156163

157164
List<IndexSpi<V>> stripes();
158165

persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/LazyIndexImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ public boolean isMutable() {
172172
}
173173

174174
@Override
175-
public List<IndexSpi<V>> divide(int parts) {
176-
return loaded().divide(parts);
175+
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
176+
return loaded().splitByTargetSize(targetSerializedSize);
177177
}
178178

179179
@Override

persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/ReadOnlyLayeredIndexImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public boolean isMutable() {
5050
}
5151

5252
@Override
53-
public List<IndexSpi<V>> divide(int parts) {
53+
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
5454
throw unsupported();
5555
}
5656

persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/StripedIndexImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public boolean isMutable() {
143143
}
144144

145145
@Override
146-
public List<IndexSpi<V>> divide(int parts) {
146+
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
147147
throw new UnsupportedOperationException("Striped indexes cannot be further divided");
148148
}
149149

persistence/nosql/persistence/impl/src/main/java/org/apache/polaris/persistence/nosql/impl/indexes/UpdatableIndexImpl.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.polaris.persistence.nosql.impl.indexes;
2020

2121
import static com.google.common.base.Preconditions.checkState;
22-
import static com.google.common.primitives.Ints.checkedCast;
2322
import static java.util.Objects.requireNonNull;
2423
import static org.apache.polaris.persistence.nosql.api.index.IndexStripe.indexStripe;
2524
import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
@@ -185,18 +184,16 @@ private List<IndexSpi<V>> collectSurvivingStripes(IndexSpi<V> mutableReference)
185184
if (stripe.hasElements()) {
186185
var serSize = stripe.estimatedSerializedSize();
187186
var maxStripeSize = params.maxIndexStripeSize().asLong();
188-
var desiredSplits = checkedCast((serSize - 1L) / maxStripeSize + 1L);
189-
if (desiredSplits > 1) {
190-
desiredSplits = Math.min(desiredSplits, stripe.asKeyList().size());
191-
}
192-
if (desiredSplits > 1) {
187+
if (serSize > maxStripeSize) {
193188
// The stripe became too big, needs to be split further
189+
var splitStripes = stripe.splitByTargetSize(maxStripeSize);
194190
LOGGER.debug(
195-
"Splitting index stripe {}, modified={}, into {} parts",
191+
"Splitting index stripe {}, modified={}, into {} stripes for target size {}",
196192
stripe.getObjId(),
197193
stripe.isModified(),
198-
desiredSplits);
199-
survivingStripes.addAll(stripe.divide(desiredSplits));
194+
splitStripes.size(),
195+
maxStripeSize);
196+
survivingStripes.addAll(splitStripes);
200197
} else {
201198
LOGGER.debug(
202199
"Keeping index stripe {}, modified={}", stripe.getObjId(), stripe.isModified());
@@ -313,7 +310,7 @@ public boolean isMutable() {
313310
}
314311

315312
@Override
316-
public List<IndexSpi<V>> divide(int parts) {
313+
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
317314
throw unsupported();
318315
}
319316

persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestAbstractLayeredIndexImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public void stateRelated() {
222222
soft.assertThatThrownBy(layered::asMutableIndex)
223223
.isInstanceOf(UnsupportedOperationException.class);
224224
soft.assertThat(layered.isMutable()).isFalse();
225-
soft.assertThatThrownBy(() -> layered.divide(3))
225+
soft.assertThatThrownBy(() -> layered.splitByTargetSize(3))
226226
.isInstanceOf(UnsupportedOperationException.class);
227227
}
228228

persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestImmutableEmptyIndexImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void stateRelated() {
6767

6868
soft.assertThat(index.asMutableIndex()).isNotSameAs(index);
6969
soft.assertThat(index.isMutable()).isFalse();
70-
soft.assertThatThrownBy(() -> index.divide(3))
70+
soft.assertThatThrownBy(() -> index.splitByTargetSize(3))
7171
.isInstanceOf(UnsupportedOperationException.class);
7272
}
7373

persistence/nosql/persistence/impl/src/test/java/org/apache/polaris/persistence/nosql/impl/indexes/TestIndexImpl.java

Lines changed: 93 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import static org.apache.polaris.persistence.nosql.impl.indexes.ObjTestValue.objTestValueOfSize;
3838
import static org.apache.polaris.persistence.nosql.impl.indexes.Util.asHex;
3939
import static org.apache.polaris.persistence.nosql.impl.indexes.Util.randomObjId;
40+
import static org.assertj.core.api.Assertions.assertThat;
41+
import static org.assertj.core.api.Assertions.assertThatCode;
4042
import static org.assertj.core.groups.Tuple.tuple;
4143

4244
import java.nio.ByteBuffer;
@@ -747,35 +749,23 @@ public void updateAll() {
747749
}
748750

749751
@Test
750-
public void emptyIndexDivide() {
751-
for (var i = -5; i < 5; i++) {
752-
var parts = i;
752+
public void invalidSplitByTargetSize() {
753+
for (var i = -5; i <= 0; i++) {
754+
var targetSize = i;
753755
soft.assertThatIllegalArgumentException()
754-
.isThrownBy(() -> newStoreIndex(OBJ_REF_SERIALIZER).divide(parts))
755-
.withMessageStartingWith("Number of parts ")
756-
.withMessageContaining(
757-
" must be greater than 0 and less or equal to number of elements ");
756+
.isThrownBy(() -> newStoreIndex(OBJ_REF_SERIALIZER).splitByTargetSize(targetSize))
757+
.withMessageStartingWith("Target serialized size ")
758+
.withMessageContaining(" must be greater than 0");
758759
}
759760
}
760761

761-
@Test
762-
public void impossibleDivide() {
763-
var indexTestSet = basicIndexTestSet();
764-
var index = indexTestSet.keyIndex();
765-
766-
soft.assertThatIllegalArgumentException()
767-
.isThrownBy(() -> index.divide(index.asKeyList().size() + 1))
768-
.withMessageStartingWith("Number of parts ")
769-
.withMessageContaining(" must be greater than 0 and less or equal to number of elements ");
770-
}
771-
772762
@ParameterizedTest
773-
@ValueSource(ints = {2, 3, 4, 5, 6})
774-
public void divide(int parts) {
763+
@ValueSource(longs = {32, 48, 64, 96, 128})
764+
public void splitByTargetSize(long targetSize) {
775765
var indexTestSet = basicIndexTestSet();
776766
var index = indexTestSet.keyIndex();
777767

778-
var splits = index.divide(parts);
768+
var splits = index.splitByTargetSize(targetSize);
779769

780770
soft.assertThat(splits.stream().mapToInt(i -> i.asKeyList().size()).sum())
781771
.isEqualTo(index.asKeyList().size());
@@ -786,6 +776,67 @@ public void divide(int parts) {
786776
.containsExactlyElementsOf(index);
787777
soft.assertThat(splits.getFirst().first()).isEqualTo(index.first());
788778
soft.assertThat(splits.getLast().last()).isEqualTo(index.last());
779+
soft.assertThat(splits).allSatisfy(split -> assertSplitRespectsTargetSize(split, targetSize));
780+
}
781+
782+
@Test
783+
public void splitByTargetSizeIsolatesLargeEntries() {
784+
var index = newStoreIndex(OBJ_TEST_SERIALIZER);
785+
var firstKey = key("a");
786+
var largeKey = key("b");
787+
var lastKey = key("c");
788+
var firstValue = objTestValueFromString("cafe");
789+
var lastValue = objTestValueFromString("babe");
790+
var targetSize = 128L;
791+
var largeValue =
792+
objTestValueOfSize(
793+
smallestValueSizeAboveActualSingleEntrySerializedSize(largeKey, targetSize));
794+
795+
index.put(firstKey, firstValue);
796+
index.put(largeKey, largeValue);
797+
index.put(lastKey, lastValue);
798+
799+
var splits = index.splitByTargetSize(targetSize);
800+
801+
soft.assertThat(splits).hasSize(3);
802+
soft.assertThat(splits.get(0).asKeyList()).containsExactly(firstKey);
803+
soft.assertThat(splits.get(1).asKeyList()).containsExactly(largeKey);
804+
soft.assertThat(splits.get(2).asKeyList()).containsExactly(lastKey);
805+
}
806+
807+
@Test
808+
public void splitByTargetSizeReturnsSameSingletonInstance() {
809+
var key = key("singleton");
810+
var targetSize = 64L;
811+
var value =
812+
objTestValueOfSize(smallestValueSizeAboveActualSingleEntrySerializedSize(key, targetSize));
813+
814+
var modified = newStoreIndex(OBJ_TEST_SERIALIZER);
815+
modified.put(key, value);
816+
soft.assertThat(modified.splitByTargetSize(targetSize)).containsExactly(modified);
817+
soft.assertThat(modified.isModified()).isTrue();
818+
819+
var deserialized = deserializeStoreIndex(modified.serialize(), OBJ_TEST_SERIALIZER);
820+
soft.assertThat(deserialized.isModified()).isFalse();
821+
soft.assertThat(deserialized.splitByTargetSize(targetSize)).containsExactly(deserialized);
822+
soft.assertThat(deserialized.isModified()).isFalse();
823+
}
824+
825+
@ParameterizedTest
826+
@ValueSource(ints = {127, 128, 16_383, 16_384})
827+
public void splitByTargetSizeElementCountVarIntBoundary(int entryCount) {
828+
var index = newStoreIndex(OBJ_REF_SERIALIZER);
829+
for (var i = 0; i < entryCount; i++) {
830+
index.put(key(format("entry-%03d", i)), randomObjId());
831+
}
832+
833+
var splits = index.splitByTargetSize(Long.MAX_VALUE);
834+
835+
soft.assertThat(splits).hasSize(1);
836+
soft.assertThat(splits.getFirst().asKeyList()).hasSize(entryCount);
837+
assertThatCode(() -> splits.getFirst().serialize()).doesNotThrowAnyException();
838+
soft.assertThat(splits.getFirst().estimatedSerializedSize())
839+
.isGreaterThanOrEqualTo(splits.getFirst().serialize().remaining());
789840
}
790841

791842
@Test
@@ -795,7 +846,27 @@ public void stateRelated() {
795846

796847
soft.assertThat(index.asMutableIndex()).isSameAs(index);
797848
soft.assertThat(index.isMutable()).isTrue();
798-
soft.assertThatCode(() -> index.divide(3)).doesNotThrowAnyException();
849+
soft.assertThatCode(() -> index.splitByTargetSize(3)).doesNotThrowAnyException();
850+
}
851+
852+
private static int smallestValueSizeAboveActualSingleEntrySerializedSize(
853+
IndexKey key, long maxEntrySerializedSize) {
854+
var valueSize = (int) Math.min(maxEntrySerializedSize, Integer.MAX_VALUE);
855+
while (((long) IndexImpl.INDEX_SERIALIZATION_HEADER_SIZE
856+
+ key.serializedSize()
857+
+ OBJ_TEST_SERIALIZER.serializedSize(objTestValueOfSize(valueSize)))
858+
<= maxEntrySerializedSize) {
859+
valueSize++;
860+
}
861+
return valueSize;
862+
}
863+
864+
private static void assertSplitRespectsTargetSize(IndexSpi<?> split, long targetSize) {
865+
var estimatedSize = split.estimatedSerializedSize();
866+
if (estimatedSize <= targetSize) {
867+
return;
868+
}
869+
assertThat(split.asKeyList()).hasSize(1);
799870
}
800871

801872
/**

0 commit comments

Comments
 (0)