diff --git a/conf/broker.conf b/conf/broker.conf index 04f38c958e1ad..80d722076d151 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -179,6 +179,11 @@ allowAutoSubscriptionCreation=true # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 +# Whether to check if a topic exists when querying partitions of the topic. +# It's enabled by default, when a topic is not created, querying the partitions of this topic will throw an exception. +# Otherwise, the queried result is 0. Disable this option to be compatible with some old clients. +checkTopicExistsWhenQueryPartitions=true + # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true. brokerDeleteInactiveTopicsEnabled=true diff --git a/conf/standalone.conf b/conf/standalone.conf index b1b3a068e35c2..3b471ddd3b968 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1113,6 +1113,11 @@ allowAutoSubscriptionCreation=true # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 +# Whether to check if a topic exists when querying partitions of the topic. +# It's enabled by default, when a topic is not created, querying the partitions of this topic will throw an exception. +# Otherwise, the queried result is 0. Disable this option to be compatible with some old clients. +checkTopicExistsWhenQueryPartitions=true + ### --- Transaction config variables --- ### # Enable transaction coordinator in broker transactionCoordinatorEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index d7617ef277444..363c21aa9486b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1376,6 +1376,15 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se ) private Set messagingProtocols = new TreeSet<>(); + @FieldContext( + category = CATEGORY_PROTOCOLS, + doc = "Whether to check if a topic exists when querying partitions of the topic. " + + "It's enabled by default, when a topic is not created, querying the partitions of this topic will " + + "throw an exception. Otherwise, the queried result is 0. " + + "Disable this option to be compatible with some old clients." + ) + private boolean checkTopicExistsWhenQueryPartitions = true; + @FieldContext( category = CATEGORY_SERVER, doc = "Enable or disable system topic.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index ad141c5884fc1..b7572765c5ff3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -492,7 +492,7 @@ protected CompletableFuture getPartitionedTopicMetadat return pulsar().getBrokerService() .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName); } else { - return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName); + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName, true); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index dfd1d4eaa0b8d..1996691e34714 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -582,7 +582,9 @@ protected CompletableFuture internalGetPartitionedMeta } else { ret = CompletableFuture.completedFuture(null); } - return ret.thenApply(__ -> metadata); + return ret.thenApply(__ -> (metadata.partitions < 0) + ? new PartitionedTopicMetadata(0, metadata.properties) + : metadata); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2015128b9e800..08c7c0fb22b3d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2893,6 +2893,11 @@ private void createPendingLoadTopic() { public CompletableFuture fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync( TopicName topicName) { + return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName, false); + } + + public CompletableFuture fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync( + TopicName topicName, boolean checkTopicExists) { if (pulsar.getNamespaceService() == null) { return FutureUtil.failedFuture(new NamingException("namespace service is not ready")); } @@ -2946,7 +2951,13 @@ public CompletableFuture fetchPartitionedTopicMetadata return null; }); } else { - future.complete(metadata); + if (checkTopicExists + && metadata.partitions == 0 + && !topicExists) { + future.complete(new PartitionedTopicMetadata(-1, metadata.properties)); + } else { + future.complete(metadata); + } } }); @@ -2980,6 +2991,16 @@ private CompletableFuture createDefaultPartitionedTopi } public CompletableFuture fetchPartitionedTopicMetadataAsync(TopicName topicName) { + return fetchPartitionedTopicMetadataAsync(topicName, false); + } + + public CompletableFuture fetchPartitionedTopicMetadataAsync( + TopicName topicName, boolean mightCheckAllowAutoCreation) { + if (mightCheckAllowAutoCreation && !pulsar.getConfig().isCheckTopicExistsWhenQueryPartitions()) { + // Some old clients might not add the "checkAllowAutoCreation=true" query param. If this option is enabled, + // use the same behavior with that query param. + return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName, true); + } // gets the number of partitions from the configuration cache return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() .getPartitionedTopicMetadataAsync(topicName).thenApply(metadata -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HttpPartitionMetadataLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HttpPartitionMetadataLookupTest.java new file mode 100644 index 0000000000000..856164035bfec --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HttpPartitionMetadataLookupTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.util.concurrent.ExecutionException; +import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import org.testng.collections.Sets; + +@Test(groups = "broker-impl") +public class HttpPartitionMetadataLookupTest extends MockedPulsarServiceBaseTest { + + private final EventLoopGroup eventExecutors = new NioEventLoopGroup(); + + @DataProvider + public Object[][] legacyLookup() { + return new Object[][] { {true}, {false} }; + } + + @BeforeMethod + @Override + protected void setup() throws Exception { + // No ops + } + + private void internalSetup(boolean legacy) throws Exception { + if (legacy) { + conf.setCheckTopicExistsWhenQueryPartitions(false); + } + super.internalSetup(); + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + admin.tenants().createTenant("prop", + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("prop/ns-abc"); + admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc", Sets.newHashSet("test")); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 45000, dataProvider = "legacyLookup") + public void testLegacyLookup(boolean legacy) throws Exception { + internalSetup(legacy); + BinaryProtoLookupService binaryLookup = (BinaryProtoLookupService) + ((PulsarClientImpl) pulsar.getClient()).getLookup(); + @Cleanup HttpLookupService lookup = new HttpLookupService(newConf(pulsar), eventExecutors); + @Cleanup LegacyHttpLookupService legacyLookup = new LegacyHttpLookupService(pulsar, eventExecutors); + String topic = "persistent://prop/ns-abc/nonexistent-topic"; + try { + assertEquals(legacyLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0); + } catch (ExecutionException e) { + assertFalse(legacy); + assertTrue(e.getCause() instanceof PulsarClientException.NotFoundException); + } + try { + assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 0); + } catch (PulsarAdminException e) { + assertFalse(legacy); + assertTrue(e instanceof PulsarAdminException.NotFoundException); + } + assertEquals(lookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0); + assertEquals(binaryLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0); + + topic = "persistent://prop/ns-abc/non-partitioned-topic"; + admin.topics().createNonPartitionedTopic(topic); + assertEquals(legacyLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0); + assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 0); + assertEquals(lookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0); + assertEquals(binaryLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0); + + topic = "persistent://prop/ns-abc/partitioned-topic"; + admin.topics().createPartitionedTopic(topic, 1); + assertEquals(legacyLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 1); + assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 1); + assertEquals(lookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 1); + assertEquals(binaryLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 1); + } + + private static ClientConfigurationData newConf(PulsarService pulsar) { + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl(pulsar.getWebServiceAddress()); + return conf; + } + + private static class LegacyHttpLookupService extends HttpLookupService { + + public LegacyHttpLookupService(PulsarService pulsar, EventLoopGroup eventLoopGroup) + throws PulsarClientException { + super(newConf(pulsar), eventLoopGroup); + } + + @Override + public boolean checkAllowTopicCreation() { + return false; + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 7969ce402363f..11ba328f7fb5f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import com.google.common.annotations.VisibleForTesting; import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.net.InetSocketAddress; @@ -111,10 +112,16 @@ public CompletableFuture> getBroker(T }); } + @VisibleForTesting + protected boolean checkAllowTopicCreation() { + return true; + } + @Override public CompletableFuture getPartitionedTopicMetadata(TopicName topicName) { String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; - return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true", + return httpClient.get(String.format(format, topicName.getLookupName()) + + (checkAllowTopicCreation() ? "?checkAllowAutoCreation=true" : ""), PartitionedTopicMetadata.class); }