Skip to content

Commit dd1f8fc

Browse files
g-talbotclaude
andcommitted
feat: wire ParquetIndexingConfig and ParquetMergePolicyConfig to pipeline
Reads parquet_indexing.sort_fields and parquet_indexing.window_duration_secs from IndexingSettings when constructing the ingest pipeline's TableConfig (was hardcoded to defaults). Adds parquet_merge_policy_from_settings() that converts the config-layer ParquetMergePolicyConfig to an Arc<dyn ParquetMergePolicy> runtime policy, paralleling merge_policy_from_settings() for Tantivy. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 912301e commit dd1f8fc

2 files changed

Lines changed: 28 additions & 2 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/pipeline.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,9 +346,14 @@ impl MetricsPipeline {
346346
.set_kill_switch(self.kill_switch.clone())
347347
.spawn(uploader);
348348

349-
// ParquetPackager
349+
// ParquetPackager — read sort schema and window duration from index config.
350350
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
351-
let table_config = quickwit_parquet_engine::table_config::TableConfig::default();
351+
let parquet_indexing_config = &self.params.indexing_settings.parquet_indexing;
352+
let mut table_config = quickwit_parquet_engine::table_config::TableConfig::default();
353+
if let Some(ref sort_fields) = parquet_indexing_config.sort_fields {
354+
table_config.sort_fields = Some(sort_fields.clone());
355+
}
356+
table_config.window_duration_secs = parquet_indexing_config.window_duration_secs;
352357
let split_kind = if self.params.use_sketch_processors {
353358
quickwit_parquet_engine::split::ParquetSplitKind::Sketches
354359
} else {

quickwit/quickwit-indexing/src/merge_policy/mod.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,27 @@ pub fn default_merge_policy() -> Arc<dyn MergePolicy> {
192192
merge_policy_from_settings(&indexing_settings)
193193
}
194194

195+
/// Creates a Parquet merge policy from the index's `ParquetMergePolicyConfig`.
196+
#[cfg(feature = "metrics")]
197+
pub fn parquet_merge_policy_from_settings(
198+
settings: &IndexingSettings,
199+
) -> Arc<dyn quickwit_parquet_engine::merge::policy::ParquetMergePolicy> {
200+
let config = &settings.parquet_merge_policy;
201+
let engine_config = quickwit_parquet_engine::merge::policy::ParquetMergePolicyConfig {
202+
merge_factor: config.merge_factor,
203+
max_merge_factor: config.max_merge_factor,
204+
max_merge_ops: config.max_merge_ops,
205+
target_split_size_bytes: config.target_split_size_bytes,
206+
maturation_period: config.maturation_period,
207+
max_finalize_merge_operations: config.max_finalize_merge_operations,
208+
};
209+
Arc::new(
210+
quickwit_parquet_engine::merge::policy::ConstWriteAmplificationParquetMergePolicy::new(
211+
engine_config,
212+
),
213+
)
214+
}
215+
195216
pub fn nop_merge_policy() -> Arc<dyn MergePolicy> {
196217
Arc::new(NopMergePolicy)
197218
}

0 commit comments

Comments
 (0)