Skip to content

Commit 4a4679e

Browse files
g-talbotclaude
andcommitted
refactor: extract shared score_merge() function for scheduler
Both Tantivy and Parquet merge scheduling used identical score logic (prefer merges that reduce more splits for less total bytes). Extracted the core arithmetic into score_merge(num_splits, total_bytes) and have both score_merge_operation() and score_parquet_merge_operation() call it. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 73bd817 commit 4a4679e

1 file changed

Lines changed: 20 additions & 18 deletions

File tree

quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -314,24 +314,30 @@ struct ScheduleMerge {
314314
split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
315315
}
316316

317-
/// The higher, the sooner we will execute the merge operation.
318-
/// A good merge operation
319-
/// - strongly reduces the number splits
320-
/// - is light.
321-
fn score_merge_operation(merge_operation: &MergeOperation) -> u64 {
322-
let total_num_bytes: u64 = merge_operation.total_num_bytes();
317+
/// Scores a merge operation for priority scheduling.
318+
///
319+
/// Higher score = scheduled sooner. Prefers merges that strongly reduce
320+
/// split count relative to their total byte cost. Used by both Tantivy
321+
/// and Parquet merge scheduling.
322+
fn score_merge(num_splits: usize, total_num_bytes: u64) -> u64 {
323323
if total_num_bytes == 0 {
324-
// Silly corner case that should never happen.
325324
return u64::MAX;
326325
}
327-
// We will remove splits.len() and add 1 merge splits.
328-
let delta_num_splits = (merge_operation.splits.len() - 1) as u64;
329-
// We use integer arithmetic to avoid `f64 are not ordered` silliness.
326+
// We will remove num_splits and add 1 merged split.
327+
let delta_num_splits = (num_splits - 1) as u64;
328+
// Integer arithmetic to avoid `f64 are not ordered` silliness.
330329
(delta_num_splits << 48)
331330
.checked_div(total_num_bytes)
332331
.unwrap_or(1u64)
333332
}
334333

334+
fn score_merge_operation(merge_operation: &MergeOperation) -> u64 {
335+
score_merge(
336+
merge_operation.splits.len(),
337+
merge_operation.total_num_bytes(),
338+
)
339+
}
340+
335341
impl ScheduleMerge {
336342
pub fn new(
337343
merge_operation: TrackedObject<MergeOperation>,
@@ -402,14 +408,10 @@ impl Handler<PermitReleased> for MergeSchedulerService {
402408

403409
#[cfg(feature = "metrics")]
404410
fn score_parquet_merge_operation(merge_operation: &ParquetMergeOperation) -> u64 {
405-
let total_num_bytes = merge_operation.total_size_bytes();
406-
if total_num_bytes == 0 {
407-
return u64::MAX;
408-
}
409-
let delta_num_splits = (merge_operation.splits.len() - 1) as u64;
410-
(delta_num_splits << 48)
411-
.checked_div(total_num_bytes)
412-
.unwrap_or(1u64)
411+
score_merge(
412+
merge_operation.splits.len(),
413+
merge_operation.total_size_bytes(),
414+
)
413415
}
414416

415417
#[cfg(feature = "metrics")]

0 commit comments

Comments
 (0)