Skip to content

Commit 579d807

Browse files
oneby-wanglhotari
authored andcommitted
[fix][broker] Fix compaction cursor reset may lose mark-delete properties (#25862)
(cherry picked from commit 9b15504)
1 parent b3c7f39 commit 579d807

2 files changed

Lines changed: 85 additions & 1 deletion

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1679,7 +1679,7 @@ public void operationFailed(ManagedLedgerException exception) {
16791679

16801680
persistentMarkDeletePosition = null;
16811681
inProgressMarkDeletePersistPosition = null;
1682-
internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
1682+
internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? null : Collections.emptyMap(),
16831683
new MarkDeleteCallback() {
16841684
@Override
16851685
public void markDeleteComplete(Object ctx) {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
2424
import static org.mockito.ArgumentMatchers.anyBoolean;
2525
import static org.mockito.ArgumentMatchers.anyInt;
26+
import static org.mockito.ArgumentMatchers.nullable;
2627
import static org.mockito.Mockito.any;
2728
import static org.mockito.Mockito.doAnswer;
2829
import static org.mockito.Mockito.eq;
@@ -6026,6 +6027,89 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
60266027
assertEquals(properties.get(propertyKey), lastIndex - 1);
60276028
}
60286029

6030+
@Test
6031+
@SuppressWarnings("unchecked")
6032+
public void testCompactionCursorResetNeverLoseMarkDeleteProperties() throws Exception {
6033+
@Cleanup
6034+
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(
6035+
"testCompactionCursorResetNeverLoseMarkDeleteProperties",
6036+
new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
6037+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("__compaction");
6038+
ManagedCursorImpl spyCursor = spy(cursor);
6039+
ledger.getCursors().removeCursor(cursor.getName());
6040+
ledger.getCursors().add(spyCursor, null);
6041+
6042+
ledger.addEntry("entry-1".getBytes(Encoding));
6043+
Position markDeletePosition = ledger.addEntry("entry-2".getBytes(Encoding));
6044+
6045+
String compactedLedgerProperty = "CompactedTopicLedger";
6046+
Map<String, Long> properties = Map.of(compactedLedgerProperty, 123456L);
6047+
6048+
CountDownLatch markDeleteEntered = new CountDownLatch(1);
6049+
CountDownLatch resetEntered = new CountDownLatch(1);
6050+
CountDownLatch markDeleteReturned = new CountDownLatch(1);
6051+
CountDownLatch markDeleteCompleted = new CountDownLatch(1);
6052+
CountDownLatch resetCompleted = new CountDownLatch(1);
6053+
6054+
doAnswer(invocation -> {
6055+
Map<String, Long> invocationProperties = invocation.getArgument(1);
6056+
if (invocationProperties != null && invocationProperties.containsKey(compactedLedgerProperty)) {
6057+
// Hold the compaction mark-delete after it enters internalAsyncMarkDelete, but before its
6058+
// properties can update lastMarkDeleteEntry.
6059+
markDeleteEntered.countDown();
6060+
assertTrue(resetEntered.await(5, TimeUnit.SECONDS));
6061+
try {
6062+
return invocation.callRealMethod();
6063+
} finally {
6064+
markDeleteReturned.countDown();
6065+
}
6066+
}
6067+
6068+
if (invocationProperties == null || invocationProperties.isEmpty()) {
6069+
// Let reset capture its properties argument first, then persist it only after the compaction
6070+
// mark-delete has completed the real internalAsyncMarkDelete call.
6071+
resetEntered.countDown();
6072+
assertTrue(markDeleteReturned.await(5, TimeUnit.SECONDS));
6073+
return invocation.callRealMethod();
6074+
}
6075+
6076+
return invocation.callRealMethod();
6077+
}).when(spyCursor).internalAsyncMarkDelete(any(Position.class), nullable(Map.class),
6078+
any(MarkDeleteCallback.class), nullable(Object.class), nullable(Runnable.class));
6079+
6080+
// Start compaction mark-delete from another thread because the spy intentionally blocks it.
6081+
CompletableFuture.runAsync(() -> spyCursor.asyncMarkDelete(
6082+
markDeletePosition, properties, new MarkDeleteCallback() {
6083+
@Override
6084+
public void markDeleteComplete(Object ctx) {
6085+
markDeleteCompleted.countDown();
6086+
}
6087+
6088+
@Override
6089+
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
6090+
}
6091+
}, null));
6092+
6093+
assertTrue(markDeleteEntered.await(5, TimeUnit.SECONDS));
6094+
// Reset the compaction cursor while the previous mark-delete with properties is still in progress.
6095+
spyCursor.asyncResetCursor(markDeletePosition, false, new AsyncCallbacks.ResetCursorCallback() {
6096+
@Override
6097+
public void resetComplete(Object ctx) {
6098+
resetCompleted.countDown();
6099+
}
6100+
6101+
@Override
6102+
public void resetFailed(ManagedLedgerException exception, Object ctx) {
6103+
}
6104+
});
6105+
6106+
assertTrue(markDeleteCompleted.await(5, TimeUnit.SECONDS));
6107+
assertTrue(resetCompleted.await(5, TimeUnit.SECONDS));
6108+
6109+
assertEquals(spyCursor.getMarkDeletedPosition(), markDeletePosition);
6110+
assertEquals(spyCursor.getProperties(), properties);
6111+
}
6112+
60296113
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
60306114
Map<Long, Integer> ledgerErrors = new HashMap<>();
60316115

0 commit comments

Comments
 (0)