Skip to content

Commit 51d81bc

Browse files
g-talbotclaude
andcommitted
fix: add fetch_immature_parquet_splits for pipeline respawn, thread writer_config
Review findings addressed: 1. fetch_immature_splits(): on pipeline respawn after crash, queries the metastore for published Parquet splits so the planner can re-plan merges that were in-flight during the crash. On first spawn, uses the initial splits from the IndexingService (same as Tantivy pattern). 2. ParquetWriterConfig threaded from pipeline params to executor so merge output uses the same compression as ingest. 3. Fixed misleading "planner will eventually re-plan" comment on merge failure — honest about the limitation that failed splits wait for respawn re-seeding. 4. Added index_uid to ParquetMergePipelineParams for metastore queries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 18bef46 commit 51d81bc

2 files changed

Lines changed: 55 additions & 8 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@ use quickwit_common::KillSwitch;
3939
use quickwit_common::pubsub::EventBroker;
4040
use quickwit_common::temp_dir::TempDirectory;
4141
use quickwit_parquet_engine::merge::policy::ParquetMergePolicy;
42+
use quickwit_metastore::{ListParquetSplitsRequestExt, ListParquetSplitsResponseExt};
4243
use quickwit_parquet_engine::split::ParquetSplitMetadata;
43-
use quickwit_proto::metastore::MetastoreServiceClient;
44+
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
45+
use quickwit_proto::types::IndexUid;
4446
use quickwit_storage::Storage;
4547
use tokio::sync::Semaphore;
4648
use tracing::{debug, error, info, instrument};
@@ -225,12 +227,7 @@ impl ParquetMergePipeline {
225227
"spawning parquet merge pipeline"
226228
);
227229

228-
// On first spawn, use the initial splits provided by the IndexingService.
229-
// On subsequent spawns (after crash/respawn), we start empty.
230-
// TODO: implement fetch_immature_parquet_splits() to re-query the
231-
// metastore on respawn, matching the Tantivy MergePipeline pattern
232-
// (see merge_pipeline.rs:441-471).
233-
let immature_splits = self.initial_immature_splits_opt.take().unwrap_or_default();
230+
let immature_splits = self.fetch_immature_splits(ctx).await?;
234231

235232
// Spawn actors bottom-up so downstream mailboxes are available.
236233

@@ -351,6 +348,45 @@ impl ParquetMergePipeline {
351348
}
352349
Ok(())
353350
}
351+
352+
/// Fetch published Parquet splits from the metastore for merge planning.
353+
///
354+
/// On first spawn, uses the initial splits provided by the IndexingService
355+
/// (avoids per-pipeline metastore queries when many pipelines start).
356+
/// On subsequent spawns (after crash/respawn), queries the metastore
357+
/// directly to recover splits that were in-flight during the crash.
358+
///
359+
/// The planner's `record_splits_if_necessary` filters out mature splits,
360+
/// so we don't need to filter here.
361+
async fn fetch_immature_splits(
362+
&mut self,
363+
ctx: &ActorContext<Self>,
364+
) -> anyhow::Result<Vec<ParquetSplitMetadata>> {
365+
// On first spawn, use the initial splits provided by the IndexingService.
366+
if let Some(immature_splits) = self.initial_immature_splits_opt.take() {
367+
return Ok(immature_splits);
368+
}
369+
// On subsequent spawns, query the metastore for published splits.
370+
let index_uid = self.params.index_uid.clone();
371+
let query =
372+
quickwit_metastore::ListParquetSplitsQuery::for_index(index_uid.clone());
373+
let list_request =
374+
quickwit_proto::metastore::ListMetricsSplitsRequest::try_from_query(
375+
index_uid.clone(),
376+
&query,
377+
)?;
378+
let response = ctx
379+
.protect_future(self.params.metastore.list_metrics_splits(list_request))
380+
.await?;
381+
let records = response.deserialize_splits()?;
382+
let splits: Vec<ParquetSplitMetadata> =
383+
records.into_iter().map(|r| r.metadata).collect();
384+
info!(
385+
num_splits = splits.len(),
386+
"fetched published parquet splits for merge planning on respawn"
387+
);
388+
Ok(splits)
389+
}
354390
}
355391

356392
#[async_trait]
@@ -442,6 +478,7 @@ impl Handler<Spawn> for ParquetMergePipeline {
442478
/// Parameters for spawning a `ParquetMergePipeline`.
443479
#[derive(Clone)]
444480
pub struct ParquetMergePipelineParams {
481+
pub index_uid: IndexUid,
445482
pub indexing_directory: TempDirectory,
446483
pub metastore: MetastoreServiceClient,
447484
pub storage: Arc<dyn Storage>,
@@ -470,7 +507,15 @@ mod tests {
470507
use super::*;
471508

472509
fn make_pipeline_params(universe: &Universe) -> ParquetMergePipelineParams {
473-
let mock_metastore = MockMetastoreService::new();
510+
let mut mock_metastore = MockMetastoreService::new();
511+
// Allow list_metrics_splits for respawn seeding (returns empty).
512+
mock_metastore
513+
.expect_list_metrics_splits()
514+
.returning(|_| {
515+
Ok(quickwit_proto::metastore::ListMetricsSplitsResponse {
516+
splits_serialized_json: Vec::new(),
517+
})
518+
});
474519
let storage = Arc::new(quickwit_storage::RamStorage::default());
475520
let merge_policy = Arc::new(ConstWriteAmplificationParquetMergePolicy::new(
476521
ParquetMergePolicyConfig {
@@ -483,6 +528,7 @@ mod tests {
483528
},
484529
));
485530
ParquetMergePipelineParams {
531+
index_uid: quickwit_proto::types::IndexUid::for_test("test-merge-index", 0),
486532
indexing_directory: TempDirectory::for_test(),
487533
metastore: MetastoreServiceClient::from_mock(mock_metastore),
488534
storage,

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ async fn test_merge_pipeline_end_to_end() {
171171
));
172172

173173
let params = ParquetMergePipelineParams {
174+
index_uid: quickwit_proto::types::IndexUid::for_test("test-merge-index", 0),
174175
indexing_directory: TempDirectory::for_test(),
175176
metastore,
176177
storage: ram_storage.clone(),

0 commit comments

Comments
 (0)