Skip to content

Commit 3a7ad73

Browse files
g-talbotclaude
andcommitted
fix: nightly rustfmt import ordering in parquet_merge_pipeline
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c4032eb commit 3a7ad73

4 files changed

Lines changed: 71 additions & 14 deletions

File tree

quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ impl Handler<DisconnectMergePlanner> for Publisher {
3939
_: DisconnectMergePlanner,
4040
_ctx: &ActorContext<Self>,
4141
) -> Result<(), ActorExitStatus> {
42+
// Clear both Tantivy and Parquet planner mailboxes. Each Publisher
43+
// instance only has one of these set (depending on which pipeline it
44+
// serves), but clearing both is safe and avoids needing separate
45+
// disconnect message types.
4246
info!("disconnecting merge planner mailbox");
4347
self.merge_planner_mailbox_opt = None;
4448
#[cfg(feature = "metrics")]

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use quickwit_common::temp_dir::TempDirectory;
4141
use quickwit_parquet_engine::merge::policy::ParquetMergePolicy;
4242
use quickwit_parquet_engine::split::ParquetSplitMetadata;
4343
use quickwit_proto::metastore::MetastoreServiceClient;
44+
use quickwit_storage::Storage;
4445
use tokio::sync::Semaphore;
4546
use tracing::{debug, error, info, instrument};
4647

@@ -51,14 +52,16 @@ use super::{METRICS_PUBLISHER_NAME, ParquetUploader};
5152
use crate::actors::pipeline_shared::wait_duration_before_retry;
5253
use crate::actors::publisher::DisconnectMergePlanner;
5354
use crate::actors::{MergeSchedulerService, Publisher, Sequencer, UploaderType};
54-
use quickwit_storage::Storage;
5555

56-
/// Spawning a merge pipeline puts pressure on the metastore, so we limit
57-
/// concurrent spawns (shared with Tantivy merge pipelines).
56+
/// Limits concurrent Parquet merge pipeline spawns to avoid overwhelming the
57+
/// metastore. This is a separate semaphore from the Tantivy merge pipeline's.
5858
static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10);
5959

6060
pub const SUPERVISE_LOOP_INTERVAL: Duration = Duration::from_secs(1);
6161

