Skip to content

Commit d85f9e9

Browse files
oneby-wangsrinath-ctds
authored andcommitted
[fix][broker] Fix markDeletedPosition race condition in ManagedLedgerImpl.maybeUpdateCursorBeforeTrimmingConsumedLedger() method (apache#25110)
(cherry picked from commit 1617bb2) (cherry picked from commit 252050c)
1 parent 9c47464 commit d85f9e9

6 files changed

Lines changed: 285 additions & 72 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,14 +1249,17 @@ public boolean hasMoreEntries() {
12491249

12501250
@Override
12511251
public long getNumberOfEntries() {
1252-
if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) {
1252+
Position readPos = readPosition;
1253+
Position lastPosition = ledger.getLastPosition();
1254+
Position nextPosition = lastPosition.getNext();
1255+
if (readPos.compareTo(nextPosition) > 0) {
12531256
if (log.isDebugEnabled()) {
12541257
log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read",
1255-
ledger.getName(), name, readPosition, ledger.getLastPosition());
1258+
ledger.getName(), name, readPos, lastPosition);
12561259
}
12571260
return 0;
12581261
} else {
1259-
return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getNext()));
1262+
return getNumberOfEntries(Range.closedOpen(readPos, nextPosition));
12601263
}
12611264
}
12621265

@@ -2250,25 +2253,27 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
22502253
}
22512254

22522255
Position newPosition = ackBatchPosition(position);
2253-
if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
2256+
Position markDeletePos = markDeletePosition;
2257+
Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
2258+
if (lastConfirmedEntry.compareTo(newPosition) < 0) {
22542259
boolean shouldCursorMoveForward = false;
22552260
try {
2256-
long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
2257-
Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
2261+
long ledgerEntries = ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
2262+
Long nextValidLedger = ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId());
22582263
shouldCursorMoveForward = nextValidLedger != null
2259-
&& (markDeletePosition.getEntryId() + 1 >= ledgerEntries)
2264+
&& (markDeletePos.getEntryId() + 1 >= ledgerEntries)
22602265
&& (newPosition.getLedgerId() == nextValidLedger);
22612266
} catch (Exception e) {
22622267
log.warn("Failed to get ledger entries while setting mark-delete-position", e);
22632268
}
22642269

22652270
if (shouldCursorMoveForward) {
22662271
log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed",
2267-
ledger.getName(), markDeletePosition, newPosition);
2272+
ledger.getName(), markDeletePos, newPosition);
22682273
} else {
22692274
if (log.isDebugEnabled()) {
22702275
log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
2271-
+ " for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
2276+
+ " for cursor [{}]", ledger.getName(), position, lastConfirmedEntry, name);
22722277
}
22732278
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
22742279
return;
@@ -2324,11 +2329,15 @@ protected void internalAsyncMarkDelete(final Position newPosition, Map<String, L
23242329
final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) {
23252330
ledger.mbean.addMarkDeleteOp();
23262331

2327-
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx,
2328-
alignAcknowledgeStatusAfterPersisted);
2329-
23302332
// We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available
23312333
synchronized (pendingMarkDeleteOps) {
2334+
// use given properties or when missing, use the properties from the previous field value
2335+
MarkDeleteEntry last = pendingMarkDeleteOps.peekLast();
2336+
Map<String, Long> propertiesToUse =
2337+
properties != null ? properties : (last != null ? last.properties : getProperties());
2338+
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx,
2339+
alignAcknowledgeStatusAfterPersisted);
2340+
23322341
// The state might have changed while we were waiting on the queue mutex
23332342
switch (state) {
23342343
case Closed:
@@ -2696,17 +2705,20 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
26962705
// update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition
26972706
private void updateLastMarkDeleteEntryToLatest(final Position newPosition,
26982707
final Map<String, Long> properties) {
2699-
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
2700-
if (last != null && last.newPosition.compareTo(newPosition) > 0) {
2701-
// keep current value, don't update
2702-
return last;
2703-
} else {
2704-
// use given properties or when missing, use the properties from the previous field value
2705-
Map<String, Long> propertiesToUse =
2706-
properties != null ? properties : (last != null ? last.properties : Collections.emptyMap());
2707-
return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
2708-
}
2709-
});
2708+
synchronized (pendingMarkDeleteOps) {
2709+
// use given properties or when missing, use the properties from the previous field value
2710+
MarkDeleteEntry lastPending = pendingMarkDeleteOps.peekLast();
2711+
Map<String, Long> propertiesToUse =
2712+
properties != null ? properties : (lastPending != null ? lastPending.properties : getProperties());
2713+
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
2714+
if (last != null && last.newPosition.compareTo(newPosition) > 0) {
2715+
// keep current value, don't update
2716+
return last;
2717+
} else {
2718+
return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
2719+
}
2720+
});
2721+
}
27102722
}
27112723

