Skip to content

Commit a3ebcdb

Browse files
g-talbotclaude
andcommitted
feat: add MergeStatistics tracking to ParquetMergePipeline
Matches the Tantivy MergePipeline pattern: - ObservableState is now MergeStatistics (was unit type) - perform_observe() collects counters from uploader + publisher handles - Tracks generation, num_spawn_attempts, num_ongoing_merges, num_uploaded_splits, num_published_splits - previous_generations_statistics preserved across respawns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 51d81bc commit a3ebcdb

1 file changed

Lines changed: 44 additions & 13 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use super::{METRICS_PUBLISHER_NAME, ParquetUploader};
5454
use crate::actors::pipeline_shared::wait_duration_before_retry;
5555
use crate::actors::publisher::DisconnectMergePlanner;
5656
use crate::actors::{MergeSchedulerService, Publisher, Sequencer, UploaderType};
57+
use crate::models::MergeStatistics;
5758

5859
/// Spawning a merge pipeline puts pressure on the metastore, so we limit
5960
/// concurrent spawns (shared with Tantivy merge pipelines).
@@ -98,10 +99,10 @@ pub struct ParquetMergePipeline {
9899
params: ParquetMergePipelineParams,
99100
merge_planner_mailbox: Mailbox<ParquetMergePlanner>,
100101
merge_planner_inbox: Inbox<ParquetMergePlanner>,
102+
previous_generations_statistics: MergeStatistics,
103+
statistics: MergeStatistics,
101104
handles_opt: Option<ParquetMergePipelineHandles>,
102105
kill_switch: KillSwitch,
103-
generation: usize,
104-
num_spawn_attempts: usize,
105106
/// Immature splits passed to the planner on first spawn. On subsequent
106107
/// spawns (after crash/respawn), the planner starts empty and picks up
107108
/// new splits from the feedback loop.
@@ -111,9 +112,11 @@ pub struct ParquetMergePipeline {
111112

112113
#[async_trait]
113114
impl Actor for ParquetMergePipeline {
114-
type ObservableState = ();
115+
type ObservableState = MergeStatistics;
115116

116-
fn observable_state(&self) {}
117+
fn observable_state(&self) -> Self::ObservableState {
118+
self.statistics.clone()
119+
}
117120

118121
fn name(&self) -> String {
119122
"ParquetMergePipeline".to_string()
@@ -139,10 +142,10 @@ impl ParquetMergePipeline {
139142
);
140143
Self {
141144
params,
145+
previous_generations_statistics: MergeStatistics::default(),
146+
statistics: MergeStatistics::default(),
142147
handles_opt: None,
143148
kill_switch: KillSwitch::default(),
144-
generation: 0,
145-
num_spawn_attempts: 0,
146149
merge_planner_inbox,
147150
merge_planner_mailbox,
148151
initial_immature_splits_opt,
@@ -187,7 +190,7 @@ impl ParquetMergePipeline {
187190
}
188191
if !failure_or_unhealthy_actors.is_empty() {
189192
error!(
190-
generation = self.generation,
193+
generation = self.generation(),
191194
healthy_actors = ?healthy_actors,
192195
failed_or_unhealthy_actors = ?failure_or_unhealthy_actors,
193196
success_actors = ?success_actors,
@@ -197,32 +200,36 @@ impl ParquetMergePipeline {
197200
}
198201
if healthy_actors.is_empty() {
199202
info!(
200-
generation = self.generation,
203+
generation = self.generation(),
201204
"parquet merge pipeline completed successfully"
202205
);
203206
return Health::Success;
204207
}
205208
debug!(
206-
generation = self.generation,
209+
generation = self.generation(),
207210
healthy_actors = ?healthy_actors,
208211
success_actors = ?success_actors,
209212
"parquet merge pipeline is running and healthy"
210213
);
211214
Health::Healthy
212215
}
213216

214-
#[instrument(name="spawn_parquet_merge_pipeline", level="info", skip_all, fields(generation=self.generation))]
217+
fn generation(&self) -> usize {
218+
self.statistics.generation
219+
}
220+
221+
#[instrument(name="spawn_parquet_merge_pipeline", level="info", skip_all, fields(generation=self.generation()))]
215222
async fn spawn_pipeline(&mut self, ctx: &ActorContext<Self>) -> anyhow::Result<()> {
216223
let _spawn_permit = ctx
217224
.protect_future(SPAWN_PIPELINE_SEMAPHORE.acquire())
218225
.await
219226
.expect("semaphore should not be closed");
220227

221-
self.num_spawn_attempts += 1;
228+
self.statistics.num_spawn_attempts += 1;
222229
self.kill_switch = ctx.kill_switch().child();
223230

224231
info!(
225-
generation = self.generation,
232+
generation = self.generation(),
226233
root_dir = %self.params.indexing_directory.path().display(),
227234
"spawning parquet merge pipeline"
228235
);
@@ -301,7 +308,8 @@ impl ParquetMergePipeline {
301308
)
302309
.spawn(merge_planner);
303310

304-
self.generation += 1;
311+
self.previous_generations_statistics = self.statistics.clone();
312+
self.statistics.generation += 1;
305313
self.handles_opt = Some(ParquetMergePipelineHandles {
306314
merge_planner: merge_planner_handle,
307315
merge_split_downloader: merge_split_downloader_handle,
@@ -326,6 +334,28 @@ impl ParquetMergePipeline {
326334
}
327335
}
328336

337+
async fn perform_observe(&mut self) {
338+
let Some(handles) = &self.handles_opt else {
339+
return;
340+
};
341+
handles.merge_planner.refresh_observe();
342+
handles.merge_uploader.refresh_observe();
343+
handles.merge_publisher.refresh_observe();
344+
let num_ongoing_merges = crate::metrics::INDEXER_METRICS
345+
.ongoing_merge_operations
346+
.get();
347+
self.statistics = self
348+
.previous_generations_statistics
349+
.clone()
350+
.add_actor_counters(
351+
&handles.merge_uploader.last_observation(),
352+
&handles.merge_publisher.last_observation(),
353+
)
354+
.set_generation(self.statistics.generation)
355+
.set_num_spawn_attempts(self.statistics.num_spawn_attempts)
356+
.set_ongoing_merges(usize::try_from(num_ongoing_merges).unwrap_or(0));
357+
}
358+
329359
async fn perform_health_check(
330360
&mut self,
331361
ctx: &ActorContext<Self>,
@@ -398,6 +428,7 @@ impl Handler<SuperviseLoop> for ParquetMergePipeline {
398428
supervise_loop_token: SuperviseLoop,
399429
ctx: &ActorContext<Self>,
400430
) -> Result<(), ActorExitStatus> {
431+
self.perform_observe().await;
401432
self.perform_health_check(ctx).await?;
402433
ctx.schedule_self_msg(SUPERVISE_LOOP_INTERVAL, supervise_loop_token);
403434
Ok(())

0 commit comments

Comments
 (0)