Skip to content

Commit 761b379

Browse files
g-talbotclaude
andcommitted
fix: nightly rustfmt import ordering in parquet_merge_pipeline
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 792243f commit 761b379

1 file changed

Lines changed: 8 additions & 4 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use quickwit_common::temp_dir::TempDirectory;
4141
use quickwit_parquet_engine::merge::policy::ParquetMergePolicy;
4242
use quickwit_parquet_engine::split::ParquetSplitMetadata;
4343
use quickwit_proto::metastore::MetastoreServiceClient;
44+
use quickwit_storage::Storage;
4445
use tokio::sync::Semaphore;
4546
use tracing::{debug, error, info, instrument};
4647

@@ -51,7 +52,6 @@ use super::{METRICS_PUBLISHER_NAME, ParquetUploader};
5152
use crate::actors::pipeline_shared::wait_duration_before_retry;
5253
use crate::actors::publisher::DisconnectMergePlanner;
5354
use crate::actors::{MergeSchedulerService, Publisher, Sequencer, UploaderType};
54-
use quickwit_storage::Storage;
5555

5656
/// Spawning a merge pipeline puts pressure on the metastore, so we limit
5757
/// concurrent spawns (shared with Tantivy merge pipelines).
@@ -227,9 +227,12 @@ impl ParquetMergePipeline {
227227

228228
let immature_splits = self.initial_immature_splits_opt.take().unwrap_or_default();
229229

230-
// Spawn actors bottom-up so downstream mailboxes are available.
230+
// Spawn actors bottom-up: each actor's constructor needs a mailbox
231+
// for the actor below it in the chain, so we start from the publisher
232+
// (bottom) and work up to the planner (top).
231233

232-
// 1. Merge publisher
234+
// 1. Merge publisher — publishes merged splits to the metastore and
235+
// feeds back ParquetNewSplits to the planner for further merging.
233236
let merge_publisher = Publisher::new(
234237
METRICS_PUBLISHER_NAME,
235238
QueueCapacity::Unbounded,
@@ -244,7 +247,8 @@ impl ParquetMergePipeline {
244247
.set_kill_switch(self.kill_switch.clone())
245248
.spawn(merge_publisher);
246249

247-
// 2. Sequencer (preserves publish ordering)
250+
// 2. Sequencer — ensures merged splits are published in the order
251+
// they were uploaded, even if uploads complete out of order.
248252
let sequencer = Sequencer::new(merge_publisher_mailbox.clone());
249253
let (sequencer_mailbox, _sequencer_handle) = ctx
250254
.spawn_actor()

0 commit comments

Comments
 (0)