Skip to content

Commit 950ff44

Browse files
[improve][broker] PIP-192: Write the child ownership to ServiceUnitStateChannel instead of ZK when handling bundle split (#18858)
1 parent de4f620 commit 950ff44

4 files changed

Lines changed: 221 additions & 55 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,9 @@ protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundl
10281028
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
10291029
authoritative, false))
10301030
.thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
1031-
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));
1031+
pulsar().getNamespaceService()
1032+
.getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
1033+
splitBoundaries));
10321034
});
10331035
}
10341036

@@ -1109,18 +1111,6 @@ private CompletableFuture<NamespaceBundle> findHotBundleAsync(NamespaceName name
11091111
.getBundleWithHighestThroughputAsync(namespaceName);
11101112
}
11111113

1112-
private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
1113-
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
1114-
if (algorithm == null) {
1115-
algorithm = NamespaceBundleSplitAlgorithm.of(
1116-
pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
1117-
}
1118-
if (algorithm == null) {
1119-
algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
1120-
}
1121-
return algorithm;
1122-
}
1123-
11241114
protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
11251115
validateSuperUserAccess();
11261116
log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

Lines changed: 112 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.extensions.channel;
2020

21+
import static java.lang.String.format;
22+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2123
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
2224
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
2325
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
@@ -35,7 +37,9 @@
3537
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
3638
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
3739
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
40+
import com.google.common.annotations.VisibleForTesting;
3841
import java.util.ArrayList;
42+
import java.util.Collections;
3943
import java.util.HashMap;
4044
import java.util.HashSet;
4145
import java.util.List;
@@ -48,6 +52,7 @@
4852
import java.util.concurrent.ScheduledFuture;
4953
import java.util.concurrent.TimeUnit;
5054
import java.util.concurrent.TimeoutException;
55+
import java.util.concurrent.atomic.AtomicInteger;
5156
import java.util.concurrent.atomic.AtomicLong;
5257
import lombok.AllArgsConstructor;
5358
import lombok.Getter;
@@ -60,18 +65,23 @@
6065
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
6166
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
6267
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
68+
import org.apache.pulsar.broker.namespace.NamespaceService;
69+
import org.apache.pulsar.broker.service.BrokerServiceException;
6370
import org.apache.pulsar.client.api.MessageId;
6471
import org.apache.pulsar.client.api.Producer;
6572
import org.apache.pulsar.client.api.PulsarClientException;
6673
import org.apache.pulsar.client.api.Schema;
6774
import org.apache.pulsar.client.api.TableView;
6875
import org.apache.pulsar.common.naming.NamespaceBundle;
69-
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
76+
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
77+
import org.apache.pulsar.common.naming.NamespaceBundles;
7078
import org.apache.pulsar.common.naming.NamespaceName;
7179
import org.apache.pulsar.common.naming.TopicDomain;
7280
import org.apache.pulsar.common.naming.TopicName;
7381
import org.apache.pulsar.common.stats.Metrics;
82+
import org.apache.pulsar.common.util.FutureUtil;
7483
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
84+
import org.apache.pulsar.metadata.api.MetadataStoreException;
7585
import org.apache.pulsar.metadata.api.NotificationType;
7686
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
7787
import org.apache.pulsar.metadata.api.extended.SessionEvent;
@@ -523,8 +533,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
523533

524534
private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) {
525535
if (isTargetBroker(data.broker())) {
526-
splitServiceUnit(serviceUnit)
527-
.thenCompose(__ -> tombstoneAsync(serviceUnit))
536+
splitServiceUnit(serviceUnit, data)
528537
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
529538
}
530539
}
@@ -625,25 +634,107 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
625634
});
626635
}
627636