62+
/// Holds actor handles for health-checking and lifecycle management.
63+
/// When `None` in the parent pipeline, no actors are running (pre-spawn or
64+
/// post-terminate). The supervisor checks these on each `SuperviseLoop` tick.
6265
struct ParquetMergePipelineHandles {
6366
merge_planner: ActorHandle<ParquetMergePlanner>,
6467
merge_split_downloader: ActorHandle<ParquetMergeSplitDownloader>,
@@ -69,6 +72,9 @@ struct ParquetMergePipelineHandles {
6972
}
7073

7174
impl ParquetMergePipelineHandles {
75+
/// Rate-limits progress checks to once per HEARTBEAT interval.
76+
/// Without this, every supervision tick would check progress, which
77+
/// is wasteful — actors only need to demonstrate liveness periodically.
7278
fn should_check_for_progress(&mut self) -> bool {
7379
let now = Instant::now();
7480
let check_for_progress = now > self.next_check_for_progress;
@@ -94,10 +100,17 @@ struct Spawn {
94100
/// shutdown, in-flight merges drain to completion.
95101
pub struct ParquetMergePipeline {
96102
params: ParquetMergePipelineParams,
103+
/// The planner's mailbox and inbox are created once and recycled across
104+
/// pipeline restarts. This lets the publisher's feedback loop survive a
105+
/// respawn — messages sent to the old planner's mailbox are delivered to
106+
/// the new planner instance.
97107
merge_planner_mailbox: Mailbox<ParquetMergePlanner>,
98108
merge_planner_inbox: Inbox<ParquetMergePlanner>,
99109
handles_opt: Option<ParquetMergePipelineHandles>,
110+
/// Child kill switch — killing this kills all actors in the pipeline
111+
/// without affecting the supervisor itself.
100112
kill_switch: KillSwitch,
113+
/// Increments on each spawn. Used for log correlation.
101114
generation: usize,
102115
num_spawn_attempts: usize,
103116
/// Immature splits passed to the planner on first spawn. On subsequent
@@ -117,6 +130,8 @@ impl Actor for ParquetMergePipeline {
117130
"ParquetMergePipeline".to_string()
118131
}
119132

133+
/// Kicks off the pipeline by sending Spawn (which spawns all actors)
134+
/// followed by SuperviseLoop (which starts periodic health checks).
120135
async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
121136
self.handle(Spawn::default(), ctx).await?;
122137
self.handle(SuperviseLoop, ctx).await?;
@@ -165,6 +180,10 @@ impl ParquetMergePipeline {
165180
]
166181
}
167182

183+
/// Consolidates health from all supervised actors into a single verdict.
184+
/// Any single actor failure makes the whole pipeline unhealthy (triggers
185+
/// terminate + respawn). All actors exiting with Success means the pipeline
186+
/// completed (e.g., after shutdown drain).
168187
fn healthcheck(&self, check_for_progress: bool) -> Health {
169188
let mut healthy_actors: Vec<&str> = Vec::new();
170189
let mut failure_or_unhealthy_actors: Vec<&str> = Vec::new();
@@ -227,9 +246,12 @@ impl ParquetMergePipeline {
227246

228247
let immature_splits = self.initial_immature_splits_opt.take().unwrap_or_default();
229248

230-
// Spawn actors bottom-up so downstream mailboxes are available.
249+
// Spawn actors bottom-up: each actor's constructor needs a mailbox
250+
// for the actor below it in the chain, so we start from the publisher
251+
// (bottom) and work up to the planner (top).
231252

232-
// 1. Merge publisher
253+
// 1. Merge publisher — publishes merged splits to the metastore and feeds back
254+
// ParquetNewSplits to the planner for further merging.
233255
let merge_publisher = Publisher::new(
234256
METRICS_PUBLISHER_NAME,
235257
QueueCapacity::Unbounded,
@@ -244,7 +266,8 @@ impl ParquetMergePipeline {
244266
.set_kill_switch(self.kill_switch.clone())
245267
.spawn(merge_publisher);
246268

247-
// 2. Sequencer (preserves publish ordering)
269+
// 2. Sequencer — ensures merged splits are published in the order they were uploaded, even
270+
// if uploads complete out of order.
248271
let sequencer = Sequencer::new(merge_publisher_mailbox.clone());
249272
let (sequencer_mailbox, _sequencer_handle) = ctx
250273
.spawn_actor()
@@ -282,7 +305,9 @@ impl ParquetMergePipeline {
282305
.set_kill_switch(self.kill_switch.clone())
283306
.spawn(merge_split_downloader);
284307

285-
// 6. Merge planner (uses recycled mailbox)
308+
// 6. Merge planner — uses recycled mailbox/inbox so the publisher's
309+
// feedback loop (which holds a clone of the planner mailbox) survives
310+
// pipeline restarts without needing to be re-wired.
286311
let merge_planner = ParquetMergePlanner::new(
287312
immature_splits,
288313
self.params.merge_policy.clone(),
@@ -310,6 +335,8 @@ impl ParquetMergePipeline {
310335
Ok(())
311336
}
312337

338+
/// Kills all actors in the pipeline immediately. Used when the health check
339+
/// detects a failure — we tear everything down and schedule a respawn.
313340
async fn terminate(&mut self) {
314341
self.kill_switch.kill();
315342
if let Some(handles) = self.handles_opt.take() {
@@ -378,17 +405,24 @@ impl Handler<FinishPendingMergesAndShutdownPipeline> for ParquetMergePipeline {
378405
_ctx: &ActorContext<Self>,
379406
) -> Result<(), ActorExitStatus> {
380407
info!("shutdown parquet merge pipeline initiated");
408+
// Prevent respawn on failure from this point forward.
381409
self.shutdown_initiated = true;
382410
if let Some(handles) = &self.handles_opt {
383-
// Break the feedback loop: publisher stops sending ParquetNewSplits
384-
// to the planner.
411+
// Two-phase graceful shutdown:
412+
//
413+
// Phase 1: Break the feedback loop so completed merges don't
414+
// trigger new merge planning. Without this, the pipeline would
415+
// never drain — each completed merge feeds back new splits that
416+
// trigger more merges.
385417
let _ = handles
386418
.merge_publisher
387419
.mailbox()
388420
.send_message(DisconnectMergePlanner)
389421
.await;
390422

391-
// Run finalize policy for cold windows, then planner exits.
423+
// Phase 2: Run the finalize policy (merges cold-window stragglers
424+
// with a lower merge factor), then the planner exits. Downstream
425+
// actors drain naturally as their inboxes empty.
392426
let _ = handles
393427
.merge_planner
394428
.mailbox()
@@ -408,9 +442,11 @@ impl Handler<Spawn> for ParquetMergePipeline {
408442
spawn: Spawn,
409443
ctx: &ActorContext<Self>,
410444
) -> Result<(), ActorExitStatus> {
445+
// Don't respawn after graceful shutdown was requested.
411446
if self.shutdown_initiated {
412447
return Ok(());
413448
}
449+
// Don't spawn if actors are already running (duplicate Spawn message).
414450
if self.handles_opt.is_some() {
415451
return Ok(());
416452
}
@@ -434,12 +470,23 @@ impl Handler<Spawn> for ParquetMergePipeline {
434470
}
435471

436472
/// Parameters for spawning a `ParquetMergePipeline`.
473+
///
474+
/// Constructed by the IndexingService from `IndexConfig` + node-level settings.
475+
/// All actors in the pipeline share these resources via `Arc`/`Clone`.
437476
#[derive(Clone)]
438477
pub struct ParquetMergePipelineParams {
478+
/// Root temp directory for scratch files (downloads, merge output).
439479
pub indexing_directory: TempDirectory,
480+
/// Metastore client for staging/publishing merged splits and for
481+
/// re-seeding the planner with immature splits on respawn.
440482
pub metastore: MetastoreServiceClient,
483+
/// Object storage for downloading input splits and uploading merge output.
441484
pub storage: Arc<dyn Storage>,
485+
/// Determines which splits to merge and when. Read from the index's
486+
/// `parquet_merge_policy` config section.
442487
pub merge_policy: Arc<dyn ParquetMergePolicy>,
488+
/// Node-wide merge scheduler — shared with Tantivy merge pipelines for
489+
/// global concurrency control via a single semaphore.
443490
pub merge_scheduler_service: Mailbox<MergeSchedulerService>,
444491
pub max_concurrent_split_uploads: usize,
445492
pub event_broker: EventBroker,

quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,13 @@ impl Handler<ParquetSplitsUpdate> for Publisher {
8787
info!("publish-metrics-splits");
8888
suggest_truncate(ctx, &self.source_mailbox_opt, checkpoint_delta_opt).await;
8989

90-
// Feedback: notify the Parquet merge planner about newly published splits
91-
// so it can plan merges. Only for non-empty, non-replace operations (i.e.,
92-
// new ingested splits, not merge outputs which would cause infinite loops).
90+
// Feedback loop: notify the merge planner about newly ingested splits so
91+
// it can plan compaction merges.
92+
//
93+
// Guard: only feed back splits from ingest (replaced_split_ids is empty).
94+
// Merge outputs have non-empty replaced_split_ids — feeding those back
95+
// would create an infinite loop where each merge triggers another merge.
96+
// The DisconnectMergePlanner message breaks this loop during shutdown.
9397
if let Some(planner_mailbox) = &self.parquet_merge_planner_mailbox_opt
9498
&& !new_splits.is_empty()
9599
&& replaced_split_ids.is_empty()

quickwit/quickwit-indexing/src/actors/publisher.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ impl Publisher {
6767
}
6868
}
6969

70-
/// Set the Parquet merge planner mailbox for merge feedback.
70+
/// Sets the Parquet merge planner mailbox for merge feedback.
71+
/// Post-construction setter because the Publisher is created before the
72+
/// planner mailbox is available (bottom-up actor spawn order).
7173
#[cfg(feature = "metrics")]
7274
pub fn set_parquet_merge_planner_mailbox(
7375
mut self,

0 commit comments

Comments
 (0)