diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index c167860fe5a..c2dc2215b22 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -70,6 +70,11 @@ struct InputMetadata { } /// Result of a single output file from the merge. +/// +/// Contains both physical metadata (file size, row count) and per-output +/// logical metadata (metric names, tags, time range) extracted from the +/// actual rows in this output file. When the merge produces multiple +/// outputs, each has metadata reflecting only its own rows. pub struct MergeOutputFile { /// Path to the output Parquet file. pub path: PathBuf, @@ -85,6 +90,16 @@ pub struct MergeOutputFile { /// Per-column zonemap regex strings. pub zonemap_regexes: std::collections::HashMap, + + /// Distinct metric names in this output file. + pub metric_names: std::collections::HashSet, + + /// Time range covered by rows in this output file. + pub time_range: crate::split::TimeRange, + + /// Low-cardinality tag values extracted from this output file's rows. + /// Currently tracks "service" to match the ingest path. + pub low_cardinality_tags: std::collections::HashMap>, } /// Merge N sorted Parquet files into M sorted output files. diff --git a/quickwit/quickwit-parquet-engine/src/merge/tests.rs b/quickwit/quickwit-parquet-engine/src/merge/tests.rs index fbf8933fe51..385a42d40eb 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/tests.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/tests.rs @@ -274,10 +274,86 @@ fn test_merge_multiple_outputs() { let total_rows: usize = outputs.iter().map(|o| o.num_rows).sum(); assert_eq!(total_rows, 6); - // Each output should have row keys. + // Each output should have row keys and per-output metadata. + let mut all_metric_names: std::collections::HashSet = std::collections::HashSet::new(); for output in &outputs { assert!(output.row_keys_proto.is_some()); + // Each output has its own metric names (subset of all inputs). + assert!(!output.metric_names.is_empty()); + all_metric_names.extend(output.metric_names.iter().cloned()); + // Time range should be valid. + assert!(output.time_range.start_secs <= output.time_range.end_secs); } + // The union of all output metric names should be the full set. + assert!(all_metric_names.contains("alpha")); + assert!(all_metric_names.contains("beta")); + assert!(all_metric_names.contains("gamma")); +} + +/// Verifies that per-output metadata (metric_names, time_range) is computed +/// from the actual rows in each output file, not aggregated from all inputs. +#[test] +fn test_merge_per_output_metadata_from_actual_rows() { + let dir = TempDir::new().unwrap(); + + // Input 1: metric "cpu" at timestamps 100, 200 + let input1 = write_test_split( + dir.path(), + "in1.parquet", + &["cpu", "cpu"], + &[100, 200], + &[1.0, 2.0], + &[1, 1], + ); + // Input 2: metric "mem" at timestamps 300, 400 + let input2 = write_test_split( + dir.path(), + "in2.parquet", + &["mem", "mem"], + &[300, 400], + &[3.0, 4.0], + &[2, 2], + ); + + let output_dir = dir.path().join("output"); + std::fs::create_dir_all(&output_dir).unwrap(); + + // Single output: should contain all metrics and full time range. + let config = MergeConfig { + num_outputs: 1, + writer_config: ParquetWriterConfig::default(), + }; + let outputs = + merge_sorted_parquet_files(&[input1.clone(), input2.clone()], &output_dir, &config) + .unwrap(); + assert_eq!(outputs.len(), 1); + let output = &outputs[0]; + assert!(output.metric_names.contains("cpu")); + assert!(output.metric_names.contains("mem")); + assert_eq!(output.time_range.start_secs, 100); + assert_eq!(output.time_range.end_secs, 401); // end is exclusive + + // Two outputs: each should have only its own metrics and time range. + let output_dir2 = dir.path().join("output2"); + std::fs::create_dir_all(&output_dir2).unwrap(); + let config2 = MergeConfig { + num_outputs: 2, + writer_config: ParquetWriterConfig::default(), + }; + let outputs2 = merge_sorted_parquet_files(&[input1, input2], &output_dir2, &config2).unwrap(); + assert_eq!(outputs2.len(), 2); + + // After sorted merge, "cpu" (sorted_series for tsid=1) and "mem" (tsid=2) + // are in separate outputs. Each output should have only one metric. + let output_a = &outputs2[0]; + let output_b = &outputs2[1]; + assert_eq!(output_a.metric_names.len(), 1); + assert_eq!(output_b.metric_names.len(), 1); + assert_ne!(output_a.metric_names, output_b.metric_names); + + // Time ranges should be disjoint or specific to each output's rows. + assert!(output_a.time_range.start_secs <= output_a.time_range.end_secs); + assert!(output_b.time_range.start_secs <= output_b.time_range.end_secs); } #[test] diff --git a/quickwit/quickwit-parquet-engine/src/merge/writer.rs b/quickwit/quickwit-parquet-engine/src/merge/writer.rs index 60879f7ddf6..44cb85d37ec 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/writer.rs @@ -39,6 +39,10 @@ use super::{InputMetadata, MergeConfig, MergeOutputFile}; use crate::row_keys; use crate::sort_fields::parse_sort_fields; use crate::sorted_series::SORTED_SERIES_COLUMN; +use crate::split::TAG_SERVICE; +use crate::storage::split_writer::{ + extract_metric_names, extract_service_names, extract_time_range, +}; use crate::storage::{ PARQUET_META_NUM_MERGE_OPS, PARQUET_META_ROW_KEYS, PARQUET_META_ROW_KEYS_JSON, PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, @@ -114,6 +118,19 @@ pub fn write_merge_outputs( let output_filename = format!("merge_output_{}.parquet", Ulid::new()); let output_path = output_dir.join(&output_filename); + // Extract per-output logical metadata from the actual rows. + let metric_names = extract_metric_names(&sorted_batch) + .context("extracting metric names from merge output")?; + let time_range = + extract_time_range(&sorted_batch).context("extracting time range from merge output")?; + let service_names = extract_service_names(&sorted_batch) + .context("extracting service names from merge output")?; + + let mut low_cardinality_tags = std::collections::HashMap::new(); + if !service_names.is_empty() { + low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names); + } + let size_bytes = write_parquet_file(&sorted_batch, &output_path, props)?; outputs.push(MergeOutputFile { @@ -122,6 +139,9 @@ pub fn write_merge_outputs( size_bytes, row_keys_proto, zonemap_regexes, + metric_names, + time_range, + low_cardinality_tags, }); } diff --git a/quickwit/quickwit-parquet-engine/src/storage/mod.rs b/quickwit/quickwit-parquet-engine/src/storage/mod.rs index 59e3059422a..8b87063c25e 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/mod.rs @@ -15,7 +15,7 @@ //! Storage layer for Parquet files. mod config; -mod split_writer; +pub(crate) mod split_writer; mod writer; pub use config::{Compression, ParquetWriterConfig}; diff --git a/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs b/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs index 100e46bddd1..3776e18df8e 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs @@ -170,7 +170,7 @@ impl ParquetSplitWriter { } /// Extracts the time range (min/max timestamp_secs) from a RecordBatch. -fn extract_time_range(batch: &RecordBatch) -> Result { +pub(crate) fn extract_time_range(batch: &RecordBatch) -> Result { let timestamp_idx = batch .schema() .index_of("timestamp_secs") @@ -194,69 +194,86 @@ fn extract_time_range(batch: &RecordBatch) -> Result Result, ParquetWriteError> { +pub(crate) fn extract_metric_names( + batch: &RecordBatch, +) -> Result, ParquetWriteError> { let metric_idx = batch .schema() .index_of("metric_name") .map_err(|_| ParquetWriteError::SchemaValidation("missing metric_name column".into()))?; - let metric_col = batch.column(metric_idx); - let mut names = HashSet::new(); - - // The column is Dictionary(Int32, Utf8) - if let Some(dict_array) = metric_col - .as_any() - .downcast_ref::>() - { - let values = dict_array.values(); - if let Some(string_values) = values.as_any().downcast_ref::() { - // Get all dictionary values that are actually used - for i in 0..dict_array.len() { - if !dict_array.is_null(i) - && let Ok(key) = dict_array.keys().value(i).try_into() - { - let key: usize = key; - if key < string_values.len() && !string_values.is_null(key) { - names.insert(string_values.value(key).to_string()); - } - } - } - } - } - - Ok(names) + extract_distinct_strings(batch.column(metric_idx)) } /// Extracts distinct service names from a RecordBatch. -fn extract_service_names(batch: &RecordBatch) -> Result, ParquetWriteError> { +pub(crate) fn extract_service_names( + batch: &RecordBatch, +) -> Result, ParquetWriteError> { let service_idx = match batch.schema().index_of("service").ok() { Some(idx) => idx, None => return Ok(HashSet::new()), }; - let service_col = batch.column(service_idx); - let mut names = HashSet::new(); + extract_distinct_strings(batch.column(service_idx)) +} - // The column is Dictionary(Int32, Utf8) - if let Some(dict_array) = service_col +/// Extracts distinct non-null string values from a column. +/// +/// Handles both Dictionary(Int32, Utf8) encoding (common at ingest) and +/// plain Utf8/LargeUtf8 (possible after optimize_output_batch in the merge +/// path when cardinality is too high for dictionary encoding). +fn extract_distinct_strings( + col: &dyn arrow::array::Array, +) -> Result, ParquetWriteError> { + let mut values = HashSet::new(); + + // Try Dictionary(Int32, Utf8) first — the common case at ingest. + if let Some(dict_array) = col .as_any() .downcast_ref::>() + && let Some(string_values) = dict_array + .values() + .as_any() + .downcast_ref::() { - let values = dict_array.values(); - if let Some(string_values) = values.as_any().downcast_ref::() { - // Get all dictionary values that are actually used - for i in 0..dict_array.len() { - if !dict_array.is_null(i) - && let Ok(key) = dict_array.keys().value(i).try_into() - { - let key: usize = key; - if key < string_values.len() && !string_values.is_null(key) { - names.insert(string_values.value(key).to_string()); - } + for i in 0..dict_array.len() { + if !dict_array.is_null(i) + && let Ok(key) = dict_array.keys().value(i).try_into() + { + let key: usize = key; + if key < string_values.len() && !string_values.is_null(key) { + values.insert(string_values.value(key).to_string()); } } } + return Ok(values); + } + + // Fall back to plain Utf8 (after optimize_output_batch strips dictionary + // encoding for high-cardinality columns). + if let Some(string_array) = col.as_any().downcast_ref::() { + for i in 0..string_array.len() { + if !string_array.is_null(i) { + values.insert(string_array.value(i).to_string()); + } + } + return Ok(values); + } + + // LargeUtf8 variant. + if let Some(string_array) = col + .as_any() + .downcast_ref::() + { + for i in 0..string_array.len() { + if !string_array.is_null(i) { + values.insert(string_array.value(i).to_string()); + } + } + return Ok(values); } - Ok(names) + // Unrecognized column type — return empty rather than error, since the + // column may legitimately be a type we don't extract strings from. + Ok(values) } #[cfg(test)]