Skip to content

Commit f5eca38

Browse files
committed
[fix][broker] Check replication cluster before starting the replicator
Fixes apache#20010 ### Motivation `PersistentTopicTest.testCreateTopicWithZombieReplicatorCursor` is flaky because the cursor could still be created again in `startReplicator`, which could be called by: ``` onPoliciesUpdate checkReplicationAndRetryOnFailure checkReplication ``` ### Modifications - Call `checkReplicationCluster` before calling `startReplicator`. - Sleep for a while in the test to reduce the flakiness caused by the asynchronous update of the policies
1 parent 42a6969 commit f5eca38

2 files changed

Lines changed: 23 additions & 4 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1542,7 +1542,13 @@ public CompletableFuture<Void> checkReplication() {
15421542
continue;
15431543
}
15441544
if (!replicators.containsKey(cluster)) {
1545-
futures.add(startReplicator(cluster));
1545+
futures.add(checkReplicationCluster(cluster).thenCompose(clusterExists -> {
1546+
if (clusterExists) {
1547+
return startReplicator(cluster);
1548+
} else {
1549+
return CompletableFuture.completedFuture(null);
1550+
}
1551+
}));
15461552
}
15471553
}
15481554

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.io.ByteArrayOutputStream;
3939
import java.lang.reflect.Field;
4040
import java.nio.charset.StandardCharsets;
41+
import java.time.Duration;
4142
import java.util.ArrayList;
4243
import java.util.Collection;
4344
import java.util.Collections;
@@ -52,6 +53,7 @@
5253
import java.util.concurrent.atomic.AtomicBoolean;
5354
import java.util.function.Supplier;
5455
import lombok.Cleanup;
56+
import lombok.extern.slf4j.Slf4j;
5557
import org.apache.bookkeeper.client.LedgerHandle;
5658
import org.apache.bookkeeper.mledger.ManagedCursor;
5759
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -83,6 +85,7 @@
8385
import org.testng.annotations.DataProvider;
8486
import org.testng.annotations.Test;
8587

88+
@Slf4j
8689
@Test(groups = "broker")
8790
public class PersistentTopicTest extends BrokerTestBase {
8891

@@ -576,6 +579,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
576579
};
577580
assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
578581

582+
// PersistentTopics#onPoliciesUpdate might happen in different threads, so there might be a race between two
583+
// updates of the replication clusters. So here we sleep for a while to reduce the flakiness.
584+
Thread.sleep(100);
585+
579586
if (topicLevelPolicy) {
580587
admin.topics().setReplicationClusters(topicName, Collections.emptyList());
581588
} else {
@@ -584,8 +591,14 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
584591
admin.clusters().deleteCluster(remoteCluster);
585592
// Now the cluster and its related policy has been removed but the replicator cursor still exists
586593

587-
topic.initialize().get(3, TimeUnit.SECONDS);
588-
Awaitility.await().atMost(3, TimeUnit.SECONDS)
589-
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
594+
Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> {
595+
log.info("Before initialize...");
596+
try {
597+
topic.initialize().get(3, TimeUnit.SECONDS);
598+
} catch (ExecutionException e) {
599+
log.warn("Failed to initialize: {}", e.getCause().getMessage());
600+
}
601+
return !topic.getManagedLedger().getCursors().iterator().hasNext();
602+
});
590603
}
591604
}

0 commit comments

Comments
 (0)