Skip to content

Commit 2f47446

Browse files
oneby-wanglhotari
authored andcommitted
[fix][broker] Fix ManagedCursorImpl.asyncDelete() method may lose previous async mark delete properties in race condition (#25165)
(cherry picked from commit bea6f8a)
1 parent ab27ec9 commit 2f47446

2 files changed

Lines changed: 101 additions & 4 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2681,10 +2681,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
26812681
}
26822682

26832683
try {
2684-
Map<String, Long> properties = lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties
2685-
: Collections.emptyMap();
2686-
2687-
internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() {
2684+
internalAsyncMarkDelete(newMarkDeletePosition, null, new MarkDeleteCallback() {
26882685
@Override
26892686
public void markDeleteComplete(Object ctx) {
26902687
callback.deleteComplete(ctx);

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5926,6 +5926,106 @@ public void testConcurrentRead() throws Exception {
59265926
assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), "msg".getBytes());
59275927
}
59285928

5929+
@Test(timeOut = 20000)
5930+
public void testAsyncMarkDeleteNeverLoseProperties() throws Exception {
5931+
ManagedLedgerConfig config = new ManagedLedgerConfig();
5932+
config.setMaxEntriesPerLedger(3);
5933+
config.setRetentionTime(20, TimeUnit.SECONDS);
5934+
config.setRetentionSizeInMB(5);
5935+
5936+
@Cleanup ManagedLedgerImpl ledger =
5937+
(ManagedLedgerImpl) factory.open("testAsyncMarkDeleteNeverLoseProperties", config);
5938+
@Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
5939+
5940+
int numMessages = 20;
5941+
List<Position> positions = new ArrayList<>();
5942+
for (int i = 0; i < numMessages; i++) {
5943+
Position pos = ledger.addEntry("entry-1".getBytes(Encoding));
5944+
positions.add(pos);
5945+
}
5946+
5947+
String propertyKey = "test-property";
5948+
CountDownLatch latch = new CountDownLatch(numMessages);
5949+
for (int i = 0; i < numMessages; i++) {
5950+
Map<String, Long> properties = new HashMap<>();
5951+
properties.put(propertyKey, (long) i);
5952+
cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() {
5953+
@Override
5954+
public void markDeleteComplete(Object ctx) {
5955+
latch.countDown();
5956+
}
5957+
5958+
@Override
5959+
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
5960+
fail("Mark delete should succeed");
5961+
}
5962+
}, null);
5963+
}
5964+
5965+
latch.await();
5966+
5967+
int lastIndex = numMessages - 1;
5968+
assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex));
5969+
Map<String, Long> properties = cursor.getProperties();
5970+
assertEquals(properties.size(), 1);
5971+
assertEquals(properties.get(propertyKey), lastIndex);
5972+
}
5973+
5974+
@Test(timeOut = 20000)
5975+
public void testAsyncDeleteNeverLoseMarkDeleteProperties() throws Exception {
5976+
ManagedLedgerConfig config = new ManagedLedgerConfig();
5977+
config.setMaxEntriesPerLedger(11);
5978+
5979+
@Cleanup ManagedLedgerImpl ledger =
5980+
(ManagedLedgerImpl) factory.open("testAsyncDeleteNeverLoseMarkDeleteProperty", config);
5981+
@Cleanup ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
5982+
5983+
int numMessages = 10;
5984+
List<Position> positions = new ArrayList<>();
5985+
for (int i = 0; i < numMessages; i++) {
5986+
Position pos = ledger.addEntry("entry-1".getBytes(Encoding));
5987+
positions.add(pos);
5988+
}
5989+
5990+
String propertyKey = "test-property";
5991+
CountDownLatch latch = new CountDownLatch(numMessages);
5992+
for (int i = 0; i < numMessages - 1; i++) {
5993+
Map<String, Long> properties = new HashMap<>();
5994+
properties.put(propertyKey, (long) i);
5995+
cursor.asyncMarkDelete(positions.get(i), properties, new MarkDeleteCallback() {
5996+
@Override
5997+
public void markDeleteComplete(Object ctx) {
5998+
latch.countDown();
5999+
}
6000+
6001+
@Override
6002+
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
6003+
fail("Mark delete should succeed");
6004+
}
6005+
}, null);
6006+
}
6007+
6008+
int lastIndex = numMessages - 1;
6009+
cursor.asyncDelete(positions.get(lastIndex), new DeleteCallback() {
6010+
@Override
6011+
public void deleteComplete(Object ctx) {
6012+
latch.countDown();
6013+
}
6014+
6015+
@Override
6016+
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
6017+
fail("Delete should succeed");
6018+
}
6019+
}, null);
6020+
6021+
latch.await();
6022+
6023+
assertEquals(cursor.getMarkDeletedPosition(), positions.get(lastIndex));
6024+
Map<String, Long> properties = cursor.getProperties();
6025+
assertEquals(properties.size(), 1);
6026+
assertEquals(properties.get(propertyKey), lastIndex - 1);
6027+
}
6028+
59296029
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
59306030
Map<Long, Integer> ledgerErrors = new HashMap<>();
59316031

0 commit comments

Comments
 (0)