Skip to content

Commit 18bef46

Browse files
g-talbotclaude
andcommitted
fix: thread ParquetWriterConfig through pipeline, fix misleading retry comment
Review findings: 1. ParquetWriterConfig was hardcoded to Default in the executor. If ingest uses custom compression, merge output would differ. Now threaded from ParquetMergePipelineParams through to the executor. 2. Fixed misleading comment claiming "planner will eventually re-plan" on merge failure. In reality, input splits are drained by operations() and won't be re-planned until the pipeline restarts with metastore re-seeding (not yet implemented — TODO added). 3. Added TODO for fetch_immature_parquet_splits() on pipeline respawn, matching the Tantivy MergePipeline pattern. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent fb8545f commit 18bef46

3 files changed

Lines changed: 33 additions & 7 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,18 @@ use crate::models::PublishLock;
4343
/// ready-to-upload Parquet files with complete metadata.
4444
pub struct ParquetMergeExecutor {
4545
uploader_mailbox: Mailbox<ParquetUploader>,
46+
writer_config: ParquetWriterConfig,
4647
}
4748

4849
impl ParquetMergeExecutor {
49-
pub fn new(uploader_mailbox: Mailbox<ParquetUploader>) -> Self {
50-
Self { uploader_mailbox }
50+
pub fn new(
51+
uploader_mailbox: Mailbox<ParquetUploader>,
52+
writer_config: ParquetWriterConfig,
53+
) -> Self {
54+
Self {
55+
uploader_mailbox,
56+
writer_config,
57+
}
5158
}
5259
}
5360

@@ -98,10 +105,11 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
98105
// Run the CPU-intensive merge on the dedicated thread pool.
99106
let input_paths = scratch.downloaded_parquet_files.clone();
100107
let output_dir_clone = output_dir.clone();
108+
let writer_config = self.writer_config.clone();
101109
let merge_result = run_cpu_intensive(move || {
102110
let config = MergeConfig {
103111
num_outputs: 1,
104-
writer_config: ParquetWriterConfig::default(),
112+
writer_config,
105113
};
106114
merge_sorted_parquet_files(&input_paths, &output_dir_clone, &config)
107115
})
@@ -113,16 +121,22 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
113121
warn!(
114122
error = %merge_err,
115123
merge_split_id = %merge_split_id,
116-
"parquet merge failed"
124+
"parquet merge failed — input splits will not be retried until \
125+
the pipeline restarts with metastore re-seeding"
117126
);
118-
// Drop the merge permit — the planner will eventually re-plan.
127+
// The input splits were drained from the planner by operations().
128+
// They remain published in the metastore but won't be re-planned
129+
// until the pipeline restarts and re-seeds from the metastore.
130+
// TODO: implement fetch_immature_parquet_splits() for respawn
131+
// (same as Tantivy's fetch_immature_splits pattern).
119132
return Ok(());
120133
}
121134
Err(panicked) => {
122135
warn!(
123136
error = %panicked,
124137
merge_split_id = %merge_split_id,
125-
"parquet merge panicked"
138+
"parquet merge panicked — input splits will not be retried until \
139+
the pipeline restarts with metastore re-seeding"
126140
);
127141
return Ok(());
128142
}

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,11 @@ impl ParquetMergePipeline {
225225
"spawning parquet merge pipeline"
226226
);
227227

228+
// On first spawn, use the initial splits provided by the IndexingService.
229+
// On subsequent spawns (after crash/respawn), we start empty.
230+
// TODO: implement fetch_immature_parquet_splits() to re-query the
231+
// metastore on respawn, matching the Tantivy MergePipeline pattern
232+
// (see merge_pipeline.rs:441-471).
228233
let immature_splits = self.initial_immature_splits_opt.take().unwrap_or_default();
229234

230235
// Spawn actors bottom-up so downstream mailboxes are available.
@@ -265,7 +270,8 @@ impl ParquetMergePipeline {
265270
.spawn(merge_uploader);
266271

267272
// 4. Merge executor
268-
let merge_executor = ParquetMergeExecutor::new(merge_uploader_mailbox);
273+
let merge_executor =
274+
ParquetMergeExecutor::new(merge_uploader_mailbox, self.params.writer_config.clone());
269275
let (merge_executor_mailbox, merge_executor_handle) = ctx
270276
.spawn_actor()
271277
.set_kill_switch(self.kill_switch.clone())
@@ -443,6 +449,10 @@ pub struct ParquetMergePipelineParams {
443449
pub merge_scheduler_service: Mailbox<MergeSchedulerService>,
444450
pub max_concurrent_split_uploads: usize,
445451
pub event_broker: EventBroker,
452+
/// Parquet writer config for merge output (compression, page size, etc.).
453+
/// Should match the ingest pipeline's writer config so merged files have
454+
/// consistent compression.
455+
pub writer_config: quickwit_parquet_engine::storage::ParquetWriterConfig,
446456
}
447457

448458
#[cfg(test)]
@@ -480,6 +490,7 @@ mod tests {
480490
merge_scheduler_service: universe.get_or_spawn_one(),
481491
max_concurrent_split_uploads: 4,
482492
event_broker: EventBroker::default(),
493+
writer_config: quickwit_parquet_engine::storage::ParquetWriterConfig::default(),
483494
}
484495
}
485496

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ async fn test_merge_pipeline_end_to_end() {
178178
merge_scheduler_service: universe.get_or_spawn_one(),
179179
max_concurrent_split_uploads: 4,
180180
event_broker: EventBroker::default(),
181+
writer_config: ParquetWriterConfig::default(),
181182
};
182183

183184
let initial_splits = vec![meta_a, meta_b];

0 commit comments

Comments
 (0)