Skip to content

Commit 8364e8a

Browse files
authored
[fix](cloud) fix active-priority rebalancer starvation (apache#60835)
Root cause: - In active-priority mode, performBalancing() returns early when active objects are still unbalanced. Under sustained hot-tablet pressure, this keeps happening and INACTIVE_ONLY never runs. Inactive partitions/tables are starved, so BE scale-out rebalancing becomes very slow. - runAfterCatalogReady() always called statRouteInfo() twice and rebuilt active tablet ids every round. These full-route/stat rebuilds allocate many temporary maps/sets, increasing young GC frequency. Fix: - Run the second statRouteInfo() only when smooth-upgrade migration actually moved tablets. - Add a starvation breaker: after N consecutive active-unbalanced rounds, force one INACTIVE_ONLY round. - Refresh active tablet snapshot by configurable interval (default 60s) instead of every round. Config: - cloud_active_tablet_ids_refresh_interval_second (default 60) - cloud_active_unbalanced_force_inactive_after_rounds (default 10) Test: - Add unit tests for forced inactive trigger, active snapshot refresh interval, and migrate no-op path.
1 parent e66e76f commit 8364e8a

3 files changed

Lines changed: 142 additions & 17 deletions

File tree

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3497,6 +3497,21 @@ public static int metaServiceRpcRetryTimes() {
34973497
+ "Default 10000. <=0 disables TopN segmentation."})
34983498
public static int cloud_active_partition_scheduling_topn = 10000;
34993499

