Skip to content

Commit e0fe818

Browse files
authored
[fix][broker] Filter system topic when getting topic list by binary proto. (#19667)
1 parent e75def8 commit e0fe818

3 files changed

Lines changed: 58 additions & 4 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2182,8 +2182,8 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
21822182
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
21832183
.thenAccept(topics -> {
21842184
boolean filterTopics = false;
2185-
// filter transaction internal topic
2186-
List<String> filteredTopics = TopicList.filterTransactionInternalName(topics);
2185+
// filter system topic
2186+
List<String> filteredTopics = TopicList.filterSystemTopic(topics);
21872187

21882188
if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) {
21892189
if (topicsPattern.get().length() <= maxSubscriptionPatternLength) {

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
6363
public void setup() throws Exception {
6464
// set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
6565
isTcpLookup = true;
66+
// enabled transaction, to test pattern consumers not subscribe to transaction system topic.
67+
conf.setTransactionCoordinatorEnabled(true);
6668
super.internalSetup();
6769
super.producerBaseSetup();
6870
}
@@ -251,6 +253,58 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception {
251253
producer4.close();
252254
}
253255

256+
@Test(timeOut = testTimeout)
257+
public void testBinaryProtoSubscribeAllTopicOfNamespace() throws Exception {
258+
String key = "testBinaryProtoSubscribeAllTopicOfNamespace";
259+
String subscriptionName = "my-ex-subscription-" + key;
260+
String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
261+
String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
262+
String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
263+
Pattern pattern = Pattern.compile("my-property/my-ns/.*");
264+
265+
// 1. create partition
266+
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
267+
admin.tenants().createTenant("prop", tenantInfo);
268+
admin.topics().createPartitionedTopic(topicName1, 1);
269+
admin.topics().createPartitionedTopic(topicName2, 2);
270+
admin.topics().createPartitionedTopic(topicName3, 3);
271+
272+
// 2. create producer to trigger create system topic.
273+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName1)
274+
.enableBatching(false)
275+
.messageRoutingMode(MessageRoutingMode.SinglePartition)
276+
.create();
277+
278+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
279+
.topicsPattern(pattern)
280+
.patternAutoDiscoveryPeriod(2)
281+
.subscriptionName(subscriptionName)
282+
.subscriptionType(SubscriptionType.Shared)
283+
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
284+
.receiverQueueSize(4)
285+
.subscribe();
286+
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
287+
288+
// 4. verify consumer get methods, to get right number of partitions and topics.
289+
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
290+
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
291+
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
292+
293+
assertEquals(topics.size(), 6);
294+
assertEquals(consumers.size(), 6);
295+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 3);
296+
297+
topics.forEach(topic -> log.info("topic: {}", topic));
298+
consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
299+
300+
IntStream.range(0, topics.size()).forEach(index ->
301+
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
302+
303+
consumer.unsubscribe();
304+
producer.close();
305+
consumer.close();
306+
}
307+
254308
@Test(timeOut = testTimeout)
255309
public void testPubRateOnNonPersistent() throws Exception {
256310
cleanup();

pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ public static List<String> filterTopics(List<String> original, Pattern topicsPat
5757
.collect(Collectors.toList());
5858
}
5959

60-
public static List<String> filterTransactionInternalName(List<String> original) {
60+
public static List<String> filterSystemTopic(List<String> original) {
6161
return original.stream()
62-
.filter(topic -> !SystemTopicNames.isTransactionInternalName(TopicName.get(topic)))
62+
.filter(topic -> !SystemTopicNames.isSystemTopic(TopicName.get(topic)))
6363
.collect(Collectors.toList());
6464
}
6565

0 commit comments

Comments
 (0)