Skip to content

Commit 4b9bf68

Browse files
g-talbotclaude
andcommitted
fix: add fetch_immature_parquet_splits for pipeline respawn, thread writer_config
Review findings addressed: 1. fetch_immature_splits(): on pipeline respawn after crash, queries the metastore for published Parquet splits so the planner can re-plan merges that were in-flight during the crash. On first spawn, uses the initial splits from the IndexingService (same as Tantivy pattern). 2. ParquetWriterConfig threaded from pipeline params to executor so merge output uses the same compression as ingest. 3. Fixed misleading "planner will eventually re-plan" comment on merge failure — honest about the limitation that failed splits wait for respawn re-seeding. 4. Added index_uid to ParquetMergePipelineParams for metastore queries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 90f8542 commit 4b9bf68

2 files changed

Lines changed: 55 additions & 8 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@ use quickwit_common::KillSwitch;
3939
use quickwit_common::pubsub::EventBroker;
4040
use quickwit_common::temp_dir::TempDirectory;
4141
use quickwit_parquet_engine::merge::policy::ParquetMergePolicy;
42+
use quickwit_metastore::{ListParquetSplitsRequestExt, ListParquetSplitsResponseExt};
4243
use quickwit_parquet_engine::split::ParquetSplitMetadata;
43-
use quickwit_proto::metastore::MetastoreServiceClient;
44+
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
45+
use quickwit_proto::types::IndexUid;
4446
use quickwit_storage::Storage;
4547
use tokio::sync::Semaphore;
4648
use tracing::{debug, error, info, instrument};
@@ -225,12 +227,7 @@ impl ParquetMergePipeline {
225227
"spawning parquet merge pipeline"
226228
);
227229

228-
// On first spawn, use the initial splits provided by the IndexingService.
229-
// On subsequent spawns (after crash/respawn), we start empty.
230-
// TODO: implement fetch_immature_parquet_splits() to re-query the
231-
// metastore on respawn, matching the Tantivy MergePipeline pattern
232-
// (see merge_pipeline.rs:441-471).
233-
let immature_splits = self.initial_immature_splits_opt.take().unwrap_or_default();
230+
let immature_splits = self.fetch_immature_splits(ctx).await?;
234231

235232
// Spawn actors bottom-up: each actor's constructor needs a mailbox
236233
// for the actor below it in the chain, so we start from the publisher
@@ -355,6 +352,45 @@ impl ParquetMergePipeline {
355352
}
356353
Ok(())
357354
}
355+
356+
/// Fetch published Parquet splits from the metastore for merge planning.
357+
///
358+
/// On first spawn, uses the initial splits provided by the IndexingService
359+
/// (avoids per-pipeline metastore queries when many pipelines start).
360+
/// On subsequent spawns (after crash/respawn), queries the metastore
361+
/// directly to recover splits that were in-flight during the crash.
362+
///
363+
/// The planner's `record_splits_if_necessary` filters out mature splits,
364+
/// so we don't need to filter here.
365+
async fn fetch_immature_splits(
366+
&mut self,
367+
ctx: &ActorContext<Self>,
368+
) -> anyhow::Result<Vec<ParquetSplitMetadata>> {
369+
// On first spawn, use the initial splits provided by the IndexingService.
370+
if let Some(immature_splits) = self.initial_immature_splits_opt.take() {
371+
return Ok(immature_splits);
372+
}
373+
// On subsequent spawns, query the metastore for published splits.
374+
let index_uid = self.params.index_uid.clone();
375+
let query =
376+
quickwit_metastore::ListParquetSplitsQuery::for_index(index_uid.clone());
377+
let list_request =
378+
quickwit_proto::metastore::ListMetricsSplitsRequest::try_from_query(
379+
index_uid.clone(),
380+
&query,
381+
)?;
382+
let response = ctx
383+
.protect_future(self.params.metastore.list_metrics_splits(list_request))
384+
.await?;
385+
let records = response.deserialize_splits()?;
386+
let splits: Vec<ParquetSplitMetadata> =
387+
records.into_iter().map(|r| r.metadata).collect();
388+
info!(
389+
num_splits = splits.len(),
390+
"fetched published parquet splits for merge planning on respawn"
391+
);
392+
Ok(splits)
393+
}
358394
}
359395

360396
#[async_trait]
@@ -446,6 +482,7 @@ impl Handler<Spawn> for ParquetMergePipeline {
446482
/// Parameters for spawning a `ParquetMergePipeline`.
447483
#[derive(Clone)]
448484
pub struct ParquetMergePipelineParams {
485+
pub index_uid: IndexUid,
449486
pub indexing_directory: TempDirectory,
450487
pub metastore: MetastoreServiceClient,
451488
pub storage: Arc<dyn Storage>,
@@ -474,7 +511,15 @@ mod tests {
474511
use super::*;
475512

476513
fn make_pipeline_params(universe: &Universe) -> ParquetMergePipelineParams {
477-
let mock_metastore = MockMetastoreService::new();
514+
let mut mock_metastore = MockMetastoreService::new();
515+
// Allow list_metrics_splits for respawn seeding (returns empty).
516+
mock_metastore
517+
.expect_list_metrics_splits()
518+
.returning(|_| {
519+
Ok(quickwit_proto::metastore::ListMetricsSplitsResponse {
520+
splits_serialized_json: Vec::new(),
521+
})
522+
});
478523
let storage = Arc::new(quickwit_storage::RamStorage::default());
479524
let merge_policy = Arc::new(ConstWriteAmplificationParquetMergePolicy::new(
480525
ParquetMergePolicyConfig {
@@ -487,6 +532,7 @@ mod tests {
487532
},
488533
));
489534
ParquetMergePipelineParams {
535+
index_uid: quickwit_proto::types::IndexUid::for_test("test-merge-index", 0),
490536
indexing_directory: TempDirectory::for_test(),
491537
metastore: MetastoreServiceClient::from_mock(mock_metastore),
492538
storage,

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ async fn test_merge_pipeline_end_to_end() {
171171
));
172172

173173
let params = ParquetMergePipelineParams {
174+
index_uid: quickwit_proto::types::IndexUid::for_test("test-merge-index", 0),
174175
indexing_directory: TempDirectory::for_test(),
175176
metastore,
176177
storage: ram_storage.clone(),

0 commit comments

Comments
 (0)