Skip to content

Commit e046576

Browse files
committed
[fix][broker]V1 topic creation should fail if namespace does not exist
1 parent 313ae97 commit e046576

3 files changed

Lines changed: 46 additions & 2 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,12 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str
323323

324324
protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean authoritative,
325325
Map<String, String> properties) {
326-
CompletableFuture<Void> ret = validateNonPartitionTopicNameAsync(topicName.getLocalName());
326+
CompletableFuture<Void> ret = namespaceResources().namespaceExistsAsync(namespaceName)
327+
.thenAccept(exists -> {
328+
if (!exists) {
329+
throw new RestException(Status.NOT_FOUND, "V1 namespace [" + namespaceName + "] does not exist");
330+
}
331+
}). thenCompose(__ -> validateNonPartitionTopicNameAsync(topicName.getLocalName()));
327332
if (topicName.isGlobal()) {
328333
ret = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));
329334
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,12 @@ && pulsar().getBrokerService().isAuthorizationEnabled()) {
10741074
}
10751075
});
10761076
}
1077-
return CompletableFuture.completedFuture(null);
1077+
return namespaceResources().namespaceExistsAsync(namespaceName)
1078+
.thenAccept(exists -> {
1079+
if (!exists) {
1080+
throw new RestException(Status.NOT_FOUND, "V1 namespace [" + namespaceName + "] does not exist");
1081+
}
1082+
});
10781083
}
10791084

10801085
public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy,

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.pulsar.broker.PulsarService;
3535
import org.apache.pulsar.broker.ServiceConfiguration;
3636
import org.apache.pulsar.client.admin.PulsarAdmin;
37+
import org.apache.pulsar.client.admin.PulsarAdminException;
3738
import org.apache.pulsar.client.api.PulsarClient;
3839
import org.apache.pulsar.client.api.PulsarClientException;
3940
import org.apache.pulsar.client.impl.ClientCnx;
@@ -635,4 +636,37 @@ public void testTenantNotExist(TopicDomain topicDomain) throws Exception {
635636
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
636637
});
637638
}
639+
640+
@Test
641+
public void testTopicNameContainsSpecialCharacters() throws Exception {
642+
// V1 topic create failed because v1 namespace does not exist.
643+
String topic = "persistent://public/default/tp/h";
644+
try {
645+
admin1.topics().createNonPartitionedTopic(topic);
646+
fail("Expected a namespace not found ex, since the v1 namespace does not exist");
647+
} catch (PulsarAdminException.NotFoundException ex) {
648+
assertTrue(ex.getMessage().contains("does not exist"));
649+
}
650+
try {
651+
admin1.topics().createPartitionedTopic(topic, 1);
652+
fail("Expected a namespace not found ex, since the v1 namespace does not exist");
653+
} catch (PulsarAdminException.NotFoundException ex) {
654+
assertTrue(ex.getMessage().contains("does not exist"));
655+
}
656+
657+
// The topic can be created after v1 namespace was created
658+
String v1Ns = "public/" + pulsar1.getConfig().getClusterName() + "/default";
659+
admin1.namespaces().createNamespace(v1Ns);
660+
String topic2 = "persistent://" + v1Ns + "/h";
661+
String topic3 = "persistent://" + v1Ns + "/h2";
662+
admin1.topics().createNonPartitionedTopic(topic2);
663+
admin1.topics().createPartitionedTopic(topic3, 1);
664+
List<String> v1Topics = admin1.topics().getList(v1Ns);
665+
assertTrue(v1Topics.contains(topic2));
666+
assertTrue(v1Topics.contains(topic3 + "-partition-0"));
667+
668+
// cleanup.
669+
admin1.topics().delete(topic2, false);
670+
admin1.topics().deletePartitionedTopic(topic3, false);
671+
}
638672
}

0 commit comments

Comments
 (0)