27122724
/**

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -424,14 +424,15 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
424424
public void initializeComplete() {
425425
log.info("[{}] Successfully initialize managed ledger", name);
426426
pendingInitializeLedgers.remove(name, pendingLedger);
427-
future.complete(newledger);
428-
429-
// May need to update the cursor position
430-
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
431-
// May need to trigger offloading
432-
if (config.isTriggerOffloadOnTopicLoad()) {
433-
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
434-
}
427+
// May need to update the cursor position and wait them finished
428+
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> {
429+
// ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger
430+
future.complete(newledger);
431+
// May need to trigger offloading
432+
if (config.isTriggerOffloadOnTopicLoad()) {
433+
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
434+
}
435+
});
435436
}
436437

437438
@Override

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,11 +1652,10 @@ public void operationComplete(Void v, Stat stat) {
16521652
updateLedgersIdsComplete(originalCurrentLedger);
16531653
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
16541654
- lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
1655+
// May need to update the cursor position
1656+
maybeUpdateCursorBeforeTrimmingConsumedLedger();
16551657
}
16561658
metadataMutex.unlock();
1657-
1658-
// May need to update the cursor position
1659-
maybeUpdateCursorBeforeTrimmingConsumedLedger();
16601659
}
16611660

16621661
@Override
@@ -2584,18 +2583,23 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
25842583
this.waitingEntryCallBacks.add(cb);
25852584
}
25862585

2587-
public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
2586+
public CompletableFuture<Void> maybeUpdateCursorBeforeTrimmingConsumedLedger() {
2587+
List<CompletableFuture<Void>> cursorMarkDeleteFutures = new ArrayList<>();
25882588
for (ManagedCursor cursor : cursors) {
2589-
Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null
2590-
? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition();
2591-
LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
2589+
CompletableFuture<Void> future = new CompletableFuture<>();
2590+
cursorMarkDeleteFutures.add(future);
2591+
2592+
// Snapshot positions into a local variables to avoid race condition.
2593+
Position markDeletedPosition = cursor.getMarkDeletedPosition();
2594+
Position lastAckedPosition = markDeletedPosition;
2595+
LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
25922596
LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
25932597
.map(Map.Entry::getValue).orElse(null);
25942598

2595-
if (currPointedLedger != null) {
2599+
if (curPointedLedger != null) {
25962600
if (nextPointedLedger != null) {
25972601
if (lastAckedPosition.getEntryId() != -1
2598-
&& lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
2602+
&& lastAckedPosition.getEntryId() + 1 >= curPointedLedger.getEntries()) {
25992603
lastAckedPosition = PositionFactory.create(nextPointedLedger.getLedgerId(), -1);
26002604
}
26012605
} else {
@@ -2605,25 +2609,37 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
26052609
log.warn("Cursor: {} does not exist in the managed-ledger.", cursor);
26062610
}
26072611

2608-
if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
2612+
int compareResult = lastAckedPosition.compareTo(markDeletedPosition);
2613+
if (compareResult > 0) {
26092614
Position finalPosition = lastAckedPosition;
2610-
log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
2611-
cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(),
2612-
new MarkDeleteCallback() {
2613-
@Override
2614-
public void markDeleteComplete(Object ctx) {
2615-
log.info("Successfully persisted cursor position for cursor:{} to {}",
2616-
cursor, finalPosition);
2617-
}
2615+
log.info("Mark deleting cursor:{} from {} to {} since ledger consumed completely.", cursor,
2616+
markDeletedPosition, lastAckedPosition);
2617+
cursor.asyncMarkDelete(lastAckedPosition, null, new MarkDeleteCallback() {
2618+
@Override
2619+
public void markDeleteComplete(Object ctx) {
2620+
log.info("Successfully persisted cursor position for cursor:{} to {}", cursor, finalPosition);
2621+
future.complete(null);
2622+
}
26182623

2619-
@Override
2620-
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
2621-
log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
2622-
cursor, cursor.getMarkDeletedPosition(), finalPosition, exception);
2623-
}
2624-
}, null);
2624+
@Override
2625+
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
2626+
log.warn("Failed to mark delete: {} from {} to {}. ", cursor, cursor.getMarkDeletedPosition(),
2627+
finalPosition, exception);
2628+
future.completeExceptionally(exception);
2629+
}
2630+
}, null);
2631+
} else if (compareResult == 0) {
2632+
log.debug("No need to reset cursor: {}, last acked position equals to current mark-delete position {}.",
2633+
cursor, markDeletedPosition);
2634+
future.complete(null);
2635+
} else {
2636+
// Should not happen
2637+
log.warn("Ledger rollover tries to mark delete an already mark-deleted position. Current mark-delete:"
2638+
+ " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition);
2639+
future.complete(null);
26252640
}
26262641
}
2642+
return FutureUtil.waitForAll(cursorMarkDeleteFutures);
26272643
}
26282644

26292645
private void trimConsumedLedgersInBackground() {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,13 @@ void recover(final VoidCallback callback) {
102102
protected void internalAsyncMarkDelete(final Position newPosition, Map<String, Long> properties,
103103
final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) {
104104
// Bypass persistence of mark-delete position and individually deleted messages info
105-
106-
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx,
107-
alignAcknowledgeStatusAfterPersisted);
105+
MarkDeleteEntry mdEntry;
108106
lock.writeLock().lock();
109107
try {
108+
// use given properties or when missing, use the properties from the previous field value
109+
Map<String, Long> propertiesToUse = properties != null ? properties : getProperties();
110+
mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx,
111+
alignAcknowledgeStatusAfterPersisted);
110112
lastMarkDeleteEntry = mdEntry;
111113
mdEntry.alignAcknowledgeStatus();
112114
} finally {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,8 @@ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
405405
ml.close();
406406
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
407407
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
408-
assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
408+
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
409+
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThan(lastEntry);
409410

410411
// cleanup.
411412
ml.delete();
@@ -496,12 +497,18 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception {
496497
assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId());
497498
assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
498499
assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
500+
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
501+
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
499502

500503
// Verify the mark delete position can be recovered properly.
501504
ml.close();
502505
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
503506
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
504-
assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
507+
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
508+
// If previous ledger is trimmed, Cursor: ManagedCursorImpl{ledger=ml_test, name=c1, ackPos=12:0, readPos=15:0}
509+
// does not exist in the managed-ledger. Recovered cursor's position will not be moved forward.
510+
// TODO should be handled in ledger trim process.
511+
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
505512

506513
// cleanup.
507514
ml.delete();
@@ -4431,7 +4438,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
44314438
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
44324439
ManagedCursor c2 = ledger2.openCursor("c");
44334440

4434-
assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
4441+
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
44354442
});
44364443
}
44374444

@@ -4490,7 +4497,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
44904497
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterIndDelInactivity", config);
44914498
ManagedCursor c2 = ledger2.openCursor("c");
44924499

4493-
assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
4500+
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
44944501
});
44954502
}
44964503

@@ -4542,7 +4549,7 @@ public void testFlushCursorAfterError() throws Exception {
45424549
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
45434550
ManagedCursor c2 = ledger2.openCursor("c");
45444551

4545-
assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
4552+
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
45464553
});
45474554
}
45484555

@@ -4805,7 +4812,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
48054812
}
48064813

48074814
@Test
4808-
public void testLazyCursorLedgerCreation() throws Exception {
4815+
public void testEagerCursorLedgerCreation() throws Exception {
48094816
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
48104817
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
48114818
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
@@ -4830,8 +4837,8 @@ public void testLazyCursorLedgerCreation() throws Exception {
48304837
ledger = (ManagedLedgerImpl) factory
48314838
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
48324839
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test");
4833-
assertEquals(cursor1.getState(), "NoLedger");
4834-
assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition);
4840+
assertEquals(cursor1.getState(), "Open");
4841+
assertThat(cursor1.getMarkDeletedPosition()).isGreaterThan(finalLastPosition);
48354842

48364843
// Verify the recovered cursor can work with new mark delete.
48374844
lastPosition = null;

0 commit comments

Comments
 (0)