Skip to content

Commit 9613d6b

Browse files
authored
[BALANCER] Implement balancer config balancer.broker.balancing.mode (#1737)
1 parent 6db827e commit 9613d6b

10 files changed

Lines changed: 938 additions & 62 deletions

File tree

app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -224,27 +224,19 @@ private static Set<String> createAndProduceTopic(
224224
void testBestPlan() {
225225
try (var admin = Admin.of(SERVICE.bootstrapServers())) {
226226
var currentClusterInfo =
227-
ClusterInfo.of(
228-
"fake",
229-
List.of(NodeInfo.of(10, "host", 22), NodeInfo.of(11, "host", 22)),
230-
Map.of(),
231-
List.of(
232-
Replica.builder()
233-
.topic("topic")
234-
.partition(0)
235-
.nodeInfo(NodeInfo.of(10, "host", 22))
236-
.lag(0)
237-
.size(100)
238-
.isLeader(true)
239-
.isSync(true)
240-
.isFuture(false)
241-
.isOffline(false)
242-
.isPreferredLeader(true)
243-
.path("/tmp/aa")
244-
.build()));
227+
ClusterInfo.builder()
228+
.addNode(Set.of(1, 2))
229+
.addFolders(
230+
Map.ofEntries(Map.entry(1, Set.of("/folder")), Map.entry(2, Set.of("/folder"))))
231+
.addTopic("topic", 1, (short) 1)
232+
.build();
245233

246234
HasClusterCost clusterCostFunction =
247-
(clusterInfo, clusterBean) -> () -> clusterInfo == currentClusterInfo ? 100D : 10D;
235+
(clusterInfo, clusterBean) ->
236+
() ->
237+
ClusterInfo.findNonFulfilledAllocation(currentClusterInfo, clusterInfo).isEmpty()
238+
? 100D
239+
: 10D;
248240
HasMoveCost moveCostFunction = HasMoveCost.EMPTY;
249241
HasMoveCost failMoveCostFunction = (before, after, clusterBean) -> () -> true;
250242

common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.function.BiFunction;
2929
import java.util.function.Function;
30+
import java.util.function.Predicate;
3031
import java.util.stream.Collectors;
3132
import java.util.stream.IntStream;
3233
import java.util.stream.Stream;
@@ -92,6 +93,18 @@ public ClusterInfoBuilder addNode(Set<Integer> brokerIds) {
9293
});
9394
}
9495

96+
/**
97+
* Remove specific brokers from the cluster state.
98+
*
99+
* @param toRemove id to remove
100+
* @return this
101+
*/
102+
public ClusterInfoBuilder removeNodes(Predicate<Integer> toRemove) {
103+
return applyNodes(
104+
(nodes, replicas) ->
105+
nodes.stream().filter(node -> toRemove.negate().test(node.id())).toList());
106+
}
107+
95108
/**
96109
* Add some fake folders to a specific broker.
97110
*

common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,53 @@ private BalancerConfigs() {}
3535
public static final String BALANCER_ALLOWED_TOPICS_REGEX = "balancer.allowed.topics.regex";
3636

3737
/**
38-
* A regular expression indicates which brokers are eligible for moving loading. When specified, a
39-
* broker with an id that doesn't match this expression cannot accept a partition from the other
40-
* broker or move its partition to other brokers.
38+
* This configuration indicates the balancing mode for each broker.
39+
*
40+
* <p>This configuration requires a string with a series of key-value pairs, each pair is
41+
* separated by a comma, and the key and value are separated by a colon. <code>
42+
* (brokerId_A|"default"):(mode),(brokerId_B):(mode), ...</code> The key indicates the integer id
43+
* for a broker. And the value indicates the balancing mode for the associated broker. When the
44+
* key is a string value <code>"default"</code>(without the double quotes), it indicates the
45+
* associated balancing mode should be the default mode for the rest of the brokers that are not
46+
* addressed in the configuration. By default, all the brokers use <code> "balancing"</code> mode.
47+
*
48+
* <h3>Possible balancing modes</h3>
49+
*
50+
* <ul>
51+
* <li><code>balancing</code>: The broker will participate in the load balancing process. The
52+
* replica assignment for this broker is eligible for changes.
53+
* <li><code>demoted</code>: The broker should become empty after the rebalance. This mode
54+
* allows the user to clear all the loadings for certain brokers, enabling a graceful
55+
* removal of those brokers. Note to the balancer implementation: A broker in this mode
56+
* assumes it will be out of service after the balancing is finished. Therefore, when
57+
* evaluating the cluster cost, the brokers to demote should be excluded. However, these
58+
* brokers will be included in the move cost evaluation. Since these brokers are still part
59+
* of the cluster right now, and move cost focusing on the cost associated during the
60+
* ongoing balancing process itself.
61+
* <li><code>excluded</code>: The broker will not participate in the load balancing process. The
62+
* replica assignment for this broker is not eligible for changes. It will neither accept
63+
* replicas from other brokers nor reassign replicas to other brokers.
64+
* </ul>
65+
*
66+
* <h3>Flag Interaction:</h3>
67+
*
68+
* <ol>
69+
* <li>When this flag is used in conjunction with {@link
70+
* BalancerConfigs#BALANCER_ALLOWED_TOPICS_REGEX}, if a demoted broker contains partition
71+
* from those forbidden topics, an exception should be raised.
72+
* </ol>
73+
*
74+
* <h3>Limitation:</h3>
75+
*
76+
* <ol>
77+
* <li>Demoting a broker may be infeasible if there are not enough brokers to fit the required
78+
* replica factor for a specific partition. This situation is more likely to occur if there
79+
* are many <code>excluded</code> brokers that reject accepting new replicas. If such a case
80+
* is detected, an exception should be raised.
81+
* <li>Any broker with ongoing replica-move-in, replica-move-out, or inter-folder movement
82+
* cannot be the demoting target. An exception will be raised if any of the demoting brokers
83+
* have such ongoing events. *
84+
* </ol>
4185
*/
42-
public static final String BALANCER_ALLOWED_BROKERS_REGEX = "balancer.allowed.brokers.regex";
86+
public static final String BALANCER_BROKER_BALANCING_MODE = "balancer.broker.balancing.mode";
4387
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.common.balancer;
18+
19+
import java.util.Arrays;
20+
import java.util.Collection;
21+
import java.util.Map;
22+
import java.util.function.Function;
23+
import java.util.function.Predicate;
24+
import java.util.regex.Pattern;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.IntStream;
27+
import java.util.stream.Stream;
28+
import org.astraea.common.EnumInfo;
29+
import org.astraea.common.admin.ClusterInfo;
30+
import org.astraea.common.admin.NodeInfo;
31+
import org.astraea.common.admin.Replica;
32+
33+
public final class BalancerUtils {
34+
35+
private BalancerUtils() {}
36+
37+
public static Map<Integer, BalancingModes> balancingMode(ClusterInfo cluster, String config) {
38+
var num = Pattern.compile("[0-9]+");
39+
40+
var map =
41+
Arrays.stream(config.split(","))
42+
.filter(Predicate.not(String::isEmpty))
43+
.map(x -> x.split(":"))
44+
.collect(
45+
Collectors.toUnmodifiableMap(
46+
s -> (Object) (num.matcher(s[0]).find() ? Integer.parseInt(s[0]) : s[0]),
47+
s ->
48+
switch (s[1]) {
49+
case "balancing" -> BalancingModes.BALANCING;
50+
case "demoted" -> BalancingModes.DEMOTED;
51+
case "excluded" -> BalancingModes.EXCLUDED;
52+
default -> throw new IllegalArgumentException(
53+
"Unsupported balancing mode: " + s[1]);
54+
}));
55+
56+
Function<Integer, BalancingModes> mode =
57+
(id) -> map.getOrDefault(id, map.getOrDefault("default", BalancingModes.BALANCING));
58+
59+
return cluster.brokers().stream()
60+
.map(NodeInfo::id)
61+
.collect(Collectors.toUnmodifiableMap(Function.identity(), mode));
62+
}
63+
64+
/**
65+
* Verify there is no logic conflict between {@link BalancerConfigs#BALANCER_ALLOWED_TOPICS_REGEX}
66+
* and {@link BalancerConfigs#BALANCER_BROKER_BALANCING_MODE}. It also performs other common
67+
* validness checks to the cluster.
68+
*/
69+
public static void verifyClearBrokerValidness(
70+
ClusterInfo cluster, Predicate<Integer> isDemoted, Predicate<String> allowedTopics) {
71+
var disallowedTopicsToClear =
72+
cluster.topicPartitionReplicas().stream()
73+
.filter(tpr -> isDemoted.test(tpr.brokerId()))
74+
.filter(tpr -> !allowedTopics.test(tpr.topic()))
75+
.collect(Collectors.toUnmodifiableSet());
76+
if (!disallowedTopicsToClear.isEmpty())
77+
throw new IllegalArgumentException(
78+
"Attempts to clear some brokers, but some of them contain topics that forbidden from being changed due to \""
79+
+ BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX
80+
+ "\": "
81+
+ disallowedTopicsToClear);
82+
83+
var ongoingEventReplica =
84+
cluster.replicas().stream()
85+
.filter(r -> isDemoted.test(r.nodeInfo().id()))
86+
.filter(r -> r.isAdding() || r.isRemoving() || r.isFuture())
87+
.map(Replica::topicPartitionReplica)
88+
.collect(Collectors.toUnmodifiableSet());
89+
if (!ongoingEventReplica.isEmpty())
90+
throw new IllegalArgumentException(
91+
"Attempts to clear broker with ongoing migration event (adding/removing/future replica): "
92+
+ ongoingEventReplica);
93+
}
94+
95+
/**
96+
* Move all the replicas at the demoting broker to other allowed brokers. <b>BE CAREFUL, The
97+
* implementation made no assumption for MoveCost or ClusterCost of the returned ClusterInfo.</b>
98+
* Be aware of this limitation before using it as the starting point for a solution search. Some
99+
* balancer implementation might have trouble finding answer when starting at a state where the
100+
* MoveCost is already violated.
101+
*/
102+
public static ClusterInfo clearedCluster(
103+
ClusterInfo initial, Predicate<Integer> clearBrokers, Predicate<Integer> allowedBrokers) {
104+
final var allowed =
105+
initial.nodes().stream()
106+
.filter(node -> allowedBrokers.test(node.id()))
107+
.filter(node -> Predicate.not(clearBrokers).test(node.id()))
108+
.collect(Collectors.toUnmodifiableSet());
109+
final var nextBroker = Stream.generate(() -> allowed).flatMap(Collection::stream).iterator();
110+
final var nextBrokerFolder =
111+
initial.brokerFolders().entrySet().stream()
112+
.collect(
113+
Collectors.toUnmodifiableMap(
114+
Map.Entry::getKey,
115+
x -> Stream.generate(x::getValue).flatMap(Collection::stream).iterator()));
116+
117+
var trackingReplicaList =
118+
initial.topicPartitions().stream()
119+
.collect(
120+
Collectors.toUnmodifiableMap(
121+
tp -> tp,
122+
tp ->
123+
initial.replicas(tp).stream()
124+
.map(Replica::nodeInfo)
125+
.collect(Collectors.toSet())));
126+
return ClusterInfo.builder(initial)
127+
.mapLog(
128+
replica -> {
129+
if (!clearBrokers.test(replica.nodeInfo().id())) return replica;
130+
var currentReplicaList = trackingReplicaList.get(replica.topicPartition());
131+
var broker =
132+
IntStream.range(0, allowed.size())
133+
.mapToObj(i -> nextBroker.next())
134+
.filter(b -> !currentReplicaList.contains(b))
135+
.findFirst()
136+
.orElseThrow(
137+
() ->
138+
new IllegalStateException(
139+
"Unable to clear replica "
140+
+ replica.topicPartitionReplica()
141+
+ " for broker "
142+
+ replica.nodeInfo().id()
143+
+ ", the allowed destination brokers are "
144+
+ allowed.stream()
145+
.map(NodeInfo::id)
146+
.collect(Collectors.toUnmodifiableSet())
147+
+ " but all of them already hosting a replica for this partition. "
148+
+ "There is no broker can adopt this replica."));
149+
var folder = nextBrokerFolder.get(broker.id()).next();
150+
151+
// update the tracking list. have to do this to avoid putting two replicas from the
152+
// same tp to one broker.
153+
currentReplicaList.remove(replica.nodeInfo());
154+
currentReplicaList.add(broker);
155+
156+
return Replica.builder(replica).nodeInfo(broker).path(folder).build();
157+
})
158+
.build();
159+
}
160+
161+
public enum BalancingModes implements EnumInfo {
162+
BALANCING,
163+
DEMOTED,
164+
EXCLUDED;
165+
166+
public static BalancingModes ofAlias(String alias) {
167+
return EnumInfo.ignoreCaseEnum(BalancingModes.class, alias);
168+
}
169+
170+
@Override
171+
public String alias() {
172+
return name();
173+
}
174+
175+
@Override
176+
public String toString() {
177+
return alias();
178+
}
179+
}
180+
}

0 commit comments

Comments
 (0)