Skip to content

Commit 39bb675

Browse files
authored
[fix][broker] Fix wrong behaviour when using namespace.allowed_clusters, such as namespace deletion and namespace policies updating (#24860)
1 parent f55d45a commit 39bb675

13 files changed

Lines changed: 343 additions & 83 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,9 @@ protected Policies getNamespacePolicies(String tenant, String cluster, String na
514514
return getNamespacePolicies(ns);
515515
}
516516

517+
/**
518+
* Directly get the replication clusters for a namespace, without checking allowed clusters.
519+
*/
517520
protected CompletableFuture<Set<String>> getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) {
518521
return namespaceResources().getPoliciesAsync(namespaceName)
519522
.thenApply(policies -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.pulsar.client.admin.PulsarAdmin;
6060
import org.apache.pulsar.common.naming.Constants;
6161
import org.apache.pulsar.common.naming.NamedEntity;
62+
import org.apache.pulsar.common.naming.NamespaceName;
6263
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
6364
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
6465
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -792,9 +793,11 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
792793
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
793794
List<CompletableFuture<String>> namespaceNamesInCluster = namespaces.stream()
794795
.map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName)
795-
.thenApply(policies -> policies.replication_clusters.contains(cluster)
796-
? namespaceName : null))
797-
.collect(Collectors.toList());
796+
.thenApply(policies -> {
797+
boolean allowed = pulsar().getBrokerService()
798+
.isCurrentClusterAllowed(NamespaceName.get(namespaceName), policies);
799+
return allowed ? namespaceName : null;
800+
})).collect(Collectors.toList());
798801
return FutureUtil.waitForAll(namespaceNamesInCluster).thenApply(
799802
__ -> namespaceNamesInCluster.stream()
800803
.map(CompletableFuture::join)

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 62 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.net.URI;
2828
import java.net.URL;
2929
import java.util.ArrayList;
30-
import java.util.Collections;
3130
import java.util.HashMap;
3231
import java.util.HashSet;
3332
import java.util.List;
@@ -221,7 +220,8 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
221220
precheckWhenDeleteNamespace(namespaceName, force)
222221
.thenCompose(policies -> {
223222
final CompletableFuture<List<String>> topicsFuture;
224-
if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){
223+
if (policies == null || !pulsar().getBrokerService()
224+
.isCurrentClusterAllowed(namespaceName, policies)) {
225225
topicsFuture = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
226226
} else {
227227
topicsFuture = pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
@@ -418,22 +418,25 @@ private CompletableFuture<Policies> precheckWhenDeleteNamespace(NamespaceName ns
418418
return CompletableFuture.completedFuture(null);
419419
}
420420
Policies policies = policiesOpt.get();
421-
Set<String> replicationClusters = policies.replication_clusters;
422-
if (replicationClusters.size() > 1) {
421+
// Just keep the behavior of V1 namespace being the same as before.
422+
if (!nsName.isV2() && policies.replication_clusters.isEmpty()
423+
&& policies.allowed_clusters.isEmpty()) {
424+
return CompletableFuture.completedFuture(policies);
425+
}
426+
String cluster = policies.getClusterThatCanDeleteNamespace();
427+
if (cluster == null) {
423428
// There are still more than one clusters configured for the global namespace
424429
throw new RestException(Status.PRECONDITION_FAILED,
425-
"Cannot delete the global namespace " + nsName + ". There are still more than "
426-
+ "one replication clusters configured.");
430+
"Cannot delete the global namespace " + nsName + ". There are still more than "
431+
+ "one replication clusters configured.");
427432
}
428-
if (replicationClusters.size() == 1
429-
&& !policies.replication_clusters.contains(config().getClusterName())) {
433+
if (!cluster.equals(config().getClusterName())) {
430434
// the only replication cluster is other cluster, redirect
431-
String replCluster = new ArrayList<>(policies.replication_clusters).get(0);
432-
return clusterResources().getClusterAsync(replCluster)
435+
return clusterResources().getClusterAsync(cluster)
433436
.thenCompose(replClusterDataOpt -> {
434437
ClusterData replClusterData = replClusterDataOpt
435438
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
436-
"Cluster " + replCluster + " does not exist"));
439+
"Cluster " + cluster + " does not exist"));
437440
URL replClusterUrl;
438441
try {
439442
if (!replClusterData.isBrokerClientTlsEnabled()) {
@@ -453,7 +456,7 @@ private CompletableFuture<Policies> precheckWhenDeleteNamespace(NamespaceName ns
453456
.replaceQueryParam("authoritative", false).build();
454457
if (log.isDebugEnabled()) {
455458
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
456-
clientAppId(), redirect, replCluster);
459+
clientAppId(), redirect, cluster);
457460
}
458461
throw new WebApplicationException(
459462
Response.temporaryRedirect(redirect).build());
@@ -503,22 +506,25 @@ protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bund
503506
.thenCompose(policies -> {
504507
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
505508
if (namespaceName.isGlobal()) {
506-
507-
if (policies.replication_clusters.size() > 1) {
509+
// Just keep the behavior of V1 namespace being the same as before.
510+
if (!namespaceName.isV2() && policies.replication_clusters.isEmpty()
511+
&& policies.allowed_clusters.isEmpty()) {
512+
return CompletableFuture.completedFuture(null);
513+
}
514+
String cluster = policies.getClusterThatCanDeleteNamespace();
515+
if (cluster == null) {
508516
// There are still more than one clusters configured for the global namespace
509517
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
510518
+ namespaceName
511519
+ ". There are still more than one replication clusters configured.");
512520
}
513-
if (policies.replication_clusters.size() == 1
514-
&& !policies.replication_clusters.contains(config().getClusterName())) {
521+
if (!cluster.equals(config().getClusterName())) { // No need to change.
515522
// the only replication cluster is other cluster, redirect
516-
String replCluster = new ArrayList<>(policies.replication_clusters).get(0);
517-
future = clusterResources().getClusterAsync(replCluster)
523+
future = clusterResources().getClusterAsync(cluster)
518524
.thenCompose(clusterData -> {
519525
if (clusterData.isEmpty()) {
520526
throw new RestException(Status.NOT_FOUND,
521-
"Cluster " + replCluster + " does not exist");
527+
"Cluster " + cluster + " does not exist");
522528
}
523529
ClusterData replClusterData = clusterData.get();
524530
URL replClusterUrl;
@@ -542,7 +548,7 @@ protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bund
542548
.replaceQueryParam("authoritative", false).build();
543549
if (log.isDebugEnabled()) {
544550
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
545-
clientAppId(), redirect, replCluster);
551+
clientAppId(), redirect, cluster);
546552
}
547553
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
548554
});
@@ -739,6 +745,9 @@ protected CompletableFuture<Void> internalRevokePermissionsOnSubscriptionAsync(S
739745
subscriptionName, role, null/* additional auth-data json */));
740746
}
741747

748+
/**
749+
* Directly get the replication clusters for a namespace, without checking allowed clusters.
750+
*/
742751
protected CompletableFuture<Set<String>> internalGetNamespaceReplicationClustersAsync() {
743752
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ)
744753
.thenAccept(__ -> {
@@ -778,21 +787,19 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
778787
"Invalid cluster id: " + clusterId);
779788
}
780789
return validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
781-
.thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName)
782-
.thenCompose(nsPolicies -> {
783-
if (nsPolicies.allowed_clusters.isEmpty()) {
784-
return validateClusterForTenantAsync(
785-
namespaceName.getTenant(), clusterId);
786-
}
787-
if (!nsPolicies.allowed_clusters.contains(clusterId)) {
788-
String msg = String.format("Cluster [%s] is not in the "
789-
+ "list of allowed clusters list for namespace "
790-
+ "[%s]", clusterId, namespaceName.toString());
791-
log.info(msg);
792-
throw new RestException(Status.FORBIDDEN, msg);
793-
}
794-
return CompletableFuture.completedFuture(null);
795-
}));
790+
.thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName)
791+
.thenCompose(nsPolicies -> {
792+
if (!Policies.checkNewReplicationClusters(nsPolicies,
793+
replicationClusterSet)) {
794+
String msg = String.format("Cluster [%s] is not in the "
795+
+ "list of allowed clusters list for namespace "
796+
+ "[%s]", clusterId, namespaceName.toString());
797+
log.info(msg);
798+
throw new RestException(Status.BAD_REQUEST, msg);
799+
}
800+
return validateClusterForTenantAsync(
801+
namespaceName.getTenant(), clusterId);
802+
}));
796803
}).collect(Collectors.toList());
797804
return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
798805
}))
@@ -1967,13 +1974,17 @@ protected BundlesData validateBundlesData(BundlesData initialBundles) {
19671974
}
19681975

