Skip to content

Commit 927dc9f

Browse files
g-talbotclaude
andcommitted
fix: nightly rustfmt import ordering in parquet_merge_pipeline
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1b3fdfd commit 927dc9f

2 files changed

Lines changed: 11 additions & 5 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use quickwit_common::temp_dir::TempDirectory;
4141
use quickwit_parquet_engine::merge::policy::ParquetMergePolicy;
4242
use quickwit_parquet_engine::split::ParquetSplitMetadata;
4343
use quickwit_proto::metastore::MetastoreServiceClient;
44+
use quickwit_storage::Storage;
4445
use tokio::sync::Semaphore;
4546
use tracing::{debug, error, info, instrument};
4647

@@ -51,7 +52,6 @@ use super::{METRICS_PUBLISHER_NAME, ParquetUploader};
5152
use crate::actors::pipeline_shared::wait_duration_before_retry;
5253
use crate::actors::publisher::DisconnectMergePlanner;
5354
use crate::actors::{MergeSchedulerService, Publisher, Sequencer, UploaderType};
54-
use quickwit_storage::Storage;
5555

5656
/// Spawning a merge pipeline puts pressure on the metastore, so we limit
5757
/// concurrent spawns (shared with Tantivy merge pipelines).
@@ -227,9 +227,12 @@ impl ParquetMergePipeline {
227227

228228
let immature_splits = self.initial_immature_splits_opt.take().unwrap_or_default();
229229

230-
// Spawn actors bottom-up so downstream mailboxes are available.
230+
// Spawn actors bottom-up: each actor's constructor needs a mailbox
231+
// for the actor below it in the chain, so we start from the publisher
232+
// (bottom) and work up to the planner (top).
231233

232-
// 1. Merge publisher
234+
// 1. Merge publisher — publishes merged splits to the metastore and
235+
// feeds back ParquetNewSplits to the planner for further merging.
233236
let merge_publisher = Publisher::new(
234237
METRICS_PUBLISHER_NAME,
235238
QueueCapacity::Unbounded,
@@ -244,7 +247,8 @@ impl ParquetMergePipeline {
244247
.set_kill_switch(self.kill_switch.clone())
245248
.spawn(merge_publisher);
246249

247-
// 2. Sequencer (preserves publish ordering)
250+
// 2. Sequencer — ensures merged splits are published in the order
251+
// they were uploaded, even if uploads complete out of order.
248252
let sequencer = Sequencer::new(merge_publisher_mailbox.clone());
249253
let (sequencer_mailbox, _sequencer_handle) = ctx
250254
.spawn_actor()

quickwit/quickwit-indexing/src/actors/publisher.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ impl Publisher {
6767
}
6868
}
6969

70-
/// Set the Parquet merge planner mailbox for merge feedback.
70+
/// Sets the Parquet merge planner mailbox for merge feedback.
71+
/// Post-construction setter because the Publisher is created before the
72+
/// planner mailbox is available (bottom-up actor spawn order).
7173
#[cfg(feature = "metrics")]
7274
pub fn set_parquet_merge_planner_mailbox(
7375
mut self,

0 commit comments

Comments
 (0)