Skip to content
39 changes: 21 additions & 18 deletions quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl MergeSchedulerService {
} = PeekMut::pop(next_merge);
// The permit is owned by the task and released via Drop when
// the executor finishes, triggering PermitReleased back here.
// Drop-based release ensures the semaphore is freed even on panic.
let parquet_merge_task = ParquetMergeTask {
merge_operation,
merge_permit,
Expand Down Expand Up @@ -318,24 +319,30 @@ struct ScheduleMerge {
split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
}

/// The higher, the sooner we will execute the merge operation.
/// A good merge operation
/// - strongly reduces the number splits
/// - is light.
fn score_merge_operation(merge_operation: &MergeOperation) -> u64 {
let total_num_bytes: u64 = merge_operation.total_num_bytes();
/// Scores a merge operation for priority scheduling.
///
/// Higher score = scheduled sooner. Prefers merges that strongly reduce
/// split count relative to their total byte cost. Used by both Tantivy
/// and Parquet merge scheduling.
fn score_merge(num_splits: usize, total_num_bytes: u64) -> u64 {
if total_num_bytes == 0 {
// Silly corner case that should never happen.
return u64::MAX;
}
// We will remove splits.len() and add 1 merge splits.
let delta_num_splits = (merge_operation.splits.len() - 1) as u64;
// We use integer arithmetic to avoid `f64 are not ordered` silliness.
// We will remove num_splits and add 1 merged split.
let delta_num_splits = (num_splits - 1) as u64;
// Integer arithmetic to avoid `f64 are not ordered` silliness.
(delta_num_splits << 48)
.checked_div(total_num_bytes)
.unwrap_or(1u64)
}

fn score_merge_operation(merge_operation: &MergeOperation) -> u64 {
score_merge(
merge_operation.splits.len(),
merge_operation.total_num_bytes(),
)
}

impl ScheduleMerge {
pub fn new(
merge_operation: TrackedObject<MergeOperation>,
Expand Down Expand Up @@ -406,14 +413,10 @@ impl Handler<PermitReleased> for MergeSchedulerService {

#[cfg(feature = "metrics")]
fn score_parquet_merge_operation(merge_operation: &ParquetMergeOperation) -> u64 {
let total_num_bytes = merge_operation.total_size_bytes();
if total_num_bytes == 0 {
return u64::MAX;
}
let delta_num_splits = (merge_operation.splits.len() - 1) as u64;
(delta_num_splits << 48)
.checked_div(total_num_bytes)
.unwrap_or(1u64)
score_merge(
merge_operation.splits.len(),
merge_operation.total_size_bytes(),
)
}

#[cfg(feature = "metrics")]
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ mod publisher_impl;
)]
mod parquet_e2e_test;

#[cfg(test)]
#[allow(clippy::disallowed_methods)]
mod parquet_merge_pipeline_test;

pub use parquet_doc_processor::{
ParquetDocProcessor, ParquetDocProcessorCounters, ParquetDocProcessorError, is_arrow_ipc,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,18 @@ use crate::models::PublishLock;
/// ready-to-upload Parquet files with complete metadata.
pub struct ParquetMergeExecutor {
uploader_mailbox: Mailbox<ParquetUploader>,
writer_config: ParquetWriterConfig,
}

impl ParquetMergeExecutor {
pub fn new(uploader_mailbox: Mailbox<ParquetUploader>) -> Self {
Self { uploader_mailbox }
pub fn new(
uploader_mailbox: Mailbox<ParquetUploader>,
writer_config: ParquetWriterConfig,
) -> Self {
Self {
uploader_mailbox,
writer_config,
}
}
}

Expand Down Expand Up @@ -99,36 +106,38 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
// Run the CPU-intensive merge on the dedicated thread pool.
let input_paths = scratch.downloaded_parquet_files.clone();
let output_dir_clone = output_dir.clone();
let writer_config = self.writer_config.clone();
let merge_result = run_cpu_intensive(move || {
let config = MergeConfig {
num_outputs: 1,
writer_config: ParquetWriterConfig::default(),
writer_config,
};
merge_sorted_parquet_files(&input_paths, &output_dir_clone, &config)
})
.await;

// We return Ok(()) on merge failure rather than Err to keep the actor
// alive — same strategy as Tantivy's MergeExecutor. This prevents a
// single "split of death" from crash-looping the entire pipeline.
// The trade-off: failed splits aren't retried until pipeline respawn.
let outputs: Vec<MergeOutputFile> = match merge_result {
Ok(Ok(outputs)) => outputs,
Ok(Err(merge_err)) => {
warn!(
error = %merge_err,
merge_split_id = %merge_split_id,
"parquet merge failed"
"parquet merge failed — input splits will not be retried until \
the pipeline restarts with metastore re-seeding"
);
// Input splits were drained from the planner by operations().
// They remain published but won't be re-planned until respawn.
// The input splits were drained from the planner by operations().
// They remain published in the metastore but won't be re-planned
// until the pipeline restarts and re-seeds from the metastore.
// TODO: implement fetch_immature_parquet_splits() for respawn
// (same as Tantivy's fetch_immature_splits pattern).
return Ok(());
}
Err(panicked) => {
warn!(
error = %panicked,
merge_split_id = %merge_split_id,
"parquet merge panicked"
"parquet merge panicked — input splits will not be retried until \
the pipeline restarts with metastore re-seeding"
);
return Ok(());
}
Expand Down Expand Up @@ -162,9 +171,8 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
.context("failed to build merge output metadata")
.map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?;

// The merge engine writes to a temp filename (merge_output_*.parquet).
// Rename to {split_id}.parquet so the uploader can find it at the
// path derived from ParquetSplitMetadata::parquet_filename().
// Rename the output file to match the generated split ID.
// The uploader expects files at `output_dir/{split_id}.parquet`.
let expected_path = output_dir.join(&metadata.parquet_file);
if output.path != expected_path {
std::fs::rename(&output.path, &expected_path)
Expand Down
Loading
Loading