Skip to content

Commit feae589

Browse files
authored
[fix] [broker] Messages lost on the remote cluster when using topic level replication (#22890)
1 parent bacb162 commit feae589

4 files changed

Lines changed: 153 additions & 26 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -439,13 +439,6 @@ public CompletableFuture<Void> initialize() {
439439
this.createPersistentSubscriptions();
440440
}));
441441

442-
for (ManagedCursor cursor : ledger.getCursors()) {
443-
if (cursor.getName().startsWith(replicatorPrefix)) {
444-
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
445-
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
446-
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
447-
}
448-
}
449442
return FutureUtil.waitForAll(futures).thenCompose(__ ->
450443
brokerService.pulsar().getPulsarResources().getNamespaceResources()
451444
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
@@ -476,6 +469,7 @@ public CompletableFuture<Void> initialize() {
476469
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
477470
}, getOrderedExecutor())
478471
.thenCompose(ignore -> initTopicPolicy())
472+
.thenCompose(ignore -> removeOrphanReplicationCursors())
479473
.exceptionally(ex -> {
480474
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
481475
topic, ex.getMessage());
@@ -553,6 +547,21 @@ private void createPersistentSubscriptions() {
553547
checkReplicatedSubscriptionControllerState();
554548
}
555549

550+
private CompletableFuture<Void> removeOrphanReplicationCursors() {
551+
List<CompletableFuture<Void>> futures = new ArrayList<>();
552+
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
553+
for (ManagedCursor cursor : ledger.getCursors()) {
554+
if (cursor.getName().startsWith(replicatorPrefix)) {
555+
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
556+
if (!replicationClusters.contains(remoteCluster)) {
557+
log.warn("Remove the orphan replicator because the cluster '{}' does not exist", remoteCluster);
558+
futures.add(removeReplicator(remoteCluster));
559+
}
560+
}
561+
}
562+
return FutureUtil.waitForAll(futures);
563+
}
564+
556565
/**
557566
* Unload a subscriber.
558567
* @throws SubscriptionNotFoundException If subscription not founded.
@@ -2055,30 +2064,18 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
20552064
return future;
20562065
}
20572066

2058-
private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) {
2059-
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
2060-
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
2061-
.thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters)
2062-
.orElse(Collections.emptySet()).contains(remoteCluster)
2063-
|| topicPolicies.getReplicationClusters().get().contains(remoteCluster));
2064-
}
2065-
20662067
protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
20672068
String localCluster) {
20682069
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
2069-
.thenCompose(__ -> checkReplicationCluster(remoteCluster))
2070-
.thenCompose(clusterExists -> {
2071-
if (!clusterExists) {
2072-
log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
2073-
return removeReplicator(remoteCluster).thenApply(__ -> null);
2074-
}
2075-
return brokerService.pulsar().getPulsarResources().getClusterResources()
2076-
.getClusterAsync(remoteCluster)
2077-
.thenApply(clusterData ->
2078-
brokerService.getReplicationClient(remoteCluster, clusterData));
2079-
})
2070+
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
2071+
.getClusterAsync(remoteCluster)
2072+
.thenApply(clusterData ->
2073+
brokerService.getReplicationClient(remoteCluster, clusterData)))
20802074
.thenAccept(replicationClient -> {
20812075
if (replicationClient == null) {
2076+
log.error("[{}] Can not create replicator because the remote client can not be created."
2077+
+ " remote cluster: {}. State of transferring : {}",
2078+
topic, remoteCluster, transferring);
20822079
return;
20832080
}
20842081
lock.readLock().lock();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.atomic.AtomicInteger;
4747
import java.util.concurrent.atomic.AtomicReference;
4848
import java.util.function.BiFunction;
49+
import java.util.function.Supplier;
4950
import lombok.AllArgsConstructor;
5051
import lombok.Data;
5152
import lombok.SneakyThrows;
@@ -58,7 +59,10 @@
5859
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
5960
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
6061
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
62+
import org.apache.pulsar.client.admin.PulsarAdmin;
6163
import org.apache.pulsar.client.api.Consumer;
64+
import org.apache.pulsar.client.api.Message;
65+
import org.apache.pulsar.client.api.MessageId;
6266
import org.apache.pulsar.client.api.Producer;
6367
import org.apache.pulsar.client.api.ProducerBuilder;
6468
import org.apache.pulsar.client.api.PulsarClient;
@@ -78,6 +82,7 @@
7882
import org.testng.Assert;
7983
import org.testng.annotations.AfterClass;
8084
import org.testng.annotations.BeforeClass;
85+
import org.testng.annotations.DataProvider;
8186
import org.testng.annotations.Test;
8287

8388
@Slf4j
@@ -809,4 +814,102 @@ public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Except
809814
admin2.topics().deletePartitionedTopic(topicName, false);
810815
});
811816
}
817+
818+
private String getTheLatestMessage(String topic, PulsarClient client, PulsarAdmin admin) throws Exception {
819+
String dummySubscription = "s_" + UUID.randomUUID().toString().replace("-", "");
820+
admin.topics().createSubscription(topic, dummySubscription, MessageId.earliest);
821+
Consumer<String> c = client.newConsumer(Schema.STRING).topic(topic).subscriptionName(dummySubscription)
822+
.subscribe();
823+
String lastMsgValue = null;
824+
while (true) {
825+
Message<String> msg = c.receive(2, TimeUnit.SECONDS);
826+
if (msg == null) {
827+
break;
828+
}
829+
lastMsgValue = msg.getValue();
830+
}
831+
c.unsubscribe();
832+
return lastMsgValue;
833+
}
834+
835+
enum ReplicationLevel {
836+
TOPIC_LEVEL,
837+
NAMESPACE_LEVEL;
838+
}
839+
840+
@DataProvider(name = "replicationLevels")
841+
public Object[][] replicationLevels() {
842+
return new Object[][]{
843+
{ReplicationLevel.TOPIC_LEVEL},
844+
{ReplicationLevel.NAMESPACE_LEVEL}
845+
};
846+
}
847+
848+
@Test(dataProvider = "replicationLevels")
849+
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
850+
final String topicName = ((Supplier<String>) () -> {
851+
if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
852+
return BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
853+
} else {
854+
return BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
855+
}
856+
}).get();
857+
admin1.topics().createNonPartitionedTopic(topicName);
858+
admin2.topics().createNonPartitionedTopic(topicName);
859+
admin2.topics().createSubscription(topicName, "s1", MessageId.earliest);
860+
if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
861+
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
862+
} else {
863+
pulsar1.getConfig().setTopicLevelPoliciesEnabled(false);
864+
}
865+
verifyReplicationWorks(topicName);
866+
867+
/**
868+
* Verify:
869+
* 1. Inject an error to make the replicator is not able to work.
870+
* 2. Send one message, since the replicator does not work anymore, this message will not be replicated.
871+
* 3. Unload topic, the replicator will be re-created.
872+
* 4. Verify: the message can be replicated to the remote cluster.
873+
*/
874+
// Step 1: Inject an error to make the replicator is not able to work.
875+
Replicator replicator = broker1.getTopic(topicName, false).join().get().getReplicators().get(cluster2);
876+
replicator.terminate();
877+
878+
// Step 2: Send one message, since the replicator does not work anymore, this message will not be replicated.
879+
String msg = UUID.randomUUID().toString();
880+
Producer p1 = client1.newProducer(Schema.STRING).topic(topicName).create();
881+
p1.send(msg);
882+
p1.close();
883+
// The result of "peek message" will be the messages generated, so it is not the same as the message just sent.
884+
Thread.sleep(3000);
885+
assertNotEquals(getTheLatestMessage(topicName, client2, admin2), msg);
886+
assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(), 1);
887+
888+
// Step 3: Unload topic, the replicator will be re-created.
889+
admin1.topics().unload(topicName);
890+
891+
// Step 4. Verify: the message can be replicated to the remote cluster.
892+
Awaitility.await().atMost(Duration.ofSeconds(300)).untilAsserted(() -> {
893+
log.info("replication backlog: {}",
894+
admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog());
895+
assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(), 0);
896+
assertEquals(getTheLatestMessage(topicName, client2, admin2), msg);
897+
});
898+
899+
// Cleanup.
900+
if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
901+
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
902+
Awaitility.await().untilAsserted(() -> {
903+
assertEquals(broker1.getTopic(topicName, false).join().get().getReplicators().size(), 0);
904+
});
905+
admin1.topics().delete(topicName, false);
906+
admin2.topics().delete(topicName, false);
907+
} else {
908+
pulsar1.getConfig().setTopicLevelPoliciesEnabled(true);
909+
cleanupTopics(() -> {
910+
admin1.topics().delete(topicName);
911+
admin2.topics().delete(topicName);
912+
});
913+
}
914+
}
812915
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,28 @@ protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception
350350
}
351351

352352
protected void verifyReplicationWorks(String topic) throws Exception {
353+
// Wait for replicator starting.
354+
Awaitility.await().until(() -> {
355+
try {
356+
PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService()
357+
.getTopic(topic, false).join().get();
358+
if (persistentTopic.getReplicators().size() > 0) {
359+
return true;
360+
}
361+
} catch (Exception ex) {}
362+
363+
try {
364+
String partition0 = TopicName.get(topic).getPartition(0).toString();
365+
PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService()
366+
.getTopic(partition0, false).join().get();
367+
if (persistentTopic.getReplicators().size() > 0) {
368+
return true;
369+
}
370+
} catch (Exception ex) {}
371+
372+
return false;
373+
});
374+
// Verify: pub & sub.
353375
final String subscription = "__subscribe_1";
354376
final String msgValue = "__msg1";
355377
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topic).create();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,9 @@ public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws
104104
public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception {
105105
super.testExpandTopicPartitionsOnNamespaceLevelReplication();
106106
}
107+
108+
@Test(enabled = false)
109+
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
110+
super.testReloadWithTopicLevelGeoReplication(replicationLevel);
111+
}
107112
}

0 commit comments

Comments
 (0)