@@ -54,6 +54,7 @@ use super::{METRICS_PUBLISHER_NAME, ParquetUploader};
5454use crate :: actors:: pipeline_shared:: wait_duration_before_retry;
5555use crate :: actors:: publisher:: DisconnectMergePlanner ;
5656use crate :: actors:: { MergeSchedulerService , Publisher , Sequencer , UploaderType } ;
57+ use crate :: models:: MergeStatistics ;
5758
5859/// Spawning a merge pipeline puts pressure on the metastore, so we limit
5960/// concurrent spawns (shared with Tantivy merge pipelines).
@@ -98,10 +99,10 @@ pub struct ParquetMergePipeline {
9899 params : ParquetMergePipelineParams ,
99100 merge_planner_mailbox : Mailbox < ParquetMergePlanner > ,
100101 merge_planner_inbox : Inbox < ParquetMergePlanner > ,
102+ previous_generations_statistics : MergeStatistics ,
103+ statistics : MergeStatistics ,
101104 handles_opt : Option < ParquetMergePipelineHandles > ,
102105 kill_switch : KillSwitch ,
103- generation : usize ,
104- num_spawn_attempts : usize ,
105106 /// Immature splits passed to the planner on first spawn. On subsequent
106107 /// spawns (after crash/respawn), the planner starts empty and picks up
107108 /// new splits from the feedback loop.
@@ -111,9 +112,11 @@ pub struct ParquetMergePipeline {
111112
112113#[ async_trait]
113114impl Actor for ParquetMergePipeline {
114- type ObservableState = ( ) ;
115+ type ObservableState = MergeStatistics ;
115116
116- fn observable_state ( & self ) { }
117+ fn observable_state ( & self ) -> Self :: ObservableState {
118+ self . statistics . clone ( )
119+ }
117120
118121 fn name ( & self ) -> String {
119122 "ParquetMergePipeline" . to_string ( )
@@ -139,10 +142,10 @@ impl ParquetMergePipeline {
139142 ) ;
140143 Self {
141144 params,
145+ previous_generations_statistics : MergeStatistics :: default ( ) ,
146+ statistics : MergeStatistics :: default ( ) ,
142147 handles_opt : None ,
143148 kill_switch : KillSwitch :: default ( ) ,
144- generation : 0 ,
145- num_spawn_attempts : 0 ,
146149 merge_planner_inbox,
147150 merge_planner_mailbox,
148151 initial_immature_splits_opt,
@@ -187,7 +190,7 @@ impl ParquetMergePipeline {
187190 }
188191 if !failure_or_unhealthy_actors. is_empty ( ) {
189192 error ! (
190- generation = self . generation,
193+ generation = self . generation( ) ,
191194 healthy_actors = ?healthy_actors,
192195 failed_or_unhealthy_actors = ?failure_or_unhealthy_actors,
193196 success_actors = ?success_actors,
@@ -197,32 +200,36 @@ impl ParquetMergePipeline {
197200 }
198201 if healthy_actors. is_empty ( ) {
199202 info ! (
200- generation = self . generation,
203+ generation = self . generation( ) ,
201204 "parquet merge pipeline completed successfully"
202205 ) ;
203206 return Health :: Success ;
204207 }
205208 debug ! (
206- generation = self . generation,
209+ generation = self . generation( ) ,
207210 healthy_actors = ?healthy_actors,
208211 success_actors = ?success_actors,
209212 "parquet merge pipeline is running and healthy"
210213 ) ;
211214 Health :: Healthy
212215 }
213216
214- #[ instrument( name="spawn_parquet_merge_pipeline" , level="info" , skip_all, fields( generation=self . generation) ) ]
217+ fn generation ( & self ) -> usize {
218+ self . statistics . generation
219+ }
220+
221+ #[ instrument( name="spawn_parquet_merge_pipeline" , level="info" , skip_all, fields( generation=self . generation( ) ) ) ]
215222 async fn spawn_pipeline ( & mut self , ctx : & ActorContext < Self > ) -> anyhow:: Result < ( ) > {
216223 let _spawn_permit = ctx
217224 . protect_future ( SPAWN_PIPELINE_SEMAPHORE . acquire ( ) )
218225 . await
219226 . expect ( "semaphore should not be closed" ) ;
220227
221- self . num_spawn_attempts += 1 ;
228+ self . statistics . num_spawn_attempts += 1 ;
222229 self . kill_switch = ctx. kill_switch ( ) . child ( ) ;
223230
224231 info ! (
225- generation = self . generation,
232+ generation = self . generation( ) ,
226233 root_dir = %self . params. indexing_directory. path( ) . display( ) ,
227234 "spawning parquet merge pipeline"
228235 ) ;
@@ -305,7 +312,8 @@ impl ParquetMergePipeline {
305312 )
306313 . spawn ( merge_planner) ;
307314
308- self . generation += 1 ;
315+ self . previous_generations_statistics = self . statistics . clone ( ) ;
316+ self . statistics . generation += 1 ;
309317 self . handles_opt = Some ( ParquetMergePipelineHandles {
310318 merge_planner : merge_planner_handle,
311319 merge_split_downloader : merge_split_downloader_handle,
@@ -330,6 +338,28 @@ impl ParquetMergePipeline {
330338 }
331339 }
332340
341+ async fn perform_observe ( & mut self ) {
342+ let Some ( handles) = & self . handles_opt else {
343+ return ;
344+ } ;
345+ handles. merge_planner . refresh_observe ( ) ;
346+ handles. merge_uploader . refresh_observe ( ) ;
347+ handles. merge_publisher . refresh_observe ( ) ;
348+ let num_ongoing_merges = crate :: metrics:: INDEXER_METRICS
349+ . ongoing_merge_operations
350+ . get ( ) ;
351+ self . statistics = self
352+ . previous_generations_statistics
353+ . clone ( )
354+ . add_actor_counters (
355+ & handles. merge_uploader . last_observation ( ) ,
356+ & handles. merge_publisher . last_observation ( ) ,
357+ )
358+ . set_generation ( self . statistics . generation )
359+ . set_num_spawn_attempts ( self . statistics . num_spawn_attempts )
360+ . set_ongoing_merges ( usize:: try_from ( num_ongoing_merges) . unwrap_or ( 0 ) ) ;
361+ }
362+
333363 async fn perform_health_check (
334364 & mut self ,
335365 ctx : & ActorContext < Self > ,
@@ -402,6 +432,7 @@ impl Handler<SuperviseLoop> for ParquetMergePipeline {
402432 supervise_loop_token : SuperviseLoop ,
403433 ctx : & ActorContext < Self > ,
404434 ) -> Result < ( ) , ActorExitStatus > {
435+ self . perform_observe ( ) . await ;
405436 self . perform_health_check ( ctx) . await ?;
406437 ctx. schedule_self_msg ( SUPERVISE_LOOP_INTERVAL , supervise_loop_token) ;
407438 Ok ( ( ) )
0 commit comments