Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,35 +354,67 @@ public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
return List.of(this);
}

var result = new ArrayList<IndexSpi<V>>();
var currentElements = new ArrayList<InternalIndexElement<V>>();
long currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;

for (var element : elements) {
var elementSerializedSize =
addElementDiff(element, element.contentSerializedSize(serializer));
if (!currentElements.isEmpty()
&& currentSerializedSize + elementSerializedSize > targetSerializedSize) {
result.add(newSplitPart(currentElements, currentSerializedSize));
currentElements = new ArrayList<>();
currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
}
final class SplitState {
private final List<IndexSpi<V>> result = new ArrayList<>();
private List<InternalIndexElement<V>> currentElements = new ArrayList<>();
private long currentSerializedBodySize;
private ByteBuffer previousSplitKey;
}

currentElements.add(element);
currentSerializedSize += elementSerializedSize;
var splitState = new SplitState();

walkSerializedElements(
(element, keyBuf, previousMaterializationKey) -> {
var valueSerializedSize = element.contentSerializedSize(serializer);
var entrySerializedSize =
serializedEntrySize(splitState.previousSplitKey, keyBuf, valueSerializedSize);
var candidateSerializedSize =
splitSerializedSize(
splitState.currentElements.size() + 1,
splitState.currentSerializedBodySize + entrySerializedSize);

if (!splitState.currentElements.isEmpty()
&& candidateSerializedSize > targetSerializedSize) {
splitState.result.add(
newSplitPart(
splitState.currentElements,
splitSerializedSize(
splitState.currentElements.size(), splitState.currentSerializedBodySize)));
splitState.currentElements = new ArrayList<>();
splitState.currentSerializedBodySize = 0L;
splitState.previousSplitKey = null;
entrySerializedSize = serializedEntrySize(null, keyBuf, valueSerializedSize);
candidateSerializedSize = splitSerializedSize(1, entrySerializedSize);
}

if (currentElements.size() == 1 && currentSerializedSize > targetSerializedSize) {
result.add(newSplitPart(currentElements, currentSerializedSize));
currentElements = new ArrayList<>();
currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE;
}
}
splitState.currentElements.add(element);
splitState.currentSerializedBodySize += entrySerializedSize;
splitState.previousSplitKey = copyKey(splitState.previousSplitKey, keyBuf);

if (splitState.currentElements.size() == 1
&& candidateSerializedSize > targetSerializedSize) {
splitState.result.add(
newSplitPart(splitState.currentElements, candidateSerializedSize));
splitState.currentElements = new ArrayList<>();
splitState.currentSerializedBodySize = 0L;
splitState.previousSplitKey = null;
}

return copyKey(previousMaterializationKey, keyBuf);
});

if (!currentElements.isEmpty()) {
result.add(newSplitPart(currentElements, currentSerializedSize));
if (splitState.result.isEmpty()) {
return List.of(this);
}
if (!splitState.currentElements.isEmpty()) {
splitState.result.add(
newSplitPart(
splitState.currentElements,
splitSerializedSize(
splitState.currentElements.size(), splitState.currentSerializedBodySize)));
}

return result;
return splitState.result;
}

private IndexSpi<V> newSplitPart(
Expand All @@ -396,6 +428,15 @@ private IndexSpi<V> newSplitPart(
partElements, checkedCast(adjustedEstimatedSerializedSize), serializer, true);
}

private static long splitSerializedSize(int elementCount, long serializedBodySize) {
return 1L + varIntLen(elementCount) + serializedBodySize;
}

private static long serializedEntrySize(
@Nullable ByteBuffer previousKey, ByteBuffer keyBuf, int valueSerializedSize) {
return (long) serializedKeyDelta(previousKey, keyBuf).serializedSize() + valueSerializedSize;
}