3500+
@ConfField(mutable = true, masterOnly = true, description = {
3501+
"活跃 tablet 优先调度开启时,active 集合刷新间隔(秒)。默认 60 秒,"
3502+
+ "表示 60 秒内复用同一批 active tablet,避免每轮重算。",
3503+
"Refresh interval in seconds for the active-tablet snapshot when active priority scheduling is enabled. "
3504+
+ "Default 60 seconds. Reuses the same active-tablet set within the interval."})
3505+
public static long cloud_active_tablet_ids_refresh_interval_second = 60L;
3506+
3507+
@ConfField(mutable = true, masterOnly = true, description = {
3508+
"活跃 tablet 优先调度开启时,若 active 阶段连续 N 轮未达均衡,"
3509+
+ "则强制执行一轮 inactive 阶段以避免长期饥饿。默认 10,<=0 表示关闭该强制机制。",
3510+
"When active priority scheduling is enabled and the active phase remains unbalanced for N consecutive "
3511+
+ "rounds, force one inactive phase round to avoid long-term starvation. "
3512+
+ "Default 10. <=0 disables this forced mechanism."})
3513+
public static int cloud_active_unbalanced_force_inactive_after_rounds = 10;
3514+
35003515
@ConfField(mutable = true, masterOnly = false)
35013516
public static String security_checker_class_name = "";
35023517

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ private enum ActiveSchedulePhase {
149149
private BalanceTypeEnum globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
150150

151151
private Set<Long> activeTabletIds = new HashSet<>();
152+
private long lastActiveTabletIdsRefreshMs = 0L;
153+
private int consecutiveActiveUnbalancedRounds = 0;
152154

153155
// cache for scheduling order in one daemon run (rebuilt in statRouteInfo)
154156
// table/partition active count is computed from activeTabletIds
@@ -510,7 +512,7 @@ protected void runAfterCatalogReady() {
510512

511513
LOG.info("cloud tablet rebalance begin");
512514
long start = System.currentTimeMillis();
513-
activeTabletIds = getActiveTabletIds();
515+
refreshActiveTabletIdsIfNeeded();
514516
globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
515517

516518
buildClusterToBackendMap();
@@ -519,8 +521,10 @@ protected void runAfterCatalogReady() {
519521
}
520522

521523
statRouteInfo();
522-
migrateTabletsForSmoothUpgrade();
523-
statRouteInfo();
524+
boolean migrated = migrateTabletsForSmoothUpgrade();
525+
if (migrated) {
526+
statRouteInfo();
527+
}
524528

525529
indexBalanced = true;
526530
tableBalanced = true;
@@ -564,18 +568,15 @@ private void buildClusterToBackendMap() {
564568
LOG.info("cluster to backends {}", clusterToBes);
565569
}
566570

567-
private void migrateTabletsForSmoothUpgrade() {
571+
private boolean migrateTabletsForSmoothUpgrade() {
572+
boolean migrated = false;
568573
Pair<Long, Long> pair;
569-
while (!tabletsMigrateTasks.isEmpty()) {
570-
try {
571-
pair = tabletsMigrateTasks.take();
572-
LOG.debug("begin tablets migration from be {} to be {}", pair.first, pair.second);
573-
migrateTablets(pair.first, pair.second);
574-
} catch (InterruptedException e) {
575-
LOG.warn("migrate tablets failed", e);
576-
throw new RuntimeException(e);
577-
}
574+
while ((pair = tabletsMigrateTasks.poll()) != null) {
575+
LOG.debug("begin tablets migration from be {} to be {}", pair.first, pair.second);
576+
migrateTablets(pair.first, pair.second);
577+
migrated = true;
578578
}
579+
return migrated;
579580
}
580581

581582
private void performBalancing() {
@@ -585,6 +586,7 @@ private void performBalancing() {
585586
// to the same table or partition onto the same BE, while `partition` scheduling later requires these tablets
586587
// to be dispersed across different BEs, resulting in unnecessary scheduling.
587588
if (!Config.enable_cloud_active_tablet_priority_scheduling) {
589+
consecutiveActiveUnbalancedRounds = 0;
588590
// Legacy scheduling: schedule the full set.
589591
if (Config.enable_cloud_partition_balance) {
590592
balanceAllPartitionsByPhase(ActiveSchedulePhase.ALL);
@@ -621,10 +623,15 @@ private void performBalancing() {
621623
activeIndexBalanced, activeTableBalanced, activeBalanced, clusterToBes.size());
622624
}
623625

624-
if (!activeBalanced) {
626+
boolean forceInactivePhase = shouldForceInactivePhase(activeBalanced);
627+
if (!activeBalanced && !forceInactivePhase) {
625628
// Active objects are not balanced yet; skip phase2 to avoid diluting scheduling budget.
626629
return;
627630
}
631+
if (forceInactivePhase) {
632+
LOG.info("active phase is still unbalanced for {} consecutive rounds, force run INACTIVE_ONLY once",
633+
Config.cloud_active_unbalanced_force_inactive_after_rounds);
634+
}
628635

629636
// Phase 2: inactive (all - active), then global if enabled and ready.
630637
boolean phase2IndexBalanced = true;
@@ -635,12 +642,30 @@ private void performBalancing() {
635642
if (Config.enable_cloud_table_balance && phase2IndexBalanced) {
636643
phase2TableBalanced = balanceAllTablesByPhase(ActiveSchedulePhase.INACTIVE_ONLY);
637644
}
638-
if (Config.enable_cloud_global_balance && phase2IndexBalanced && phase2TableBalanced) {
645+
if (Config.enable_cloud_global_balance && activeBalanced && phase2IndexBalanced && phase2TableBalanced) {
639646
globalBalance();
640647
}
641648
}
642649
}
643650

651+
private boolean shouldForceInactivePhase(boolean activeBalanced) {
652+
if (activeBalanced) {
653+
consecutiveActiveUnbalancedRounds = 0;
654+
return false;
655+
}
656+
int forceAfterRounds = Config.cloud_active_unbalanced_force_inactive_after_rounds;
657+
if (forceAfterRounds <= 0) {
658+
consecutiveActiveUnbalancedRounds = 0;
659+
return false;
660+
}
661+
consecutiveActiveUnbalancedRounds++;
662+
if (consecutiveActiveUnbalancedRounds < forceAfterRounds) {
663+
return false;
664+
}
665+
consecutiveActiveUnbalancedRounds = 0;
666+
return true;
667+
}
668+
644669
private boolean balanceAllPartitionsByPhase(ActiveSchedulePhase phase) {
645670
// Reuse existing "balanced" flags as a per-phase signal.
646671
indexBalanced = true;
@@ -1682,7 +1707,7 @@ private long findSourceBackend(List<Long> bes, Map<Long, Set<Tablet>> beToTablet
16821707
maxTabletsNum = tabletNum;
16831708
}
16841709
} else {
1685-
LOG.info("backend {} not found", be);
1710+
LOG.debug("backend {} not found", be);
16861711
}
16871712
}
16881713
return srcBe;
@@ -1939,6 +1964,21 @@ private Set<Long> getActiveTabletIds() {
19391964
}
19401965
}
19411966

1967+
private void refreshActiveTabletIdsIfNeeded() {
1968+
long nowMs = System.currentTimeMillis();
1969+
if (!shouldRefreshActiveTabletIds(nowMs)) {
1970+
return;
1971+
}
1972+
activeTabletIds = getActiveTabletIds();
1973+
lastActiveTabletIdsRefreshMs = nowMs;
1974+
}
1975+
1976+
private boolean shouldRefreshActiveTabletIds(long nowMs) {
1977+
long refreshIntervalSeconds = Math.max(1L, Config.cloud_active_tablet_ids_refresh_interval_second);
1978+
long refreshIntervalMs = TimeUnit.SECONDS.toMillis(refreshIntervalSeconds);
1979+
return lastActiveTabletIdsRefreshMs <= 0L || nowMs - lastActiveTabletIdsRefreshMs >= refreshIntervalMs;
1980+
}
1981+
19421982
// Choose non-active (cold) tablet first to re-balance, to reduce impact on hot tablets.
19431983
// Fallback to active/random if no cold tablet is available.
19441984
private Tablet pickTabletPreferCold(long srcBe, Set<Tablet> tablets, Set<Long> activeTabletIds,

fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,22 @@
4343
public class CloudTabletRebalancerTest {
4444

4545
private boolean oldEnableActiveScheduling;
46+
private long oldActiveTabletIdsRefreshIntervalSecond;
47+
private int oldForceInactiveAfterRounds;
4648

4749
@BeforeEach
4850
public void setUp() {
4951
oldEnableActiveScheduling = Config.enable_cloud_active_tablet_priority_scheduling;
52+
oldActiveTabletIdsRefreshIntervalSecond = Config.cloud_active_tablet_ids_refresh_interval_second;
53+
oldForceInactiveAfterRounds = Config.cloud_active_unbalanced_force_inactive_after_rounds;
5054
Config.enable_cloud_active_tablet_priority_scheduling = true;
5155
}
5256

5357
@AfterEach
5458
public void tearDown() {
5559
Config.enable_cloud_active_tablet_priority_scheduling = oldEnableActiveScheduling;
60+
Config.cloud_active_tablet_ids_refresh_interval_second = oldActiveTabletIdsRefreshIntervalSecond;
61+
Config.cloud_active_unbalanced_force_inactive_after_rounds = oldForceInactiveAfterRounds;
5662
}
5763

5864
private static class TestRebalancer extends CloudTabletRebalancer {
@@ -79,6 +85,13 @@ private static void setField(Object obj, String name, Object value) throws Excep
7985
f.set(obj, value);
8086
}
8187

88+
@SuppressWarnings("unchecked")
89+
private static <T> T getField(Object obj, String name) throws Exception {
90+
Field f = CloudTabletRebalancer.class.getDeclaredField(name);
91+
f.setAccessible(true);
92+
return (T) f.get(obj);
93+
}
94+
8295
@SuppressWarnings("unchecked")
8396
private static <T> T invokePrivate(Object obj, String method, Class<?>[] types, Object[] args) throws Exception {
8497
Method m = CloudTabletRebalancer.class.getDeclaredMethod(method, types);
@@ -236,6 +249,63 @@ public void testPartitionEntryComparator_internalDbLastAndIdDescTieBreak() throw
236249
Assertions.assertEquals(200L, list.get(1).getKey());
237250
Assertions.assertEquals(100L, list.get(2).getKey(), "Internal db partition should be scheduled last");
238251
}
239-
}
240252

253+
@Test
254+
public void testShouldForceInactivePhase_afterConsecutiveUnbalancedRounds() throws Exception {
255+
TestRebalancer r = new TestRebalancer();
256+
Config.cloud_active_unbalanced_force_inactive_after_rounds = 3;
257+
258+
boolean forceRound1 = invokePrivate(r, "shouldForceInactivePhase",
259+
new Class<?>[] {boolean.class}, new Object[] {false});
260+
boolean forceRound2 = invokePrivate(r, "shouldForceInactivePhase",
261+
new Class<?>[] {boolean.class}, new Object[] {false});
262+
boolean forceRound3 = invokePrivate(r, "shouldForceInactivePhase",
263+
new Class<?>[] {boolean.class}, new Object[] {false});
264+
265+
Assertions.assertFalse(forceRound1);
266+
Assertions.assertFalse(forceRound2);
267+
Assertions.assertTrue(forceRound3);
268+
Assertions.assertEquals(0, (int) getField(r, "consecutiveActiveUnbalancedRounds"));
269+
270+
boolean forceAfterBalanced = invokePrivate(r, "shouldForceInactivePhase",
271+
new Class<?>[] {boolean.class}, new Object[] {true});
272+
Assertions.assertFalse(forceAfterBalanced);
273+
Assertions.assertEquals(0, (int) getField(r, "consecutiveActiveUnbalancedRounds"));
274+
}
275+
276+
@Test
277+
public void testShouldRefreshActiveTabletIds_respectsIntervalAndClamp() throws Exception {
278+
TestRebalancer r = new TestRebalancer();
279+
280+
Config.cloud_active_tablet_ids_refresh_interval_second = 60L;
281+
setField(r, "lastActiveTabletIdsRefreshMs", 0L);
282+
boolean firstRound = invokePrivate(r, "shouldRefreshActiveTabletIds",
283+
new Class<?>[] {long.class}, new Object[] {1000L});
284+
Assertions.assertTrue(firstRound);
285+
286+
setField(r, "lastActiveTabletIdsRefreshMs", 1000L);
287+
boolean beforeInterval = invokePrivate(r, "shouldRefreshActiveTabletIds",
288+
new Class<?>[] {long.class}, new Object[] {60000L});
289+
boolean atInterval = invokePrivate(r, "shouldRefreshActiveTabletIds",
290+
new Class<?>[] {long.class}, new Object[] {61000L});
291+
Assertions.assertFalse(beforeInterval);
292+
Assertions.assertTrue(atInterval);
293+
294+
Config.cloud_active_tablet_ids_refresh_interval_second = 0L; // clamp to 1s
295+
setField(r, "lastActiveTabletIdsRefreshMs", 1000L);
296+
boolean beforeClampInterval = invokePrivate(r, "shouldRefreshActiveTabletIds",
297+
new Class<?>[] {long.class}, new Object[] {1500L});
298+
boolean atClampInterval = invokePrivate(r, "shouldRefreshActiveTabletIds",
299+
new Class<?>[] {long.class}, new Object[] {2000L});
300+
Assertions.assertFalse(beforeClampInterval);
301+
Assertions.assertTrue(atClampInterval);
302+
}
303+
304+
@Test
305+
public void testMigrateTabletsForSmoothUpgrade_emptyQueueReturnsFalse() throws Exception {
306+
TestRebalancer r = new TestRebalancer();
307+
boolean migrated = invokePrivate(r, "migrateTabletsForSmoothUpgrade", new Class<?>[] {}, new Object[] {});
308+
Assertions.assertFalse(migrated);
309+
}
310+
}
241311

0 commit comments

Comments
 (0)