@@ -20,11 +20,15 @@ use std::sync::Arc;
2020use anyhow:: Context ;
2121use async_trait:: async_trait;
2222use quickwit_actors:: { Actor , ActorContext , ActorExitStatus , Handler , Mailbox } ;
23+ #[ cfg( feature = "metrics" ) ]
24+ use quickwit_parquet_engine:: merge:: policy:: ParquetMergeOperation ;
2325use tantivy:: TrackedObject ;
2426use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
2527use tracing:: error;
2628
2729use super :: MergeSplitDownloader ;
30+ #[ cfg( feature = "metrics" ) ]
31+ use super :: metrics_pipeline:: { ParquetMergeSplitDownloader , ParquetMergeTask } ;
2832use crate :: merge_policy:: { MergeOperation , MergeTask } ;
2933
3034pub struct MergePermit {
@@ -70,6 +74,20 @@ pub async fn schedule_merge(
7074 Ok ( ( ) )
7175}
7276
77+ #[ cfg( feature = "metrics" ) ]
78+ pub async fn schedule_parquet_merge (
79+ merge_scheduler_service : & Mailbox < MergeSchedulerService > ,
80+ merge_operation : TrackedObject < ParquetMergeOperation > ,
81+ merge_split_downloader_mailbox : Mailbox < ParquetMergeSplitDownloader > ,
82+ ) -> anyhow:: Result < ( ) > {
83+ let schedule_merge = ScheduleParquetMerge :: new ( merge_operation, merge_split_downloader_mailbox) ;
84+ merge_scheduler_service
85+ . ask ( schedule_merge)
86+ . await
87+ . context ( "failed to schedule parquet merge" ) ?;
88+ Ok ( ( ) )
89+ }
90+
7391struct ScheduledMerge {
7492 score : u64 ,
7593 id : u64 , //< just for total ordering.
@@ -103,6 +121,45 @@ impl Ord for ScheduledMerge {
103121 }
104122}
105123
124+ #[ cfg( feature = "metrics" ) ]
125+ struct ScheduledParquetMerge {
126+ score : u64 ,
127+ id : u64 ,
128+ merge_operation : TrackedObject < ParquetMergeOperation > ,
129+ split_downloader_mailbox : Mailbox < ParquetMergeSplitDownloader > ,
130+ }
131+
132+ #[ cfg( feature = "metrics" ) ]
133+ impl ScheduledParquetMerge {
134+ fn order_key ( & self ) -> ( u64 , Reverse < u64 > ) {
135+ ( self . score , Reverse ( self . id ) )
136+ }
137+ }
138+
139+ #[ cfg( feature = "metrics" ) ]
140+ impl Eq for ScheduledParquetMerge { }
141+
142+ #[ cfg( feature = "metrics" ) ]
143+ impl PartialEq for ScheduledParquetMerge {
144+ fn eq ( & self , other : & Self ) -> bool {
145+ self . cmp ( other) . is_eq ( )
146+ }
147+ }
148+
149+ #[ cfg( feature = "metrics" ) ]
150+ impl PartialOrd for ScheduledParquetMerge {
151+ fn partial_cmp ( & self , other : & Self ) -> Option < std:: cmp:: Ordering > {
152+ Some ( self . cmp ( other) )
153+ }
154+ }
155+
156+ #[ cfg( feature = "metrics" ) ]
157+ impl Ord for ScheduledParquetMerge {
158+ fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
159+ self . order_key ( ) . cmp ( & other. order_key ( ) )
160+ }
161+ }
162+
106163/// The merge scheduler service is in charge of keeping track of all scheduled merge operations,
107164/// and schedule them in the best possible order, respecting the `merge_concurrency` limit.
108165///
@@ -116,6 +173,8 @@ pub struct MergeSchedulerService {
116173 merge_semaphore : Arc < Semaphore > ,
117174 merge_concurrency : usize ,
118175 pending_merge_queue : BinaryHeap < ScheduledMerge > ,
176+ #[ cfg( feature = "metrics" ) ]
177+ pending_parquet_merge_queue : BinaryHeap < ScheduledParquetMerge > ,
119178 next_merge_id : u64 ,
120179 pending_merge_bytes : u64 ,
121180}
@@ -133,6 +192,8 @@ impl MergeSchedulerService {
133192 merge_semaphore,
134193 merge_concurrency,
135194 pending_merge_queue : BinaryHeap :: default ( ) ,
195+ #[ cfg( feature = "metrics" ) ]
196+ pending_parquet_merge_queue : BinaryHeap :: default ( ) ,
136197 next_merge_id : 0 ,
137198 pending_merge_bytes : 0 ,
138199 }
@@ -183,6 +244,50 @@ impl MergeSchedulerService {
183244 }
184245 }
185246 }
247+ // Dispatch pending Parquet merges (shares same semaphore).
248+ #[ cfg( feature = "metrics" ) ]
249+ loop {
250+ let merge_semaphore = self . merge_semaphore . clone ( ) ;
251+ let Some ( next_merge) = self . pending_parquet_merge_queue . peek_mut ( ) else {
252+ break ;
253+ } ;
254+ let Ok ( semaphore_permit) = Semaphore :: try_acquire_owned ( merge_semaphore) else {
255+ break ;
256+ } ;
257+ let merge_permit = MergePermit {
258+ _semaphore_permit : Some ( semaphore_permit) ,
259+ merge_scheduler_mailbox : Some ( ctx. mailbox ( ) . clone ( ) ) ,
260+ } ;
261+ let ScheduledParquetMerge {
262+ merge_operation,
263+ split_downloader_mailbox,
264+ ..
265+ } = PeekMut :: pop ( next_merge) ;
266+ let parquet_merge_task = ParquetMergeTask {
267+ merge_operation,
268+ merge_permit,
269+ } ;
270+ self . pending_merge_bytes -= parquet_merge_task. merge_operation . total_size_bytes ( ) ;
271+ crate :: metrics:: INDEXER_METRICS
272+ . pending_merge_operations
273+ . set (
274+ self . pending_merge_queue . len ( ) as i64
275+ + self . pending_parquet_merge_queue . len ( ) as i64 ,
276+ ) ;
277+ crate :: metrics:: INDEXER_METRICS
278+ . pending_merge_bytes
279+ . set ( self . pending_merge_bytes as i64 ) ;
280+ match split_downloader_mailbox. try_send_message ( parquet_merge_task) {
281+ Ok ( _) => { }
282+ Err ( quickwit_actors:: TrySendError :: Full ( _) ) => {
283+ error ! ( "parquet split downloader queue is full: please report" ) ;
284+ }
285+ Err ( quickwit_actors:: TrySendError :: Disconnected ) => {
286+ // The downloader is dead — pipeline probably restarted.
287+ }
288+ }
289+ }
290+
186291 let num_merges =
187292 self . merge_concurrency as i64 - self . merge_semaphore . available_permits ( ) as i64 ;
188293 crate :: metrics:: INDEXER_METRICS
@@ -293,6 +398,82 @@ impl Handler<PermitReleased> for MergeSchedulerService {
293398 }
294399}
295400
401+ // --- Parquet merge scheduling (feature-gated) ---
402+
403+ #[ cfg( feature = "metrics" ) ]
404+ fn score_parquet_merge_operation ( merge_operation : & ParquetMergeOperation ) -> u64 {
405+ let total_num_bytes = merge_operation. total_size_bytes ( ) ;
406+ if total_num_bytes == 0 {
407+ return u64:: MAX ;
408+ }
409+ let delta_num_splits = ( merge_operation. splits . len ( ) - 1 ) as u64 ;
410+ ( delta_num_splits << 48 )
411+ . checked_div ( total_num_bytes)
412+ . unwrap_or ( 1u64 )
413+ }
414+
415+ #[ cfg( feature = "metrics" ) ]
416+ #[ derive( Debug ) ]
417+ struct ScheduleParquetMerge {
418+ score : u64 ,
419+ merge_operation : TrackedObject < ParquetMergeOperation > ,
420+ split_downloader_mailbox : Mailbox < ParquetMergeSplitDownloader > ,
421+ }
422+
423+ #[ cfg( feature = "metrics" ) ]
424+ impl ScheduleParquetMerge {
425+ pub fn new (
426+ merge_operation : TrackedObject < ParquetMergeOperation > ,
427+ split_downloader_mailbox : Mailbox < ParquetMergeSplitDownloader > ,
428+ ) -> Self {
429+ let score = score_parquet_merge_operation ( & merge_operation) ;
430+ Self {
431+ score,
432+ merge_operation,
433+ split_downloader_mailbox,
434+ }
435+ }
436+ }
437+
438+ #[ cfg( feature = "metrics" ) ]
439+ #[ async_trait]
440+ impl Handler < ScheduleParquetMerge > for MergeSchedulerService {
441+ type Reply = ( ) ;
442+
443+ async fn handle (
444+ & mut self ,
445+ schedule_merge : ScheduleParquetMerge ,
446+ ctx : & ActorContext < Self > ,
447+ ) -> Result < ( ) , ActorExitStatus > {
448+ let ScheduleParquetMerge {
449+ score,
450+ merge_operation,
451+ split_downloader_mailbox,
452+ } = schedule_merge;
453+ let merge_id = self . next_merge_id ;
454+ self . next_merge_id += 1 ;
455+ let scheduled = ScheduledParquetMerge {
456+ score,
457+ id : merge_id,
458+ merge_operation,
459+ split_downloader_mailbox,
460+ } ;
461+ self . pending_merge_bytes += scheduled. merge_operation . total_size_bytes ( ) ;
462+ self . pending_parquet_merge_queue . push ( scheduled) ;
463+ crate :: metrics:: INDEXER_METRICS
464+ . pending_merge_operations
465+ . set (
466+ self . pending_merge_queue . len ( ) as i64
467+ + self . pending_parquet_merge_queue . len ( ) as i64 ,
468+ ) ;
469+ crate :: metrics:: INDEXER_METRICS
470+ . pending_merge_bytes
471+ . set ( self . pending_merge_bytes as i64 ) ;
472+ self . schedule_pending_merges ( ctx) ;
473+ Ok ( ( ) )
474+ }
475+ }
476+
296477#[ cfg( test) ]
297478mod tests {
298479 use std:: time:: Duration ;
0 commit comments