3838import java .io .ByteArrayOutputStream ;
3939import java .lang .reflect .Field ;
4040import java .nio .charset .StandardCharsets ;
41+ import java .time .Duration ;
4142import java .util .ArrayList ;
43+ import java .util .Arrays ;
4244import java .util .Collection ;
4345import java .util .Collections ;
4446import java .util .HashSet ;
5254import java .util .concurrent .atomic .AtomicBoolean ;
5355import java .util .function .Supplier ;
5456import lombok .Cleanup ;
57+ import lombok .extern .slf4j .Slf4j ;
5558import org .apache .bookkeeper .client .LedgerHandle ;
5659import org .apache .bookkeeper .mledger .ManagedCursor ;
5760import org .apache .bookkeeper .mledger .ManagedLedger ;
8386import org .testng .annotations .DataProvider ;
8487import org .testng .annotations .Test ;
8588
89+ @ Slf4j
8690@ Test (groups = "broker" )
8791public class PersistentTopicTest extends BrokerTestBase {
8892
@@ -558,10 +562,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
558562 admin .tenants ().updateTenant ("prop" , tenantInfo );
559563
560564 if (topicLevelPolicy ) {
561- admin .topics ().setReplicationClusters (topicName , Collections . singletonList ( remoteCluster ));
565+ admin .topics ().setReplicationClusters (topicName , Arrays . asList ( "test" , remoteCluster ));
562566 } else {
563567 admin .namespaces ().setNamespaceReplicationClustersAsync (
564- namespace , Collections . singleton ( remoteCluster )).get ();
568+ namespace , Sets . newHashSet ( "test" , remoteCluster )).get ();
565569 }
566570
567571 final PersistentTopic topic = (PersistentTopic ) pulsar .getBrokerService ().getTopic (topicName , false )
@@ -576,16 +580,27 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
576580 };
577581 assertEquals (getCursors .get (), Collections .singleton (conf .getReplicatorPrefix () + "." + remoteCluster ));
578582
583+ // PersistentTopics#onPoliciesUpdate might happen in different threads, so there might be a race between two
584+ // updates of the replication clusters. So here we sleep for a while to reduce the flakiness.
585+ Thread .sleep (100 );
586+
587+ // Configure the local cluster to avoid the topic being deleted in PersistentTopics#checkReplication
579588 if (topicLevelPolicy ) {
580- admin .topics ().setReplicationClusters (topicName , Collections .emptyList ( ));
589+ admin .topics ().setReplicationClusters (topicName , Collections .singletonList ( "test" ));
581590 } else {
582- admin .namespaces ().setNamespaceReplicationClustersAsync (namespace , Collections .emptySet ( )).get ();
591+ admin .namespaces ().setNamespaceReplicationClustersAsync (namespace , Collections .singleton ( "test" )).get ();
583592 }
584593 admin .clusters ().deleteCluster (remoteCluster );
585594 // Now the cluster and its related policy has been removed but the replicator cursor still exists
586595
587- topic .initialize ().get (3 , TimeUnit .SECONDS );
588- Awaitility .await ().atMost (3 , TimeUnit .SECONDS )
589- .until (() -> !topic .getManagedLedger ().getCursors ().iterator ().hasNext ());
596+ Awaitility .await ().atMost (Duration .ofSeconds (10 )).until (() -> {
597+ log .info ("Before initialize..." );
598+ try {
599+ topic .initialize ().get (3 , TimeUnit .SECONDS );
600+ } catch (ExecutionException e ) {
601+ log .warn ("Failed to initialize: {}" , e .getCause ().getMessage ());
602+ }
603+ return !topic .getManagedLedger ().getCursors ().iterator ().hasNext ();
604+ });
590605 }
591606}
0 commit comments