@@ -20,11 +20,15 @@ use std::sync::Arc;
2020use anyhow:: Context ;
2121use async_trait:: async_trait;
2222use quickwit_actors:: { Actor , ActorContext , ActorExitStatus , Handler , Mailbox } ;
23+ #[ cfg( feature = "metrics" ) ]
24+ use quickwit_parquet_engine:: merge:: policy:: ParquetMergeOperation ;
2325use tantivy:: TrackedObject ;
2426use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
2527use tracing:: error;
2628
2729use super :: MergeSplitDownloader ;
30+ #[ cfg( feature = "metrics" ) ]
31+ use super :: metrics_pipeline:: { ParquetMergeSplitDownloader , ParquetMergeTask } ;
2832use crate :: merge_policy:: { MergeOperation , MergeTask } ;
2933
3034pub struct MergePermit {
@@ -70,6 +74,20 @@ pub async fn schedule_merge(
7074 Ok ( ( ) )
7175}
7276
77+ #[ cfg( feature = "metrics" ) ]
78+ pub async fn schedule_parquet_merge (
79+ merge_scheduler_service : & Mailbox < MergeSchedulerService > ,
80+ merge_operation : TrackedObject < ParquetMergeOperation > ,
81+ merge_split_downloader_mailbox : Mailbox < ParquetMergeSplitDownloader > ,
82+ ) -> anyhow:: Result < ( ) > {
83+ let schedule_merge = ScheduleParquetMerge :: new ( merge_operation, merge_split_downloader_mailbox) ;
84+ merge_scheduler_service
85+ . ask ( schedule_merge)
86+ . await
87+ . context ( "failed to schedule parquet merge" ) ?;
88+ Ok ( ( ) )
89+ }
90+
7391struct ScheduledMerge {
7492 score : u64 ,
7593 id : u64 , //< just for total ordering.
@@ -103,6 +121,45 @@ impl Ord for ScheduledMerge {
103121 }
104122}
105123
124+ #[ cfg( feature = "metrics" ) ]
125+ struct ScheduledParquetMerge {
126+ score : u64 ,
127+ id : u64 ,
128+ merge_operation : TrackedObject < ParquetMergeOperation > ,
129+ split_downloader_mailbox : Mailbox < ParquetMergeSplitDownloader > ,
130+ }
131+
132+ #[ cfg( feature = "metrics" ) ]
133+ impl ScheduledParquetMerge {
134+ fn order_key ( & self ) -> ( u64 , Reverse < u64 > ) {
135+ ( self . score , Reverse ( self . id ) )
136+ }
137+ }
138+
139+ #[ cfg( feature = "metrics" ) ]
140+ impl Eq for ScheduledParquetMerge { }
141+
142+ #[ cfg( feature = "metrics" ) ]
143+ impl PartialEq for ScheduledParquetMerge {
144+ fn eq ( & self , other : & Self ) -> bool {
145+ self . cmp ( other) . is_eq ( )
146+ }
147+ }
148+
149+ #[ cfg( feature = "metrics" ) ]
150+ impl PartialOrd for ScheduledParquetMerge {
151+ fn partial_cmp ( & self , other : & Self ) -> Option < std:: cmp:: Ordering > {
152+ Some ( self . cmp ( other) )
153+ }
154+ }
155+
156+ #[ cfg( feature = "metrics" ) ]
157+ impl Ord for ScheduledParquetMerge {
158+ fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
159+ self . order_key ( ) . cmp ( & other. order_key ( ) )
160+ }
161+ }
162+
106163/// The merge scheduler service is in charge of keeping track of all scheduled merge operations,
107164/// and schedule them in the best possible order, respecting the `merge_concurrency` limit.
108165///
@@ -116,6 +173,8 @@ pub struct MergeSchedulerService {
116173 merge_semaphore : Arc < Semaphore > ,
117174 merge_concurrency : usize ,
118175 pending_merge_queue : BinaryHeap < ScheduledMerge > ,
176+ #[ cfg( feature = "metrics" ) ]
177+ pending_parquet_merge_queue : BinaryHeap < ScheduledParquetMerge > ,
119178 next_merge_id : u64 ,
120179 pending_merge_bytes : u64 ,
121180}
@@ -133,6 +192,8 @@ impl MergeSchedulerService {
133192 merge_semaphore,
134193 merge_concurrency,
135194 pending_merge_queue : BinaryHeap :: default ( ) ,
195+ #[ cfg( feature = "metrics" ) ]
196+ pending_parquet_merge_queue : BinaryHeap :: default ( ) ,
136197 next_merge_id : 0 ,
137198 pending_merge_bytes : 0 ,
138199 }
@@ -183,6 +244,54 @@ impl MergeSchedulerService {
183244 }
184245 }
185246 }
247+ // Dispatch pending Parquet merges. Shares the same semaphore as
248+ // Tantivy merges so the node doesn't exceed its merge concurrency
249+ // limit regardless of how many pipelines of each type are running.
250+ #[ cfg( feature = "metrics" ) ]
251+ loop {
252+ let merge_semaphore = self . merge_semaphore . clone ( ) ;
253+ let Some ( next_merge) = self . pending_parquet_merge_queue . peek_mut ( ) else {
254+ break ;
255+ } ;
256+ let Ok ( semaphore_permit) = Semaphore :: try_acquire_owned ( merge_semaphore) else {
257+ break ;
258+ } ;
259+ let merge_permit = MergePermit {
260+ _semaphore_permit : Some ( semaphore_permit) ,
261+ merge_scheduler_mailbox : Some ( ctx. mailbox ( ) . clone ( ) ) ,
262+ } ;
263+ let ScheduledParquetMerge {
264+ merge_operation,
265+ split_downloader_mailbox,
266+ ..
267+ } = PeekMut :: pop ( next_merge) ;
268+ // The permit is owned by the task and released via Drop when
269+ // the executor finishes, triggering PermitReleased back here.
270+ let parquet_merge_task = ParquetMergeTask {
271+ merge_operation,
272+ merge_permit,
273+ } ;
274+ self . pending_merge_bytes -= parquet_merge_task. merge_operation . total_size_bytes ( ) ;
275+ crate :: metrics:: INDEXER_METRICS
276+ . pending_merge_operations
277+ . set (
278+ self . pending_merge_queue . len ( ) as i64
279+ + self . pending_parquet_merge_queue . len ( ) as i64 ,
280+ ) ;
281+ crate :: metrics:: INDEXER_METRICS
282+ . pending_merge_bytes
283+ . set ( self . pending_merge_bytes as i64 ) ;
284+ match split_downloader_mailbox. try_send_message ( parquet_merge_task) {
285+ Ok ( _) => { }
286+ Err ( quickwit_actors:: TrySendError :: Full ( _) ) => {
287+ error ! ( "parquet split downloader queue is full: please report" ) ;
288+ }
289+ Err ( quickwit_actors:: TrySendError :: Disconnected ) => {
290+ // The downloader is dead — pipeline probably restarted.
291+ }
292+ }
293+ }
294+
186295 let num_merges =
187296 self . merge_concurrency as i64 - self . merge_semaphore . available_permits ( ) as i64 ;
188297 crate :: metrics:: INDEXER_METRICS
@@ -293,6 +402,82 @@ impl Handler<PermitReleased> for MergeSchedulerService {
293402 }
294403}
295404
405+ // --- Parquet merge scheduling (feature-gated) ---
406+
407+ #[ cfg( feature = "metrics" ) ]
408+ fn score_parquet_merge_operation ( merge_operation : & ParquetMergeOperation ) -> u64 {
409+ let total_num_bytes = merge_operation. total_size_bytes ( ) ;
410+ if total_num_bytes == 0 {
411+ return u64:: MAX ;
412+ }
413+ let delta_num_splits = ( merge_operation. splits . len ( ) - 1 ) as u64 ;
414+ ( delta_num_splits << 48 )
415+ . checked_div ( total_num_bytes)
416+ . unwrap_or ( 1u64 )
417+ }
418+
419+ #[ cfg( feature = "metrics" ) ]
420+ #[ derive( Debug ) ]
421+ struct ScheduleParquetMerge {
422+ score : u64 ,
423+ merge_operation : TrackedObject < ParquetMergeOperation > ,
424+ split_downloader_mailbox : Mailbox < ParquetMergeSplitDownloader > ,
425+ }
426+
427+ #[ cfg( feature = "metrics" ) ]
428+ impl ScheduleParquetMerge {
429+ pub fn new (
430+ merge_operation : TrackedObject < ParquetMergeOperation > ,
431+ split_downloader_mailbox : Mailbox < ParquetMergeSplitDownloader > ,
432+ ) -> Self {
433+ let score = score_parquet_merge_operation ( & merge_operation) ;
434+ Self {
435+ score,
436+ merge_operation,
437+ split_downloader_mailbox,
438+ }
439+ }
440+ }
441+
442+ #[ cfg( feature = "metrics" ) ]
443+ #[ async_trait]
444+ impl Handler < ScheduleParquetMerge > for MergeSchedulerService {
445+ type Reply = ( ) ;
446+
447+ async fn handle (
448+ & mut self ,
449+ schedule_merge : ScheduleParquetMerge ,
450+ ctx : & ActorContext < Self > ,
451+ ) -> Result < ( ) , ActorExitStatus > {
452+ let ScheduleParquetMerge {
453+ score,
454+ merge_operation,
455+ split_downloader_mailbox,
456+ } = schedule_merge;
457+ let merge_id = self . next_merge_id ;
458+ self . next_merge_id += 1 ;
459+ let scheduled = ScheduledParquetMerge {
460+ score,
461+ id : merge_id,
462+ merge_operation,
463+ split_downloader_mailbox,
464+ } ;
465+ self . pending_merge_bytes += scheduled. merge_operation . total_size_bytes ( ) ;
466+ self . pending_parquet_merge_queue . push ( scheduled) ;
467+ crate :: metrics:: INDEXER_METRICS
468+ . pending_merge_operations
469+ . set (
470+ self . pending_merge_queue . len ( ) as i64
471+ + self . pending_parquet_merge_queue . len ( ) as i64 ,
472+ ) ;
473+ crate :: metrics:: INDEXER_METRICS
474+ . pending_merge_bytes
475+ . set ( self . pending_merge_bytes as i64 ) ;
476+ self . schedule_pending_merges ( ctx) ;
477+ Ok ( ( ) )
478+ }
479+ }
480+
296481#[ cfg( test) ]
297482mod tests {
298483 use std:: time:: Duration ;
0 commit comments