628-
private CompletableFuture<Void> splitServiceUnit(String serviceUnit) {
629-
// TODO: after the split we need to write the child ownerships to BSC instead of ZK.
637+
private CompletableFuture<Void> splitServiceUnit(String serviceUnit, ServiceUnitStateData data) {
638+
// Write the child ownerships to BSC.
630639
long startTime = System.nanoTime();
631-
return pulsar.getNamespaceService()
632-
.splitAndOwnBundle(getNamespaceBundle(serviceUnit),
633-
false,
634-
NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()),
635-
null)
636-
.whenComplete((__, ex) -> {
637-
double splitBundleTime = TimeUnit.NANOSECONDS
638-
.toMillis((System.nanoTime() - startTime));
639-
if (ex == null) {
640-
log.info("Successfully split {} namespace-bundle in {} ms",
641-
serviceUnit, splitBundleTime);
642-
} else {
643-
log.error("Failed to split {} namespace-bundle in {} ms",
644-
serviceUnit, splitBundleTime, ex);
645-
}
646-
});
640+
NamespaceService namespaceService = pulsar.getNamespaceService();
641+
NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory();
642+
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
643+
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
644+
final AtomicInteger counter = new AtomicInteger(0);
645+
this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, bundle, serviceUnit, data,
646+
counter, startTime, completionFuture);
647+
return completionFuture;
648+
}
649+
650+
@VisibleForTesting
651+
protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
652+
NamespaceBundleFactory bundleFactory,
653+
NamespaceBundle bundle,
654+
String serviceUnit,
655+
ServiceUnitStateData data,
656+
AtomicInteger counter,
657+
long startTime,
658+
CompletableFuture<Void> completionFuture) {
659+
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
660+
661+
pulsar.getNamespaceService().getSplitBoundary(bundle, null).thenAccept(splitBundlesPair -> {
662+
// Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper.
663+
if (splitBundlesPair == null) {
664+
String msg = format("Bundle %s not found under namespace", serviceUnit);
665+
updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
666+
return;
667+
}
668+
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker());
669+
NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
670+
List<NamespaceBundle> splitBundles = Collections.unmodifiableList(splitBundlesPair.getRight());
671+
List<NamespaceBundle> successPublishedBundles =
672+
Collections.synchronizedList(new ArrayList<>(splitBundles.size()));
673+
List<CompletableFuture<Void>> futures = new ArrayList<>(splitBundles.size());
674+
for (NamespaceBundle sBundle : splitBundles) {
675+
futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> successPublishedBundles.add(sBundle)));
676+
}
677+
NamespaceName nsname = bundle.getNamespaceObject();
678+
FutureUtil.waitForAll(futures)
679+
.thenCompose(__ -> namespaceService.updateNamespaceBundles(nsname, targetNsBundle))
680+
.thenCompose(__ -> namespaceService.updateNamespaceBundlesForPolicies(nsname, targetNsBundle))
681+
.thenRun(() -> {
682+
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
683+
updateFuture.complete(splitBundles);
684+
}).exceptionally(e -> {
685+
// Clean the new bundle when has exception.
686+
List<CompletableFuture<Void>> futureList = new ArrayList<>();
687+
for (NamespaceBundle sBundle : successPublishedBundles) {
688+
futureList.add(tombstoneAsync(sBundle.toString()).thenAccept(__ -> {}));
689+
}
690+
FutureUtil.waitForAll(futureList)
691+
.whenComplete((__, ex) -> {
692+
if (ex != null) {
693+
log.warn("Clean new bundles failed,", ex);
694+
}
695+
updateFuture.completeExceptionally(e);
696+
});
697+
return null;
698+
});
699+
}).exceptionally(e -> {
700+
updateFuture.completeExceptionally(e);
701+
return null;
702+
});
703+
704+
updateFuture.thenAccept(r -> {
705+
// Free the old bundle
706+
tombstoneAsync(serviceUnit).thenRun(() -> {
707+
// Update bundled_topic cache for load-report-generation
708+
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
709+
// TODO: Update the load data immediately if needed.
710+
completionFuture.complete(null);
711+
double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
712+
log.info("Successfully split {} parent namespace-bundle to {} in {} ms", serviceUnit, r,
713+
splitBundleTime);
714+
}).exceptionally(e -> {
715+
double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
716+
String msg = format("Failed to free bundle %s in %s ms, under namespace [%s] with error %s",
717+
bundle.getNamespaceObject().toString(), splitBundleTime, bundle, e.getMessage());
718+
completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
719+
return null;
720+
});
721+
}).exceptionally(ex -> {
722+
// Retry several times on BadVersion
723+
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
724+
if ((throwable instanceof MetadataStoreException.BadVersionException)
725+
&& (counter.incrementAndGet() < NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) {
726+
pulsar.getExecutor().schedule(() -> splitServiceUnitOnceAndRetry(namespaceService, bundleFactory,
727+
bundle, serviceUnit, data, counter, startTime, completionFuture), 100, MILLISECONDS);
728+
} else if (throwable instanceof IllegalArgumentException) {
729+
completionFuture.completeExceptionally(throwable);
730+
} else {
731+
// Retry enough, or meet other exception
732+
String msg = format("Bundle: %s not success update nsBundles, counter %d, reason %s",
733+
bundle.toString(), counter.get(), throwable.getMessage());
734+
completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
735+
}
736+
return null;
737+
});
647738
}
648739

