diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 16798ef18bc8e..1aab7844d54ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -750,34 +750,47 @@ public CompletableFuture splitNamespaceBundleAsync(ServiceUnitId bundle, final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle.toString()); NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange); - return pulsar.getNamespaceService().getSplitBoundary(namespaceBundle, splitAlgorithm, boundaries) - .thenCompose(splitBundlesPair -> { - if (splitBundlesPair == null) { - String msg = format("Bundle %s not found under namespace", namespaceBundle); - log.error(msg); - return FutureUtil.failedFuture(new IllegalStateException(msg)); + return pulsar.getNamespaceService().getSplitBoundary(namespaceBundle, boundaries, splitAlgorithm) + .thenCompose(splitBoundaries -> { + if (splitBoundaries == null || splitBoundaries.isEmpty()) { + log.info("[{}] No valid boundary found in {} to split bundle {}", + namespaceBundle.getNamespaceObject(), boundaries, namespaceBundle.getBundleRange()); + return CompletableFuture.completedFuture(null); } - - return getOwnershipAsync(Optional.empty(), bundle) - .thenCompose(brokerOpt -> { - if (brokerOpt.isEmpty()) { - String msg = String.format("Namespace bundle: %s is not owned by any broker.", - bundle); - log.warn(msg); - throw new IllegalStateException(msg); + return pulsar.getNamespaceService().getNamespaceBundleFactory() + .splitBundles(namespaceBundle, splitBoundaries.size() + 1, splitBoundaries) + .thenCompose(splitBundlesPair -> { + if (splitBundlesPair == null) { + String msg = format("Bundle %s not found under namespace", namespaceBundle); + log.error(msg); + return FutureUtil.failedFuture(new IllegalStateException(msg)); } - String sourceBroker = brokerOpt.get(); - SplitDecision splitDecision = new SplitDecision(); - List splitBundles = splitBundlesPair.getRight(); - Map> splitServiceUnitToDestBroker = new HashMap<>(); - splitBundles.forEach(splitBundle -> splitServiceUnitToDestBroker - .put(splitBundle.getBundleRange(), Optional.empty())); - splitDecision.setSplit( - new Split(bundle.toString(), sourceBroker, splitServiceUnitToDestBroker)); - splitDecision.setLabel(Success); - splitDecision.setReason(Admin); - return splitAsync(splitDecision, - conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS); + + return getOwnershipAsync(Optional.empty(), bundle) + .thenCompose(brokerOpt -> { + if (brokerOpt.isEmpty()) { + String msg = String.format( + "Namespace bundle: %s is not owned by any broker.", + bundle); + log.warn(msg); + throw new IllegalStateException(msg); + } + String sourceBroker = brokerOpt.get(); + SplitDecision splitDecision = new SplitDecision(); + List splitBundles = splitBundlesPair.getRight(); + Map> splitServiceUnitToDestBroker = + new HashMap<>(); + splitBundles.forEach(splitBundle -> splitServiceUnitToDestBroker + .put(splitBundle.getBundleRange(), Optional.empty())); + splitDecision.setSplit( + new Split(bundle.toString(), sourceBroker, + splitServiceUnitToDestBroker)); + splitDecision.setLabel(Success); + splitDecision.setReason(Admin); + return splitAsync(splitDecision, + conf.getNamespaceBundleUnloadingTimeoutMs(), + TimeUnit.MILLISECONDS); + }); }); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index d9e076a00022e..cfb5f2fd93fc2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -88,6 +88,7 @@ import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.BundleSplitOption; +import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitAlgorithm; import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -950,7 +951,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, CompletableFuture completionFuture, NamespaceBundleSplitAlgorithm splitAlgorithm, List boundaries) { - BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config); + BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config, splitAlgorithm); splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> { CompletableFuture> updateFuture = new CompletableFuture<>(); @@ -1093,16 +1094,16 @@ public CompletableFuture>> getSplit public CompletableFuture> getSplitBoundary( NamespaceBundle bundle, List boundaries, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm) { - BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config); + BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config, nsBundleSplitAlgorithm); return nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption); } private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle, List boundaries, - ServiceConfiguration config) { + ServiceConfiguration config, + NamespaceBundleSplitAlgorithm splitAlgorithm) { BundleSplitOption bundleSplitOption; - if (config.getDefaultNamespaceBundleSplitAlgorithm() - .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) { + if (splitAlgorithm instanceof FlowOrQpsEquallyDivideBundleSplitAlgorithm) { Map topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle); bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries, topicStatsMap, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c7cbb63dc54eb..53a7f92e122f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1722,6 +1722,24 @@ public void testNamespaceSplitBundleWithTopicCountEquallyDivideAlgorithm() throw } } + @Test + public void testNamespaceSplitBundleWithFlowOrQpsAlgorithmUsesRequestedAlgorithm() throws Exception { + final String namespace = "prop-xyz/flow-or-qps-split"; + final String topicName = "persistent://" + namespace + "/topic-1"; + admin.namespaces().createNamespace(namespace, 1); + admin.topics().createNonPartitionedTopic(topicName); + admin.lookups().lookupTopic(topicName); + + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, + NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE); + } catch (Exception e) { + fail("split bundle with flow_or_qps_equally_divide shouldn't have thrown exception", e); + } + + assertEquals(admin.namespaces().getBundles(namespace).getNumBundles(), 1); + } + @Test public void testNamespacesGetTopicHashPositions() throws Exception { // Force to create a namespace with only one bundle and create a topic diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index af071f35b01bb..f0a5132cf535d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -124,6 +124,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicDomain; @@ -1023,6 +1024,21 @@ public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception { assertTrue(bundlesData.getBoundaries().contains(midBundle)); assertTrue(bundlesData.getBoundaries().contains(highBundle)); } + + @Test(timeOut = 30 * 1000) + public void testSplitBundleWithFlowOrQpsAdminAPINoValidBoundary() throws Exception { + String namespace = "public/test-split-with-flow-or-qps"; + String topic = "persistent://" + namespace + "/test-split-with-flow-or-qps"; + admin.namespaces().createNamespace(namespace, 1); + admin.topics().createNonPartitionedTopic(topic); + admin.lookups().lookupTopic(topic); + + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, + NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE); + + assertEquals(admin.namespaces().getBundles(namespace).getNumBundles(), 1); + } + @Test(timeOut = 30 * 1000) public void testDeleteNamespaceBundle() throws Exception { final String namespace = "public/testDeleteNamespaceBundle";