Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,10 @@ public synchronized void asyncTerminate(TerminateCallback callback, Object ctx)
}

log.info("Terminating managed ledger");
State previousState = state;
state = State.Terminated;
clearPendingAddEntriesAfterTerminate(previousState,
new ManagedLedgerTerminatedException("Managed ledger was already terminated"));

LedgerHandle lh = currentLedger;
log.debug().attr("ledgerId", lh.getId()).log("Closing current writing ledger");
Expand Down Expand Up @@ -1706,7 +1709,30 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) {

@Override
public synchronized void createComplete(int rc, final LedgerHandle lh, Object ctx) {
if (STATE_UPDATER.get(this) == State.Closed) {
log.debug().attr("rc", rc).attr("ledgerId", lh != null ? lh.getId() : -1).log("createComplete");

// The create callback carries a future used by the timeout checker. Complete it before any terminal-state
// return; otherwise a late callback after terminate/close can leave the timeout task and create-op metric
// unbalanced. A true return means this callback is stale because the future was already completed, and the
// helper has already deleted the late-created ledger if needed.
if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
return;
}

mbean.endDataLedgerCreateOp();
State state = STATE_UPDATER.get(this);
if (state == State.Terminated) {
if (lh != null) {
log.warn().attr("rc", rc)
.attr("ledgerId", lh != null ? lh.getId() : -1)
.attr("state", state)
.log("Ledger create completed after the managed ledger is terminated,"
+ " so close and delete this ledger handle");
closeAndDeleteCreatedLedger(lh);
}
clearPendingAddEntries(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return;
} else if (state == State.Closed) {
if (lh != null) {
log.warn().attr("rc", rc)
.attr("ledgerId", lh != null ? lh.getId() : -1)
Expand All @@ -1716,14 +1742,6 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
}
return;
}

log.debug().attr("rc", rc).attr("ledgerId", lh != null ? lh.getId() : -1).log("createComplete");

if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
return;
}

mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.error().attr("rc", rc).attr("message", BKException.getMessage(rc)).log("Error creating ledger");
ManagedLedgerException status = createManagedLedgerException(rc);
Expand All @@ -1749,8 +1767,11 @@ public void operationComplete(Void v, Stat stat) {
synchronized (ManagedLedgerImpl.this) {
try {
State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
if (state == State.Closed || state.isFenced()) {
log.debug().log("skip ledger update after create complete ledger is closed or fenced");
if (state == State.Closed || state == State.Terminated || state.isFenced()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this branch still leaves one terminate-vs-rollover race unresolved. By the time this callback reaches operationComplete, updateLedgersListAfterRollover has already successfully written metadata that includes newLedger. If terminate() is concurrently writing the terminated metadata, the two store.asyncUpdateLedgerIds(...) calls can still race because asyncTerminate is not serialized with this metadataMutex path.

There are two problematic outcomes: if the rollover metadata update wins first, this branch only closes lh and relies on a later terminate metadata update to remove the new ledger from metadata, but the unused BookKeeper ledger is not deleted; if terminate wins the metadata version race first, this rollover callback can go through operationFailed(BadVersionException) and call handleBadVersion, fencing a ledger that should remain terminated. The TODO here is therefore part of the correctness fix, not just cleanup.

Can we make the in-flight rollover metadata update terminal-state aware as well, e.g. serialize terminate with the same metadata update path, or handle state == Terminated in both the success and BadVersion failure callbacks as a stale rollover completion, while ensuring the unused ledger is removed from metadata and deleted if it was already written?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation. I’ve also identified this remaining
terminate-vs-rollover metadata race.

The ManagedLedger code path is quite complex, so I think it would be clearer
and safer to split the related fixes into smaller PRs instead of carrying all
of them in this one. This PR focuses on the terminated-state transition and
pending-add handling.

After this PR is merged, I’ll submit a follow-up PR to address the metadata
update race, including the stale rollover success path, the BadVersion path,
and cleanup of any unused ledger that may have been written to metadata.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unresolving this comment since there's very useful observations and analysis so far and it's useful to keep the thread visible until we agree on how to address the problems in short term and long term.

I agree that the current ManagedLedgerImpl code path is quite complex. It's hard to reason about the correctness since it's a mixture of locks, synchronization and different executors.

Just wondering if terminate should be implemented in a way where the state is set to "Terminating" which would be used to prevent any new operations starting, but existing inflight operations would first be completed before the ManagedLedgerImpl could be transitioned into "Terminated" state? The transitioning from Terminating to Terminated would be handled after the metadata update has been successfully completed. If there's a crash in the metadata update, the ledger will continue to be operational. The client requesting the termination can retry if it doesn't receive a successful response to the termination request.
WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed review. I agree the terminate-vs-rollover metadata race is important.

For this PR, I would like to keep the scope focused on the immediate terminated-state and pending-add behavior: once termination has taken effect, late callbacks should not move the managed ledger back to a writable state, and pending add callbacks should complete consistently.

The metadata race is a different path and the fix is more involved. A focused follow-up PR would need to handle the terminate metadata update together with the metadataMutex-serialized metadata update path, including:

  • stale rollover metadata success after termination, where we may need metadata cleanup and unused BookKeeper ledger cleanup;
  • stale rollover metadata failure after termination wins, where BadVersionException should not fence a ledger that is already terminating/terminated.

Mixing that into this PR would make the review much harder because it combines state/callback behavior with metadata serialization and ledger cleanup. My preference is to keep this PR focused, then send a separate PR for the metadataMutex / stale rollover metadata path.

I agree that a separate Terminating state would be a cleaner long-term model.

The distinction I have in mind is:

  • Terminating: termination has started on this broker. New writes and new rollovers should be rejected, late callbacks should not move the ledger back to writable, and existing in-flight adds should either drain or fail deterministically.
  • Terminated: the terminate metadata update has succeeded and terminatedPosition is durable.

With this model, Terminating does not need to be the durable state. If the broker crashes before the metadata update succeeds, recovery can treat the ledger as non-terminated and the caller can retry termination. If the metadata update succeeds, recovery observes terminatedPosition and restores Terminated.

This would make the state machine easier to reason about, but it still needs a clear design for how it interacts with rollover callbacks, pending add entries, metadata updates, and BadVersionException handling. It also would not replace the need to fix the metadataMutex race; it mainly gives us a clearer lifecycle boundary so those cases can be handled consistently.

I think this is worth doing as a follow-up design/change.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, I would like to keep the scope focused on the immediate terminated-state and pending-add behavior: once termination has taken effect, late callbacks should not move the managed ledger back to a writable state, and pending add callbacks should complete consistently.

It just feels that this approach isn't correct. The problem itself is real.

I think this is worth doing as a follow-up design/change.

Yes, it most likely makes sense to start with the analysis and design. In Pulsar, it's most natural to document the problem, analysis and design into a PIP before implementation (prototypes and experimentation are obviously recommended and necessary when coming up with the design).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense to me.

I’ll take a deeper look at the overall ManagedLedger termination flow and how these issues should be handled together, including the state transition model, the metadata update/locking path, and stale rollover callbacks.

After that, I’ll try to prepare a proper PIP or design proposal so we can discuss the problem, analysis, and proposed approach more clearly with the community.

log.debug().attr("state", state)
.log("skip ledger update after create complete ledger is not writable");
// TODO: if this path is hit after the new ledger was already written into metadata,
// delete the unused ledger together with removing it from the metadata.
Comment on lines +1773 to +1774
lh.closeAsync().exceptionally(e -> {
if (e != null) {
log.error()
Expand All @@ -1760,6 +1781,10 @@ public void operationComplete(Void v, Stat stat) {
}
return null;
});
if (state == State.Terminated) {
clearPendingAddEntries(new ManagedLedgerTerminatedException(
"Managed ledger was already terminated"));
}
} else {
LedgerHandle originalCurrentLedger = currentLedger;
ledgers.put(lh.getId(), newLedger);
Expand Down Expand Up @@ -1837,6 +1862,23 @@ private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, Le
store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback);
}

private void closeAndDeleteCreatedLedger(LedgerHandle lh) {
long ledgerId = lh.getId();
lh.closeAsync().whenComplete((ignore, closeException) -> {
if (closeException != null) {
log.warn().attr("ledgerId", ledgerId)
.attr("error", closeException.getMessage())
.log("Failed to close late-created ledger before deletion");
}
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES).exceptionally(deleteException -> {
log.warn().attr("ledgerId", ledgerId)
.attr("error", deleteException.getMessage())
.log("Failed to delete late-created ledger");
return null;
});
});
}

@VisibleForTesting
void createNewOpAddEntryForNewLedger() {
// Avoid use same OpAddEntry between different ledger handle
Expand All @@ -1861,6 +1903,14 @@ void createNewOpAddEntryForNewLedger() {
}

protected synchronized void updateLedgersIdsComplete(@Nullable LedgerHandle originalCurrentLedger) {
State state = STATE_UPDATER.get(this);
if (state == State.Terminated) {
log.debug().attr("state", state)
.attr("pendingAddEntries", pendingAddEntries.size())
.log("Skip completing ledger switch because managed ledger is terminated");
clearPendingAddEntries(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return;
}
STATE_UPDATER.set(this, State.LedgerOpened);
// Delete original "currentLedger" if it has been removed from "ledgers".
if (originalCurrentLedger != null && !ledgers.containsKey(originalCurrentLedger.getId())){
Expand Down Expand Up @@ -1996,7 +2046,7 @@ synchronized void createLedgerAfterClosed() {

boolean isNeededCreateNewLedgerAfterCloseLedger() {
final State state = STATE_UPDATER.get(this);
if (state != State.CreatingLedger && state != State.LedgerOpened) {
if (state != State.CreatingLedger && state != State.LedgerOpened && state != State.Terminated) {
return true;
}
return false;
Expand Down Expand Up @@ -2080,6 +2130,41 @@ synchronized void clearPendingAddEntries(ManagedLedgerException e) {
}
}

synchronized boolean failAddIfTerminated(OpAddEntry op) {
if (STATE_UPDATER.get(this) != State.Terminated) {
return false;
}

// This is only for a failed add callback that arrives after terminate has taken ownership of the ledger.
// terminate closes the current ledger at the current BK LAC; any outstanding add drained by
// LedgerHandle.close() is outside the terminated position. If this callback falls through to the normal
// write-failure path, ledgerClosed() will return in Terminated state without completing this add, leaving the
// client callback hanging. Fail it explicitly instead.
pendingAddEntries.remove(op);
op.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return true;
}

synchronized void clearPendingAddEntriesAfterTerminate(State previousState, ManagedLedgerException e) {
// In these states, there is no current-ledger write path left. Every pending op is waiting for a future ledger
// or a replay onto that future ledger, which terminate must not create.
if (previousState == State.CreatingLedger || previousState == State.ClosedLedger
|| previousState == State.WriteFailed) {
clearPendingAddEntries(e);
return;
}

// In LedgerOpened/ClosingLedger, this queue can also contain writes already sent to the current ledger.
// Those writes are decided by the BK callback path: they either complete before close advances LAC, or
// LedgerHandle.close() drains them and OpAddEntry.handleAddFailure completes them as terminated. Only fail
// entries that have not been initiated yet, since they are only waiting for a future ledger.
for (OpAddEntry op : pendingAddEntries) {
if (op.getState() == OpAddEntry.State.OPEN && pendingAddEntries.remove(op)) {
op.failed(e);
}
}
}

void asyncReadEntries(OpReadEntry opReadEntry) {
final State state = STATE_UPDATER.get(this);
if (state.isFenced() || state == State.Closed) {
Expand Down Expand Up @@ -4638,8 +4723,11 @@ public Clock getClock() {
}

/**
* check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
* @return
* Complete the ledger-create future used by the timeout checker.
*
* @return true when this callback is stale because another callback or the timeout task already completed the
* future. In that case the caller must stop processing this callback. If a ledger was created by the stale
* callback, this method schedules it for deletion.
*/
@SuppressWarnings("unchecked")
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ void handleAddFailure(final LedgerHandle lh, Integer rc) {
finalMl.mbean.recordAddEntryError();

finalMl.getExecutor().execute(() -> {
if (finalMl.failAddIfTerminated(this)) {
return;
}
// Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
// from a BK callback.
// If we received a "MetadataVersionException" or a "LedgerFencedException", we should tell the ML that
Expand Down
Loading
Loading