@Override
public List<IndexSpi<V>> stripes() {
return List.of(this);
Expand Down Expand Up @@ -628,55 +669,14 @@ public int estimatedSerializedSize() {
// Serialized segment index version
target.put(CURRENT_STORE_INDEX_VERSION);
putVarInt(target, elements.size());
var serializeTarget = target;

ByteBuffer previousKey = null;

var scratchKeyBuffer = acquireScratchKeyBuffer();
try {
boolean onlyLazy;
InternalIndexElement<V> previous = null;
for (var el : elements) {
ByteBuffer keyBuf = null;
if (isLazyElementImpl(el)) {
var lazyEl = (LazyIndexElement<V>) el;
// The purpose of this 'if'-branch is to determine whether it can serialize the
// 'IndexKey' by _not_ fully materializing the `IndexKey`. This is possible if (and
// only if!) the current and the previous element are `LazyStoreIndexElement`s, where
// the previous element is exactly the one that has been deserialized.
//noinspection RedundantIfStatement
if (lazyEl.prefixLen == 0 || lazyEl.previous == previous) {
// Can use the optimized serialization in `LazyStoreIndexElement` if the current
// element has no prefix of if the previously serialized element was also a
// `LazyStoreIndexElement`. In other words, no intermediate `LazyStoreIndexElement`
// has been removed and no new element has been added.
onlyLazy = true;
} else {
// This if-branch detects whether an element has been removed from the index. In that
// case, serialization has to materialize the `IndexKey` for serialization.
onlyLazy = false;
}
if (onlyLazy) {
// Key serialization via 'LazyStoreIndexElement' is much cheaper (CPU and heap) than
// having to first materialize and then serialize it.
keyBuf = lazyEl.serializeKey(scratchKeyBuffer, previousKey);
}
} else {
onlyLazy = false;
}

if (!onlyLazy) {
// Either 'el' is not a 'LazyStoreIndexElement' or the previous element of a
// 'LazyStoreIndexElement' is not suitable (see above).
keyBuf = serializeIndexKeyString(el.key(), scratchKeyBuffer);
}

previousKey = serializeKey(keyBuf, previousKey, target);
el.serializeContent(serializer, target);
previous = el;
}
} finally {
releaseScratchKeyBuffer(scratchKeyBuffer);
}
walkSerializedElements(
(element, keyBuf, previousMaterializationKey) -> {
var previousKey = serializeKey(keyBuf, previousMaterializationKey, serializeTarget);
element.serializeContent(serializer, serializeTarget);
return previousKey;
});

target = target.flip();
} else {
Expand All @@ -690,26 +690,121 @@ private boolean isLazyElementImpl(InternalIndexElement<V> el) {
return el.getClass() == LazyIndexElement.class;
}

private ByteBuffer serializeKey(ByteBuffer keyBuf, ByteBuffer previousKey, ByteBuffer target) {
var keyPos = keyBuf.position();
if (previousKey != null) {
var mismatch = previousKey.mismatch(keyBuf);
checkState(mismatch != -1, "Previous and current keys must not be equal");
var strip = previousKey.remaining() - mismatch;
putVarInt(target, strip);
keyBuf.position(keyPos + mismatch);
@FunctionalInterface
private interface SerializedElementVisitor<V> {
ByteBuffer visit(
InternalIndexElement<V> element,
ByteBuffer keyBuf,
@Nullable ByteBuffer previousMaterializationKey);
}

private void walkSerializedElements(SerializedElementVisitor<V> visitor) {
ByteBuffer previousMaterializationKey = null;

var scratchKeyBuffer = acquireScratchKeyBuffer();
try {
InternalIndexElement<V> previous = null;
for (var el : elements) {
var keyBuf =
materializeSerializedKey(el, previous, previousMaterializationKey, scratchKeyBuffer);
previousMaterializationKey = visitor.visit(el, keyBuf, previousMaterializationKey);
previous = el;
}
} finally {
releaseScratchKeyBuffer(scratchKeyBuffer);
}
}

private ByteBuffer materializeSerializedKey(
InternalIndexElement<V> el,
@Nullable InternalIndexElement<V> previous,
@Nullable ByteBuffer previousKey,
ByteBuffer scratchKeyBuffer) {
boolean onlyLazy;
ByteBuffer keyBuf = null;
if (isLazyElementImpl(el)) {
var lazyEl = (LazyIndexElement<V>) el;
// The purpose of this 'if'-branch is to determine whether it can serialize the
// 'IndexKey' by _not_ fully materializing the `IndexKey`. This is possible if (and
// only if!) the current and the previous element are `LazyStoreIndexElement`s, where
// the previous element is exactly the one that has been deserialized.
//noinspection RedundantIfStatement
if (lazyEl.prefixLen == 0 || lazyEl.previous == previous) {
// Can use the optimized serialization in `LazyStoreIndexElement` if the current
// element has no prefix of if the previously serialized element was also a
// `LazyStoreIndexElement`. In other words, no intermediate `LazyStoreIndexElement`
// has been removed and no new element has been added.
onlyLazy = true;
} else {
// This if-branch detects whether an element has been removed from the index. In that
// case, serialization has to materialize the `IndexKey` for serialization.
onlyLazy = false;
}
if (onlyLazy) {
// Key serialization via 'LazyStoreIndexElement' is much cheaper (CPU and heap) than
// having to first materialize and then serialize it.
keyBuf = lazyEl.serializeKey(scratchKeyBuffer, previousKey);
}
} else {
onlyLazy = false;
}

if (!onlyLazy) {
// Either 'el' is not a 'LazyStoreIndexElement' or the previous element of a
// 'LazyStoreIndexElement' is not suitable (see above).
keyBuf = serializeIndexKeyString(el.key(), scratchKeyBuffer);
}

return keyBuf;
}

private static ByteBuffer serializeKey(
ByteBuffer keyBuf, ByteBuffer previousKey, ByteBuffer target) {
var keyDelta = serializedKeyDelta(previousKey, keyBuf);
if (previousKey == null) {
previousKey = newKeyBuffer();
} else {
putVarInt(target, keyDelta.strip());
keyBuf.position(keyDelta.suffixPosition());
}
target.put(keyBuf);

previousKey.clear();
keyBuf.position(keyPos);
keyBuf.position(keyDelta.keyPosition());
previousKey.put(keyBuf).flip();

return previousKey;
}

private static KeyDelta serializedKeyDelta(@Nullable ByteBuffer previousKey, ByteBuffer keyBuf) {
var keyPosition = keyBuf.position();
if (previousKey == null) {
return new KeyDelta(keyPosition, keyPosition, 0, keyBuf.remaining());
}

var mismatch = previousKey.mismatch(keyBuf);
checkState(mismatch != -1, "Previous and current keys must not be equal");
var strip = previousKey.remaining() - mismatch;
return new KeyDelta(
keyPosition,
keyPosition + mismatch,
strip,
(int) (varIntLen(strip) + (long) keyBuf.remaining() - mismatch));
}

private static ByteBuffer copyKey(@Nullable ByteBuffer previousKey, ByteBuffer keyBuf) {
if (previousKey == null) {
previousKey = newKeyBuffer();
}
previousKey.clear();
var keyPosition = keyBuf.position();
previousKey.put(keyBuf).flip();
keyBuf.position(keyPosition);
return previousKey;
}

private record KeyDelta(int keyPosition, int suffixPosition, int strip, int serializedSize) {}

static <V> IndexSpi<V> deserializeStoreIndex(ByteBuffer serialized, IndexValueSerializer<V> ser) {
return new IndexImpl<>(serialized, ser);
}
Expand Down Expand Up @@ -750,7 +845,7 @@ private IndexImpl(ByteBuffer serialized, IndexValueSerializer<V> ser) {
// It has no predecessor that would be needed to re-construct (aka materialize) the full key.
var elementPredecessor = prefixLen > 0 ? predecessor : null;
var element =
new LazyIndexElement<V>(
new LazyIndexElement<>(
this, elementPredecessor, previous, keyOffset, prefixLen, valueOffset, endOffset);
if (elementPredecessor == null) {
predecessor = element;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,28 @@ public void splitByTargetSizeElementCountVarIntBoundary(int entryCount) {
.isGreaterThanOrEqualTo(splits.getFirst().serialize().remaining());
}

@Test
public void splitByTargetSizeReturnsSameMultiElementInstanceWhenAlreadyWithinTarget() {
var index = basicIndexTestSet().keyIndex();
var targetSize = index.serialize().remaining();

soft.assertThat(index.splitByTargetSize(targetSize)).containsExactly(index);
}

@Test
public void splitByTargetSizeUsesActualSerializedKeySizeForLongCommonPrefixes() {
var index = newStoreIndex(OBJ_REF_SERIALIZER);
var keyPrefix = "catalog/" + "shared-prefix/".repeat(32);
for (var i = 0; i < 64; i++) {
index.put(key(keyPrefix + format("table-%03d", i)), randomObjId());
}

var actualSerializedSize = index.serialize().remaining();

soft.assertThat(index.estimatedSerializedSize()).isGreaterThan(actualSerializedSize);
soft.assertThat(index.splitByTargetSize(actualSerializedSize)).containsExactly(index);
}

@Test
public void stateRelated() {
var indexTestSet = basicIndexTestSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.stream.StreamSupport.stream;
import static org.apache.polaris.persistence.nosql.api.index.IndexKey.key;
import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.OBJ_REF_SERIALIZER;
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.deserializeStoreIndex;
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.indexElement;
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.lazyStoreIndex;
import static org.apache.polaris.persistence.nosql.impl.indexes.IndexesInternal.newStoreIndex;
Expand Down Expand Up @@ -164,6 +165,23 @@ public void splitByTargetSize(long targetSize) {
.containsExactlyElementsOf(expectedSplits.stream().map(IndexSpi::asKeyList).toList());
}

@Test
public void splitByTargetSizeUsesLazyCommonPrefixKeysWithoutSplitting() {
var base = newStoreIndex(OBJ_REF_SERIALIZER);
var keyPrefix = "catalog/" + "shared-prefix/".repeat(32);
for (var i = 0; i < 512; i++) {
base.put(key(keyPrefix + "table-%03d".formatted(i)), randomObjId());
}

var serialized = base.serialize();
var deserialized = deserializeStoreIndex(serialized, OBJ_REF_SERIALIZER);

var lazyIndex = lazyStoreIndex(() -> deserialized, deserialized.first(), deserialized.last());

soft.assertThat(lazyIndex.splitByTargetSize(serialized.remaining()))
.containsExactly(deserialized);
}

@Test
public void firstLastKeyDontLoad() {
var index = newStoreIndex(OBJ_REF_SERIALIZER);
Expand Down
Loading