Skip to content

Commit 4cafb74

Browse files
g-talbotclaude
andauthored
feat: compute per-output split metadata in merge engine (#6359)
* feat: add configurable ParquetMergePolicyConfig to index settings Adds `parquet_merge_policy` section to `IndexingSettings`, making the Parquet merge policy configurable per-index via YAML. Parameters: - merge_factor (default 10): min splits to trigger a merge - max_merge_factor (default 12): max splits per merge - max_merge_ops (default 4): bounds write amplification - target_split_size_bytes (default 256 MiB): target output size - maturation_period (default 48h): split maturity timeout - max_finalize_merge_operations (default 3): cold-window shutdown limit Mirrors the existing merge_policy config pattern for logs/traces. Updates index-config.md documentation with the new section. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: add ParquetIndexingConfig with sort_fields and window_duration_secs Adds `parquet_indexing` section to `IndexingSettings` for per-index Parquet pipeline configuration: - `sort_fields`: sort schema override (Husky-style pipe-delimited syntax with /V2 suffix). Controls row ordering, query pruning, compression locality, and compaction scope. When omitted, uses the product-type default. - `window_duration_secs`: time window for split partitioning (default 900s / 15 min). Must divide 3600. Updates docs/configuration/index-config.md with: - "Parquet indexing settings" section explaining both parameters - Full sort schema syntax reference (column types, direction overrides, & LSM cutoff marker) - Examples showing minimal, custom, and advanced configurations Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: update indexing service fingerprint constants and nightly fmt Adding ParquetMergePolicyConfig and ParquetIndexingConfig to IndexingSettings changes the Hash output, which changes the pipeline params fingerprints. Updated the hardcoded test constants. Added a comment explaining how to recompute them when IndexingSettings fields change. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: compute per-output split metadata in merge engine The merge engine now extracts metric_names, time_range, and low_cardinality_tags from each output file's actual rows during the merge write pass. Previously, MergeOutputFile only contained physical metadata (num_rows, size_bytes, row_keys, zonemaps). The downstream metadata_aggregation function inferred logical metadata by unioning all input splits — which is incorrect when num_outputs > 1, since each output contains only a subset of the globally sorted rows. Now each MergeOutputFile carries: - metric_names: distinct metrics in this output's rows - time_range: min/max timestamp_secs from this output's rows - low_cardinality_tags: service names from this output's rows Reuses existing extract_metric_names, extract_service_names, and extract_time_range from split_writer (made pub(crate)). Includes test that verifies per-output metadata is computed from actual rows when merging 2 inputs into 2 outputs with different metric names. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: nightly rustfmt import ordering Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a36f4fe commit 4cafb74

5 files changed

Lines changed: 174 additions & 46 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ struct InputMetadata {
7070
}
7171

7272
/// Result of a single output file from the merge.
73+
///
74+
/// Contains both physical metadata (file size, row count) and per-output
75+
/// logical metadata (metric names, tags, time range) extracted from the
76+
/// actual rows in this output file. When the merge produces multiple
77+
/// outputs, each has metadata reflecting only its own rows.
7378
pub struct MergeOutputFile {
7479
/// Path to the output Parquet file.
7580
pub path: PathBuf,
@@ -85,6 +90,16 @@ pub struct MergeOutputFile {
8590

8691
/// Per-column zonemap regex strings.
8792
pub zonemap_regexes: std::collections::HashMap<String, String>,
93+
94+
/// Distinct metric names in this output file.
95+
pub metric_names: std::collections::HashSet<String>,
96+
97+
/// Time range covered by rows in this output file.
98+
pub time_range: crate::split::TimeRange,
99+
100+
/// Low-cardinality tag values extracted from this output file's rows.
101+
/// Currently tracks "service" to match the ingest path.
102+
pub low_cardinality_tags: std::collections::HashMap<String, std::collections::HashSet<String>>,
88103
}
89104

90105
/// Merge N sorted Parquet files into M sorted output files.

quickwit/quickwit-parquet-engine/src/merge/tests.rs

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,86 @@ fn test_merge_multiple_outputs() {
274274
let total_rows: usize = outputs.iter().map(|o| o.num_rows).sum();
275275
assert_eq!(total_rows, 6);
276276

277-
// Each output should have row keys.
277+
// Each output should have row keys and per-output metadata.
278+
let mut all_metric_names: std::collections::HashSet<String> = std::collections::HashSet::new();
278279
for output in &outputs {
279280
assert!(output.row_keys_proto.is_some());
281+
// Each output has its own metric names (subset of all inputs).
282+
assert!(!output.metric_names.is_empty());
283+
all_metric_names.extend(output.metric_names.iter().cloned());
284+
// Time range should be valid.
285+
assert!(output.time_range.start_secs <= output.time_range.end_secs);
280286
}
287+
// The union of all output metric names should be the full set.
288+
assert!(all_metric_names.contains("alpha"));
289+
assert!(all_metric_names.contains("beta"));
290+
assert!(all_metric_names.contains("gamma"));
291+
}
292+
293+
/// Verifies that per-output metadata (metric_names, time_range) is computed
294+
/// from the actual rows in each output file, not aggregated from all inputs.
295+
#[test]
296+
fn test_merge_per_output_metadata_from_actual_rows() {
297+
let dir = TempDir::new().unwrap();
298+
299+
// Input 1: metric "cpu" at timestamps 100, 200
300+
let input1 = write_test_split(
301+
dir.path(),
302+
"in1.parquet",
303+
&["cpu", "cpu"],
304+
&[100, 200],
305+
&[1.0, 2.0],
306+
&[1, 1],
307+
);
308+
// Input 2: metric "mem" at timestamps 300, 400
309+
let input2 = write_test_split(
310+
dir.path(),
311+
"in2.parquet",
312+
&["mem", "mem"],
313+
&[300, 400],
314+
&[3.0, 4.0],
315+
&[2, 2],
316+
);
317+
318+
let output_dir = dir.path().join("output");
319+
std::fs::create_dir_all(&output_dir).unwrap();
320+
321+
// Single output: should contain all metrics and full time range.
322+
let config = MergeConfig {
323+
num_outputs: 1,
324+
writer_config: ParquetWriterConfig::default(),
325+
};
326+
let outputs =
327+
merge_sorted_parquet_files(&[input1.clone(), input2.clone()], &output_dir, &config)
328+
.unwrap();
329+
assert_eq!(outputs.len(), 1);
330+
let output = &outputs[0];
331+
assert!(output.metric_names.contains("cpu"));
332+
assert!(output.metric_names.contains("mem"));
333+
assert_eq!(output.time_range.start_secs, 100);
334+
assert_eq!(output.time_range.end_secs, 401); // end is exclusive
335+
336+
// Two outputs: each should have only its own metrics and time range.
337+
let output_dir2 = dir.path().join("output2");
338+
std::fs::create_dir_all(&output_dir2).unwrap();
339+
let config2 = MergeConfig {
340+
num_outputs: 2,
341+
writer_config: ParquetWriterConfig::default(),
342+
};
343+
let outputs2 = merge_sorted_parquet_files(&[input1, input2], &output_dir2, &config2).unwrap();
344+
assert_eq!(outputs2.len(), 2);
345+
346+
// After sorted merge, "cpu" (sorted_series for tsid=1) and "mem" (tsid=2)
347+
// are in separate outputs. Each output should have only one metric.
348+
let output_a = &outputs2[0];
349+
let output_b = &outputs2[1];
350+
assert_eq!(output_a.metric_names.len(), 1);
351+
assert_eq!(output_b.metric_names.len(), 1);
352+
assert_ne!(output_a.metric_names, output_b.metric_names);
353+
354+
// Time ranges should be disjoint or specific to each output's rows.
355+
assert!(output_a.time_range.start_secs <= output_a.time_range.end_secs);
356+
assert!(output_b.time_range.start_secs <= output_b.time_range.end_secs);
281357
}
282358

283359
#[test]

quickwit/quickwit-parquet-engine/src/merge/writer.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ use super::{InputMetadata, MergeConfig, MergeOutputFile};
3939
use crate::row_keys;
4040
use crate::sort_fields::parse_sort_fields;
4141
use crate::sorted_series::SORTED_SERIES_COLUMN;
42+
use crate::split::TAG_SERVICE;
43+
use crate::storage::split_writer::{
44+
extract_metric_names, extract_service_names, extract_time_range,
45+
};
4246
use crate::storage::{
4347
PARQUET_META_NUM_MERGE_OPS, PARQUET_META_ROW_KEYS, PARQUET_META_ROW_KEYS_JSON,
4448
PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START,
@@ -114,6 +118,19 @@ pub fn write_merge_outputs(
114118
let output_filename = format!("merge_output_{}.parquet", Ulid::new());
115119
let output_path = output_dir.join(&output_filename);
116120

121+
// Extract per-output logical metadata from the actual rows.
122+
let metric_names = extract_metric_names(&sorted_batch)
123+
.context("extracting metric names from merge output")?;
124+
let time_range =
125+
extract_time_range(&sorted_batch).context("extracting time range from merge output")?;
126+
let service_names = extract_service_names(&sorted_batch)
127+
.context("extracting service names from merge output")?;
128+
129+
let mut low_cardinality_tags = std::collections::HashMap::new();
130+
if !service_names.is_empty() {
131+
low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names);
132+
}
133+
117134
let size_bytes = write_parquet_file(&sorted_batch, &output_path, props)?;
118135

119136
outputs.push(MergeOutputFile {
@@ -122,6 +139,9 @@ pub fn write_merge_outputs(
122139
size_bytes,
123140
row_keys_proto,
124141
zonemap_regexes,
142+
metric_names,
143+
time_range,
144+
low_cardinality_tags,
125145
});
126146
}
127147

quickwit/quickwit-parquet-engine/src/storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
//! Storage layer for Parquet files.
1616
1717
mod config;
18-
mod split_writer;
18+
pub(crate) mod split_writer;
1919
mod writer;
2020

2121
pub use config::{Compression, ParquetWriterConfig};

quickwit/quickwit-parquet-engine/src/storage/split_writer.rs

Lines changed: 61 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl ParquetSplitWriter {
170170
}
171171

172172
/// Extracts the time range (min/max timestamp_secs) from a RecordBatch.
173-
fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, ParquetWriteError> {
173+
pub(crate) fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, ParquetWriteError> {
174174
let timestamp_idx = batch
175175
.schema()
176176
.index_of("timestamp_secs")
@@ -194,69 +194,86 @@ fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, ParquetWriteErro
194194
}
195195

196196
/// Extracts distinct metric names from a RecordBatch.
197-
fn extract_metric_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
197+
pub(crate) fn extract_metric_names(
198+
batch: &RecordBatch,
199+
) -> Result<HashSet<String>, ParquetWriteError> {
198200
let metric_idx = batch
199201
.schema()
200202
.index_of("metric_name")
201203
.map_err(|_| ParquetWriteError::SchemaValidation("missing metric_name column".into()))?;
202-
let metric_col = batch.column(metric_idx);
203-
let mut names = HashSet::new();
204-
205-
// The column is Dictionary(Int32, Utf8)
206-
if let Some(dict_array) = metric_col
207-
.as_any()
208-
.downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
209-
{
210-
let values = dict_array.values();
211-
if let Some(string_values) = values.as_any().downcast_ref::<arrow::array::StringArray>() {
212-
// Get all dictionary values that are actually used
213-
for i in 0..dict_array.len() {
214-
if !dict_array.is_null(i)
215-
&& let Ok(key) = dict_array.keys().value(i).try_into()
216-
{
217-
let key: usize = key;
218-
if key < string_values.len() && !string_values.is_null(key) {
219-
names.insert(string_values.value(key).to_string());
220-
}
221-
}
222-
}
223-
}
224-
}
225-
226-
Ok(names)
204+
extract_distinct_strings(batch.column(metric_idx))
227205
}
228206

229207
/// Extracts distinct service names from a RecordBatch.
230-
fn extract_service_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
208+
pub(crate) fn extract_service_names(
209+
batch: &RecordBatch,
210+
) -> Result<HashSet<String>, ParquetWriteError> {
231211
let service_idx = match batch.schema().index_of("service").ok() {
232212
Some(idx) => idx,
233213
None => return Ok(HashSet::new()),
234214
};
235-
let service_col = batch.column(service_idx);
236-
let mut names = HashSet::new();
215+
extract_distinct_strings(batch.column(service_idx))
216+
}
237217

238-
// The column is Dictionary(Int32, Utf8)
239-
if let Some(dict_array) = service_col
218+
/// Extracts distinct non-null string values from a column.
219+
///
220+
/// Handles both Dictionary(Int32, Utf8) encoding (common at ingest) and
221+
/// plain Utf8/LargeUtf8 (possible after optimize_output_batch in the merge
222+
/// path when cardinality is too high for dictionary encoding).
223+
fn extract_distinct_strings(
224+
col: &dyn arrow::array::Array,
225+
) -> Result<HashSet<String>, ParquetWriteError> {
226+
let mut values = HashSet::new();
227+
228+
// Try Dictionary(Int32, Utf8) first — the common case at ingest.
229+
if let Some(dict_array) = col
240230
.as_any()
241231
.downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
232+
&& let Some(string_values) = dict_array
233+
.values()
234+
.as_any()
235+
.downcast_ref::<arrow::array::StringArray>()
242236
{
243-
let values = dict_array.values();
244-
if let Some(string_values) = values.as_any().downcast_ref::<arrow::array::StringArray>() {
245-
// Get all dictionary values that are actually used
246-
for i in 0..dict_array.len() {
247-
if !dict_array.is_null(i)
248-
&& let Ok(key) = dict_array.keys().value(i).try_into()
249-
{
250-
let key: usize = key;
251-
if key < string_values.len() && !string_values.is_null(key) {
252-
names.insert(string_values.value(key).to_string());
253-
}
237+
for i in 0..dict_array.len() {
238+
if !dict_array.is_null(i)
239+
&& let Ok(key) = dict_array.keys().value(i).try_into()
240+
{
241+
let key: usize = key;
242+
if key < string_values.len() && !string_values.is_null(key) {
243+
values.insert(string_values.value(key).to_string());
254244
}
255245
}
256246
}
247+
return Ok(values);
248+
}
249+
250+
// Fall back to plain Utf8 (after optimize_output_batch strips dictionary
251+
// encoding for high-cardinality columns).
252+
if let Some(string_array) = col.as_any().downcast_ref::<arrow::array::StringArray>() {
253+
for i in 0..string_array.len() {
254+
if !string_array.is_null(i) {
255+
values.insert(string_array.value(i).to_string());
256+
}
257+
}
258+
return Ok(values);
259+
}
260+
261+
// LargeUtf8 variant.
262+
if let Some(string_array) = col
263+
.as_any()
264+
.downcast_ref::<arrow::array::LargeStringArray>()
265+
{
266+
for i in 0..string_array.len() {
267+
if !string_array.is_null(i) {
268+
values.insert(string_array.value(i).to_string());
269+
}
270+
}
271+
return Ok(values);
257272
}
258273

259-
Ok(names)
274+
// Unrecognized column type — return empty rather than error, since the
275+
// column may legitimately be a type we don't extract strings from.
276+
Ok(values)
260277
}
261278

262279
#[cfg(test)]

0 commit comments

Comments
 (0)