Skip to content

Commit 9033223

Browse files
authored
[fix][broker]Namespaces can be created with may empty replication_clusters policy (#25551)
1 parent a1613bc commit 9033223

3 files changed

Lines changed: 43 additions & 8 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ private CompletableFuture<Policies> precheckWhenDeleteNamespace(NamespaceName ns
491491
// There are still more than one clusters configured for the global namespace
492492
throw new RestException(Status.PRECONDITION_FAILED,
493493
"Cannot delete the global namespace " + nsName + ". There are still more than "
494-
+ "one replication clusters configured.");
494+
+ "one replication clusters configured or replication clusters is empty.");
495495
}
496496
if (!cluster.equals(config().getClusterName())) {
497497
// the only replication cluster is other cluster, redirect
@@ -2392,7 +2392,7 @@ private CompletableFuture<Void> validatePoliciesAsync(NamespaceName ns, Policies
23922392
log.info(msg);
23932393
return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, msg));
23942394
}
2395-
pulsar().getBrokerService().setCurrentClusterAllowedIfNoClusterIsAllowed(ns, policies);
2395+
pulsar().getBrokerService().setCurrentClusterAllowedWhenCreating(ns, policies);
23962396

23972397
// Validate cluster names and permissions
23982398
return Stream.concat(policies.replication_clusters.stream(), policies.allowed_clusters.stream())

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4153,14 +4153,14 @@ public boolean isCurrentClusterAllowed(NamespaceName nsName, Policies nsPolicies
41534153
return nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName());
41544154
}
41554155

4156-
public void setCurrentClusterAllowedIfNoClusterIsAllowed(NamespaceName nsName, Policies nsPolicies) {
4157-
if (nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName())
4158-
|| nsPolicies.allowed_clusters.contains(pulsar.getConfig().getClusterName())) {
4159-
return;
4160-
}
4156+
public void setCurrentClusterAllowedWhenCreating(NamespaceName nsName, Policies nsPolicies) {
41614157
if (nsPolicies.replication_clusters.isEmpty()) {
41624158
nsPolicies.replication_clusters.add(pulsar.getConfig().getClusterName());
4163-
} else {
4159+
}
4160+
if (nsPolicies.allowed_clusters.isEmpty()) {
4161+
return;
4162+
}
4163+
if (!nsPolicies.allowed_clusters.contains(pulsar.getConfig().getClusterName())) {
41644164
nsPolicies.allowed_clusters.add(pulsar.getConfig().getClusterName());
41654165
}
41664166
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@
4141
import com.google.common.collect.Lists;
4242
import com.google.common.collect.Sets;
4343
import java.lang.reflect.Field;
44+
import java.net.URI;
4445
import java.net.URL;
46+
import java.net.http.HttpClient;
47+
import java.net.http.HttpRequest;
48+
import java.net.http.HttpResponse;
4549
import java.nio.charset.StandardCharsets;
4650
import java.time.Clock;
4751
import java.util.ArrayList;
@@ -137,6 +141,7 @@
137141
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
138142
import org.apache.pulsar.common.policies.data.PersistencePolicies;
139143
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
144+
import org.apache.pulsar.common.policies.data.Policies;
140145
import org.apache.pulsar.common.policies.data.RetentionPolicies;
141146
import org.apache.pulsar.common.policies.data.SubscriptionStats;
142147
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -1739,6 +1744,36 @@ public void testCreateNamespaceWithNoClusters() throws PulsarAdminException {
17391744
Collections.singletonList(localCluster));
17401745
}
17411746

1747+
@Test
1748+
public void testCreateNamespaceWithEmptyReplicationClustersByHttp() throws Exception {
1749+
String localCluster = pulsar.getConfiguration().getClusterName();
1750+
String namespacePart = newUniqueName("ns");
1751+
String namespace = defaultTenant + "/" + namespacePart;
1752+
1753+
// Create namespace with "allowed_cluster", and the param "replication_clusters" is empty.
1754+
HttpClient httpClient = HttpClient.newHttpClient();
1755+
URI adminV2Uri = URI.create(brokerUrl.toString()).resolve("/admin/v2/");
1756+
String namespaceRequestBody = "{\"allowed_clusters\": [\"" + localCluster + "\"]}";
1757+
HttpRequest createNamespaceRequest =
1758+
HttpRequest.newBuilder(adminV2Uri.resolve("namespaces/" + namespace))
1759+
.header("Content-Type", "application/json")
1760+
.PUT(HttpRequest.BodyPublishers.ofString(namespaceRequestBody))
1761+
.build();
1762+
HttpResponse<String> createNamespaceResponse = httpClient.send(createNamespaceRequest,
1763+
HttpResponse.BodyHandlers.ofString());
1764+
assertEquals(createNamespaceResponse.statusCode(), Status.NO_CONTENT.getStatusCode(),
1765+
"Failed to create namespace by HTTP: " + createNamespaceResponse.body());
1766+
1767+
// Verify: replication_clusters is not empty.
1768+
Awaitility.await().untilAsserted(() -> {
1769+
Policies policies = admin.namespaces().getPolicies(namespace);
1770+
assertEquals(policies.replication_clusters.size(), 1);
1771+
assertEquals(policies.allowed_clusters.size(), 1);
1772+
assertTrue(policies.replication_clusters.contains(localCluster));
1773+
assertTrue(policies.allowed_clusters.contains(localCluster));
1774+
});
1775+
}
1776+
17421777
@Test(timeOut = 30000)
17431778
public void testConsumerStatsLastTimestamp() throws PulsarClientException, PulsarAdminException,
17441779
InterruptedException {

0 commit comments

Comments
 (0)