Skip to content

Commit 4332a44

Browse files
oneby-wanglhotari
authored andcommitted
[fix][broker] Fix ManagedCursorImpl.asyncDelete() method may lose previous async mark delete properties in race condition (#25165)
(cherry picked from commit bea6f8a)
1 parent a8eac91 commit 4332a44

2 files changed

Lines changed: 101 additions & 4 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2676,10 +2676,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
26762676
}
26772677

26782678
try {
2679-
Map<String, Long> properties = lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties
2680-
: Collections.emptyMap();
2681-
2682-
internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() {
2679+
internalAsyncMarkDelete(newMarkDeletePosition, null, new MarkDeleteCallback() {
26832680
@Override
26842681
public void markDeleteComplete(Object ctx) {
26852682
callback.deleteComplete(ctx);

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5916,6 +5916,106 @@ public void testConcurrentRead() throws Exception {
59165916
assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), "msg".getBytes());
59175917
}
59185918

5919+
@Test(timeOut = 20000)
5920+
public void testAsyncMarkDeleteNeverLoseProperties() throws Exception {
5921+
ManagedLedgerConfig config = new ManagedLedgerConfig();
5922+
config.setMaxEntriesPerLedger(3);
5923+
config.setRetentionTime(20, TimeUnit.SECONDS);
5924+
config.setRetentionSizeInMB(5);
5925+
5926+
@Cleanup ManagedLedgerImpl ledger =
5927+
(ManagedLedgerImpl) factory.open("testAsyncMarkDeleteNeverLoseProperties", config);
5928+
@Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
5929+
5930+
int numMessages = 20;
5931+
List<Position> positions = new ArrayList<>();
5932+
for (int i = 0; i < numMessages; i++) {
5933+
Position pos = ledger.addEntry("entry-1".getBytes(Encoding));
5934+
positions.add(pos);
5935+
}
5936+
5937+
String propertyKey = "test-property";
5938+
CountDownLatch latch = new CountDownLatch(numMessages);
5939+
for (int i = 0; i < numMessages; i++) {
5940+
Map<String, Long> properties = new HashMap<>();
5941+
properties.put(propertyKey, (long) i);
5942+
cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() {
5943+
@Override
5944+
public void markDeleteComplete(Object ctx) {
5945+
latch.countDown();
5946+
}
5947+
5948+
@Override
5949+
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
5950+
fail("Mark delete should succeed");
5951+
}
5952+
}, null);
5953+
}
5954+
5955+
latch.await();
5956+
5957+
int lastIndex = numMessages - 1;
5958+
assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex));
5959+
Map<String, Long> properties = cursor.getProperties();
5960+
assertEquals(properties.size(), 1);
5961+
assertEquals(properties.get(propertyKey), lastIndex);
5962+
}
5963+
5964+
@Test(timeOut = 20000)
5965+
public void testAsyncDeleteNeverLoseMarkDeleteProperties() throws Exception {
5966+
ManagedLedgerConfig config = new ManagedLedgerConfig();
5967+
config.setMaxEntriesPerLedger(11);
5968+
5969+
@Cleanup ManagedLedgerImpl ledger =
5970+
(ManagedLedgerImpl) factory.open("testAsyncDeleteNeverLoseMarkDeleteProperty", config);
5971+
@Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
5972+
5973+
int numMessages = 10;
5974+
List<Position> positions = new ArrayList<>();
5975+
for (int i = 0; i < numMessages; i++) {
5976+
Position pos = ledger.addEntry("entry-1".getBytes(Encoding));
5977+
positions.add(pos);
5978+
}
5979+
5980+
String propertyKey = "test-property";
5981+
CountDownLatch latch = new CountDownLatch(numMessages);
5982+
for (int i = 0; i < numMessages - 1; i++) {
5983+
Map<String, Long> properties = new HashMap<>();
5984+
properties.put(propertyKey, (long) i);
5985+
cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() {
5986+
@Override
5987+
public void markDeleteComplete(Object ctx) {
5988+
latch.countDown();
5989+
}
5990+
5991+
@Override
5992+
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
5993+
fail("Mark delete should succeed");
5994+
}
5995+
}, null);
5996+
}
5997+
5998+
int lastIndex = numMessages - 1;
5999+
cursor.asyncDelete(positions.get(lastIndex), new DeleteCallback() {
6000+
@Override
6001+
public void deleteComplete(Object ctx) {
6002+
latch.countDown();
6003+
}
6004+
6005+
@Override
6006+
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
6007+
fail("Delete should succeed");
6008+
}
6009+
}, null);
6010+
6011+
latch.await();
6012+
6013+
assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex));
6014+
Map<String, Long> properties = cursor.getProperties();
6015+
assertEquals(properties.size(), 1);
6016+
assertEquals(properties.get(propertyKey), lastIndex - 1);
6017+
}
6018+
59196019
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
59206020
Map<Long, Integer> ledgerErrors = new HashMap<>();
59216021

0 commit comments

Comments
 (0)