Skip to content

Commit da0d116

Browse files
authored
[improve][ml] Optimize ledger opening by skipping fully acknowledged ledgers (apache#24655)
1 parent f2709bc commit da0d116

6 files changed

Lines changed: 150 additions & 7 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte
912912
// Skip deleted entries.
913913
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
914914
OpReadEntry op =
915-
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
915+
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition, true);
916916
ledger.asyncReadEntries(op);
917917
}
918918

@@ -1072,7 +1072,7 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re
10721072
// Skip deleted entries.
10731073
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
10741074
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
1075-
ctx, maxPosition, skipCondition);
1075+
ctx, maxPosition, skipCondition, true);
10761076
int opReadId = op.id;
10771077
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
10781078
op.recycle();

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2044,6 +2044,19 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
20442044
return;
20452045
}
20462046

2047+
// Optimization: Check if all entries in this ledger have been deleted (acknowledged)
2048+
// If so, skip opening the ledger and move to the next one
2049+
if (opReadEntry.cursor != null && opReadEntry.skipOpenLedgerFullyAcked
2050+
&& isLedgerFullyAcked(ledgerId, ledgerInfo, opReadEntry.cursor)) {
2051+
log.info("[{}] All entries in ledger {} have been acked, skipping ledger opening", name, ledgerId);
2052+
// Move to the next ledger
2053+
Long nextLedgerId = ledgers.ceilingKey(ledgerId + 1);
2054+
opReadEntry.updateReadPosition(
2055+
PositionFactory.create(Objects.requireNonNullElseGet(nextLedgerId, () -> ledgerId + 1), 0));
2056+
opReadEntry.checkReadCompletion();
2057+
return;
2058+
}
2059+
20472060
// Get a ledger handle to read from
20482061
getLedgerHandle(ledgerId).thenAccept(ledger -> internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex
20492062
-> {
@@ -2056,6 +2069,56 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
20562069
}
20572070
}
20582071

2072+
/**
2073+
* Check if all entries in the specified ledger have been acknowledged by the cursor.
2074+
* This optimization helps avoid opening ledgers that have no unacked entries.
2075+
*
2076+
* @param ledgerId the ledger ID to check
2077+
* @param ledgerInfo the ledger information
2078+
* @param cursor the cursor reading from this ledger
2079+
* @return true if all entries in the ledger have been acknowledged, false otherwise
2080+
*/
2081+
private boolean isLedgerFullyAcked(long ledgerId, LedgerInfo ledgerInfo, ManagedCursorImpl cursor) {
2082+
if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
2083+
return true;
2084+
}
2085+
2086+
// Get the cursor's mark delete position
2087+
Position markDeletedPosition = cursor.getMarkDeletedPosition();
2088+
if (markDeletedPosition == null) {
2089+
return false;
2090+
}
2091+
2092+
// If the mark delete position is in a later ledger, then this ledger is fully acknowledged
2093+
if (markDeletedPosition.getLedgerId() > ledgerId) {
2094+
return true;
2095+
}
2096+
2097+
// Check if all entries in this ledger are individually deleted
2098+
if (markDeletedPosition.getLedgerId() <= ledgerId) {
2099+
final long lastEntryInLedger = ledgerInfo.getEntries() - 1;
2100+
Position startPosition = PositionFactory.create(ledgerId, 0);
2101+
if (markDeletedPosition.getLedgerId() == ledgerId) {
2102+
// The mark delete position represents the last acknowledged entry
2103+
// If it points to the last entry in the ledger, then the ledger is fully acknowledged
2104+
if (markDeletedPosition.getEntryId() >= lastEntryInLedger) {
2105+
return true;
2106+
}
2107+
2108+
startPosition = markDeletedPosition;
2109+
}
2110+
2111+
Range<Position> scanRange =
2112+
Range.closed(startPosition, PositionFactory.create(ledgerId, lastEntryInLedger));
2113+
long unackMessages = cursor.getNumberOfEntries(scanRange);
2114+
// All entries are individually deleted
2115+
return unackMessages == 0;
2116+
}
2117+
2118+
// If mark delete position is in an earlier ledger, this ledger is not consumed
2119+
return false;
2120+
}
2121+
20592122
public CompletableFuture<LedgerMetadata> getLedgerMetadata(long ledgerId) {
20602123
LedgerHandle currentLedger = this.currentLedger;
20612124
if (currentLedger != null && ledgerId == currentLedger.getId()) {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ class OpReadEntry implements ReadEntriesCallback {
5555
Position maxPosition;
5656

5757
Predicate<Position> skipCondition;
58+
boolean skipOpenLedgerFullyAcked = false;
5859

5960
public static OpReadEntry create(ManagedCursorImpl cursor, Position readPositionRef, int count,
60-
ReadEntriesCallback callback, Object ctx, Position maxPosition, Predicate<Position> skipCondition) {
61+
ReadEntriesCallback callback, Object ctx, Position maxPosition,
62+
Predicate<Position> skipCondition,
63+
boolean skipOpenLedgerFullyAcked) {
6164
OpReadEntry op = RECYCLER.get();
6265
op.id = opReadIdGenerator.getAndIncrement();
6366
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
@@ -70,6 +73,7 @@ public static OpReadEntry create(ManagedCursorImpl cursor, Position readPosition
7073
}
7174
op.maxPosition = maxPosition;
7275
op.skipCondition = skipCondition;
76+
op.skipOpenLedgerFullyAcked = skipOpenLedgerFullyAcked;
7377
op.ctx = ctx;
7478
op.nextReadPosition = PositionFactory.create(op.readPosition);
7579
return op;
@@ -247,6 +251,7 @@ public void recycle() {
247251
nextReadPosition = null;
248252
maxPosition = null;
249253
skipCondition = null;
254+
skipOpenLedgerFullyAcked = false;
250255
recyclerHandle.recycle(this);
251256
}
252257

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void find() {
128128
}
129129
if (cursor.hasMoreEntries(searchPosition)) {
130130
OpReadEntry opReadEntry = OpReadEntry.create(cursor, searchPosition, batchSize,
131-
this, OpScan.this.ctx, null, null);
131+
this, OpScan.this.ctx, null, null, false);
132132
ledger.asyncReadEntries(opReadEntry);
133133
} else {
134134
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020

2121
import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize;
2222
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
23+
import static org.mockito.ArgumentMatchers.anyBoolean;
2324
import static org.mockito.ArgumentMatchers.anyInt;
2425
import static org.mockito.Mockito.any;
2526
import static org.mockito.Mockito.doAnswer;
2627
import static org.mockito.Mockito.eq;
2728
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.never;
30+
import static org.mockito.Mockito.spy;
31+
import static org.mockito.Mockito.verify;
2832
import static org.mockito.Mockito.when;
2933
import static org.testng.Assert.assertEquals;
3034
import static org.testng.Assert.assertFalse;
@@ -4391,7 +4395,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
43914395

43924396
// op readPosition is bigger than maxReadPosition
43934397
OpReadEntry opReadEntry = OpReadEntry.create(cursor, ledger.lastConfirmedEntry, 10, callback,
4394-
null, PositionFactory.create(lastPosition.getLedgerId(), -1), null);
4398+
null, PositionFactory.create(lastPosition.getLedgerId(), -1), null, true);
43954399
Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
43964400
field.setAccessible(true);
43974401
field.set(cursor, PositionFactory.EARLIEST);
@@ -4414,7 +4418,7 @@ public void testOpReadEntryRecycle() throws Exception {
44144418

44154419
@Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = Mockito.mockStatic(OpReadEntry.class);
44164420
mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(),
4417-
any(), any(), any())).thenAnswer(__ -> createOpReadEntry.get());
4421+
any(), any(), any(), anyBoolean())).thenAnswer(__ -> createOpReadEntry.get());
44184422

