Skip to content

Commit 3227b37

Browse files
g-talbotclaude
andcommitted
feat: add ParquetMergePipeline supervisor and publisher feedback (Phase 3d+3e)
Phase 3 pipeline integration, combined supervisor and feedback PR: - ParquetMergePipeline supervisor: spawns all merge actors (publisher → sequencer → uploader → executor → downloader → planner), health-checks with periodic supervision loop, respawn on failure with backoff, graceful shutdown via FinishPendingMergesAndShutdownPipeline that disconnects feedback and runs finalize policy. 3 tests. - Publisher feedback: add parquet_merge_planner_mailbox_opt to Publisher (feature-gated behind cfg(feature = "metrics")). After successful ParquetSplitsUpdate publish of new ingested splits, sends ParquetNewSplits to the planner. Merge outputs (non-empty replaced_split_ids) are not fed back to avoid infinite loops. - DisconnectMergePlanner extended to clear both Tantivy and Parquet planner mailboxes, supporting shutdown drain for both pipeline types. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ceba410 commit 3227b37

5 files changed

Lines changed: 607 additions & 0 deletions

File tree

quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ impl Handler<DisconnectMergePlanner> for Publisher {
4141
) -> Result<(), ActorExitStatus> {
4242
info!("disconnecting merge planner mailbox");
4343
self.merge_planner_mailbox_opt = None;
44+
#[cfg(feature = "metrics")]
45+
{
46+
self.parquet_merge_planner_mailbox_opt = None;
47+
}
4448
Ok(())
4549
}
4650
}

quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ mod parquet_doc_processor;
2727
mod parquet_indexer;
2828
mod parquet_merge_executor;
2929
pub(crate) mod parquet_merge_messages;
30+
mod parquet_merge_pipeline;
3031
mod parquet_merge_planner;
3132
mod parquet_merge_split_downloader;
3233
mod parquet_packager;
@@ -50,6 +51,7 @@ pub use parquet_doc_processor::{
5051
pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch};
5152
pub use parquet_merge_executor::ParquetMergeExecutor;
5253
pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits};
54+
pub use parquet_merge_pipeline::{ParquetMergePipeline, ParquetMergePipelineParams};
5355
pub use parquet_merge_planner::ParquetMergePlanner;
5456
pub use parquet_merge_split_downloader::ParquetMergeSplitDownloader;
5557
pub use parquet_packager::{ParquetBatchForPackager, ParquetPackager, ParquetPackagerCounters};

0 commit comments

Comments
 (0)