Skip to content

Commit 83a2a98

Browse files
g-talbotclaude
andcommitted
fix: thread ParquetWriterConfig through pipeline, fix misleading retry comment
Review findings: 1. ParquetWriterConfig was hardcoded to Default in the executor. If ingest uses custom compression, merge output would differ. Now threaded from ParquetMergePipelineParams through to the executor. 2. Fixed misleading comment claiming "planner will eventually re-plan" on merge failure. In reality, input splits are drained by operations() and won't be re-planned until the pipeline restarts with metastore re-seeding (not yet implemented — TODO added). 3. Added TODO for fetch_immature_parquet_splits() on pipeline respawn, matching the Tantivy MergePipeline pattern. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4dbf884 commit 83a2a98

3 files changed

Lines changed: 35 additions & 15 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,18 @@ use crate::models::PublishLock;
4343
/// ready-to-upload Parquet files with complete metadata.
4444
pub struct ParquetMergeExecutor {
4545
uploader_mailbox: Mailbox<ParquetUploader>,
46+
writer_config: ParquetWriterConfig,
4647
}
4748

4849
impl ParquetMergeExecutor {
49-
pub fn new(uploader_mailbox: Mailbox<ParquetUploader>) -> Self {
50-
Self { uploader_mailbox }
50+
pub fn new(
51+
uploader_mailbox: Mailbox<ParquetUploader>,
52+
writer_config: ParquetWriterConfig,
53+
) -> Self {
54+
Self {
55+
uploader_mailbox,
56+
writer_config,
57+
}
5158
}
5259
}
5360

@@ -98,36 +105,38 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
98105
// Run the CPU-intensive merge on the dedicated thread pool.
99106
let input_paths = scratch.downloaded_parquet_files.clone();
100107
let output_dir_clone = output_dir.clone();
108+
let writer_config = self.writer_config.clone();
101109
let merge_result = run_cpu_intensive(move || {
102110
let config = MergeConfig {
103111
num_outputs: 1,
104-
writer_config: ParquetWriterConfig::default(),
112+
writer_config,
105113
};
106114
merge_sorted_parquet_files(&input_paths, &output_dir_clone, &config)
107115
})
108116
.await;
109117

110-
// We return Ok(()) on merge failure rather than Err to keep the actor
111-
// alive — same strategy as Tantivy's MergeExecutor. This prevents a
112-
// single "split of death" from crash-looping the entire pipeline.
113-
// The trade-off: failed splits aren't retried until pipeline respawn.
114118
let outputs: Vec<MergeOutputFile> = match merge_result {
115119
Ok(Ok(outputs)) => outputs,
116120
Ok(Err(merge_err)) => {
117121
warn!(
118122
error = %merge_err,
119123
merge_split_id = %merge_split_id,
120-
"parquet merge failed"
124+
"parquet merge failed — input splits will not be retried until \
125+
the pipeline restarts with metastore re-seeding"
121126
);
122-
// Input splits were drained from the planner by operations().
123-
// They remain published but won't be re-planned until respawn.
127+
// The input splits were drained from the planner by operations().
128+
// They remain published in the metastore but won't be re-planned
129+
// until the pipeline restarts and re-seeds from the metastore.
130+
// TODO: implement fetch_immature_parquet_splits() for respawn
131+
// (same as Tantivy's fetch_immature_splits pattern).
124132
return Ok(());
125133
}
126134
Err(panicked) => {
127135
warn!(
128136
error = %panicked,
129137
merge_split_id = %merge_split_id,
130-
"parquet merge panicked"
138+
"parquet merge panicked — input splits will not be retried until \
139+
the pipeline restarts with metastore re-seeding"
131140
);
132141
return Ok(());
133142
}
@@ -160,9 +169,8 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
160169
.context("failed to build merge output metadata")
161170
.map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?;
162171

163-
// The merge engine writes to a temp filename (merge_output_*.parquet).
164-
// Rename to {split_id}.parquet so the uploader can find it at the
165-
// path derived from ParquetSplitMetadata::parquet_filename().
172+
// Rename the output file to match the generated split ID.
173+
// The uploader expects files at `output_dir/{split_id}.parquet`.
166174
let expected_path = output_dir.join(&metadata.parquet_file);
167175
if output.path != expected_path {
168176
std::fs::rename(&output.path, &expected_path)

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,11 @@ impl ParquetMergePipeline {
225225
"spawning parquet merge pipeline"
226226
);
227227

228+
// On first spawn, use the initial splits provided by the IndexingService.
229+
// On subsequent spawns (after crash/respawn), we start empty.
230+
// TODO: implement fetch_immature_parquet_splits() to re-query the
231+
// metastore on respawn, matching the Tantivy MergePipeline pattern
232+
// (see merge_pipeline.rs:441-471).
228233
let immature_splits = self.initial_immature_splits_opt.take().unwrap_or_default();
229234

230235
// Spawn actors bottom-up: each actor's constructor needs a mailbox
@@ -269,7 +274,8 @@ impl ParquetMergePipeline {
269274
.spawn(merge_uploader);
270275

271276
// 4. Merge executor
272-
let merge_executor = ParquetMergeExecutor::new(merge_uploader_mailbox);
277+
let merge_executor =
278+
ParquetMergeExecutor::new(merge_uploader_mailbox, self.params.writer_config.clone());
273279
let (merge_executor_mailbox, merge_executor_handle) = ctx
274280
.spawn_actor()
275281
.set_kill_switch(self.kill_switch.clone())
@@ -447,6 +453,10 @@ pub struct ParquetMergePipelineParams {
447453
pub merge_scheduler_service: Mailbox<MergeSchedulerService>,
448454
pub max_concurrent_split_uploads: usize,
449455
pub event_broker: EventBroker,
456+
/// Parquet writer config for merge output (compression, page size, etc.).
457+
/// Should match the ingest pipeline's writer config so merged files have
458+
/// consistent compression.
459+
pub writer_config: quickwit_parquet_engine::storage::ParquetWriterConfig,
450460
}
451461

452462
#[cfg(test)]
@@ -484,6 +494,7 @@ mod tests {
484494
merge_scheduler_service: universe.get_or_spawn_one(),
485495
max_concurrent_split_uploads: 4,
486496
event_broker: EventBroker::default(),
497+
writer_config: quickwit_parquet_engine::storage::ParquetWriterConfig::default(),
487498
}
488499
}
489500

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ async fn test_merge_pipeline_end_to_end() {
178178
merge_scheduler_service: universe.get_or_spawn_one(),
179179
max_concurrent_split_uploads: 4,
180180
event_broker: EventBroker::default(),
181+
writer_config: ParquetWriterConfig::default(),
181182
};
182183

183184
let initial_splits = vec![meta_a, meta_b];

0 commit comments

Comments
 (0)