Skip to content

Commit c13b393

Browse files
oneby-wanglhotari
authored andcommitted
[fix][broker] Fix PersistentMessageExpiryMonitor findEntryComplete() method may lose mark-delete properties in race condition (#25803)
(cherry picked from commit 47eec87)
1 parent 25af7cd commit c13b393

2 files changed

Lines changed: 59 additions & 2 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,7 @@ public void findEntryComplete(Position position, Object ctx) {
240240
}
241241
log.info("[{}][{}] Expiring all messages until position {}", topicName, subName, position);
242242
Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
243-
cursor.asyncMarkDelete(position, cursor.getProperties(), markDeleteCallback,
244-
cursor.getNumberOfEntriesInBacklog(false));
243+
cursor.asyncMarkDelete(position, null, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false));
245244
if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) {
246245
subscription.updateLastMarkDeleteAdvancedTimestamp();
247246
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.nullable;
2223
import static org.mockito.Mockito.doAnswer;
2324
import static org.mockito.Mockito.doReturn;
2425
import static org.mockito.Mockito.mock;
@@ -34,11 +35,14 @@
3435
import io.netty.buffer.UnpooledByteBufAllocator;
3536
import java.lang.reflect.Field;
3637
import java.util.ArrayList;
38+
import java.util.HashMap;
3739
import java.util.HashSet;
3840
import java.util.List;
41+
import java.util.Map;
3942
import java.util.Optional;
4043
import java.util.Set;
4144
import java.util.concurrent.CompletableFuture;
45+
import java.util.concurrent.CountDownLatch;
4246
import java.util.concurrent.TimeUnit;
4347
import java.util.concurrent.atomic.AtomicBoolean;
4448
import java.util.concurrent.atomic.AtomicInteger;
@@ -1099,4 +1103,58 @@ public void testGetFindPositionRange_SingleClosedLedger() {
10991103
assertNull(range.getRight());
11001104
assertEquals(range.getLeft(), PositionFactory.create(1, 9));
11011105
}
1106+
1107+
@Test
1108+
@SuppressWarnings("unchecked")
1109+
void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception {
1110+
final String ledgerAndCursorName = "testExpireMessagesNeverLoseMarkDeleteProperties";
1111+
1112+
ManagedLedgerConfig config = new ManagedLedgerConfig();
1113+
config.setRetentionSizeInMB(10);
1114+
config.setRetentionTime(1, TimeUnit.HOURS);
1115+
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
1116+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
1117+
ManagedCursorImpl spyCursor = spy(cursor);
1118+
1119+
Position pos1 = ledger.addEntry(createMessageWrittenToLedger("msg-1"));
1120+
Position pos2 = ledger.addEntry(createMessageWrittenToLedger("msg-2"));
1121+
1122+
CountDownLatch expiryMarkDeleteEnteredLatch = new CountDownLatch(1);
1123+
CountDownLatch cursorMarkDeleteCompletedLatch = new CountDownLatch(1);
1124+
CountDownLatch expiryMarkDeleteCompletedLatch = new CountDownLatch(1);
1125+
1126+
doAnswer(invocation -> {
1127+
Map<String, Long> invocationProperties = invocation.getArgument(1);
1128+
// Pause the expiry-triggered mark-delete so the user markDelete() can complete first.
1129+
if (invocationProperties == null || invocationProperties.isEmpty()) {
1130+
expiryMarkDeleteEnteredLatch.countDown();
1131+
assertTrue(cursorMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS));
1132+
try {
1133+
return invocation.callRealMethod();
1134+
} finally {
1135+
expiryMarkDeleteCompletedLatch.countDown();
1136+
}
1137+
}
1138+
1139+
return invocation.callRealMethod();
1140+
}).when(spyCursor)
1141+
.asyncMarkDelete(any(Position.class), nullable(Map.class), any(AsyncCallbacks.MarkDeleteCallback.class),
1142+
nullable(Object.class));
1143+
1144+
PersistentTopic topic = mockPersistentTopic("topicname");
1145+
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic,
1146+
spyCursor.getName(), spyCursor, null);
1147+
1148+
CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2, null));
1149+
assertTrue(expiryMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS));
1150+
1151+
Map<String, Long> properties = new HashMap<>();
1152+
properties.put("test-property", 1L);
1153+
spyCursor.markDelete(pos1, properties);
1154+
cursorMarkDeleteCompletedLatch.countDown();
1155+
1156+
assertTrue(expiryMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS));
1157+
assertEquals(spyCursor.getMarkDeletedPosition(), pos2);
1158+
assertEquals(spyCursor.getProperties(), properties);
1159+
}
11021160
}

0 commit comments

Comments
 (0)