Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ use std::sync::Arc;
use anyhow::Context;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox};
#[cfg(feature = "metrics")]
use quickwit_parquet_engine::merge::policy::ParquetMergeOperation;
use tantivy::TrackedObject;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::error;

use super::MergeSplitDownloader;
#[cfg(feature = "metrics")]
use super::metrics_pipeline::{ParquetMergeSplitDownloader, ParquetMergeTask};
use crate::merge_policy::{MergeOperation, MergeTask};

pub struct MergePermit {
Expand Down Expand Up @@ -70,6 +74,20 @@ pub async fn schedule_merge(
Ok(())
}

#[cfg(feature = "metrics")]
pub async fn schedule_parquet_merge(
merge_scheduler_service: &Mailbox<MergeSchedulerService>,
merge_operation: TrackedObject<ParquetMergeOperation>,
merge_split_downloader_mailbox: Mailbox<ParquetMergeSplitDownloader>,
) -> anyhow::Result<()> {
let schedule_merge = ScheduleParquetMerge::new(merge_operation, merge_split_downloader_mailbox);
merge_scheduler_service
.ask(schedule_merge)
.await
.context("failed to schedule parquet merge")?;
Ok(())
}

struct ScheduledMerge {
score: u64,
id: u64, //< just for total ordering.
Expand Down Expand Up @@ -103,6 +121,45 @@ impl Ord for ScheduledMerge {
}
}

#[cfg(feature = "metrics")]
struct ScheduledParquetMerge {
score: u64,
id: u64,
merge_operation: TrackedObject<ParquetMergeOperation>,
split_downloader_mailbox: Mailbox<ParquetMergeSplitDownloader>,
}

#[cfg(feature = "metrics")]
impl ScheduledParquetMerge {
fn order_key(&self) -> (u64, Reverse<u64>) {
(self.score, Reverse(self.id))
}
}

#[cfg(feature = "metrics")]
impl Eq for ScheduledParquetMerge {}

#[cfg(feature = "metrics")]
impl PartialEq for ScheduledParquetMerge {
fn eq(&self, other: &Self) -> bool {
self.cmp(other).is_eq()
}
}

#[cfg(feature = "metrics")]
impl PartialOrd for ScheduledParquetMerge {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

#[cfg(feature = "metrics")]
impl Ord for ScheduledParquetMerge {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.order_key().cmp(&other.order_key())
}
}

/// The merge scheduler service is in charge of keeping track of all scheduled merge operations,
/// and schedule them in the best possible order, respecting the `merge_concurrency` limit.
///
Expand All @@ -116,6 +173,8 @@ pub struct MergeSchedulerService {
merge_semaphore: Arc<Semaphore>,
merge_concurrency: usize,
pending_merge_queue: BinaryHeap<ScheduledMerge>,
#[cfg(feature = "metrics")]
pending_parquet_merge_queue: BinaryHeap<ScheduledParquetMerge>,
next_merge_id: u64,
pending_merge_bytes: u64,
}
Expand All @@ -133,6 +192,8 @@ impl MergeSchedulerService {
merge_semaphore,
merge_concurrency,
pending_merge_queue: BinaryHeap::default(),
#[cfg(feature = "metrics")]
pending_parquet_merge_queue: BinaryHeap::default(),
next_merge_id: 0,
pending_merge_bytes: 0,
}
Expand Down Expand Up @@ -183,6 +244,54 @@ impl MergeSchedulerService {
}
}
}
// Dispatch pending Parquet merges. Shares the same semaphore as
// Tantivy merges so the node doesn't exceed its merge concurrency
// limit regardless of how many pipelines of each type are running.
#[cfg(feature = "metrics")]
loop {
let merge_semaphore = self.merge_semaphore.clone();
let Some(next_merge) = self.pending_parquet_merge_queue.peek_mut() else {
break;
};
let Ok(semaphore_permit) = Semaphore::try_acquire_owned(merge_semaphore) else {
break;
};
let merge_permit = MergePermit {
_semaphore_permit: Some(semaphore_permit),
merge_scheduler_mailbox: Some(ctx.mailbox().clone()),
};
let ScheduledParquetMerge {
merge_operation,
split_downloader_mailbox,
..
} = PeekMut::pop(next_merge);
// The permit is owned by the task and released via Drop when
// the executor finishes, triggering PermitReleased back here.
let parquet_merge_task = ParquetMergeTask {
merge_operation,
merge_permit,
};
self.pending_merge_bytes -= parquet_merge_task.merge_operation.total_size_bytes();
crate::metrics::INDEXER_METRICS
.pending_merge_operations
.set(
self.pending_merge_queue.len() as i64
+ self.pending_parquet_merge_queue.len() as i64,
);
crate::metrics::INDEXER_METRICS
.pending_merge_bytes
.set(self.pending_merge_bytes as i64);
match split_downloader_mailbox.try_send_message(parquet_merge_task) {
Ok(_) => {}
Err(quickwit_actors::TrySendError::Full(_)) => {
error!("parquet split downloader queue is full: please report");
}
Err(quickwit_actors::TrySendError::Disconnected) => {
// The downloader is dead — pipeline probably restarted.
}
}
}

