3535import java .io .ByteArrayOutputStream ;
3636import java .lang .reflect .Field ;
3737import java .nio .charset .StandardCharsets ;
38+ import java .time .Duration ;
3839import java .util .ArrayList ;
40+ import java .util .Arrays ;
3941import java .util .Collection ;
4042import java .util .Collections ;
4143import java .util .HashSet ;
5153import java .util .concurrent .atomic .AtomicBoolean ;
5254import java .util .function .Supplier ;
5355import lombok .Cleanup ;
56+ import lombok .extern .slf4j .Slf4j ;
5457import org .apache .bookkeeper .client .LedgerHandle ;
5558import org .apache .bookkeeper .mledger .ManagedCursor ;
5659import org .apache .bookkeeper .mledger .ManagedLedger ;
8184import org .testng .annotations .DataProvider ;
8285import org .testng .annotations .Test ;
8386
87+ @ Slf4j
8488@ Test (groups = "broker" )
8589public class PersistentTopicTest extends BrokerTestBase {
8690
@@ -443,10 +447,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
443447 admin .tenants ().updateTenant ("prop" , tenantInfo );
444448
445449 if (topicLevelPolicy ) {
446- admin .topics ().setReplicationClusters (topicName , Collections . singletonList ( remoteCluster ));
450+ admin .topics ().setReplicationClusters (topicName , Arrays . asList ( "test" , remoteCluster ));
447451 } else {
448452 admin .namespaces ().setNamespaceReplicationClustersAsync (
449- namespace , Collections . singleton ( remoteCluster )).get ();
453+ namespace , Sets . newHashSet ( "test" , remoteCluster )).get ();
450454 }
451455
452456 final PersistentTopic topic = (PersistentTopic ) pulsar .getBrokerService ().getTopic (topicName , false )
@@ -461,16 +465,27 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
461465 };
462466 assertEquals (getCursors .get (), Collections .singleton (conf .getReplicatorPrefix () + "." + remoteCluster ));
463467
468+ // PersistentTopics#onPoliciesUpdate might happen in different threads, so there might be a race between two
469+ // updates of the replication clusters. So here we sleep for a while to reduce the flakiness.
470+ Thread .sleep (100 );
471+
472+ // Configure the local cluster to avoid the topic being deleted in PersistentTopics#checkReplication
464473 if (topicLevelPolicy ) {
465- admin .topics ().setReplicationClusters (topicName , Collections .emptyList ( ));
474+ admin .topics ().setReplicationClusters (topicName , Collections .singletonList ( "test" ));
466475 } else {
467- admin .namespaces ().setNamespaceReplicationClustersAsync (namespace , Collections .emptySet ( )).get ();
476+ admin .namespaces ().setNamespaceReplicationClustersAsync (namespace , Collections .singleton ( "test" )).get ();
468477 }
469478 admin .clusters ().deleteCluster (remoteCluster );
470479 // Now the cluster and its related policy has been removed but the replicator cursor still exists
471480
472- topic .initialize ().get (3 , TimeUnit .SECONDS );
473- Awaitility .await ().atMost (3 , TimeUnit .SECONDS )
474- .until (() -> !topic .getManagedLedger ().getCursors ().iterator ().hasNext ());
481+ Awaitility .await ().atMost (Duration .ofSeconds (10 )).until (() -> {
482+ log .info ("Before initialize..." );
483+ try {
484+ topic .initialize ().get (3 , TimeUnit .SECONDS );
485+ } catch (ExecutionException e ) {
486+ log .warn ("Failed to initialize: {}" , e .getCause ().getMessage ());
487+ }
488+ return !topic .getManagedLedger ().getCursors ().iterator ().hasNext ();
489+ });
475490 }
476491}
0 commit comments