44194423
final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
44204424
ledgerConfig.setNewEntriesCheckDelayInMillis(10);
@@ -5468,6 +5472,77 @@ public void close() {
54685472
}).toList(), IntStream.range(0, 10).mapToObj(i -> "msg-" + i).toList());
54695473
}
54705474

5475+
@Test
5476+
public void testSkipOpenLedgerFullyAcked() throws Exception {
5477+
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
5478+
managedLedgerConfig.setMaxEntriesPerLedger(10);
5479+
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
5480+
ManagedLedger ledger = factory.open("testSkipOpenLedgerFullyAcked", managedLedgerConfig);
5481+
ManagedCursor cursor = ledger.openCursor("cursor");
5482+
5483+
List<Position> positions = new ArrayList<>();
5484+
for (int i = 0; i < 10; i++) {
5485+
Position pos = ledger.addEntry(("entry-" + i).getBytes());
5486+
positions.add(pos);
5487+
}
5488+
5489+
((ManagedLedgerImpl) ledger).rollCurrentLedgerIfFull();
5490+
5491+
for (int i = 10; i < 20; i++) {
5492+
Position pos = ledger.addEntry(("entry-" + i).getBytes());
5493+
positions.add(pos);
5494+
}
5495+
5496+
ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
5497+
ManagedCursorImpl cursorImpl = (ManagedCursorImpl) cursor;
5498+
5499+
for (int i = 0; i < 5; i++) {
5500+
cursor.markDelete(positions.get(i));
5501+
}
5502+
5503+
for (int i = 5; i < 10; i++) {
5504+
cursor.delete(positions.get(i));
5505+
}
5506+
5507+
long firstLedgerId = positions.get(0).getLedgerId();
5508+
long secondLedgerId = positions.get(10).getLedgerId();
5509+
log.info("First ledger id is {}, Second ledger id is {}", firstLedgerId, secondLedgerId);
5510+
5511+
ManagedLedgerImpl spyLedger = spy(ledgerImpl);
5512+
5513+
Position readPosition = PositionFactory.create(firstLedgerId, 0);
5514+
CountDownLatch readLatch = new CountDownLatch(1);
5515+
5516+
ReadEntriesCallback callback = new ReadEntriesCallback() {
5517+
@Override
5518+
public void readEntriesComplete(List<Entry> entries, Object ctx) {
5519+
try {
5520+
if (!entries.isEmpty()) {
5521+
entries.forEach(Entry::release);
5522+
}
5523+
} finally {
5524+
readLatch.countDown();
5525+
}
5526+
}
5527+
5528+
@Override
5529+
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
5530+
log.error("Read failed", exception);
5531+
readLatch.countDown();
5532+
}
5533+
};
5534+
5535+
OpReadEntry opReadEntry = OpReadEntry.create(cursorImpl, readPosition, 5, callback, null, null, null, true);
5536+
5537+
spyLedger.asyncReadEntries(opReadEntry);
5538+
5539+
assertTrue(readLatch.await(10, TimeUnit.SECONDS));
5540+
5541+
verify(spyLedger, never()).getLedgerHandle(firstLedgerId);
5542+
5543+
ledger.close();
5544+
}
5545+
54715546
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
54725547
Map<Long, Integer> ledgerErrors = new HashMap<>();
54735548

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
692692
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
693693

694694
}
695-
}, null, maxPosition, null);
695+
}, null, maxPosition, null, false);
696696
Assert.assertEquals(opReadEntry.readPosition, position);
697697
}
698698

0 commit comments

Comments
 (0)