Skip to content

Commit bb254f1

Browse files
committed
[fix][test] Fix flaky testCreateTopicWithZombieReplicatorCursor
Fixes #20010 ### Motivation `PersistentTopicTest.testCreateTopicWithZombieReplicatorCursor` is flaky because `onPoliciesUpdate` is asynchronous, while `testCreateTopicWithZombieReplicatorCursor` updates the namespace policy nearly the same time, so there is a race with the order of updating `AbstractTopic#topicPolicies`. Sometimes the policies update might fail because the topic might be deleted in `PersistentTopic#checkReplication`: > Deleting topic [xxx] because local cluster is not part of global namespace repl list [remote] ### Modifications - Sleep 100ms between two calls of updating the replication clusters - Add the local cluster to the replication cluster list - Add the retry logic for `initialize`
1 parent 42a6969 commit bb254f1

1 file changed

Lines changed: 22 additions & 7 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
import java.io.ByteArrayOutputStream;
3939
import java.lang.reflect.Field;
4040
import java.nio.charset.StandardCharsets;
41+
import java.time.Duration;
4142
import java.util.ArrayList;
43+
import java.util.Arrays;
4244
import java.util.Collection;
4345
import java.util.Collections;
4446
import java.util.HashSet;
@@ -52,6 +54,7 @@
5254
import java.util.concurrent.atomic.AtomicBoolean;
5355
import java.util.function.Supplier;
5456
import lombok.Cleanup;
57+
import lombok.extern.slf4j.Slf4j;
5558
import org.apache.bookkeeper.client.LedgerHandle;
5659
import org.apache.bookkeeper.mledger.ManagedCursor;
5760
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -83,6 +86,7 @@
8386
import org.testng.annotations.DataProvider;
8487
import org.testng.annotations.Test;
8588

89+
@Slf4j
8690
@Test(groups = "broker")
8791
public class PersistentTopicTest extends BrokerTestBase {
8892

@@ -558,10 +562,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
558562
admin.tenants().updateTenant("prop", tenantInfo);
559563

560564
if (topicLevelPolicy) {
561-
admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
565+
admin.topics().setReplicationClusters(topicName, Arrays.asList("test", remoteCluster));
562566
} else {
563567
admin.namespaces().setNamespaceReplicationClustersAsync(
564-
namespace, Collections.singleton(remoteCluster)).get();
568+
namespace, Sets.newHashSet("test", remoteCluster)).get();
565569
}
566570

567571
final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
@@ -576,16 +580,27 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
576580
};
577581
assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
578582

583+
// PersistentTopics#onPoliciesUpdate might happen in different threads, so there might be a race between two
584+
// updates of the replication clusters. So here we sleep for a while to reduce the flakiness.
585+
Thread.sleep(100);
586+
587+
// Configure the local cluster to avoid the topic being deleted in PersistentTopics#checkReplication
579588
if (topicLevelPolicy) {
580-
admin.topics().setReplicationClusters(topicName, Collections.emptyList());
589+
admin.topics().setReplicationClusters(topicName, Collections.singletonList("test"));
581590
} else {
582-
admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get();
591+
admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.singleton("test")).get();
583592
}
584593
admin.clusters().deleteCluster(remoteCluster);
585594
// Now the cluster and its related policy has been removed but the replicator cursor still exists
586595

587-
topic.initialize().get(3, TimeUnit.SECONDS);
588-
Awaitility.await().atMost(3, TimeUnit.SECONDS)
589-
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
596+
Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> {
597+
log.info("Before initialize...");
598+
try {
599+
topic.initialize().get(3, TimeUnit.SECONDS);
600+
} catch (ExecutionException e) {
601+
log.warn("Failed to initialize: {}", e.getCause().getMessage());
602+
}
603+
return !topic.getManagedLedger().getCursors().iterator().hasNext();
604+
});
590605
}
591606
}

0 commit comments

Comments
 (0)