Skip to content

Commit 2e3e78d

Browse files
authored
[cleanup] PIP-457: Remove NamespaceName.isGlobal() and TopicName.isGlobal() (#25319)
1 parent 5067d0d commit 2e3e78d

11 files changed

Lines changed: 169 additions & 356 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
597597
.thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
598598
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
599599
.thenRun(() -> {
600-
if (!createLocalTopicOnly && topicName.isGlobal()
600+
if (!createLocalTopicOnly
601601
&& pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
602602
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
603603
log.info("[{}] Successfully created partitioned for topic {} for the remote clusters",

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 96 additions & 253 deletions
Large diffs are not rendered by default.

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -239,14 +239,12 @@ public void getPartitionedStats(
239239
throw new RestException(Response.Status.PRECONDITION_FAILED,
240240
"Partitioned Topic Name should not contain '-partition-'");
241241
}
242-
if (topicName.isGlobal()) {
243-
try {
244-
validateGlobalNamespaceOwnership(namespaceName);
245-
} catch (Exception e) {
246-
log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, e);
247-
resumeAsyncResponseExceptionally(asyncResponse, e);
248-
return;
249-
}
242+
try {
243+
validateGlobalNamespaceOwnership(namespaceName);
244+
} catch (Exception e) {
245+
log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, e);
246+
resumeAsyncResponseExceptionally(asyncResponse, e);
247+
return;
250248
}
251249
getPartitionedTopicMetadataAsync(topicName,
252250
authoritative, false).thenAccept(partitionMetadata -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2791,9 +2791,6 @@ private void configValueChanged(String configKey, String newValueStr) {
27912791
* @param namespace
27922792
*/
27932793
private void unloadDeletedReplNamespace(Policies data, NamespaceName namespace) {
2794-
if (!namespace.isGlobal()) {
2795-
return;
2796-
}
27972794
final String localCluster = this.pulsar.getConfiguration().getClusterName();
27982795
if (pulsar.getBrokerService().isCurrentClusterAllowed(namespace, data)) {
27992796
return;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ public CompletableFuture<Void> stopReplProducers() {
583583
@Override
584584
public CompletableFuture<Void> checkReplication() {
585585
TopicName name = TopicName.get(topic);
586-
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
586+
if (NamespaceService.isHeartbeatNamespace(name)
587587
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
588588
return CompletableFuture.completedFuture(null);
589589
}
@@ -990,11 +990,8 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
990990
}
991991

992992
public boolean isActive() {
993-
if (TopicName.get(topic).isGlobal()) {
994-
// No local consumers and no local producers
995-
return !subscriptions.isEmpty() || hasLocalProducers();
996-
}
997-
return currentUsageCount() != 0 || !subscriptions.isEmpty();
993+
// No local consumers and no local producers
994+
return !subscriptions.isEmpty() || hasLocalProducers();
998995
}
999996

1000997
@Override
@@ -1050,34 +1047,31 @@ public void checkGC() {
10501047
} else {
10511048
if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
10521049

1053-
if (TopicName.get(topic).isGlobal()) {
1054-
// For global namespace, close repl producers first.
1055-
// Once all repl producers are closed, we can delete the topic,
1056-
// provided no remote producers connected to the broker.
1057-
if (log.isDebugEnabled()) {
1058-
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
1059-
maxInactiveDurationInSec);
1060-
}
1050+
// Close repl producers first.
1051+
// Once all repl producers are closed, we can delete the topic,
1052+
// provided no remote producers connected to the broker.
1053+
if (log.isDebugEnabled()) {
1054+
log.debug("[{}] Topic inactive for {} seconds, closing repl producers.", topic,
1055+
maxInactiveDurationInSec);
1056+
}
10611057

1062-
stopReplProducers().thenCompose(v -> delete(true, false))
1063-
.thenCompose(__ -> tryToDeletePartitionedMetadata())
1064-
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
1065-
.exceptionally(e -> {
1066-
Throwable throwable = e.getCause();
1067-
if (throwable instanceof TopicBusyException) {
1068-
// topic became active again
1069-
if (log.isDebugEnabled()) {
1070-
log.debug("[{}] Did not delete busy topic: {}", topic,
1071-
throwable.getMessage());
1072-
}
1073-
replicators.forEach((region, replicator) -> replicator.startProducer());
1074-
} else {
1075-
log.warn("[{}] Inactive topic deletion failed", topic, e);
1058+
stopReplProducers().thenCompose(v -> delete(true, false))
1059+
.thenCompose(__ -> tryToDeletePartitionedMetadata())
1060+
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
1061+
.exceptionally(e -> {
1062+
Throwable throwable = e.getCause();
1063+
if (throwable instanceof TopicBusyException) {
1064+
// topic became active again
1065+
if (log.isDebugEnabled()) {
1066+
log.debug("[{}] Did not delete busy topic: {}", topic,
1067+
throwable.getMessage());
10761068
}
1077-
return null;
1078-
});
1079-
1080-
}
1069+
replicators.forEach((region, replicator) -> replicator.startProducer());
1070+
} else {
1071+
log.warn("[{}] Inactive topic deletion failed", topic, e);
1072+
}
1073+
return null;
1074+
});
10811075
}
10821076
}
10831077
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1942,7 +1942,7 @@ CompletableFuture<Void> checkPersistencePolicies() {
19421942
@Override
19431943
public CompletableFuture<Void> checkReplication() {
19441944
TopicName name = TopicName.get(topic);
1945-
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
1945+
if (NamespaceService.isHeartbeatNamespace(name)
19461946
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
19471947
return CompletableFuture.completedFuture(null);
19481948
}
@@ -2112,7 +2112,7 @@ protected CompletableFuture<Boolean> checkAllowedCluster(String localCluster) {
21122112
if (nsPolicies.isPresent()) {
21132113
allowedClusters = nsPolicies.get().allowed_clusters;
21142114
}
2115-
if (TopicName.get(topic).isGlobal() && !topicRepls.contains(localCluster)
2115+
if (!topicRepls.contains(localCluster)
21162116
&& !allowedClusters.contains(localCluster)) {
21172117
log.warn("Local cluster {} is not part of global namespace repl list {} and allowed list {}",
21182118
localCluster, topicRepls, allowedClusters);
@@ -3197,12 +3197,8 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) {
31973197
}
31983198
break;
31993199
}
3200-
if (TopicName.get(topic).isGlobal()) {
3201-
// no local producers
3202-
return hasLocalProducers();
3203-
} else {
3204-
return currentUsageCount() != 0;
3205-
}
3200+
// no local producers
3201+
return hasLocalProducers();
32063202
}
32073203

32083204
private boolean hasBacklogs(boolean getPreciseBacklog) {
@@ -3410,46 +3406,42 @@ public void checkGC() {
34103406
} else {
34113407
CompletableFuture<Void> replCloseFuture = new CompletableFuture<>();
34123408

3413-
if (TopicName.get(topic).isGlobal()) {
3414-
// For global namespace, close repl producers first.
3415-
// Once all repl producers are closed, we can delete the topic,
3416-
// provided no remote producers connected to the broker.
3417-
if (log.isDebugEnabled()) {
3418-
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
3419-
maxInactiveDurationInSec);
3420-
}
3421-
/**
3422-
* There is a race condition that may cause a NPE:
3423-
* - task 1: a callback of "replicator.cursor.asyncRead" will trigger a replication.
3424-
* - task 2: "closeReplProducersIfNoBacklog" called by current thread will make the variable
3425-
* "replicator.producer" to a null value.
3426-
* Race condition: task 1 will get a NPE when it tries to send messages using the variable
3427-
* "replicator.producer", because task 2 will set this variable to "null".
3428-
* TODO Create a seperated PR to fix it.
3429-
*/
3430-
closeReplProducersIfNoBacklog().thenRun(() -> {
3431-
if (hasRemoteProducers()) {
3432-
if (log.isDebugEnabled()) {
3433-
log.debug("[{}] Global topic has connected remote producers. Not a candidate for GC",
3434-
topic);
3435-
}
3436-
replCloseFuture
3437-
.completeExceptionally(new TopicBusyException("Topic has connected remote producers"));
3438-
} else {
3439-
log.info("[{}] Global topic inactive for {} seconds, closed repl producers", topic,
3440-
maxInactiveDurationInSec);
3441-
replCloseFuture.complete(null);
3442-
}
3443-
}).exceptionally(e -> {
3409+
// Close repl producers first.
3410+
// Once all repl producers are closed, we can delete the topic,
3411+
// provided no remote producers connected to the broker.
3412+
if (log.isDebugEnabled()) {
3413+
log.debug("[{}] Topic inactive for {} seconds, closing repl producers.", topic,
3414+
maxInactiveDurationInSec);
3415+
}
3416+
/**
3417+
* There is a race condition that may cause a NPE:
3418+
* - task 1: a callback of "replicator.cursor.asyncRead" will trigger a replication.
3419+
* - task 2: "closeReplProducersIfNoBacklog" called by current thread will make the variable
3420+
* "replicator.producer" to a null value.
3421+
* Race condition: task 1 will get a NPE when it tries to send messages using the variable
3422+
* "replicator.producer", because task 2 will set this variable to "null".
3423+
* TODO Create a seperated PR to fix it.
3424+
*/
3425+
closeReplProducersIfNoBacklog().thenRun(() -> {
3426+
if (hasRemoteProducers()) {
34443427
if (log.isDebugEnabled()) {
3445-
log.debug("[{}] Global topic has replication backlog. Not a candidate for GC", topic);
3428+
log.debug("[{}] Topic has connected remote producers. Not a candidate for GC",
3429+
topic);
34463430
}
3447-
replCloseFuture.completeExceptionally(e.getCause());
3448-
return null;
3449-
});
3450-
} else {
3451-
replCloseFuture.complete(null);
3452-
}
3431+
replCloseFuture
3432+
.completeExceptionally(new TopicBusyException("Topic has connected remote producers"));
3433+
} else {
3434+
log.info("[{}] Topic inactive for {} seconds, closed repl producers", topic,
3435+
maxInactiveDurationInSec);
3436+
replCloseFuture.complete(null);
3437+
}
3438+
}).exceptionally(e -> {
3439+
if (log.isDebugEnabled()) {
3440+
log.debug("[{}] Topic has replication backlog. Not a candidate for GC", topic);
3441+
}
3442+
replCloseFuture.completeExceptionally(e.getCause());
3443+
return null;
3444+
});
34533445

34543446
replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions,
34553447
deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false))

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
872872
public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
873873
NamespaceName namespace,
874874
boolean allowDeletedNamespace) {
875-
if (!namespace.isGlobal() || NamespaceService.isSLAOrHeartbeatNamespace(namespace.toString())) {
875+
if (NamespaceService.isSLAOrHeartbeatNamespace(namespace.toString())) {
876876
return CompletableFuture.completedFuture(null);
877877
}
878878

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,7 @@ private void initAndStartBroker() throws Exception {
223223
admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
224224
admin.clusters().createCluster("usw", ClusterData.builder().serviceUrl("http://127.0.0.2:8082").build());
225225
admin.clusters().createCluster("usc", ClusterData.builder().serviceUrl("http://127.0.0.3:8083").build());
226-
// After V1 removal, all namespaces go through the peer-cluster redirect path
227-
// (NamespaceName.isGlobal() always returns true), so peer clusters must be configured.
226+
// All namespaces go through the peer-cluster redirect path, so peer clusters must be configured.
228227
// Only "usc" is a peer because peer clusters cannot also be replication clusters.
229228
admin.clusters().updatePeerClusterNames("use",
230229
new LinkedHashSet<>(List.of("usc")));

pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,6 @@ public String getLocalName() {
119119
return localName;
120120
}
121121

122-
public boolean isGlobal() {
123-
return true;
124-
}
125-
126122
public String getPersistentTopicName(String localTopic) {
127123
return getTopicName(TopicDomain.persistent, localTopic);
128124
}

pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -352,10 +352,6 @@ public String getLookupName() {
352352
return String.format("%s/%s/%s/%s", domain, tenant, namespacePortion, getEncodedLocalName());
353353
}
354354

355-
public boolean isGlobal() {
356-
return namespaceName.isGlobal();
357-
}
358-
359355
public String getSchemaName() {
360356
return getTenant()
361357
+ "/" + getNamespacePortion()

0 commit comments

Comments
 (0)