3434import static org .testng .Assert .assertNull ;
3535import com .google .common .collect .Sets ;
3636import java .lang .reflect .Field ;
37+ import java .time .Duration ;
3738import java .util .ArrayList ;
39+ import java .util .Arrays ;
3840import java .util .Collections ;
3941import java .util .HashSet ;
4042import java .util .List ;
4648import java .util .concurrent .ExecutionException ;
4749import java .util .concurrent .atomic .AtomicBoolean ;
4850import java .util .function .Supplier ;
51+ import lombok .extern .slf4j .Slf4j ;
4952import org .apache .bookkeeper .client .LedgerHandle ;
5053import org .apache .bookkeeper .mledger .ManagedCursor ;
5154import org .apache .bookkeeper .mledger .ManagedLedger ;
7275import org .testng .annotations .DataProvider ;
7376import org .testng .annotations .Test ;
7477
78+ @ Slf4j
7579@ Test (groups = "broker" )
7680public class PersistentTopicTest extends BrokerTestBase {
7781
@@ -428,10 +432,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
428432 admin .tenants ().updateTenant ("prop" , tenantInfo );
429433
430434 if (topicLevelPolicy ) {
431- admin .topics ().setReplicationClusters (topicName , Collections . singletonList ( remoteCluster ));
435+ admin .topics ().setReplicationClusters (topicName , Arrays . asList ( "test" , remoteCluster ));
432436 } else {
433437 admin .namespaces ().setNamespaceReplicationClustersAsync (
434- namespace , Collections . singleton ( remoteCluster )).get ();
438+ namespace , Sets . newHashSet ( "test" , remoteCluster )).get ();
435439 }
436440
437441 final PersistentTopic topic = (PersistentTopic ) pulsar .getBrokerService ().getTopic (topicName , false )
@@ -446,16 +450,27 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
446450 };
447451 assertEquals (getCursors .get (), Collections .singleton (conf .getReplicatorPrefix () + "." + remoteCluster ));
448452
453+ // PersistentTopics#onPoliciesUpdate might happen in different threads, so there might be a race between two
454+ // updates of the replication clusters. So here we sleep for a while to reduce the flakiness.
455+ Thread .sleep (100 );
456+
457+ // Configure the local cluster to avoid the topic being deleted in PersistentTopics#checkReplication
449458 if (topicLevelPolicy ) {
450- admin .topics ().setReplicationClusters (topicName , Collections .emptyList ( ));
459+ admin .topics ().setReplicationClusters (topicName , Collections .singletonList ( "test" ));
451460 } else {
452- admin .namespaces ().setNamespaceReplicationClustersAsync (namespace , Collections .emptySet ( )).get ();
461+ admin .namespaces ().setNamespaceReplicationClustersAsync (namespace , Collections .singleton ( "test" )).get ();
453462 }
454463 admin .clusters ().deleteCluster (remoteCluster );
455464 // Now the cluster and its related policy has been removed but the replicator cursor still exists
456465
457- topic .initialize ().get (3 , TimeUnit .SECONDS );
458- Awaitility .await ().atMost (3 , TimeUnit .SECONDS )
459- .until (() -> !topic .getManagedLedger ().getCursors ().iterator ().hasNext ());
466+ Awaitility .await ().atMost (Duration .ofSeconds (10 )).until (() -> {
467+ log .info ("Before initialize..." );
468+ try {
469+ topic .initialize ().get (3 , TimeUnit .SECONDS );
470+ } catch (ExecutionException e ) {
471+ log .warn ("Failed to initialize: {}" , e .getCause ().getMessage ());
472+ }
473+ return !topic .getManagedLedger ().getCursors ().iterator ().hasNext ();
474+ });
460475 }
461476}
0 commit comments