Skip to content

Commit a197f46

Browse files
BewareMyPowerlhotari
authored andcommitted
[improve][broker] Improve the log when namespace bundle is not available (#24434)
(cherry picked from commit 19b4a05)
1 parent 84c56ed commit a197f46

6 files changed

Lines changed: 24 additions & 16 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,13 +1283,16 @@ private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {
12831283
}
12841284

12851285
public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
1286-
// TODO: Add unit tests cover it.
1286+
return getBundleAsync(topicName).thenCompose(bundle -> checkBundleOwnership(topicName, bundle));
1287+
}
1288+
1289+
public CompletableFuture<Boolean> checkBundleOwnership(TopicName topicName, NamespaceBundle bundle) {
12871290
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
1288-
return getBundleAsync(topicName)
1289-
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
1291+
// TODO: Add unit tests cover it.
1292+
return loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle);
1293+
} else {
1294+
return ownershipCache.checkOwnershipAsync(bundle);
12901295
}
1291-
return getBundleAsync(topicName)
1292-
.thenCompose(ownershipCache::checkOwnershipAsync);
12931296
}
12941297

12951298
public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2297,19 +2297,19 @@ public CompletableFuture<Boolean> isTopicNsOwnedByBrokerAsync(TopicName topicNam
22972297

22982298
public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
22992299
TopicName topicName = TopicName.get(topic);
2300+
final var namespaceService = pulsar.getNamespaceService();
23002301

2301-
return pulsar.getNamespaceService().checkTopicOwnership(topicName)
2302-
.thenCompose(ownedByThisInstance -> {
2302+
return namespaceService.getBundleAsync(topicName).thenCompose(bundle ->
2303+
namespaceService.checkBundleOwnership(topicName, bundle).thenCompose(ownedByThisInstance -> {
23032304
if (ownedByThisInstance) {
23042305
return CompletableFuture.completedFuture(null);
23052306
} else {
2306-
String msg = String.format("Namespace bundle for topic (%s) not served by this instance:%s. "
2307-
+ "Please redo the lookup. Request is denied: namespace=%s",
2308-
topic, pulsar.getBrokerId(), topicName.getNamespace());
2307+
String msg = String.format("Namespace bundle (%s) for topic (%s) not served by this instance:"
2308+
+ "%s. Please redo the lookup.", bundle, topic, pulsar.getBrokerId());
23092309
log.warn(msg);
23102310
return FutureUtil.failedFuture(new ServiceUnitNotReadyException(msg));
23112311
}
2312-
});
2312+
}));
23132313
}
23142314

23152315
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ public void setup() throws Exception {
161161
NamespaceService nsSvc = pulsarTestContext.getPulsarService().getNamespaceService();
162162
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
163163
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
164-
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
164+
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
165+
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any());
165166

166167
setupMLAsyncCallbackMocks();
167168

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public void setup(Method m) throws Exception {
105105
doReturn(nsSvc).when(pulsar).getNamespaceService();
106106
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
107107
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
108-
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
108+
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
109+
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any());
109110

110111
final List<Position> addedEntries = new ArrayList<>();
111112

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ public void setup() throws Exception {
220220
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
221221
doReturn(true).when(nsSvc).isServiceUnitActive(any());
222222
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
223-
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any());
223+
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
224+
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any());
224225

225226
setupMLAsyncCallbackMocks();
226227
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import org.apache.pulsar.common.api.proto.ServerError;
143143
import org.apache.pulsar.common.api.proto.Subscription;
144144
import org.apache.pulsar.common.api.proto.TxnAction;
145+
import org.apache.pulsar.common.naming.NamespaceBundle;
145146
import org.apache.pulsar.common.naming.NamespaceName;
146147
import org.apache.pulsar.common.naming.TopicName;
147148
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -223,11 +224,12 @@ public void setup() throws Exception {
223224
brokerService = pulsarTestContext.getBrokerService();
224225

225226
namespaceService = pulsar.getNamespaceService();
226-
doReturn(CompletableFuture.completedFuture(null)).when(namespaceService).getBundleAsync(any());
227+
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(namespaceService)
228+
.getBundleAsync(any());
229+
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(), any());
227230
doReturn(true).when(namespaceService).isServiceUnitOwned(any());
228231
doReturn(true).when(namespaceService).isServiceUnitActive(any());
229232
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
230-
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
231233
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
232234
NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL);
233235
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(

0 commit comments

Comments
 (0)