Skip to content

Commit 16afcb0

Browse files
Merge pull request #29959 from redpanda-data/stephan/greedy-improvements-fix
cluster: Improve greedy leader balancer
2 parents a3cdc6d + a989eeb commit 16afcb0

5 files changed

Lines changed: 145 additions & 86 deletions

File tree

src/v/cluster/scheduling/leader_balancer.cc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,16 +646,26 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
646646
std::move(muted_index),
647647
std::move(preference_index));
648648
break;
649-
case model::leader_balancer_mode::greedy:
649+
case model::leader_balancer_mode::greedy: {
650650
vlog(clusterlog.debug, "using greedy strategy");
651+
// Collect non-user topic IDs so the greedy strategy can exclude
652+
// them from cross-topic global counts.
653+
absl::flat_hash_set<leader_balancer_types::topic_id_t> internal_topics;
654+
for (const auto& t : _topics.topics_map()) {
655+
if (!model::is_user_topic(t.first)) {
656+
internal_topics.emplace(t.second.get_revision());
657+
}
658+
}
651659
strategy = std::make_unique<
652660
leader_balancer_types::greedy_topic_aware_strategy>(
653661
_members.node_count(),
654662
std::move(index),
655663
std::move(group_id_to_topic),
664+
std::move(internal_topics),
656665
std::move(muted_index),
657666
std::move(preference_index));
658667
break;
668+
}
659669
case model::leader_balancer_mode::calibrated:
660670
vlog(clusterlog.debug, "using calibrated_hill_climbing strategy");
661671
strategy = std::make_unique<

src/v/cluster/scheduling/leader_balancer_greedy.cc

