@@ -354,35 +354,67 @@ public List<IndexSpi<V>> splitByTargetSize(long targetSerializedSize) {
354354 return List .of (this );
355355 }
356356
357- var result = new ArrayList <IndexSpi <V >>();
358- var currentElements = new ArrayList <InternalIndexElement <V >>();
359- long currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE ;
360-
361- for (var element : elements ) {
362- var elementSerializedSize =
363- addElementDiff (element , element .contentSerializedSize (serializer ));
364- if (!currentElements .isEmpty ()
365- && currentSerializedSize + elementSerializedSize > targetSerializedSize ) {
366- result .add (newSplitPart (currentElements , currentSerializedSize ));
367- currentElements = new ArrayList <>();
368- currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE ;
369- }
357+ final class SplitState {
358+ private final List <IndexSpi <V >> result = new ArrayList <>();
359+ private List <InternalIndexElement <V >> currentElements = new ArrayList <>();
360+ private long currentSerializedBodySize ;
361+ private ByteBuffer previousSplitKey ;
362+ }
370363
371- currentElements .add (element );
372- currentSerializedSize += elementSerializedSize ;
364+ var splitState = new SplitState ();
365+
366+ walkSerializedElements (
367+ (element , keyBuf , previousMaterializationKey ) -> {
368+ var valueSerializedSize = element .contentSerializedSize (serializer );
369+ var entrySerializedSize =
370+ serializedEntrySize (splitState .previousSplitKey , keyBuf , valueSerializedSize );
371+ var candidateSerializedSize =
372+ splitSerializedSize (
373+ splitState .currentElements .size () + 1 ,
374+ splitState .currentSerializedBodySize + entrySerializedSize );
375+
376+ if (!splitState .currentElements .isEmpty ()
377+ && candidateSerializedSize > targetSerializedSize ) {
378+ splitState .result .add (
379+ newSplitPart (
380+ splitState .currentElements ,
381+ splitSerializedSize (
382+ splitState .currentElements .size (), splitState .currentSerializedBodySize )));
383+ splitState .currentElements = new ArrayList <>();
384+ splitState .currentSerializedBodySize = 0L ;
385+ splitState .previousSplitKey = null ;
386+ entrySerializedSize = serializedEntrySize (null , keyBuf , valueSerializedSize );
387+ candidateSerializedSize = splitSerializedSize (1 , entrySerializedSize );
388+ }
373389
374- if (currentElements .size () == 1 && currentSerializedSize > targetSerializedSize ) {
375- result .add (newSplitPart (currentElements , currentSerializedSize ));
376- currentElements = new ArrayList <>();
377- currentSerializedSize = INDEX_SERIALIZATION_HEADER_SIZE ;
378- }
379- }
390+ splitState .currentElements .add (element );
391+ splitState .currentSerializedBodySize += entrySerializedSize ;
392+ splitState .previousSplitKey = copyKey (splitState .previousSplitKey , keyBuf );
393+
394+ if (splitState .currentElements .size () == 1
395+ && candidateSerializedSize > targetSerializedSize ) {
396+ splitState .result .add (
397+ newSplitPart (splitState .currentElements , candidateSerializedSize ));
398+ splitState .currentElements = new ArrayList <>();
399+ splitState .currentSerializedBodySize = 0L ;
400+ splitState .previousSplitKey = null ;
401+ }
402+
403+ return copyKey (previousMaterializationKey , keyBuf );
404+ });
380405
381- if (!currentElements .isEmpty ()) {
382- result .add (newSplitPart (currentElements , currentSerializedSize ));
406+ if (splitState .result .isEmpty ()) {
407+ return List .of (this );
408+ }
409+ if (!splitState .currentElements .isEmpty ()) {
410+ splitState .result .add (
411+ newSplitPart (
412+ splitState .currentElements ,
413+ splitSerializedSize (
414+ splitState .currentElements .size (), splitState .currentSerializedBodySize )));
383415 }
384416
385- return result ;
417+ return splitState . result ;
386418 }
387419
388420 private IndexSpi <V > newSplitPart (
@@ -396,6 +428,15 @@ private IndexSpi<V> newSplitPart(
396428 partElements , checkedCast (adjustedEstimatedSerializedSize ), serializer , true );
397429 }
398430
431+ private static long splitSerializedSize (int elementCount , long serializedBodySize ) {
432+ return 1L + varIntLen (elementCount ) + serializedBodySize ;
433+ }
434+
435+ private static long serializedEntrySize (
436+ @ Nullable ByteBuffer previousKey , ByteBuffer keyBuf , int valueSerializedSize ) {
437+ return (long ) serializedKeyDelta (previousKey , keyBuf ).serializedSize () + valueSerializedSize ;
438+ }
439+
399440 @ Override
400441 public List <IndexSpi <V >> stripes () {
401442 return List .of (this );
@@ -628,55 +669,14 @@ public int estimatedSerializedSize() {
628669 // Serialized segment index version
629670 target .put (CURRENT_STORE_INDEX_VERSION );
630671 putVarInt (target , elements .size ());
672+ var serializeTarget = target ;
631673
632- ByteBuffer previousKey = null ;
633-
634- var scratchKeyBuffer = acquireScratchKeyBuffer ();
635- try {
636- boolean onlyLazy ;
637- InternalIndexElement <V > previous = null ;
638- for (var el : elements ) {
639- ByteBuffer keyBuf = null ;
640- if (isLazyElementImpl (el )) {
641- var lazyEl = (LazyIndexElement <V >) el ;
642- // The purpose of this 'if'-branch is to determine whether it can serialize the
643- // 'IndexKey' by _not_ fully materializing the `IndexKey`. This is possible if (and
644- // only if!) the current and the previous element are `LazyStoreIndexElement`s, where
645- // the previous element is exactly the one that has been deserialized.
646- //noinspection RedundantIfStatement
647- if (lazyEl .prefixLen == 0 || lazyEl .previous == previous ) {
648- // Can use the optimized serialization in `LazyStoreIndexElement` if the current
649- // element has no prefix of if the previously serialized element was also a
650- // `LazyStoreIndexElement`. In other words, no intermediate `LazyStoreIndexElement`
651- // has been removed and no new element has been added.
652- onlyLazy = true ;
653- } else {
654- // This if-branch detects whether an element has been removed from the index. In that
655- // case, serialization has to materialize the `IndexKey` for serialization.
656- onlyLazy = false ;
657- }
658- if (onlyLazy ) {
659- // Key serialization via 'LazyStoreIndexElement' is much cheaper (CPU and heap) than
660- // having to first materialize and then serialize it.
661- keyBuf = lazyEl .serializeKey (scratchKeyBuffer , previousKey );
662- }
663- } else {
664- onlyLazy = false ;
665- }
666-
667- if (!onlyLazy ) {
668- // Either 'el' is not a 'LazyStoreIndexElement' or the previous element of a
669- // 'LazyStoreIndexElement' is not suitable (see above).
670- keyBuf = serializeIndexKeyString (el .key (), scratchKeyBuffer );
671- }
672-
673- previousKey = serializeKey (keyBuf , previousKey , target );
674- el .serializeContent (serializer , target );
675- previous = el ;
676- }
677- } finally {
678- releaseScratchKeyBuffer (scratchKeyBuffer );
679- }
674+ walkSerializedElements (
675+ (element , keyBuf , previousMaterializationKey ) -> {
676+ var previousKey = serializeKey (keyBuf , previousMaterializationKey , serializeTarget );
677+ element .serializeContent (serializer , serializeTarget );
678+ return previousKey ;
679+ });
680680
681681 target = target .flip ();
682682 } else {
@@ -690,26 +690,121 @@ private boolean isLazyElementImpl(InternalIndexElement<V> el) {
690690 return el .getClass () == LazyIndexElement .class ;
691691 }
692692
693- private ByteBuffer serializeKey (ByteBuffer keyBuf , ByteBuffer previousKey , ByteBuffer target ) {
694- var keyPos = keyBuf .position ();
695- if (previousKey != null ) {
696- var mismatch = previousKey .mismatch (keyBuf );
697- checkState (mismatch != -1 , "Previous and current keys must not be equal" );
698- var strip = previousKey .remaining () - mismatch ;
699- putVarInt (target , strip );
700- keyBuf .position (keyPos + mismatch );
693+ @ FunctionalInterface
694+ private interface SerializedElementVisitor <V > {
695+ ByteBuffer visit (
696+ InternalIndexElement <V > element ,
697+ ByteBuffer keyBuf ,
698+ @ Nullable ByteBuffer previousMaterializationKey );
699+ }
700+
701+ private void walkSerializedElements (SerializedElementVisitor <V > visitor ) {
702+ ByteBuffer previousMaterializationKey = null ;
703+
704+ var scratchKeyBuffer = acquireScratchKeyBuffer ();
705+ try {
706+ InternalIndexElement <V > previous = null ;
707+ for (var el : elements ) {
708+ var keyBuf =
709+ materializeSerializedKey (el , previous , previousMaterializationKey , scratchKeyBuffer );
710+ previousMaterializationKey = visitor .visit (el , keyBuf , previousMaterializationKey );
711+ previous = el ;
712+ }
713+ } finally {
714+ releaseScratchKeyBuffer (scratchKeyBuffer );
715+ }
716+ }
717+
718+ private ByteBuffer materializeSerializedKey (
719+ InternalIndexElement <V > el ,
720+ @ Nullable InternalIndexElement <V > previous ,
721+ @ Nullable ByteBuffer previousKey ,
722+ ByteBuffer scratchKeyBuffer ) {
723+ boolean onlyLazy ;
724+ ByteBuffer keyBuf = null ;
725+ if (isLazyElementImpl (el )) {
726+ var lazyEl = (LazyIndexElement <V >) el ;
727+ // The purpose of this 'if'-branch is to determine whether it can serialize the
728+ // 'IndexKey' by _not_ fully materializing the `IndexKey`. This is possible if (and
729+ // only if!) the current and the previous element are `LazyStoreIndexElement`s, where
730+ // the previous element is exactly the one that has been deserialized.
731+ //noinspection RedundantIfStatement
732+ if (lazyEl .prefixLen == 0 || lazyEl .previous == previous ) {
733+ // Can use the optimized serialization in `LazyStoreIndexElement` if the current
734+ // element has no prefix of if the previously serialized element was also a
735+ // `LazyStoreIndexElement`. In other words, no intermediate `LazyStoreIndexElement`
736+ // has been removed and no new element has been added.
737+ onlyLazy = true ;
738+ } else {
739+ // This if-branch detects whether an element has been removed from the index. In that
740+ // case, serialization has to materialize the `IndexKey` for serialization.
741+ onlyLazy = false ;
742+ }
743+ if (onlyLazy ) {
744+ // Key serialization via 'LazyStoreIndexElement' is much cheaper (CPU and heap) than
745+ // having to first materialize and then serialize it.
746+ keyBuf = lazyEl .serializeKey (scratchKeyBuffer , previousKey );
747+ }
701748 } else {
749+ onlyLazy = false ;
750+ }
751+
752+ if (!onlyLazy ) {
753+ // Either 'el' is not a 'LazyStoreIndexElement' or the previous element of a
754+ // 'LazyStoreIndexElement' is not suitable (see above).
755+ keyBuf = serializeIndexKeyString (el .key (), scratchKeyBuffer );
756+ }
757+
758+ return keyBuf ;
759+ }
760+
761+ private static ByteBuffer serializeKey (
762+ ByteBuffer keyBuf , ByteBuffer previousKey , ByteBuffer target ) {
763+ var keyDelta = serializedKeyDelta (previousKey , keyBuf );
764+ if (previousKey == null ) {
702765 previousKey = newKeyBuffer ();
766+ } else {
767+ putVarInt (target , keyDelta .strip ());
768+ keyBuf .position (keyDelta .suffixPosition ());
703769 }
704770 target .put (keyBuf );
705771
706772 previousKey .clear ();
707- keyBuf .position (keyPos );
773+ keyBuf .position (keyDelta . keyPosition () );
708774 previousKey .put (keyBuf ).flip ();
709775
710776 return previousKey ;
711777 }
712778
779+ private static KeyDelta serializedKeyDelta (@ Nullable ByteBuffer previousKey , ByteBuffer keyBuf ) {
780+ var keyPosition = keyBuf .position ();
781+ if (previousKey == null ) {
782+ return new KeyDelta (keyPosition , keyPosition , 0 , keyBuf .remaining ());
783+ }
784+
785+ var mismatch = previousKey .mismatch (keyBuf );
786+ checkState (mismatch != -1 , "Previous and current keys must not be equal" );
787+ var strip = previousKey .remaining () - mismatch ;
788+ return new KeyDelta (
789+ keyPosition ,
790+ keyPosition + mismatch ,
791+ strip ,
792+ (int ) (varIntLen (strip ) + (long ) keyBuf .remaining () - mismatch ));
793+ }
794+
795+ private static ByteBuffer copyKey (@ Nullable ByteBuffer previousKey , ByteBuffer keyBuf ) {
796+ if (previousKey == null ) {
797+ previousKey = newKeyBuffer ();
798+ }
799+ previousKey .clear ();
800+ var keyPosition = keyBuf .position ();
801+ previousKey .put (keyBuf ).flip ();
802+ keyBuf .position (keyPosition );
803+ return previousKey ;
804+ }
805+
806+ private record KeyDelta (int keyPosition , int suffixPosition , int strip , int serializedSize ) {}
807+
713808 static <V > IndexSpi <V > deserializeStoreIndex (ByteBuffer serialized , IndexValueSerializer <V > ser ) {
714809 return new IndexImpl <>(serialized , ser );
715810 }
@@ -750,7 +845,7 @@ private IndexImpl(ByteBuffer serialized, IndexValueSerializer<V> ser) {
750845 // It has no predecessor that would be needed to re-construct (aka materialize) the full key.
751846 var elementPredecessor = prefixLen > 0 ? predecessor : null ;
752847 var element =
753- new LazyIndexElement <V >(
848+ new LazyIndexElement <>(
754849 this , elementPredecessor , previous , keyOffset , prefixLen , valueOffset , endOffset );
755850 if (elementPredecessor == null ) {
756851 predecessor = element ;
0 commit comments