@@ -2257,37 +2257,51 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
22572257 log .debug ("[{}] Mark delete cursor {} up to position: {}" , ledger .getName (), name , position );
22582258 }
22592259
2260+ // Snapshot all positions into local variables to avoid race condition.
22602261 Position newPosition = ackBatchPosition (position );
2262+ Position moveForwardPosition = newPosition ;
22612263 Position markDeletePos = markDeletePosition ;
22622264 Position lastConfirmedEntry = ledger .getLastConfirmedEntry ();
2263- if (lastConfirmedEntry .compareTo (newPosition ) < 0 ) {
2264- boolean shouldCursorMoveForward = false ;
2265- try {
2266- long ledgerEntries = ledger .getLedgerInfo (markDeletePos .getLedgerId ()).get ().getEntries ();
2265+ boolean shouldCursorMoveForward = false ;
2266+ try {
2267+ if (lastConfirmedEntry .getLedgerId () >= newPosition .getLedgerId ()) {
2268+ LedgerInfo curMarkDeleteLedgerInfo = ledger .getLedgerInfo (newPosition .getLedgerId ()).get ();
2269+ Long nextValidLedger = ledger .getNextValidLedger (newPosition .getLedgerId ());
2270+ shouldCursorMoveForward = (nextValidLedger != null )
2271+ && (curMarkDeleteLedgerInfo != null
2272+ && newPosition .getEntryId () + 1 >= curMarkDeleteLedgerInfo .getEntries ());
2273+ if (shouldCursorMoveForward ) {
2274+ moveForwardPosition = PositionFactory .create (nextValidLedger , -1 );
2275+ }
2276+ } else {
22672277 Long nextValidLedger = ledger .getNextValidLedger (lastConfirmedEntry .getLedgerId ());
2268- shouldCursorMoveForward = nextValidLedger != null
2269- && (markDeletePos .getEntryId () + 1 >= ledgerEntries )
2270- && (newPosition .getLedgerId () == nextValidLedger );
2271- } catch (Exception e ) {
2272- log .warn ("Failed to get ledger entries while setting mark-delete-position" , e );
2278+ shouldCursorMoveForward = (nextValidLedger != null )
2279+ && (newPosition .getLedgerId () == nextValidLedger )
2280+ && (newPosition .getEntryId () == -1 );
22732281 }
2282+ } catch (Exception e ) {
2283+ log .warn ("Failed to get ledger entries while setting mark-delete-position" , e );
2284+ }
22742285
2275- if (shouldCursorMoveForward ) {
2276- log .info ("[{}] move mark-delete-position from {} to {} since all the entries have been consumed" ,
2277- ledger .getName (), markDeletePos , newPosition );
2278- } else {
2279- if (log .isDebugEnabled ()) {
2280- log .debug ("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
2281- + " for cursor [{}]" , ledger .getName (), position , lastConfirmedEntry , name );
2282- }
2283- callback .markDeleteFailed (new ManagedLedgerException ("Invalid mark deleted position" ), ctx );
2284- return ;
2286+ if (shouldCursorMoveForward ) {
2287+ log .info ("[{}] move cursor {} mark-delete-position from {} to {} since all the entries have been consumed,"
2288+ + " last-confirmed-entry: {}, attempt position: {}, ack-batch position: {}" , ledger .getName (),
2289+ name , markDeletePos , moveForwardPosition , lastConfirmedEntry , position , newPosition );
2290+ } else if (lastConfirmedEntry .compareTo (newPosition ) < 0 ) {
2291+ // If newPosition>lastConfirmedEntry and current ledger entries are not fully consumed,
2292+ // in which case the newPosition is an invalid mark delete position.
2293+ if (log .isDebugEnabled ()) {
2294+ log .debug ("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
2295+ + " for cursor [{}], attempt position: {}" , ledger .getName (), newPosition , lastConfirmedEntry ,
2296+ name , position );
22852297 }
2298+ callback .markDeleteFailed (new ManagedLedgerException ("Invalid mark deleted position" ), ctx );
2299+ return ;
22862300 }
22872301
22882302 lock .writeLock ().lock ();
22892303 try {
2290- newPosition = setAcknowledgedPosition (newPosition );
2304+ moveForwardPosition = setAcknowledgedPosition (moveForwardPosition );
22912305 } catch (IllegalArgumentException e ) {
22922306 callback .markDeleteFailed (getManagedLedgerException (e ), ctx );
22932307 return ;
@@ -2298,11 +2312,11 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
22982312 // Apply rate limiting to mark-delete operations
22992313 if (markDeleteLimiter != null && !markDeleteLimiter .tryAcquire ()) {
23002314 isDirty = true ;
2301- updateLastMarkDeleteEntryToLatest (newPosition , properties );
2315+ updateLastMarkDeleteEntryToLatest (moveForwardPosition , properties );
23022316 callback .markDeleteComplete (ctx );
23032317 return ;
23042318 }
2305- internalAsyncMarkDelete (newPosition , properties , callback , ctx , null );
2319+ internalAsyncMarkDelete (moveForwardPosition , properties , callback , ctx , null );
23062320 }
23072321
23082322 private Position ackBatchPosition (Position position ) {
0 commit comments