let num_merges =
self.merge_concurrency as i64 - self.merge_semaphore.available_permits() as i64;
crate::metrics::INDEXER_METRICS
Expand Down Expand Up @@ -293,6 +402,82 @@ impl Handler<PermitReleased> for MergeSchedulerService {
}
}

// --- Parquet merge scheduling (feature-gated) ---

#[cfg(feature = "metrics")]
fn score_parquet_merge_operation(merge_operation: &ParquetMergeOperation) -> u64 {
let total_num_bytes = merge_operation.total_size_bytes();
if total_num_bytes == 0 {
return u64::MAX;
}
let delta_num_splits = (merge_operation.splits.len() - 1) as u64;
(delta_num_splits << 48)
.checked_div(total_num_bytes)
.unwrap_or(1u64)
}

#[cfg(feature = "metrics")]
#[derive(Debug)]
struct ScheduleParquetMerge {
score: u64,
merge_operation: TrackedObject<ParquetMergeOperation>,
split_downloader_mailbox: Mailbox<ParquetMergeSplitDownloader>,
}

#[cfg(feature = "metrics")]
impl ScheduleParquetMerge {
pub fn new(
merge_operation: TrackedObject<ParquetMergeOperation>,
split_downloader_mailbox: Mailbox<ParquetMergeSplitDownloader>,
) -> Self {
let score = score_parquet_merge_operation(&merge_operation);
Self {
score,
merge_operation,
split_downloader_mailbox,
}
}
}

#[cfg(feature = "metrics")]
#[async_trait]
impl Handler<ScheduleParquetMerge> for MergeSchedulerService {
type Reply = ();

async fn handle(
&mut self,
schedule_merge: ScheduleParquetMerge,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let ScheduleParquetMerge {
score,
merge_operation,
split_downloader_mailbox,
} = schedule_merge;
let merge_id = self.next_merge_id;
self.next_merge_id += 1;
let scheduled = ScheduledParquetMerge {
score,
id: merge_id,
merge_operation,
split_downloader_mailbox,
};
self.pending_merge_bytes += scheduled.merge_operation.total_size_bytes();
self.pending_parquet_merge_queue.push(scheduled);
crate::metrics::INDEXER_METRICS
.pending_merge_operations
.set(
self.pending_merge_queue.len() as i64
+ self.pending_parquet_merge_queue.len() as i64,
);
crate::metrics::INDEXER_METRICS
.pending_merge_bytes
.set(self.pending_merge_bytes as i64);
self.schedule_pending_merges(ctx);
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mod indexing_service_impl;
mod parquet_doc_processor;
mod parquet_indexer;
pub(crate) mod parquet_merge_messages;
mod parquet_merge_planner;
mod parquet_merge_split_downloader;
mod parquet_packager;
mod parquet_splits_update;
mod parquet_uploader;
Expand All @@ -46,6 +48,8 @@ pub use parquet_doc_processor::{
};
pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch};
pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits};
pub use parquet_merge_planner::ParquetMergePlanner;
pub use parquet_merge_split_downloader::ParquetMergeSplitDownloader;
pub use parquet_packager::{ParquetBatchForPackager, ParquetPackager, ParquetPackagerCounters};
pub use parquet_splits_update::ParquetSplitsUpdate;
pub use parquet_uploader::ParquetUploader;
Expand Down
Loading
Loading