Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public boolean isMutable() {
}

@Override
public List<IndexSpi<V>> divide(int parts) {
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
throw unsupported();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.primitives.Ints.checkedCast;
import static java.util.Collections.binarySearch;
import static java.util.Objects.requireNonNull;
import static org.apache.polaris.persistence.nosql.api.index.IndexKey.deserializeKey;
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.indexElement;
import static org.apache.polaris.persistence.varint.VarInt.putVarInt;
import static org.apache.polaris.persistence.varint.VarInt.readVarInt;
import static org.apache.polaris.persistence.varint.VarInt.varIntLen;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
Expand Down Expand Up @@ -339,29 +341,61 @@ public boolean isMutable() {
}

@Override
public List<IndexSpi<V>> divide(int parts) {
var elems = elements;
var size = elems.size();
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
checkArgument(
parts > 0 && parts <= size,
"Number of parts %s must be greater than 0 and less or equal to number of elements %s",
parts,
size);
var partSize = size / parts;
var serializedMax = originalSerializedSize + estimatedSerializedSizeDiff;

var result = new ArrayList<IndexSpi<V>>(parts);
var index = 0;
for (var i = 0; i < parts; i++) {
var end = i < parts - 1 ? index + partSize : elems.size();
var partElements = new ArrayList<>(elements.subList(index, end));
var part = new IndexImpl<>(partElements, serializedMax, serializer, true);
result.add(part);
index = end;
targetSerializedSize > 0,
"Target serialized size %s must be greater than 0",
targetSerializedSize);

if (elements.isEmpty()) {
return List.of();
}
if (elements.size() == 1) {
return List.of(this);
}

var result = new ArrayList<IndexSpi<V>>();
var currentElements = new ArrayList<InternalIndexElement<V>>();
long currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;

for (var element : elements) {
var elementSerializedSize =
addElementDiff(element, element.contentSerializedSize(serializer));
if (!currentElements.isEmpty()
&& currentSerializedSize + elementSerializedSize > targetSerializedSize) {
result.add(newSplitPart(currentElements, currentSerializedSize));
currentElements = new ArrayList<>();
currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
}

currentElements.add(element);
currentSerializedSize += elementSerializedSize;

if (currentElements.size() == 1 && currentSerializedSize > targetSerializedSize) {
result.add(newSplitPart(currentElements, currentSerializedSize));
currentElements = new ArrayList<>();
currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
}
}

if (!currentElements.isEmpty()) {
result.add(newSplitPart(currentElements, currentSerializedSize));
}

return result;
}

private IndexSpi<V> newSplitPart(
List<InternalIndexElement<V>> partElements, long estimatedSerializedSize) {
var adjustedEstimatedSerializedSize =
estimatedSerializedSize
- INDEX_SERIALIZATION_HEADER_SIZE
+ 1L
+ varIntLen(partElements.size());
return new IndexImpl<>(
partElements, checkedCast(adjustedEstimatedSerializedSize), serializer, true);
}

@Override
public List<IndexSpi<V>> stripes() {
return List.of(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,14 @@ default IndexSpi<V> setObjId(ObjRef objRef) {

boolean isMutable();

List<IndexSpi<V>> divide(int parts);
/**
* Split this index into a variable number of stripes, trying to keep each resulting stripe near
* the target serialized size.
*
* <p>The target size is a goal, not a hard upper bound. A single oversized entry may still end up
* in its own stripe.
*/
List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize);

List<IndexSpi<V>> stripes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public boolean isMutable() {
}

@Override
public List<IndexSpi<V>> divide(int parts) {
return loaded().divide(parts);
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
return loaded().splitByTargetSize(targetSerializedSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public boolean isMutable() {
}

@Override
public List<IndexSpi<V>> divide(int parts) {
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
throw unsupported();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public boolean isMutable() {
}

@Override
public List<IndexSpi<V>> divide(int parts) {
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
throw new UnsupportedOperationException("Striped indexes cannot be further divided");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.polaris.persistence.nosql.impl.indexes;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.primitives.Ints.checkedCast;
import static java.util.Objects.requireNonNull;
import static org.apache.polaris.persistence.nosql.api.index.IndexStripe.indexStripe;
import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
Expand Down Expand Up @@ -185,18 +184,16 @@ private List<IndexSpi<V>> collectSurvivingStripes(IndexSpi<V> mutableReference)
if (stripe.hasElements()) {
var serSize = stripe.estimatedSerializedSize();
var maxStripeSize = params.maxIndexStripeSize().asLong();
var desiredSplits = checkedCast((serSize - 1L) / maxStripeSize + 1L);
if (desiredSplits > 1) {
desiredSplits = Math.min(desiredSplits, stripe.asKeyList().size());
}
if (desiredSplits > 1) {
if (serSize > maxStripeSize) {
// The stripe became too big, needs to be split further
var splitStripes = stripe.splitByTargetSize(maxStripeSize);
LOGGER.debug(
"Splitting index stripe {}, modified={}, into {} parts",
"Splitting index stripe {}, modified={}, into {} stripes for target size {}",
stripe.getObjId(),
stripe.isModified(),
desiredSplits);
survivingStripes.addAll(stripe.divide(desiredSplits));
splitStripes.size(),
maxStripeSize);
survivingStripes.addAll(splitStripes);
} else {
LOGGER.debug(
"Keeping index stripe {}, modified={}", stripe.getObjId(), stripe.isModified());
Expand Down Expand Up @@ -313,7 +310,7 @@ public boolean isMutable() {
}

@Override
public List<IndexSpi<V>> divide(int parts) {
public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
throw unsupported();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void stateRelated() {
soft.assertThatThrownBy(layered::asMutableIndex)
.isInstanceOf(UnsupportedOperationException.class);
soft.assertThat(layered.isMutable()).isFalse();
soft.assertThatThrownBy(() -> layered.divide(3))
soft.assertThatThrownBy(() -> layered.splitByTargetSize(3))
.isInstanceOf(UnsupportedOperationException.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void stateRelated() {

soft.assertThat(index.asMutableIndex()).isNotSameAs(index);
soft.assertThat(index.isMutable()).isFalse();
soft.assertThatThrownBy(() -> index.divide(3))
soft.assertThatThrownBy(() -> index.splitByTargetSize(3))
.isInstanceOf(UnsupportedOperationException.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import static org.apache.polaris.persistence.nosql.impl.indexes.ObjTestValue.objTestValueOfSize;
import static org.apache.polaris.persistence.nosql.impl.indexes.Util.asHex;
import static org.apache.polaris.persistence.nosql.impl.indexes.Util.randomObjId;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.groups.Tuple.tuple;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -747,35 +749,23 @@ public void updateAll() {
}

@Test
public void emptyIndexDivide() {
for (var i = -5; i < 5; i++) {
var parts = i;
public void invalidSplitByTargetSize() {
for (var i = -5; i <= 0; i++) {
var targetSize = i;
soft.assertThatIllegalArgumentException()
.isThrownBy(() -> newStoreIndex(OBJ_REF_SERIALIZER).divide(parts))
.withMessageStartingWith("Number of parts ")
.withMessageContaining(
" must be greater than 0 and less or equal to number of elements ");
.isThrownBy(() -> newStoreIndex(OBJ_REF_SERIALIZER).splitByTargetSize(targetSize))
.withMessageStartingWith("Target serialized size ")
.withMessageContaining(" must be greater than 0");
}
}

@Test
public void impossibleDivide() {
var indexTestSet = basicIndexTestSet();
var index = indexTestSet.keyIndex();

soft.assertThatIllegalArgumentException()
.isThrownBy(() -> index.divide(index.asKeyList().size() + 1))
.withMessageStartingWith("Number of parts ")
.withMessageContaining(" must be greater than 0 and less or equal to number of elements ");
}

@ParameterizedTest
@ValueSource(ints = {2, 3, 4, 5, 6})
public void divide(int parts) {
@ValueSource(longs = {32, 48, 64, 96, 128})
public void splitByTargetSize(long targetSize) {
var indexTestSet = basicIndexTestSet();
var index = indexTestSet.keyIndex();

var splits = index.divide(parts);
var splits = index.splitByTargetSize(targetSize);

soft.assertThat(splits.stream().mapToInt(i -> i.asKeyList().size()).sum())
.isEqualTo(index.asKeyList().size());
Expand All @@ -786,6 +776,67 @@ public void divide(int parts) {
.containsExactlyElementsOf(index);
soft.assertThat(splits.getFirst().first()).isEqualTo(index.first());
soft.assertThat(splits.getLast().last()).isEqualTo(index.last());
soft.assertThat(splits).allSatisfy(split -> assertSplitRespectsTargetSize(split, targetSize));
}

@Test
public void splitByTargetSizeIsolatesLargeEntries() {
var index = newStoreIndex(OBJ_TEST_SERIALIZER);
var firstKey = key("a");
var largeKey = key("b");
var lastKey = key("c");
var firstValue = objTestValueFromString("cafe");
var lastValue = objTestValueFromString("babe");
var targetSize = 128L;
var largeValue =
objTestValueOfSize(
smallestValueSizeAboveActualSingleEntrySerializedSize(largeKey, targetSize));

index.put(firstKey, firstValue);
index.put(largeKey, largeValue);
index.put(lastKey, lastValue);

var splits = index.splitByTargetSize(targetSize);

soft.assertThat(splits).hasSize(3);
soft.assertThat(splits.get(0).asKeyList()).containsExactly(firstKey);
soft.assertThat(splits.get(1).asKeyList()).containsExactly(largeKey);
soft.assertThat(splits.get(2).asKeyList()).containsExactly(lastKey);
}

@Test
public void splitByTargetSizeReturnsSameSingletonInstance() {
var key = key("singleton");
var targetSize = 64L;
var value =
objTestValueOfSize(smallestValueSizeAboveActualSingleEntrySerializedSize(key, targetSize));

var modified = newStoreIndex(OBJ_TEST_SERIALIZER);
modified.put(key, value);
soft.assertThat(modified.splitByTargetSize(targetSize)).containsExactly(modified);
soft.assertThat(modified.isModified()).isTrue();

var deserialized = deserializeStoreIndex(modified.serialize(), OBJ_TEST_SERIALIZER);
soft.assertThat(deserialized.isModified()).isFalse();
soft.assertThat(deserialized.splitByTargetSize(targetSize)).containsExactly(deserialized);
soft.assertThat(deserialized.isModified()).isFalse();
}

@ParameterizedTest
@ValueSource(ints = {127, 128, 16_383, 16_384})
public void splitByTargetSizeElementCountVarIntBoundary(int entryCount) {
var index = newStoreIndex(OBJ_REF_SERIALIZER);
for (var i = 0; i < entryCount; i++) {
index.put(key(format("entry-%03d", i)), randomObjId());
}

var splits = index.splitByTargetSize(Long.MAX_VALUE);

soft.assertThat(splits).hasSize(1);
soft.assertThat(splits.getFirst().asKeyList()).hasSize(entryCount);
assertThatCode(() -> splits.getFirst().serialize()).doesNotThrowAnyException();
soft.assertThat(splits.getFirst().estimatedSerializedSize())
.isGreaterThanOrEqualTo(splits.getFirst().serialize().remaining());
}

@Test
Expand All @@ -795,7 +846,27 @@ public void stateRelated() {

soft.assertThat(index.asMutableIndex()).isSameAs(index);
soft.assertThat(index.isMutable()).isTrue();
soft.assertThatCode(() -> index.divide(3)).doesNotThrowAnyException();
soft.assertThatCode(() -> index.splitByTargetSize(3)).doesNotThrowAnyException();
}

private static int smallestValueSizeAboveActualSingleEntrySerializedSize(
IndexKey key, long maxEntrySerializedSize) {
var valueSize = (int) Math.min(maxEntrySerializedSize, Integer.MAX_VALUE);
while (((long) IndexImpl.INDEX_SERIALIZATION_HEADER_SIZE
+ key.serializedSize()
+ OBJ_TEST_SERIALIZER.serializedSize(objTestValueOfSize(valueSize)))
<= maxEntrySerializedSize) {
valueSize++;
}
return valueSize;
}

private static void assertSplitRespectsTargetSize(IndexSpi<?> split, long targetSize) {
var estimatedSize = split.estimatedSerializedSize();
if (estimatedSize <= targetSize) {
return;
}
assertThat(split.asKeyList()).hasSize(1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,15 @@ public void stateRelated() {
}

@ParameterizedTest
@ValueSource(ints = {2, 3, 4, 5, 6})
public void divide(int parts) {
@ValueSource(longs = {32, 48, 64, 96, 128})
public void splitByTargetSize(long targetSize) {
var indexTestSet = basicIndexTestSet();
var base = indexTestSet.keyIndex();
var expectedSplits = base.splitByTargetSize(targetSize);

var index = lazyStoreIndex(() -> base, base.first(), base.last());

var splits = index.divide(parts);
var splits = index.splitByTargetSize(targetSize);

soft.assertThat(splits.stream().mapToInt(i -> i.asKeyList().size()).sum())
.isEqualTo(index.asKeyList().size());
Expand All @@ -158,6 +159,9 @@ public void divide(int parts) {
.containsExactlyElementsOf(index);
soft.assertThat(splits.getFirst().first()).isEqualTo(index.first());
soft.assertThat(splits.getLast().last()).isEqualTo(index.last());
soft.assertThat(splits).hasSize(expectedSplits.size());
soft.assertThat(splits.stream().map(IndexSpi::asKeyList))
.containsExactlyElementsOf(expectedSplits.stream().map(IndexSpi::asKeyList).toList());
}

@Test
Expand Down
Loading