649740
public void handleMetadataSessionEvent(SessionEvent e) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public class NamespaceService implements AutoCloseable {
124124
private final NamespaceBundleFactory bundleFactory;
125125
private final String host;
126126

127-
private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
127+
public static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
128128
public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
129129
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
130130
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)");
@@ -828,18 +828,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
828828
CompletableFuture<Void> completionFuture,
829829
NamespaceBundleSplitAlgorithm splitAlgorithm,
830830
List<Long> boundaries) {
831-
BundleSplitOption bundleSplitOption;
832-
if (config.getDefaultNamespaceBundleSplitAlgorithm()
833-
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
834-
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
835-
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
836-
topicStatsMap,
837-
config.getLoadBalancerNamespaceBundleMaxMsgRate(),
838-
config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
839-
config.getFlowOrQpsDifferenceThresholdPercentage());
840-
} else {
841-
bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
842-
}
831+
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
843832

844833
splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
845834
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
@@ -957,6 +946,61 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
957946
});
958947
}
959948

949+
/**
950+
* Get the split boundary's.
951+
*
952+
* @param bundle The bundle to split.
953+
* @param boundaries The specified positions,
954+
* use for {@link org.apache.pulsar.common.naming.SpecifiedPositionsBundleSplitAlgorithm}.
955+
* @return A pair, left is target namespace bundle, right is split bundles.
956+
*/
957+
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplitBoundary(
958+
NamespaceBundle bundle, List<Long> boundaries) {
959+
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
960+
NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm =
961+
getNamespaceBundleSplitAlgorithmByName(config.getDefaultNamespaceBundleSplitAlgorithm());
962+
CompletableFuture<List<Long>> splitBoundary =
963+
nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
964+
return splitBoundary.thenCompose(splitBoundaries -> {
965+
if (splitBoundaries == null || splitBoundaries.size() == 0) {
966+
LOG.info("[{}] No valid boundary found in {} to split bundle {}",
967+
bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange());
968+
return CompletableFuture.completedFuture(null);
969+
}
970+
return pulsar.getNamespaceService().getNamespaceBundleFactory()
971+
.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries);
972+
});
973+
}
974+
975+
private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
976+
List<Long> boundaries,
977+
ServiceConfiguration config) {
978+
BundleSplitOption bundleSplitOption;
979+
if (config.getDefaultNamespaceBundleSplitAlgorithm()
980+
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
981+
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
982+
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
983+
topicStatsMap,
984+
config.getLoadBalancerNamespaceBundleMaxMsgRate(),
985+
config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
986+
config.getFlowOrQpsDifferenceThresholdPercentage());
987+
} else {
988+
bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
989+
}
990+
return bundleSplitOption;
991+
}
992+
993+
public NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
994+
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
995+
if (algorithm == null) {
996+
algorithm = NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm());
997+
}
998+
if (algorithm == null) {
999+
algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
1000+
}
1001+
return algorithm;
1002+
}
1003+
9601004
/**
9611005
* Update new bundle-range to admin/policies/namespace.
9621006
* Update may fail because of concurrent write to Zookeeper.
@@ -965,7 +1009,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
9651009
* @param nsBundles
9661010
* @throws Exception
9671011
*/
968-
private CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname,
1012+
public CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname,
9691013
NamespaceBundles nsBundles) {
9701014
Objects.requireNonNull(nsname);
9711015
Objects.requireNonNull(nsBundles);
@@ -994,7 +1038,7 @@ private CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName
9941038
* @param nsBundles
9951039
* @throws Exception
9961040
*/
997-
private CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
1041+
public CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
9981042
Objects.requireNonNull(nsname);
9991043
Objects.requireNonNull(nsBundles);
10001044

0 commit comments

Comments
 (0)