1919package org .apache .pulsar .broker .delayed .bucket ;
2020
2121import static com .google .common .base .Preconditions .checkArgument ;
22- import com .google .protobuf .ByteString ;
22+ import com .google .protobuf .UnsafeByteOperations ;
2323import java .nio .ByteBuffer ;
2424import java .util .ArrayList ;
2525import java .util .HashMap ;
@@ -74,6 +74,8 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
7474
7575 List <SnapshotSegment > bucketSnapshotSegments = new ArrayList <>();
7676 List <SnapshotSegmentMetadata > segmentMetadataList = new ArrayList <>();
77+ Map <Long , RoaringBitmap > immutableBucketBitMap = new HashMap <>();
78+
7779 Map <Long , RoaringBitmap > bitMap = new HashMap <>();
7880 SnapshotSegment snapshotSegment = new SnapshotSegment ();
7981 SnapshotSegmentMetadata .Builder segmentMetadataBuilder = SnapshotSegmentMetadata .newBuilder ();
@@ -82,18 +84,20 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
8284 long currentTimestampUpperLimit = 0 ;
8385 long currentFirstTimestamp = 0L ;
8486 while (!delayedIndexQueue .isEmpty ()) {
85- DelayedIndex delayedIndex = snapshotSegment .addIndexe ();
86- delayedIndexQueue .popToObject (delayedIndex );
87-
88- long timestamp = delayedIndex .getTimestamp ();
87+ final long timestamp = delayedIndexQueue .peekTimestamp ();
8988 if (currentTimestampUpperLimit == 0 ) {
9089 currentFirstTimestamp = timestamp ;
9190 firstScheduleTimestamps .add (currentFirstTimestamp );
9291 currentTimestampUpperLimit = timestamp + timeStepPerBucketSnapshotSegment - 1 ;
9392 }
9493
95- long ledgerId = delayedIndex .getLedgerId ();
96- long entryId = delayedIndex .getEntryId ();
94+ DelayedIndex delayedIndex = snapshotSegment .addIndexe ();
95+ delayedIndexQueue .popToObject (delayedIndex );
96+
97+ final long ledgerId = delayedIndex .getLedgerId ();
98+ final long entryId = delayedIndex .getEntryId ();
99+
100+ removeIndexBit (ledgerId , entryId );
97101
98102 checkArgument (ledgerId >= startLedgerId && ledgerId <= endLedgerId );
99103
@@ -102,10 +106,10 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
102106 sharedQueue .add (timestamp , ledgerId , entryId );
103107 }
104108
105- numMessages ++;
106-
107109 bitMap .computeIfAbsent (ledgerId , k -> new RoaringBitmap ()).add (entryId , entryId + 1 );
108110
111+ numMessages ++;
112+
109113 if (delayedIndexQueue .isEmpty () || delayedIndexQueue .peekTimestamp () > currentTimestampUpperLimit
110114 || (maxIndexesPerBucketSnapshotSegment != -1
111115 && snapshotSegment .getIndexesCount () >= maxIndexesPerBucketSnapshotSegment )) {
@@ -119,9 +123,17 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
119123 final var lId = entry .getKey ();
120124 final var bm = entry .getValue ();
121125 bm .runOptimize ();
122- final var array = new byte [bm .serializedSizeInBytes ()];
123- bm .serialize (ByteBuffer .wrap (array ));
124- segmentMetadataBuilder .putDelayedIndexBitMap (lId , ByteString .copyFrom (array ));
126+ ByteBuffer byteBuffer = ByteBuffer .allocate (bm .serializedSizeInBytes ());
127+ bm .serialize (byteBuffer );
128+ byteBuffer .flip ();
129+ segmentMetadataBuilder .putDelayedIndexBitMap (lId , UnsafeByteOperations .unsafeWrap (byteBuffer ));
130+ immutableBucketBitMap .compute (lId , (__ , bm0 ) -> {
131+ if (bm0 == null ) {
132+ return bm ;
133+ }
134+ bm0 .or (bm );
135+ return bm0 ;
136+ });
125137 iterator .remove ();
126138 }
127139
@@ -133,6 +145,10 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
133145 }
134146 }
135147
148+ // optimize bm
149+ immutableBucketBitMap .values ().forEach (RoaringBitmap ::runOptimize );
150+ this .delayedIndexBitMap .values ().forEach (RoaringBitmap ::runOptimize );
151+
136152 SnapshotMetadata bucketSnapshotMetadata = SnapshotMetadata .newBuilder ()
137153 .addAllMetadataList (segmentMetadataList )
138154 .build ();
@@ -145,6 +161,7 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
145161 bucket .setNumberBucketDelayedMessages (numMessages );
146162 bucket .setLastSegmentEntryId (lastSegmentEntryId );
147163 bucket .setFirstScheduleTimestamps (firstScheduleTimestamps );
164+ bucket .setDelayedIndexBitMap (immutableBucketBitMap );
148165
149166 // Skip first segment, because it has already been loaded
150167 List <SnapshotSegment > snapshotSegments = bucketSnapshotSegments .subList (1 , bucketSnapshotSegments .size ());
0 commit comments