Skip to content

Commit 72521f1

Browse files
g-talbotclaude
andcommitted
feat: compute per-output split metadata in merge engine
The merge engine now extracts metric_names, time_range, and low_cardinality_tags from each output file's actual rows during the merge write pass. Previously, MergeOutputFile only contained physical metadata (num_rows, size_bytes, row_keys, zonemaps). The downstream metadata_aggregation function inferred logical metadata by unioning all input splits — which is incorrect when num_outputs > 1, since each output contains only a subset of the globally sorted rows. Now each MergeOutputFile carries: - metric_names: distinct metrics in this output's rows - time_range: min/max timestamp_secs from this output's rows - low_cardinality_tags: service names from this output's rows Reuses existing extract_metric_names, extract_service_names, and extract_time_range from split_writer (made pub(crate)). Includes test that verifies per-output metadata is computed from actual rows when merging 2 inputs into 2 outputs with different metric names. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a16ef62 commit 72521f1

5 files changed

Lines changed: 112 additions & 5 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ struct InputMetadata {
7070
}
7171

7272
/// Result of a single output file from the merge.
73+
///
74+
/// Contains both physical metadata (file size, row count) and per-output
75+
/// logical metadata (metric names, tags, time range) extracted from the
76+
/// actual rows in this output file. When the merge produces multiple
77+
/// outputs, each has metadata reflecting only its own rows.
7378
pub struct MergeOutputFile {
7479
/// Path to the output Parquet file.
7580
pub path: PathBuf,
@@ -85,6 +90,16 @@ pub struct MergeOutputFile {
8590

8691
/// Per-column zonemap regex strings.
8792
pub zonemap_regexes: std::collections::HashMap<String, String>,
93+
94+
/// Distinct metric names in this output file.
95+
pub metric_names: std::collections::HashSet<String>,
96+
97+
/// Time range covered by rows in this output file.
98+
pub time_range: crate::split::TimeRange,
99+
100+
/// Low-cardinality tag values extracted from this output file's rows.
101+
/// Currently tracks "service" to match the ingest path.
102+
pub low_cardinality_tags: std::collections::HashMap<String, std::collections::HashSet<String>>,
88103
}
89104

90105
/// Merge N sorted Parquet files into M sorted output files.

quickwit/quickwit-parquet-engine/src/merge/tests.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,84 @@ fn test_merge_multiple_outputs() {
274274
let total_rows: usize = outputs.iter().map(|o| o.num_rows).sum();
275275
assert_eq!(total_rows, 6);
276276

277-
// Each output should have row keys.
277+
// Each output should have row keys and per-output metadata.
278+
let mut all_metric_names: std::collections::HashSet<String> = std::collections::HashSet::new();
278279
for output in &outputs {
279280
assert!(output.row_keys_proto.is_some());
281+
// Each output has its own metric names (subset of all inputs).
282+
assert!(!output.metric_names.is_empty());
283+
all_metric_names.extend(output.metric_names.iter().cloned());
284+
// Time range should be valid.
285+
assert!(output.time_range.start_secs <= output.time_range.end_secs);
280286
}
287+
// The union of all output metric names should be the full set.
288+
assert!(all_metric_names.contains("alpha"));
289+
assert!(all_metric_names.contains("beta"));
290+
assert!(all_metric_names.contains("gamma"));
291+
}
292+
293+
/// Verifies that per-output metadata (metric_names, time_range) is computed
294+
/// from the actual rows in each output file, not aggregated from all inputs.
295+
#[test]
296+
fn test_merge_per_output_metadata_from_actual_rows() {
297+
let dir = TempDir::new().unwrap();
298+
299+
// Input 1: metric "cpu" at timestamps 100, 200
300+
let input1 = write_test_split(
301+
dir.path(),
302+
"in1.parquet",
303+
&["cpu", "cpu"],
304+
&[100, 200],
305+
&[1.0, 2.0],
306+
&[1, 1],
307+
);
308+
// Input 2: metric "mem" at timestamps 300, 400
309+
let input2 = write_test_split(
310+
dir.path(),
311+
"in2.parquet",
312+
&["mem", "mem"],
313+
&[300, 400],
314+
&[3.0, 4.0],
315+
&[2, 2],
316+
);
317+
318+
let output_dir = dir.path().join("output");
319+
std::fs::create_dir_all(&output_dir).unwrap();
320+
321+
// Single output: should contain all metrics and full time range.
322+
let config = MergeConfig {
323+
num_outputs: 1,
324+
writer_config: ParquetWriterConfig::default(),
325+
};
326+
let outputs = merge_sorted_parquet_files(&[input1.clone(), input2.clone()], &output_dir, &config).unwrap();
327+
assert_eq!(outputs.len(), 1);
328+
let output = &outputs[0];
329+
assert!(output.metric_names.contains("cpu"));
330+
assert!(output.metric_names.contains("mem"));
331+
assert_eq!(output.time_range.start_secs, 100);
332+
assert_eq!(output.time_range.end_secs, 401); // end is exclusive
333+
334+
// Two outputs: each should have only its own metrics and time range.
335+
let output_dir2 = dir.path().join("output2");
336+
std::fs::create_dir_all(&output_dir2).unwrap();
337+
let config2 = MergeConfig {
338+
num_outputs: 2,
339+
writer_config: ParquetWriterConfig::default(),
340+
};
341+
let outputs2 = merge_sorted_parquet_files(&[input1, input2], &output_dir2, &config2).unwrap();
342+
assert_eq!(outputs2.len(), 2);
343+
344+
// After sorted merge, "cpu" (sorted_series for tsid=1) and "mem" (tsid=2)
345+
// are in separate outputs. Each output should have only one metric.
346+
let output_a = &outputs2[0];
347+
let output_b = &outputs2[1];
348+
assert_eq!(output_a.metric_names.len(), 1);
349+
assert_eq!(output_b.metric_names.len(), 1);
350+
assert_ne!(output_a.metric_names, output_b.metric_names);
351+
352+
// Time ranges should be disjoint or specific to each output's rows.
353+
assert!(output_a.time_range.start_secs <= output_a.time_range.end_secs);
354+
assert!(output_b.time_range.start_secs <= output_b.time_range.end_secs);
281355
}
282356

283357
#[test]

quickwit/quickwit-parquet-engine/src/merge/writer.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ use super::{InputMetadata, MergeConfig, MergeOutputFile};
3939
use crate::row_keys;
4040
use crate::sort_fields::parse_sort_fields;
4141
use crate::sorted_series::SORTED_SERIES_COLUMN;
42+
use crate::split::TAG_SERVICE;
43+
use crate::storage::split_writer::{extract_metric_names, extract_service_names, extract_time_range};
4244
use crate::storage::{
4345
PARQUET_META_NUM_MERGE_OPS, PARQUET_META_ROW_KEYS, PARQUET_META_ROW_KEYS_JSON,
4446
PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START,
@@ -114,6 +116,19 @@ pub fn write_merge_outputs(
114116
let output_filename = format!("merge_output_{}.parquet", Ulid::new());
115117
let output_path = output_dir.join(&output_filename);
116118

119+
// Extract per-output logical metadata from the actual rows.
120+
let metric_names = extract_metric_names(&sorted_batch)
121+
.context("extracting metric names from merge output")?;
122+
let time_range = extract_time_range(&sorted_batch)
123+
.context("extracting time range from merge output")?;
124+
let service_names = extract_service_names(&sorted_batch)
125+
.context("extracting service names from merge output")?;
126+
127+
let mut low_cardinality_tags = std::collections::HashMap::new();
128+
if !service_names.is_empty() {
129+
low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names);
130+
}
131+
117132
let size_bytes = write_parquet_file(&sorted_batch, &output_path, props)?;
118133

119134
outputs.push(MergeOutputFile {
@@ -122,6 +137,9 @@ pub fn write_merge_outputs(
122137
size_bytes,
123138
row_keys_proto,
124139
zonemap_regexes,
140+
metric_names,
141+
time_range,
142+
low_cardinality_tags,
125143
});
126144
}
127145

quickwit/quickwit-parquet-engine/src/storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
//! Storage layer for Parquet files.
1616
1717
mod config;
18-
mod split_writer;
18+
pub(crate) mod split_writer;
1919
mod writer;
2020

2121
pub use config::{Compression, ParquetWriterConfig};

quickwit/quickwit-parquet-engine/src/storage/split_writer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl ParquetSplitWriter {
170170
}
171171

172172
/// Extracts the time range (min/max timestamp_secs) from a RecordBatch.
173-
fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, ParquetWriteError> {
173+
pub(crate) fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, ParquetWriteError> {
174174
let timestamp_idx = batch
175175
.schema()
176176
.index_of("timestamp_secs")
@@ -194,7 +194,7 @@ fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, ParquetWriteErro
194194
}
195195

196196
/// Extracts distinct metric names from a RecordBatch.
197-
fn extract_metric_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
197+
pub(crate) fn extract_metric_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
198198
let metric_idx = batch
199199
.schema()
200200
.index_of("metric_name")
@@ -227,7 +227,7 @@ fn extract_metric_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetW
227227
}
228228

229229
/// Extracts distinct service names from a RecordBatch.
230-
fn extract_service_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
230+
pub(crate) fn extract_service_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
231231
let service_idx = match batch.schema().index_of("service").ok() {
232232
Some(idx) => idx,
233233
None => return Ok(HashSet::new()),

0 commit comments

Comments
 (0)