Skip to content

Commit 7ac7d8b

Browse files
authored
fix(storage): balance distributed recluster task generation (#19840)
* fix(storage): balance distributed recluster task generation * fix test * fix test * fix test * fix * fix
1 parent a08b70e commit 7ac7d8b

3 files changed

Lines changed: 223 additions & 24 deletions

File tree

src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs

Lines changed: 130 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,66 @@ fn test_cluster_key_expr() -> Expr<usize> {
6161
})
6262
}
6363

64+
async fn gen_recluster_segments(
65+
data_accessor: &opendal::Operator,
66+
location_generator: &TableMetaLocationGenerator,
67+
num_segments: usize,
68+
blocks_per_segment: usize,
69+
row_count: u64,
70+
block_size: u64,
71+
file_size: u64,
72+
thresholds: BlockThresholds,
73+
cluster_key_id: u32,
74+
) -> anyhow::Result<Vec<meta::Location>> {
75+
let mut segment_locations = Vec::with_capacity(num_segments);
76+
for _ in 0..num_segments {
77+
let mut blocks = Vec::with_capacity(blocks_per_segment);
78+
for _ in 0..blocks_per_segment {
79+
let block_id = Uuid::new_v4().simple().to_string();
80+
let location = (block_id, DataBlock::VERSION);
81+
blocks.push(Arc::new(BlockMeta::new(
82+
row_count,
83+
block_size,
84+
file_size,
85+
HashMap::default(),
86+
HashMap::default(),
87+
Some(ClusterStatistics::new(
88+
cluster_key_id,
89+
vec![Scalar::from(1i32)],
90+
vec![Scalar::from(100i32)],
91+
0,
92+
None,
93+
)),
94+
location,
95+
None,
96+
0,
97+
None,
98+
None,
99+
None,
100+
None,
101+
None,
102+
None,
103+
None,
104+
None,
105+
meta::Compression::Lz4Raw,
106+
Some(Utc::now()),
107+
)));
108+
}
109+
110+
let block_refs = blocks
111+
.iter()
112+
.map(|block| block.as_ref())
113+
.collect::<Vec<_>>();
114+
let statistics = reduce_block_metas(&block_refs, thresholds, Some(cluster_key_id));
115+
let segment = SegmentInfo::new(blocks, statistics);
116+
let segment_location = location_generator
117+
.gen_segment_info_location(TestFixture::default_table_meta_timestamps(), false);
118+
segment.write_meta(data_accessor, &segment_location).await?;
119+
segment_locations.push((segment_location, SegmentInfo::VERSION));
120+
}
121+
Ok(segment_locations)
122+
}
123+
64124
#[tokio::test(flavor = "multi_thread")]
65125
async fn test_recluster_mutator_block_select() -> anyhow::Result<()> {
66126
let fixture = TestFixture::setup().await?;
@@ -114,8 +174,8 @@ async fn test_recluster_mutator_block_select() -> anyhow::Result<()> {
114174
let mut test_block_locations = vec![];
115175
let (segment_location, block_location) = gen_test_seg(Some(ClusterStatistics::new(
116176
cluster_key_id,
117-
vec![Scalar::from(1i64)],
118-
vec![Scalar::from(3i64)],
177+
vec![Scalar::from(1i32)],
178+
vec![Scalar::from(3i32)],
119179
0,
120180
None,
121181
)))
@@ -125,8 +185,8 @@ async fn test_recluster_mutator_block_select() -> anyhow::Result<()> {
125185

126186
let (segment_location, block_location) = gen_test_seg(Some(ClusterStatistics::new(
127187
cluster_key_id,
128-
vec![Scalar::from(2i64)],
129-
vec![Scalar::from(4i64)],
188+
vec![Scalar::from(2i32)],
189+
vec![Scalar::from(4i32)],
130190
0,
131191
None,
132192
)))
@@ -137,8 +197,8 @@ async fn test_recluster_mutator_block_select() -> anyhow::Result<()> {
137197
let schema = TableSchemaRef::new(TableSchema::empty());
138198
let (segment_location, block_location) = gen_test_seg(Some(ClusterStatistics::new(
139199
cluster_key_id,
140-
vec![Scalar::from(4i64)],
141-
vec![Scalar::from(5i64)],
200+
vec![Scalar::from(4i32)],
201+
vec![Scalar::from(5i32)],
142202
0,
143203
None,
144204
)))
@@ -180,6 +240,70 @@ async fn test_recluster_mutator_block_select() -> anyhow::Result<()> {
180240
Ok(())
181241
}
182242

243+
#[tokio::test(flavor = "multi_thread")]
244+
async fn test_recluster_mutator_split_tasks_by_parallel_budget() -> anyhow::Result<()> {
245+
let fixture = TestFixture::setup().await?;
246+
let ctx = fixture.new_query_ctx().await?;
247+
ctx.get_settings().set_recluster_block_size(1000)?;
248+
249+
let data_accessor = ctx.get_application_level_data_operator()?.operator();
250+
let location_generator = TableMetaLocationGenerator::new("_prefix".to_owned());
251+
let cluster_key_id = 0;
252+
let thresholds = BlockThresholds::new(1000, 10, 10, 1000);
253+
254+
// 200 small blocks with four workers should be selected and split by the
255+
// parallel budget, instead of collapsing into one or two oversized tasks.
256+
let segment_locations = gen_recluster_segments(
257+
&data_accessor,
258+
&location_generator,
259+
20,
260+
10,
261+
1000,
262+
10,
263+
10,
264+
thresholds,
265+
cluster_key_id,
266+
)
267+
.await?;
268+
269+
let schema = TableSchemaRef::new(TableSchema::empty());
270+
let ctx: Arc<dyn TableContext> = ctx.clone();
271+
let segment_locations = create_segment_location_vector(segment_locations, None);
272+
let compact_segments = FuseTable::segment_pruning(
273+
&ctx,
274+
schema.clone(),
275+
data_accessor.clone(),
276+
&None,
277+
segment_locations,
278+
)
279+
.await?;
280+
281+
let mutator = ReclusterMutator::new(
282+
ctx,
283+
data_accessor,
284+
schema,
285+
vec![test_cluster_key_expr()],
286+
1.0,
287+
thresholds,
288+
cluster_key_id,
289+
4,
290+
);
291+
292+
let compact_segments = mutator.select_segments(&compact_segments, 1000)?;
293+
let (block_num, parts) = mutator.target_select(compact_segments).await?;
294+
295+
assert_eq!(block_num, 200);
296+
assert_eq!(parts.tasks.len(), 4);
297+
let task_block_counts = parts
298+
.tasks
299+
.iter()
300+
.map(|task| task.parts.len())
301+
.collect::<Vec<_>>();
302+
assert_eq!(task_block_counts, vec![50, 50, 50, 50]);
303+
304+
Ok(())
305+
}
306+
183307
#[tokio::test(flavor = "multi_thread")]
184308
async fn test_recluster_mutator_zero_task_segment_rebuild() -> anyhow::Result<()> {
185309
let fixture = TestFixture::setup().await?;

src/query/settings/src/settings_default.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,7 @@ impl DefaultSettings {
10061006
desc: "Sets the maximum byte size of blocks for recluster",
10071007
mode: SettingMode::Both,
10081008
scope: SettingScope::Both,
1009-
range: Some(SettingRange::Numeric(0..=u64::MAX)),
1009+
range: Some(SettingRange::Numeric(1..=u64::MAX)),
10101010
}),
10111011
("compact_max_block_selection", DefaultSettingValue {
10121012
value: UserSettingValue::UInt64(1000),
@@ -1784,7 +1784,7 @@ impl DefaultSettings {
17841784
fn recluster_block_size(max_memory_usage: u64) -> u64 {
17851785
// The sort merge consumes more than twice as much memory,
17861786
// so the block size is set relatively conservatively here.
1787-
std::cmp::min(max_memory_usage * 30 / 100, 80 * 1024 * 1024 * 1024)
1787+
std::cmp::min(max_memory_usage * 20 / 100, 80 * 1024 * 1024 * 1024)
17881788
}
17891789

17901790
/// Converts and validates a setting value based on its key.

src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use std::collections::HashMap;
1919
use std::collections::HashSet;
2020
use std::sync::Arc;
2121

22-
use databend_common_base::runtime::GLOBAL_MEM_STAT;
2322
use databend_common_base::runtime::execute_futures_in_parallel;
2423
use databend_common_catalog::plan::ReclusterParts;
2524
use databend_common_catalog::plan::ReclusterTask;
@@ -237,16 +236,11 @@ impl ReclusterMutator {
237236
blocks_map.entry(stats.level).or_default().push(idx);
238237
}
239238

240-
// Compute memory threshold and maximum number of blocks allowed for reclustering.
239+
// Use the configured recluster budget as a stable scheduling target. Runtime
240+
// memory usage is intentionally not folded in here because sort spill can
241+
// absorb pressure and the available memory snapshot changes during execution.
241242
let settings = self.ctx.get_settings();
242-
let avail_memory_usage =
243-
settings.get_max_memory_usage()? - GLOBAL_MEM_STAT.get_memory_usage() as u64;
244-
let memory_threshold = settings
245-
.get_recluster_block_size()?
246-
.min(avail_memory_usage * 30 / 100) as usize;
247-
// specify a rather small value, so that `recluster_block_size` might be tuned to lower value.
248-
let max_blocks_num =
249-
(memory_threshold / self.block_thresholds.max_bytes_per_block).max(2) * self.max_tasks;
243+
let memory_threshold = settings.get_recluster_block_size()? as usize;
250244
let block_per_seg = self.block_thresholds.block_per_segment;
251245

252246
// Prepare task generation parameters
@@ -327,7 +321,12 @@ impl ReclusterMutator {
327321
break;
328322
}
329323

330-
// Select blocks for reclustering based on depth threshold and max block size
324+
let max_blocks_num_per_node =
325+
self.max_blocks_num_per_node(total_bytes as usize, block_count, memory_threshold);
326+
let max_blocks_num = max_blocks_num_per_node * self.max_tasks;
327+
// Fetch enough candidates for all workers. The per-node quota is based
328+
// on the observed block size in selected segments instead of the worst
329+
// allowed block size, otherwise distributed recluster can under-select.
331330
let mut selected_idx =
332331
self.fetch_max_depth(points_map, self.depth_threshold, max_blocks_num)?;
333332
if selected_idx.is_empty() {
@@ -337,19 +336,60 @@ impl ReclusterMutator {
337336
selected_idx = IndexSet::from_iter(small_blocks);
338337
}
339338

340-
// Process selected blocks into recluster tasks based on memory threshold
341-
let mut task_bytes = 0;
339+
let max_total_bytes = memory_threshold.saturating_mul(self.max_tasks);
340+
// Keep the first, highest-depth blocks within the total execution budget.
341+
// This is a second-stage guard after candidate selection: the average
342+
// block size is only an estimate, while task generation uses real bytes.
343+
let selected_total_bytes =
344+
Self::limit_selected_blocks_by_budget(&mut selected_idx, &blocks, max_total_bytes);
345+
let selected_block_count = selected_idx.len();
346+
if selected_block_count < 2 {
347+
continue;
348+
}
349+
350+
let min_blocks_per_task = (max_blocks_num_per_node / 2).max(2);
351+
let target_tasks_by_blocks = selected_block_count / min_blocks_per_task;
352+
let target_tasks_by_memory = selected_total_bytes.div_ceil(memory_threshold);
353+
// A recluster task needs at least two blocks, so this caps parallelism
354+
// when the selected candidate set is too small for every worker.
355+
let max_tasks_by_blocks = selected_block_count / 2;
356+
let target_tasks = target_tasks_by_blocks
357+
.max(target_tasks_by_memory)
358+
.max(1)
359+
.min(self.max_tasks)
360+
.min(max_tasks_by_blocks);
361+
let target_task_bytes = selected_total_bytes.div_ceil(target_tasks);
362+
let target_task_blocks = selected_block_count.div_ceil(target_tasks);
363+
364+
// Process selected blocks into recluster tasks based on memory and parallelism targets.
365+
let mut task_bytes = 0usize;
342366
let mut task_rows = 0;
343367
let mut task_compressed = 0;
344368
let mut task_indices = Vec::new();
345369
let mut selected_blocks = Vec::new();
346-
for idx in selected_idx {
370+
for (processed_blocks, idx) in selected_idx.into_iter().enumerate() {
347371
let block = blocks[idx].1.clone();
348372
let block_size = block.block_size as usize;
349373
let row_count = block.row_count as usize;
350374

351-
// If memory threshold exceeded, generate a new task and reset accumulators
352-
if task_bytes + block_size > memory_threshold && selected_blocks.len() > 1 {
375+
let remaining_tasks = target_tasks.saturating_sub(tasks.len() + 1);
376+
let remaining_blocks = selected_block_count.saturating_sub(processed_blocks);
377+
// Only split for parallelism when the remaining blocks can still
378+
// satisfy the minimum task size for the tasks left to create.
379+
let has_enough_remaining_blocks =
380+
remaining_blocks >= remaining_tasks * min_blocks_per_task;
381+
let should_split_for_memory = task_bytes.saturating_add(block_size)
382+
> memory_threshold
383+
&& selected_blocks.len() > 1;
384+
// Memory split is the hard safety guard. Parallel split is a load
385+
// balancing target and is allowed only while preserving task size.
386+
let should_split_for_parallelism = tasks.len() + 1 < target_tasks
387+
&& selected_blocks.len() >= min_blocks_per_task
388+
&& has_enough_remaining_blocks
389+
&& (task_bytes >= target_task_bytes
390+
|| selected_blocks.len() >= target_task_blocks);
391+
392+
if should_split_for_memory || should_split_for_parallelism {
353393
selected_blocks_idx.extend(std::mem::take(&mut task_indices));
354394

355395
tasks.push(self.generate_task(
@@ -489,6 +529,41 @@ impl ReclusterMutator {
489529
}
490530
}
491531

532+
fn max_blocks_num_per_node(
533+
&self,
534+
total_bytes: usize,
535+
block_count: usize,
536+
memory_threshold: usize,
537+
) -> usize {
538+
let avg_block_bytes = (total_bytes / block_count).max(1);
539+
// Clamp the observed average to normal block thresholds so tiny fragments
540+
// do not inflate the candidate count and unusually large blocks do not
541+
// make the distributed selection overly conservative.
542+
let target_block_bytes = avg_block_bytes
543+
.max(self.block_thresholds.min_bytes_per_block)
544+
.min(self.block_thresholds.max_bytes_per_block);
545+
(memory_threshold / target_block_bytes).max(2)
546+
}
547+
548+
fn limit_selected_blocks_by_budget(
549+
selected_idx: &mut IndexSet<usize>,
550+
blocks: &[(BlockIndex, Arc<BlockMeta>)],
551+
max_total_bytes: usize,
552+
) -> usize {
553+
let mut total_bytes = 0usize;
554+
let mut keep_blocks = 0;
555+
for idx in selected_idx.iter().copied() {
556+
let block_size = blocks[idx].1.block_size as usize;
557+
if keep_blocks >= 2 && total_bytes.saturating_add(block_size) > max_total_bytes {
558+
break;
559+
}
560+
total_bytes = total_bytes.saturating_add(block_size);
561+
keep_blocks += 1;
562+
}
563+
selected_idx.truncate(keep_blocks);
564+
total_bytes
565+
}
566+
492567
pub fn select_segments(
493568
&self,
494569
compact_segments: &[(SegmentLocation, Arc<CompactSegmentInfo>)],

0 commit comments

Comments
 (0)