19691976
private CompletableFuture<Void> validatePoliciesAsync(NamespaceName ns, Policies policies) {
1970-
if (ns.isV2() && policies.replication_clusters.isEmpty()) {
1971-
// Default to local cluster
1972-
policies.replication_clusters = Collections.singleton(config().getClusterName());
1977+
if (!policies.checkAllowedAndReplicationClusters()) {
1978+
String msg = String.format("[%s] All replication clusters should be included in allowed clusters."
1979+
+ " Repl clusters: %s, allowed clusters: %s",
1980+
ns.toString(), policies.replication_clusters, policies.allowed_clusters);
1981+
log.info(msg);
1982+
throw new RestException(Status.BAD_REQUEST, msg);
19731983
}
1984+
pulsar().getBrokerService().setCurrentClusterAllowedIfNoClusterIsAllowed(ns, policies);
19741985

19751986
// Validate cluster names and permissions
1976-
return policies.replication_clusters.stream()
1987+
return Stream.concat(policies.replication_clusters.stream(), policies.allowed_clusters.stream())
19771988
.map(cluster -> validateClusterForTenantAsync(ns.getTenant(), cluster))
19781989
.reduce(CompletableFuture.completedFuture(null), (a, b) -> a.thenCompose(ignore -> b))
19791990
.thenAccept(__ -> {
@@ -2864,16 +2875,15 @@ protected CompletableFuture<Void> internalSetNamespaceAllowedClusters(List<Strin
28642875
throw new RestException(Status.PRECONDITION_FAILED,
28652876
"Cannot specify global in the list of allowed clusters");
28662877
}
2867-
return getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
2868-
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
2869-
if (!clusterIds.contains(replicationCluster)) {
2870-
throw new RestException(Status.BAD_REQUEST,
2871-
String.format("Allowed clusters do not contain the replication cluster %s. "
2872-
+ "Please remove the replication cluster if the cluster is not allowed "
2873-
+ "for this namespace", replicationCluster));
2874-
}
2875-
});
2876-
return Sets.newHashSet(clusterIds);
2878+
return getNamespacePoliciesAsync(this.namespaceName).thenApply(nsPolicies -> {
2879+
Set<String> clusterSet = Sets.newHashSet(clusterIds);
2880+
if (!Policies.checkNewAllowedClusters(nsPolicies, clusterSet)){
2881+
throw new RestException(Status.BAD_REQUEST,
2882+
String.format("Allowed clusters do not contain the replication cluster %s. "
2883+
+ "Please remove the replication cluster if the cluster is not allowed "
2884+
+ "for this namespace", nsPolicies.replication_clusters));
2885+
}
2886+
return clusterSet;
28772887
});
28782888
})
28792889
// Verify the allowed clusters are valid and they do not contain the peer clusters.
@@ -2896,6 +2906,9 @@ protected CompletableFuture<Void> internalSetNamespaceAllowedClusters(List<Strin
28962906
}));
28972907
}
28982908

