Skip to content

Commit ebc1b66

Browse files
g-talbotclaude
andauthored
feat: Phase 3d+3e — ParquetMergePipeline supervisor + publisher feedback (#6354)
* feat: add ParquetMergePipeline supervisor and publisher feedback (Phase 3d+3e) Phase 3 pipeline integration, combined supervisor and feedback PR: - ParquetMergePipeline supervisor: spawns all merge actors (publisher → sequencer → uploader → executor → downloader → planner), health-checks with periodic supervision loop, respawn on failure with backoff, graceful shutdown via FinishPendingMergesAndShutdownPipeline that disconnects feedback and runs finalize policy. 3 tests. - Publisher feedback: add parquet_merge_planner_mailbox_opt to Publisher (feature-gated behind cfg(feature = "metrics")). After successful ParquetSplitsUpdate publish of new ingested splits, sends ParquetNewSplits to the planner. Merge outputs (non-empty replaced_split_ids) are not fed back to avoid infinite loops. - DisconnectMergePlanner extended to clear both Tantivy and Parquet planner mailboxes, supporting shutdown drain for both pipeline types. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: nightly rustfmt import ordering in parquet_merge_pipeline Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: reflow comment to satisfy nightly rustfmt wrap_comments Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: feed back merge outputs to planner, matching Tantivy behavior The guard that filtered out merge outputs (non-empty replaced_split_ids) from the feedback loop was incorrect. Tantivy feeds ALL new splits to the merge planner — both ingest and merge outputs. Infinite loops are prevented by the merge policy's maturity checks (max_merge_ops, size, maturation_period), not by the publisher. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7b352cd commit ebc1b66

5 files changed

Lines changed: 662 additions & 0 deletions

File tree

quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,16 @@ impl Handler<DisconnectMergePlanner> for Publisher {
3939
_: DisconnectMergePlanner,
4040
_ctx: &ActorContext<Self>,
4141
) -> Result<(), ActorExitStatus> {
42+
// Clear both Tantivy and Parquet planner mailboxes. Each Publisher
43+
// instance only has one of these set (depending on which pipeline it
44+
// serves), but clearing both is safe and avoids needing separate
45+
// disconnect message types.
4246
info!("disconnecting merge planner mailbox");
4347
self.merge_planner_mailbox_opt = None;
48+
#[cfg(feature = "metrics")]
49+
{
50+
self.parquet_merge_planner_mailbox_opt = None;
51+
}
4452
Ok(())
4553
}
4654
}

quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ mod parquet_doc_processor;
2727
mod parquet_indexer;
2828
mod parquet_merge_executor;
2929
pub(crate) mod parquet_merge_messages;
30+
mod parquet_merge_pipeline;
3031
mod parquet_merge_planner;
3132
mod parquet_merge_split_downloader;
3233
mod parquet_packager;
@@ -50,6 +51,7 @@ pub use parquet_doc_processor::{
5051
pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch};
5152
pub use parquet_merge_executor::ParquetMergeExecutor;
5253
pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits};
54+
pub use parquet_merge_pipeline::{ParquetMergePipeline, ParquetMergePipelineParams};
5355
pub use parquet_merge_planner::ParquetMergePlanner;
5456
pub use parquet_merge_split_downloader::ParquetMergeSplitDownloader;
5557
pub use parquet_packager::{ParquetBatchForPackager, ParquetPackager, ParquetPackagerCounters};

0 commit comments

Comments
 (0)