Skip to content

Commit d1fc732

Browse files
[fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent (#19972)
1 parent 5ef3a21 commit d1fc732

2 files changed

Lines changed: 87 additions & 6 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,14 +1696,32 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
16961696
return future;
16971697
}
16981698

1699+
private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) {
1700+
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
1701+
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
1702+
.thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters)
1703+
.orElse(Collections.emptySet()).contains(remoteCluster)
1704+
|| topicPolicies.getReplicationClusters().get().contains(remoteCluster));
1705+
}
1706+
16991707
protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
17001708
String localCluster) {
17011709
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
1702-
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
1703-
.getClusterAsync(remoteCluster)
1704-
.thenApply(clusterData ->
1705-
brokerService.getReplicationClient(remoteCluster, clusterData)))
1710+
.thenCompose(__ -> checkReplicationCluster(remoteCluster))
1711+
.thenCompose(clusterExists -> {
1712+
if (!clusterExists) {
1713+
log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
1714+
return removeReplicator(remoteCluster).thenApply(__ -> null);
1715+
}
1716+
return brokerService.pulsar().getPulsarResources().getClusterResources()
1717+
.getClusterAsync(remoteCluster)
1718+
.thenApply(clusterData ->
1719+
brokerService.getReplicationClient(remoteCluster, clusterData));
1720+
})
17061721
.thenAccept(replicationClient -> {
1722+
if (replicationClient == null) {
1723+
return;
1724+
}
17071725
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
17081726
try {
17091727
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
@@ -1727,8 +1745,8 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) {
17271745

17281746
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
17291747

1730-
replicators.get(remoteCluster).disconnect().thenRun(() -> {
1731-
1748+
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
1749+
.orElse(CompletableFuture.completedFuture(null)).thenRun(() -> {
17321750
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
17331751
@Override
17341752
public void deleteCursorComplete(Object ctx) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,20 @@
4040
import java.nio.charset.StandardCharsets;
4141
import java.util.ArrayList;
4242
import java.util.Collection;
43+
import java.util.Collections;
44+
import java.util.HashSet;
4345
import java.util.List;
46+
import java.util.Set;
4447
import java.util.UUID;
4548
import java.util.concurrent.CompletableFuture;
4649
import java.util.concurrent.CountDownLatch;
4750
import java.util.concurrent.ExecutionException;
4851
import java.util.concurrent.TimeUnit;
4952
import java.util.concurrent.atomic.AtomicBoolean;
53+
import java.util.function.Supplier;
5054
import lombok.Cleanup;
5155
import org.apache.bookkeeper.client.LedgerHandle;
56+
import org.apache.bookkeeper.mledger.ManagedCursor;
5257
import org.apache.bookkeeper.mledger.ManagedLedger;
5358
import org.apache.pulsar.broker.service.BrokerService;
5459
import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -57,6 +62,7 @@
5762
import org.apache.pulsar.client.admin.PulsarAdminException;
5863
import org.apache.pulsar.client.api.Consumer;
5964
import org.apache.pulsar.client.api.Message;
65+
import org.apache.pulsar.client.api.MessageId;
6066
import org.apache.pulsar.client.api.MessageListener;
6167
import org.apache.pulsar.client.api.MessageRoutingMode;
6268
import org.apache.pulsar.client.api.Producer;
@@ -66,7 +72,9 @@
6672
import org.apache.pulsar.client.api.SubscriptionType;
6773
import org.apache.pulsar.common.naming.NamespaceBundle;
6874
import org.apache.pulsar.common.naming.TopicName;
75+
import org.apache.pulsar.common.policies.data.ClusterData;
6976
import org.apache.pulsar.common.policies.data.Policies;
77+
import org.apache.pulsar.common.policies.data.TenantInfo;
7078
import org.apache.pulsar.common.policies.data.TopicStats;
7179
import org.awaitility.Awaitility;
7280
import org.junit.Assert;
@@ -525,4 +533,59 @@ public void testDeleteTopicFail() throws Exception {
525533
makeDeletedFailed.set(false);
526534
persistentTopic.delete().get();
527535
}
536+
537+
@DataProvider(name = "topicLevelPolicy")
538+
public static Object[][] topicLevelPolicy() {
539+
return new Object[][] { { true }, { false } };
540+
}
541+
542+
@Test(dataProvider = "topicLevelPolicy")
543+
public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) throws Exception {
544+
final String namespace = "prop/ns-abc";
545+
final String topicName = "persistent://" + namespace
546+
+ "/testCreateTopicWithZombieReplicatorCursor" + topicLevelPolicy;
547+
final String remoteCluster = "remote";
548+
admin.topics().createNonPartitionedTopic(topicName);
549+
admin.topics().createSubscription(topicName, conf.getReplicatorPrefix() + "." + remoteCluster,
550+
MessageId.earliest, true);
551+
552+
admin.clusters().createCluster(remoteCluster, ClusterData.builder()
553+
.serviceUrl("http://localhost:11112")
554+
.brokerServiceUrl("pulsar://localhost:11111")
555+
.build());
556+
TenantInfo tenantInfo = admin.tenants().getTenantInfo("prop");
557+
tenantInfo.getAllowedClusters().add(remoteCluster);
558+
admin.tenants().updateTenant("prop", tenantInfo);
559+
560+
if (topicLevelPolicy) {
561+
admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
562+
} else {
563+
admin.namespaces().setNamespaceReplicationClustersAsync(
564+
namespace, Collections.singleton(remoteCluster)).get();
565+
}
566+
567+
final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
568+
.get(3, TimeUnit.SECONDS).orElse(null);
569+
assertNotNull(topic);
570+
571+
final Supplier<Set<String>> getCursors = () -> {
572+
final Set<String> cursors = new HashSet<>();
573+
final Iterable<ManagedCursor> iterable = topic.getManagedLedger().getCursors();
574+
iterable.forEach(c -> cursors.add(c.getName()));
575+
return cursors;
576+
};
577+
assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
578+
579+
if (topicLevelPolicy) {
580+
admin.topics().setReplicationClusters(topicName, Collections.emptyList());
581+
} else {
582+
admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get();
583+
}
584+
admin.clusters().deleteCluster(remoteCluster);
585+
// Now the cluster and its related policy has been removed but the replicator cursor still exists
586+
587+
topic.initialize().get(3, TimeUnit.SECONDS);
588+
Awaitility.await().atMost(3, TimeUnit.SECONDS)
589+
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
590+
}
528591
}

0 commit comments

Comments
 (0)