Skip to content

Commit a8fa780

Browse files
author
gosonzhang
committed
[fix][broker] Missing assignment of the corresponding InFlightTask in PersistentReplicator.readEntriesFailed
1 parent a3ae705 commit a8fa780

2 files changed

Lines changed: 57 additions & 10 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,9 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
508508
return;
509509
}
510510

511+
InFlightTask inFlightTask = (InFlightTask) ctx;
512+
inFlightTask.setEntries(Collections.emptyList());
513+
511514
// Reduce read batch size to avoid flooding bookies with retries
512515
readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize();
513516

@@ -989,4 +992,9 @@ protected boolean hasPendingRead() {
989992
String getReplicatorId() {
990993
return replicatorId;
991994
}
995+
996+
@VisibleForTesting
997+
public void incrementWaitForCursorRewindingRefCnf() {
998+
waitForCursorRewindingRefCnf++;
999+
}
9921000
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ public void testReplicatorClearBacklog() throws Exception {
660660
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
661661
PersistentReplicator replicator = (PersistentReplicator) spy(
662662
topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0)));
663-
replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
663+
replicator.incrementWaitForCursorRewindingRefCnf();
664664
replicator.clearBacklog().get();
665665
Thread.sleep(100);
666666
replicator.updateRates(); // for code-coverage
@@ -690,7 +690,7 @@ public void testReplicatorExpireMsgAsync() throws Exception {
690690
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
691691
PersistentReplicator replicator = (PersistentReplicator) spy(
692692
topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0)));
693-
replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
693+
replicator.incrementWaitForCursorRewindingRefCnf();
694694
replicator.clearBacklog().get();
695695
Thread.sleep(100);
696696
replicator.updateRates(); // for code-coverage
@@ -1653,7 +1653,7 @@ private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace,
16531653
@Test
16541654
public void testReplicatorWithFailedAck() throws Exception {
16551655

1656-
log.info("--- Starting ReplicatorTest::testReplication ---");
1656+
log.info("--- Starting ReplicatorTest::testReplicatorWithFailedAck ---");
16571657

16581658
String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
16591659
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
@@ -1686,14 +1686,28 @@ public void testReplicatorWithFailedAck() throws Exception {
16861686
}).when(spyCursor).asyncDelete(Mockito.any(Position.class), Mockito.any(AsyncCallbacks.DeleteCallback.class),
16871687
Mockito.any());
16881688

1689+
// Mock the readEntriesFailed scenario:
1690+
// Use AtomicBoolean to control whether to trigger read failure, manually set true/false by test code.
1691+
// Initialized to true to ensure the first readMoreEntries after replicator startup is intercepted.
1692+
AtomicBoolean isMakeReadFail = new AtomicBoolean(true);
1693+
doAnswer(invocation -> {
1694+
if (isMakeReadFail.get()) {
1695+
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(2);
1696+
Object ctx = invocation.getArgument(3);
1697+
log.info("asyncReadEntriesOrWait will be failed");
1698+
callback.readEntriesFailed(new ManagedLedgerException("Mocked read failure"), ctx);
1699+
return null;
1700+
} else {
1701+
log.info("asyncReadEntriesOrWait will proceed normally");
1702+
return invocation.callRealMethod();
1703+
}
1704+
}).when(spyCursor).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(),
1705+
Mockito.any(AsyncCallbacks.ReadEntriesCallback.class), Mockito.any(), Mockito.any(Position.class));
1706+
16891707
log.info("--- Starting producer --- " + url1);
16901708
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"), false);
1691-
// Produce from cluster1 and consume from the rest
1692-
producer1.produce(2);
1693-
1694-
MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
1695-
Position lastPosition = PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId());
16961709

1710+
// Wait for replicator to start
16971711
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS)
16981712
.ignoreExceptions()
16991713
.untilAsserted(() -> {
@@ -1703,9 +1717,34 @@ public void testReplicatorWithFailedAck() throws Exception {
17031717
replicator.getState());
17041718
});
17051719

1706-
// Make sure all the data has replicated to the remote cluster before close the cursor.
1707-
Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition));
1720+
// --- Test readEntriesFailed scenario ---
1721+
// isMakeReadFail is already true, replicator's readMoreEntries keeps failing
1722+
1723+
// Record current mark delete position
1724+
Position posBeforeReadFail = cursor.getMarkDeletedPosition();
1725+
1726+
// Produce messages; since reads keep failing, messages cannot be replicated
1727+
producer1.produce(2);
1728+
1729+
MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
1730+
Position lastPosition = PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId());
1731+
1732+
// During 2 seconds of continuous read failure, mark delete position should not advance
1733+
Awaitility.await()
1734+
.during(2, TimeUnit.SECONDS)
1735+
.atMost(5, TimeUnit.SECONDS)
1736+
.untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), posBeforeReadFail));
1737+
1738+
// Disable the read failure flag; replicator will read normally on retry, thus resuming replication
1739+
isMakeReadFail.set(false);
1740+
1741+
// Wait for replicator to recover from read failure and complete replication
1742+
// (mark delete catches up to the latest position)
1743+
Awaitility.await().timeout(30, TimeUnit.SECONDS).untilAsserted(() -> {
1744+
assertEquals(cursor.getMarkDeletedPosition(), lastPosition);
1745+
});
17081746

1747+
// --- Test DeleteCallback scenario ---
17091748
isMakeAckFail.set(true);
17101749

17111750
producer1.produce(10);

0 commit comments

Comments
 (0)