2909+
/**
2910+
* Directly get the allowed clusters for a namespace, without checking replication clusters.
2911+
*/
28992912
protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsync() {
29002913
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
29012914
.thenAccept(__ -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ private CompletableFuture<Set<String>> getReplicationClusters() {
514514
return CompletableFuture.completedFuture(null);
515515
}
516516
// Query the topic-level policies only if the namespace-level policies exist.
517-
// Global policies does not affet Replication.
517+
// Global policies does not affect Replication.
518518
final var namespacePolicies = optionalPolicies.get();
519519
return pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName,
520520
TopicPoliciesService.GetType.LOCAL_ONLY

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {
9797
protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
9898

9999
protected final String topic;
100+
protected final NamespaceName namespace;
100101

101102
// Reference to the CompletableFuture returned when creating this topic in BrokerService.
102103
// Used to safely remove the topic from BrokerService's cache by ensuring we remove the exact
@@ -183,6 +184,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {
183184

184185
public AbstractTopic(String topic, BrokerService brokerService) {
185186
this.topic = topic;
187+
this.namespace = TopicName.get(topic).getNamespaceObject();
186188
this.clock = brokerService.getClock();
187189
this.brokerService = brokerService;
188190
this.producers = new ConcurrentHashMap<>();

0 commit comments

Comments
 (0)