Skip to content

Commit 610b72e

Browse files
authored
[fix][broker] The feature brokerDeleteInactivePartitionedTopicMetadataEnabled leaves orphan topic policies and topic schemas (#24150)
1 parent 9bcad7c commit 610b72e

7 files changed

Lines changed: 304 additions & 5 deletions

File tree

conf/broker.conf

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,11 @@ brokerDeleteInactiveTopicsFrequencySeconds=60
222222
brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
223223

224224
# Metadata of inactive partitioned topic will not be cleaned up automatically by default.
225-
# Note: If `allowAutoTopicCreation` and this option are enabled at the same time,
225+
# Note 1: If `allowAutoTopicCreation` and this option are enabled at the same time,
226226
# it may appear that a partitioned topic has just been deleted but is automatically created as a non-partitioned topic.
227+
# Note 2: Activating bidirectional geo-replication under global configuration ZooKeeper may lead to schema remnants and
228+
# abnormal topic-level policies.
229+
# Note 3: Activating bidirectional geo-replication under global configuration ZooKeeper may lead to a consumption issue.
227230
brokerDeleteInactivePartitionedTopicMetadataEnabled=false
228231

229232
# Max duration of topic inactivity in seconds, default is not present

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -694,9 +694,13 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
694694
category = CATEGORY_POLICIES,
695695
dynamic = true,
696696
doc = "Metadata of inactive partitioned topic will not be automatically cleaned up by default.\n"
697-
+ "Note: If `allowAutoTopicCreation` and this option are enabled at the same time,\n"
697+
+ "Note 1: If `allowAutoTopicCreation` and this option are enabled at the same time,\n"
698698
+ "it may appear that a partitioned topic has just been deleted but is automatically created as a "
699-
+ "non-partitioned topic."
699+
+ "non-partitioned topic.\n"
700+
+ "Note 2: Activating bidirectional geo-replication under global ZooKeeper configuration may lead to schema"
701+
+ " remnants and abnormal topic-level policies.\n"
702+
+ "Note 3: Activating bidirectional geo-replication under global configuration ZooKeeper may lead"
703+
+ " to a consumption issue."
700704
)
701705
private boolean brokerDeleteInactivePartitionedTopicMetadataEnabled = false;
702706
@FieldContext(

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3245,8 +3245,14 @@ private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
32453245
String.format("Another partition exists for [%s].",
32463246
topicName));
32473247
} else {
3248-
return partitionedTopicResources
3249-
.deletePartitionedTopicAsync(topicName);
3248+
try {
3249+
return brokerService.getPulsar().getAdminClient().topics()
3250+
.deletePartitionedTopicAsync(topicName.toString());
3251+
} catch (PulsarServerException e) {
3252+
log.info("[{}] Delete topic metadata failed due to failed to"
3253+
+ " get internal admin client.", topicName, e);
3254+
return CompletableFuture.failedFuture(e);
3255+
}
32503256
}
32513257
});
32523258
}))

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3744,4 +3744,18 @@ public void testGrantAndRevokePermissions() throws Exception {
37443744
Assert.assertTrue(permissions11.isEmpty());
37453745
Assert.assertTrue(permissions22.isEmpty());
37463746
}
3747+
3748+
@Test
3749+
public void testDeletePatchyPartitionedTopic() throws Exception {
3750+
final String topic = BrokerTestUtil.newUniqueName(defaultNamespace + "/tp");
3751+
admin.topics().createPartitionedTopic(topic, 2);
3752+
Producer producer = pulsarClient.newProducer().topic(TopicName.get(topic).getPartition(0).toString())
3753+
.create();
3754+
// Mock a scenario that "-partition-1" has been removed due to topic GC.
3755+
pulsar.getBrokerService().getTopic(TopicName.get(topic).getPartition(1).toString(), false)
3756+
.get().get().delete().join();
3757+
// Verify: delete partitioned topic.
3758+
producer.close();
3759+
admin.topics().deletePartitionedTopicAsync(topic, false).get();
3760+
}
37473761
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,4 +480,21 @@ protected void deleteTopicAfterDisableTopicLevelReplication(String topic) throws
480480
admin2.topics().delete(topicName.toString());
481481
}
482482
}
483+
484+
protected void waitReplicatorStopped(PulsarService sourceCluster, PulsarService targetCluster, String topicName) {
485+
Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
486+
Optional<Topic> topicOptional2 = targetCluster.getBrokerService().getTopic(topicName, false).get();
487+
assertTrue(topicOptional2.isPresent());
488+
PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get();
489+
for (org.apache.pulsar.broker.service.Producer producer : persistentTopic2.getProducers().values()) {
490+
assertFalse(producer.getProducerName()
491+
.startsWith(targetCluster.getConfiguration().getReplicatorPrefix()));
492+
}
493+
Optional<Topic> topicOptional1 = sourceCluster.getBrokerService().getTopic(topicName, false).get();
494+
assertTrue(topicOptional1.isPresent());
495+
PersistentTopic persistentTopic1 = (PersistentTopic) topicOptional1.get();
496+
assertTrue(persistentTopic1.getReplicators().isEmpty()
497+
|| !persistentTopic1.getReplicators().get(targetCluster.getConfig().getClusterName()).isConnected());
498+
});
499+
}
483500
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertFalse;
23+
import static org.testng.Assert.assertNotNull;
24+
import static org.testng.Assert.assertTrue;
25+
import java.util.Collections;
26+
import java.util.concurrent.TimeUnit;
27+
import lombok.extern.slf4j.Slf4j;
28+
import org.apache.commons.collections4.CollectionUtils;
29+
import org.apache.pulsar.broker.BrokerTestUtil;
30+
import org.apache.pulsar.broker.ServiceConfiguration;
31+
import org.apache.pulsar.client.api.Message;
32+
import org.apache.pulsar.client.api.MessageId;
33+
import org.apache.pulsar.client.api.Producer;
34+
import org.apache.pulsar.client.api.Schema;
35+
import org.apache.pulsar.common.naming.TopicName;
36+
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
37+
import org.apache.pulsar.common.policies.data.PublishRate;
38+
import org.apache.pulsar.common.policies.data.TopicType;
39+
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
40+
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
41+
import org.awaitility.Awaitility;
42+
import org.testng.annotations.AfterClass;
43+
import org.testng.annotations.BeforeClass;
44+
import org.testng.annotations.DataProvider;
45+
import org.testng.annotations.Test;
46+
47+
@Slf4j
48+
@Test(groups = "broker")
49+
public class ReplicationTopicGcTest extends OneWayReplicatorTestBase {
50+
51+
@Override
52+
@BeforeClass(alwaysRun = true, timeOut = 300000)
53+
public void setup() throws Exception {
54+
super.setup();
55+
}
56+
57+
@Override
58+
@AfterClass(alwaysRun = true, timeOut = 300000)
59+
public void cleanup() throws Exception {
60+
super.cleanup();
61+
}
62+
63+
@DataProvider(name = "topicTypes")
64+
public Object[][] topicTypes() {
65+
return new Object[][]{
66+
{TopicType.NON_PARTITIONED},
67+
{TopicType.PARTITIONED}
68+
};
69+
}
70+
71+
protected void setConfigDefaults(ServiceConfiguration config, String clusterName,
72+
LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
73+
super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk);
74+
config.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
75+
config.setBrokerDeleteInactiveTopicsEnabled(true);
76+
config.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
77+
config.setBrokerDeleteInactiveTopicsFrequencySeconds(5);
78+
config.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(5);
79+
config.setReplicationPolicyCheckDurationSeconds(1);
80+
}
81+
82+
@Test(dataProvider = "topicTypes")
83+
public void testTopicGC(TopicType topicType) throws Exception {
84+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
85+
final String schemaId = TopicName.get(topicName).getSchemaName();
86+
final String subTopic = topicType == TopicType.NON_PARTITIONED ? topicName
87+
: TopicName.get(topicName).getPartition(0).toString();
88+
if (topicType == TopicType.NON_PARTITIONED) {
89+
admin1.topics().createNonPartitionedTopic(topicName);
90+
} else {
91+
admin1.topics().createPartitionedTopic(topicName, 1);
92+
}
93+
94+
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
95+
// Wait for replicator started.
96+
waitReplicatorStarted(subTopic);
97+
98+
// Trigger a topic level policies.
99+
PublishRate publishRate = new PublishRate(1000, 1024 * 1024);
100+
admin1.topicPolicies().setPublishRate(topicName, publishRate);
101+
admin2.topicPolicies().setPublishRate(topicName, publishRate);
102+
// Write a schema.
103+
// Since there is a producer registered one the source cluster, skipped to write a schema.
104+
admin2.schemas().createSchema(topicName, Schema.STRING.getSchemaInfo());
105+
106+
// Trigger GC through close all clients.
107+
producer1.close();
108+
// Verify: all resources were deleted.
109+
Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
110+
// sub topic.
111+
assertFalse(pulsar1.getPulsarResources().getTopicResources()
112+
.persistentTopicExists(TopicName.get(subTopic)).get());
113+
assertFalse(pulsar2.getPulsarResources().getTopicResources()
114+
.persistentTopicExists(TopicName.get(subTopic)).get());
115+
// partitioned topic.
116+
assertFalse(pulsar1.getPulsarResources().getNamespaceResources()
117+
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
118+
assertFalse(pulsar2.getPulsarResources().getNamespaceResources()
119+
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
120+
// topic policies.
121+
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
122+
TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
123+
assertTrue(pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
124+
TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
125+
// schema.
126+
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
127+
assertTrue(CollectionUtils.isEmpty(pulsar2.getSchemaStorage().getAll(schemaId).get()));
128+
});
129+
}
130+
131+
@Test(dataProvider = "topicTypes")
132+
public void testRemoteClusterStillConsumeAfterCurrentClusterGc(TopicType topicType) throws Exception {
133+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
134+
final String subscription = "s1";
135+
final String schemaId = TopicName.get(topicName).getSchemaName();
136+
final String subTopic = topicType == TopicType.NON_PARTITIONED ? topicName
137+
: TopicName.get(topicName).getPartition(0).toString();
138+
if (topicType == TopicType.NON_PARTITIONED) {
139+
admin1.topics().createNonPartitionedTopic(topicName);
140+
} else {
141+
admin1.topics().createPartitionedTopic(topicName, 1);
142+
}
143+
144+
// Wait for replicator started.
145+
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
146+
waitReplicatorStarted(subTopic);
147+
admin2.topics().createSubscription(topicName, subscription, MessageId.earliest);
148+
149+
if (usingGlobalZK) {
150+
admin2.topics().setReplicationClusters(topicName, Collections.singletonList(cluster2));
151+
waitReplicatorStopped(pulsar2, pulsar1, subTopic);
152+
}
153+
154+
// Send a message
155+
producer1.send("msg-1");
156+
Awaitility.await().untilAsserted(() -> {
157+
assertEquals(admin2.topics().getStats(subTopic).getSubscriptions().get(subscription).getMsgBacklog(), 1);
158+
});
159+
160+
// Trigger GC through close all clients.
161+
producer1.close();
162+
// Verify: the topic was removed on the source cluster.
163+
Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
164+
// sub topic.
165+
assertFalse(pulsar1.getPulsarResources().getTopicResources()
166+
.persistentTopicExists(TopicName.get(subTopic)).get());
167+
// partitioned topic.
168+
assertFalse(pulsar1.getPulsarResources().getNamespaceResources()
169+
.getPartitionedTopicResources().partitionedTopicExists(TopicName.get(topicName)));
170+
// topic policies.
171+
assertTrue(pulsar1.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(topicName),
172+
TopicPoliciesService.GetType.DEFAULT).get().isEmpty());
173+
// schema.
174+
assertTrue(CollectionUtils.isEmpty(pulsar1.getSchemaStorage().getAll(schemaId).get()));
175+
});
176+
177+
// Verify: consumer still can consume messages from the remote cluster.
178+
org.apache.pulsar.client.api.Consumer<String> consumer =
179+
client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subscription).subscribe();
180+
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
181+
assertNotNull(msg);
182+
assertEquals(msg.getValue(), "msg-1");
183+
184+
// Cleanup.
185+
consumer.close();
186+
if (topicType == TopicType.NON_PARTITIONED) {
187+
admin2.topics().delete(topicName);
188+
} else {
189+
admin2.topics().deletePartitionedTopic(topicName);
190+
}
191+
}
192+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.pulsar.common.policies.data.TopicType;
23+
import org.testng.annotations.AfterClass;
24+
import org.testng.annotations.BeforeClass;
25+
import org.testng.annotations.Test;
26+
27+
@Slf4j
28+
@Test(groups = "broker")
29+
public class ReplicationTopicGcUsingGlobalZKTest extends ReplicationTopicGcTest {
30+
31+
@Override
32+
@BeforeClass(alwaysRun = true, timeOut = 300000)
33+
public void setup() throws Exception {
34+
super.usingGlobalZK = true;
35+
super.setup();
36+
}
37+
38+
@Override
39+
@AfterClass(alwaysRun = true, timeOut = 300000)
40+
public void cleanup() throws Exception {
41+
super.cleanup();
42+
}
43+
44+
@Test(dataProvider = "topicTypes")
45+
public void testTopicGC(TopicType topicType) throws Exception {
46+
if (topicType.equals(TopicType.PARTITIONED)) {
47+
// Pulsar does not support the feature "brokerDeleteInactivePartitionedTopicMetadataEnabled" when enabling
48+
// Geo-Replication with Global ZK.
49+
return;
50+
}
51+
super.testTopicGC(topicType);
52+
}
53+
54+
@Test(dataProvider = "topicTypes")
55+
public void testRemoteClusterStillConsumeAfterCurrentClusterGc(TopicType topicType) throws Exception {
56+
if (topicType.equals(TopicType.PARTITIONED)) {
57+
// Pulsar does not support the feature "brokerDeleteInactivePartitionedTopicMetadataEnabled" when enabling
58+
// Geo-Replication with Global ZK.
59+
return;
60+
}
61+
super.testRemoteClusterStillConsumeAfterCurrentClusterGc(topicType);
62+
}
63+
}

0 commit comments

Comments
 (0)