Lines changed: 41 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ greedy_topic_aware_strategy::greedy_topic_aware_strategy(
2424
size_t node_count,
2525
index_type index,
2626
group_id_to_topic_id group_to_topic,
27+
absl::flat_hash_set<topic_id_t> internal_topics,
2728
muted_index muted_index_value,
2829
std::optional<preference_index> preference_idx)
2930
: _muted_index(std::move(muted_index_value))
3031
, _group_to_topic(std::move(group_to_topic))
3132
, _shard_index(std::move(index))
3233
, _topic_distribution_constraint(_group_to_topic, _shard_index, _muted_index)
3334
, _shard_load_constraint(_shard_index, _muted_index)
35+
, _internal_topics(std::move(internal_topics))
3436
, _node_count(node_count) {
3537
if (preference_idx) {
3638
_pinning_constraint.emplace(
@@ -85,38 +87,36 @@ std::vector<shard_load> greedy_topic_aware_strategy::stats() const {
8587
}
8688

8789
// Lower-is-better on every element:
88-
// 1) excess above per-topic floor (0 while at or below
89-
// fair share; positive once exceeded — this gates the
90-
// remainder allocation so global counts can steer it)
90+
// 1) at_quota: 0 while broker is below its per-topic fair share,
91+
// 1 once reached — guarantees each broker fills to floor_quota
92+
// before any gets more, so shard preferences in fields 2-3
93+
// cannot cause broker imbalance.
9194
// 2) fewest topic leaders on this broker-shard
92-
// 3) fewest global leaders on this broker
93-
// 4) fewest topic leaders on this broker
94-
// 5) fewest global leaders on this broker-shard
95-
// 6) deterministic tie-break by broker-shard ordering
95+
// 3) fewest topic leaders on this broker (tie-break to spread
96+
// across nodes when shards tie)
97+
// 4) fewest global leaders on this broker-shard
98+
// 5) deterministic tie-break by broker-shard ordering
9699
struct leader_preference {
97-
size_t excess;
100+
size_t at_quota;
98101
size_t shard_count;
99-
size_t global_broker_count;
100102
size_t broker_count;
101-
size_t global_shard_counts;
103+
size_t global_shard_count;
102104
model::broker_shard replica;
103105

104106
auto operator<=>(const leader_preference&) const = default;
105107
};
106108

107109
void greedy_topic_aware_strategy::build_target_assignment() {
108110
// `partitions_by_topic` is the input to the greedy planner grouped by
109-
// topic. Each entry stores the raft groups for one topic along with the
110-
// broker-shards that can legally host leadership for those groups.
111+
// topic. Each entry stores the raft groups for one topic along with
112+
// the broker-shards that can legally host leadership for those groups.
111113
std::map<topic_id_t, chunked_vector<partition_info>> partitions_by_topic;
112114

113-
// `global_shard_counts` and `global_broker_counts` track how many target
114-
// leaders have been assigned to each broker-shard and broker respectively
115-
// across previously processed topics. They serve as cross-topic
116-
// tie-breakers: when two replicas have equal per-topic counts, the one on
117-
// the globally least-loaded broker (then shard) wins.
115+
// `global_shard_counts` tracks how many target leaders have been
116+
// assigned to each broker-shard across all topics processed so far.
117+
// It serves as a cross-topic tie-breaker so that topics processed
118+
// later inherit shard awareness from earlier ones.
118119
chunked_hash_map<model::broker_shard, size_t> global_shard_counts;
119-
chunked_hash_map<model::node_id, size_t> global_broker_counts;
120120

121121
for (const auto& [leader, groups] : _shard_index.shards()) {
122122
for (const auto& [group, replicas] : groups) {
@@ -126,18 +126,18 @@ void greedy_topic_aware_strategy::build_target_assignment() {
126126
clusterlog.warn, "missing topic mapping for group {}", group);
127127
continue;
128128
}
129-
130129
partitions_by_topic[topic_iterator->second].emplace_back(
131130
group, topic_iterator->second, leader, replicas);
132131
}
133132
}
134133

135-
// Build targets topic-by-topic. For each topic the greedy walk assigns
136-
// each partition to the replica that minimises a scoring tuple. A
137-
// per-topic floor (partitions / brokers) gates the first element so
138-
// that a broker that has already received its fair share is penalised,
139-
// letting globally under-loaded brokers absorb the remainder.
134+
// Build targets topic-by-topic. For each topic the greedy walk
135+
// assigns each partition to the replica that minimises the
136+
// leader_preference scoring tuple defined above. The at_quota
137+
// gate guarantees broker balance while shard_count optimises
138+
// shard placement within that constraint.
140139
for (auto& [topic, partitions] : partitions_by_topic) {
140+
bool is_internal = _internal_topics.contains(topic);
141141
std::ranges::sort(
142142
partitions, std::ranges::less{}, &partition_info::group);
143143

@@ -155,48 +155,35 @@ void greedy_topic_aware_strategy::build_target_assignment() {
155155
std::optional<leader_preference> best_assignment;
156156
for (const model::broker_shard& replica : partition.replicas) {
157157
size_t broker_count = assigned_broker_counts[replica.node_id];
158-
size_t excess = broker_count > floor_quota
159-
? broker_count - floor_quota
160-
: 0;
161-
auto candidate_assignment = leader_preference{
162-
.excess = excess,
158+
auto candidate = leader_preference{
159+
.at_quota = broker_count >= floor_quota ? 1u : 0u,
163160
.shard_count = assigned_shard_counts[replica],
164-
.global_broker_count = global_broker_counts[replica.node_id],
165161
.broker_count = broker_count,
166-
.global_shard_counts = global_shard_counts[replica],
162+
.global_shard_count = global_shard_counts[replica],
167163
.replica = replica};
168-
if (
169-
!best_assignment || candidate_assignment < *best_assignment) {
170-
best_assignment = candidate_assignment;
164+
if (!best_assignment || candidate < *best_assignment) {
165+
best_assignment = candidate;
171166
}
172167
}
173168

174169
if (!best_assignment.has_value()) {
175170
// group has no replicas
176171
continue;
177172
}
178-
model::broker_shard selected_leader_shard
179-
= best_assignment->replica;
180-
if (partition.leader != selected_leader_shard) {
173+
model::broker_shard selected = best_assignment->replica;
174+
if (partition.leader != selected) {
181175
_pending_moves.emplace_back(
182-
partition.group, partition.leader, selected_leader_shard);
176+
partition.group, partition.leader, selected);
177+
}
178+
assigned_broker_counts[selected.node_id] += 1;
179+
assigned_shard_counts[selected] += 1;
180+
181+
// Internal topics (e.g. id_allocator, tx_manager) are
182+
// excluded from global counts so their fixed placement
183+
// does not distort the cross-topic balance of user topics.
184+
if (!is_internal) {
185+
global_shard_counts[selected] += 1;
183186
}
184-
assigned_broker_counts[selected_leader_shard.node_id] += 1;
185-
assigned_shard_counts[selected_leader_shard] += 1;
186-
}
187-
188-
// Commit this topic's assignments to the global counters after
189-
// the entire topic is processed, not per-partition. If globals
190-
// were updated per-partition, the first assignment to an
191-
// under-loaded broker (e.g. global {3,3,2} → {3,3,3}) would
192-
// erase the imbalance before the remaining partitions are
193-
// scored, preventing global_broker_counts from steering subsequent
194-
// remainder assignments to that broker.
195-
for (const auto& [node, count] : assigned_broker_counts) {
196-
global_broker_counts[node] += count;
197-
}
198-
for (const auto& [shard, count] : assigned_shard_counts) {
199-
global_shard_counts[shard] += count;
200187
}
201188
}
202189
}

src/v/cluster/scheduling/leader_balancer_greedy.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class greedy_topic_aware_strategy final : public leader_balancer_strategy {
3535
size_t node_count,
3636
index_type index,
3737
group_id_to_topic_id group_to_topic,
38+
absl::flat_hash_set<topic_id_t> internal_topics,
3839
muted_index muted_index_value,
3940
std::optional<preference_index> preference_idx);
4041

@@ -66,6 +67,8 @@ class greedy_topic_aware_strategy final : public leader_balancer_strategy {
6667

6768
std::optional<pinning_constraint> _pinning_constraint;
6869

70+
absl::flat_hash_set<topic_id_t> _internal_topics;
71+
6972
chunked_vector<reassignment> _pending_moves;
7073
size_t _next_pending{0};
7174
size_t _node_count;

src/v/cluster/tests/leader_balancer_greedy_test.cc

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <cstdint>
2424
#include <iterator>
2525
#include <memory>
26+
#include <random>
2627
#include <ranges>
2728
#include <tuple>
2829
#include <vector>
@@ -98,6 +99,7 @@ make_strategy_from_partitions(
9899
nodes.size(),
99100
std::move(index),
100101
std::move(group_to_topic),
102+
absl::flat_hash_set<cluster::leader_balancer_types::topic_id_t>{},
101103
cluster::leader_balancer_types::muted_index{muted_nodes, {}},
102104
std::move(preference_idx));
103105

@@ -125,18 +127,38 @@ make_greedy_strategy(
125127
}
126128
}
127129

130+
// Each node independently assigns partitions to shards in a
131+
// balanced-but-shuffled order: a permutation of [0..shards-1]
132+
// that repeats, so every shard gets the same number of replicas.
133+
// Different nodes use different permutations (seeded by node id
134+
// and topic) so replicas of the same partition land on different
135+
// shards across nodes, matching real node-local shard placement.
136+
auto make_shard_sequence = [&](int node, int topic) {
137+
std::vector<uint32_t> perm(static_cast<size_t>(shards_per_broker));
138+
std::iota(perm.begin(), perm.end(), uint32_t{0});
139+
std::mt19937 rng(static_cast<uint32_t>(node * 137 + topic * 31));
140+
std::ranges::shuffle(perm, rng);
141+
return perm;
142+
};
143+
128144
for (int topic : std::views::iota(0, topic_count)) {
145+
// Build per-node shard permutations and counters.
146+
std::vector<std::vector<uint32_t>> node_perms;
147+
std::vector<size_t> node_counters(static_cast<size_t>(broker_count), 0);
148+
for (int n = 0; n < broker_count; ++n) {
149+
node_perms.push_back(make_shard_sequence(n, topic));
150+
}
151+
129152
for (int partition : std::views::iota(0, partitions_per_topic)) {
130153
std::vector<model::broker_shard> replicas;
131-
uint32_t topic_shard_offset = static_cast<uint32_t>(
132-
std::max(1, shards_per_broker / 2));
133154
int broker_offset = partition + topic;
134155
for (int replica : std::views::iota(0, replication_factor)) {
135-
replicas.push_back(bs(
136-
(broker_offset + replica) % broker_count,
137-
static_cast<uint32_t>(
138-
(partition + topic * topic_shard_offset)
139-
% shards_per_broker)));
156+
int node = (broker_offset + replica) % broker_count;
157+
auto& ctr = node_counters[static_cast<size_t>(node)];
158+
auto& perm = node_perms[static_cast<size_t>(node)];
159+
auto shard = perm[ctr % perm.size()];
160+
++ctr;
161+
replicas.push_back(bs(node, shard));
140162
}
141163
partitions.push_back(
142164
partition_spec{
@@ -419,28 +441,27 @@ TEST(GreedyLeaderBalancerTest, TwoTopicsFourBrokersThreeShards) {
419441
balanced,
420442
0,
421443
std::array<std::array<int, 3>, 4>{{
422-
{{2, 2, 1}},
444+
{{2, 1, 1}},
423445
{{1, 2, 2}},
446+
{{2, 2, 1}},
424447
{{1, 1, 2}},
425-
{{2, 1, 1}},
426448
}});
427449
expect_shard_counts_per_broker(
428450
balanced,
429451
1,
430452
std::array<std::array<int, 3>, 4>{{
453+
{{1, 1, 2}},
454+
{{1, 2, 1}},
455+
{{2, 2, 1}},
431456
{{2, 2, 1}},
432-
{{1, 1, 1}},
433-
{{1, 2, 2}},
434-
{{2, 1, 2}},
435457
}});
436-
// Not perfect
437458
expect_combined_shard_counts(
438459
balanced,
439460
std::array<std::array<int, 3>, 4>{{
461+
{{3, 2, 3}},
462+
{{2, 4, 3}},
440463
{{4, 4, 2}},
441-
{{2, 3, 3}},
442-
{{2, 3, 4}},
443-
{{4, 2, 3}},
464+
{{3, 3, 3}},
444465
}});
445466
}
446467

@@ -468,10 +489,10 @@ TEST(GreedyLeaderBalancerTest, RemainderPartitions) {
468489
expect_shard_counts_per_broker(
469490
balanced_leaders,
470491
1,
471-
std::array<std::array<int, 2>, 3>{{{{1, 1}}, {{2, 1}}, {{1, 2}}}});
492+
std::array<std::array<int, 2>, 3>{{{{1, 2}}, {{1, 1}}, {{1, 2}}}});
472493
expect_combined_shard_counts(
473494
balanced_leaders,
474-
std::array<std::array<int, 2>, 3>{{{{3, 2}}, {{3, 3}}, {{2, 3}}}});
495+
std::array<std::array<int, 2>, 3>{{{{3, 3}}, {{2, 3}}, {{2, 3}}}});
475496
}
476497

477498
TEST(GreedyLeaderBalancerTest, RemainderPartitionsThreeTopics) {
@@ -486,11 +507,11 @@ TEST(GreedyLeaderBalancerTest, RemainderPartitionsThreeTopics) {
486507
expect_shard_counts_per_broker(
487508
balanced_leaders,
488509
1,
489-
std::array<std::array<int, 2>, 3>{{{{1, 1}}, {{2, 1}}, {{1, 2}}}});
510+
std::array<std::array<int, 2>, 3>{{{{1, 2}}, {{1, 1}}, {{1, 2}}}});
490511
expect_shard_counts_per_broker(
491512
balanced_leaders,
492513
2,
493-
std::array<std::array<int, 2>, 3>{{{{1, 2}}, {{1, 1}}, {{2, 1}}}});
514+
std::array<std::array<int, 2>, 3>{{{{1, 1}}, {{2, 1}}, {{2, 1}}}});
494515
expect_combined_shard_counts(
495516
balanced_leaders,
496517
std::array<std::array<int, 2>, 3>{{{{4, 4}}, {{4, 4}}, {{4, 4}}}});
@@ -506,23 +527,23 @@ TEST(GreedyLeaderBalancerTest, BalancesShardsWithinBroker) {
506527
0,
507528
std::array<std::array<int, 4>, 3>{{
508529
{{2, 1, 1, 2}},
509-
{{2, 2, 1, 1}},
510-
{{1, 2, 2, 1}},
530+
{{1, 1, 2, 2}},
531+
{{1, 2, 1, 2}},
511532
}});
512533
expect_shard_counts_per_broker(
513534
balanced_leaders,
514535
1,
515536
std::array<std::array<int, 4>, 3>{{
516537
{{1, 2, 2, 1}},
517-
{{1, 1, 2, 2}},
518538
{{2, 1, 1, 2}},
539+
{{2, 2, 1, 1}},
519540
}});
520541
expect_combined_shard_counts(
521542
balanced_leaders,
522543
std::array<std::array<int, 4>, 3>{{
523544
{{3, 3, 3, 3}},
524-
{{3, 3, 3, 3}},
525-
{{3, 3, 3, 3}},
545+
{{3, 2, 3, 4}},
546+
{{3, 4, 2, 3}},
526547
}});
527548
}
528549

@@ -536,17 +557,16 @@ TEST(GreedyLeaderBalancerTest, TwoTopicsSixBrokersTwoShardsRfThree) {
536557
balanced_leaders,
537558
0,
538559
std::array<std::array<int, 2>, 6>{
539-
{{{2, 1}}, {{1, 2}}, {{2, 1}}, {{1, 2}}, {{2, 1}}, {{1, 2}}}});
560+
{{{2, 1}}, {{1, 2}}, {{2, 1}}, {{2, 1}}, {{1, 2}}, {{1, 2}}}});
540561
expect_shard_counts_per_broker(
541562
balanced_leaders,
542563
1,
543564
std::array<std::array<int, 2>, 6>{
544-
{{{2, 2}}, {{2, 1}}, {{1, 2}}, {{1, 1}}, {{1, 2}}, {{2, 1}}}});
565+
{{{2, 2}}, {{2, 1}}, {{1, 2}}, {{1, 1}}, {{1, 2}}, {{1, 2}}}});
545566
expect_combined_shard_counts(
546-
// Not perfect
547567
balanced_leaders,
548568
std::array<std::array<int, 2>, 6>{
549-
{{{4, 3}}, {{3, 3}}, {{3, 3}}, {{2, 3}}, {{3, 3}}, {{3, 3}}}});
569+
{{{4, 3}}, {{3, 3}}, {{3, 3}}, {{3, 2}}, {{2, 4}}, {{2, 4}}}});
550570
}
551571

552572
TEST(GreedyLeaderBalancerTest, SkippedMoveDoesNotDesyncIndex) {

0 commit comments

Comments
 (0)