Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions quickwit/quickwit-parquet-engine/src/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ struct InputMetadata {
}

/// Result of a single output file from the merge.
///
/// Contains both physical metadata (file size, row count) and per-output
/// logical metadata (metric names, tags, time range) extracted from the
/// actual rows in this output file. When the merge produces multiple
/// outputs, each has metadata reflecting only its own rows.
pub struct MergeOutputFile {
/// Path to the output Parquet file.
pub path: PathBuf,
Expand All @@ -85,6 +90,16 @@ pub struct MergeOutputFile {

/// Per-column zonemap regex strings.
pub zonemap_regexes: std::collections::HashMap<String, String>,

/// Distinct metric names in this output file.
pub metric_names: std::collections::HashSet<String>,

/// Time range covered by rows in this output file.
pub time_range: crate::split::TimeRange,

/// Low-cardinality tag values extracted from this output file's rows.
/// Currently tracks "service" to match the ingest path.
pub low_cardinality_tags: std::collections::HashMap<String, std::collections::HashSet<String>>,
}

/// Merge N sorted Parquet files into M sorted output files.
Expand Down
78 changes: 77 additions & 1 deletion quickwit/quickwit-parquet-engine/src/merge/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,86 @@ fn test_merge_multiple_outputs() {
let total_rows: usize = outputs.iter().map(|o| o.num_rows).sum();
assert_eq!(total_rows, 6);

// Each output should have row keys.
// Each output should have row keys and per-output metadata.
let mut all_metric_names: std::collections::HashSet<String> = std::collections::HashSet::new();
for output in &outputs {
assert!(output.row_keys_proto.is_some());
// Each output has its own metric names (subset of all inputs).
assert!(!output.metric_names.is_empty());
all_metric_names.extend(output.metric_names.iter().cloned());
// Time range should be valid.
assert!(output.time_range.start_secs <= output.time_range.end_secs);
}
// The union of all output metric names should be the full set.
assert!(all_metric_names.contains("alpha"));
assert!(all_metric_names.contains("beta"));
assert!(all_metric_names.contains("gamma"));
}

/// Verifies that per-output metadata (metric_names, time_range) is computed
/// from the actual rows in each output file, not aggregated from all inputs.
#[test]
fn test_merge_per_output_metadata_from_actual_rows() {
let dir = TempDir::new().unwrap();

// Input 1: metric "cpu" at timestamps 100, 200
let input1 = write_test_split(
dir.path(),
"in1.parquet",
&["cpu", "cpu"],
&[100, 200],
&[1.0, 2.0],
&[1, 1],
);
// Input 2: metric "mem" at timestamps 300, 400
let input2 = write_test_split(
dir.path(),
"in2.parquet",
&["mem", "mem"],
&[300, 400],
&[3.0, 4.0],
&[2, 2],
);

let output_dir = dir.path().join("output");
std::fs::create_dir_all(&output_dir).unwrap();

// Single output: should contain all metrics and full time range.
let config = MergeConfig {
num_outputs: 1,
writer_config: ParquetWriterConfig::default(),
};
let outputs =
merge_sorted_parquet_files(&[input1.clone(), input2.clone()], &output_dir, &config)
.unwrap();
assert_eq!(outputs.len(), 1);
let output = &outputs[0];
assert!(output.metric_names.contains("cpu"));
assert!(output.metric_names.contains("mem"));
assert_eq!(output.time_range.start_secs, 100);
assert_eq!(output.time_range.end_secs, 401); // end is exclusive

// Two outputs: each should have only its own metrics and time range.
let output_dir2 = dir.path().join("output2");
std::fs::create_dir_all(&output_dir2).unwrap();
let config2 = MergeConfig {
num_outputs: 2,
writer_config: ParquetWriterConfig::default(),
};
let outputs2 = merge_sorted_parquet_files(&[input1, input2], &output_dir2, &config2).unwrap();
assert_eq!(outputs2.len(), 2);

// After sorted merge, "cpu" (sorted_series for tsid=1) and "mem" (tsid=2)
// are in separate outputs. Each output should have only one metric.
let output_a = &outputs2[0];
let output_b = &outputs2[1];
assert_eq!(output_a.metric_names.len(), 1);
assert_eq!(output_b.metric_names.len(), 1);
assert_ne!(output_a.metric_names, output_b.metric_names);

// Time ranges should be disjoint or specific to each output's rows.
assert!(output_a.time_range.start_secs <= output_a.time_range.end_secs);
assert!(output_b.time_range.start_secs <= output_b.time_range.end_secs);
}

#[test]
Expand Down
20 changes: 20 additions & 0 deletions quickwit/quickwit-parquet-engine/src/merge/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ use super::{InputMetadata, MergeConfig, MergeOutputFile};
use crate::row_keys;
use crate::sort_fields::parse_sort_fields;
use crate::sorted_series::SORTED_SERIES_COLUMN;
use crate::split::TAG_SERVICE;
use crate::storage::split_writer::{
extract_metric_names, extract_service_names, extract_time_range,
};
use crate::storage::{
PARQUET_META_NUM_MERGE_OPS, PARQUET_META_ROW_KEYS, PARQUET_META_ROW_KEYS_JSON,
PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START,
Expand Down Expand Up @@ -114,6 +118,19 @@ pub fn write_merge_outputs(
let output_filename = format!("merge_output_{}.parquet", Ulid::new());
let output_path = output_dir.join(&output_filename);

// Extract per-output logical metadata from the actual rows.
let metric_names = extract_metric_names(&sorted_batch)
.context("extracting metric names from merge output")?;
let time_range =
extract_time_range(&sorted_batch).context("extracting time range from merge output")?;
let service_names = extract_service_names(&sorted_batch)
.context("extracting service names from merge output")?;

let mut low_cardinality_tags = std::collections::HashMap::new();
if !service_names.is_empty() {
low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names);
}

let size_bytes = write_parquet_file(&sorted_batch, &output_path, props)?;

outputs.push(MergeOutputFile {
Expand All @@ -122,6 +139,9 @@ pub fn write_merge_outputs(
size_bytes,
row_keys_proto,
zonemap_regexes,
metric_names,
time_range,
low_cardinality_tags,
});
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-parquet-engine/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Storage layer for Parquet files.

mod config;
mod split_writer;
pub(crate) mod split_writer;
mod writer;

pub use config::{Compression, ParquetWriterConfig};
Expand Down
10 changes: 7 additions & 3 deletions quickwit/quickwit-parquet-engine/src/storage/split_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl ParquetSplitWriter {
}

/// Extracts the time range (min/max timestamp_secs) from a RecordBatch.
fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, ParquetWriteError> {
pub(crate) fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, ParquetWriteError> {
let timestamp_idx = batch
.schema()
.index_of("timestamp_secs")
Expand All @@ -194,7 +194,9 @@ fn extract_time_range(batch: &RecordBatch) -> Result<TimeRange, ParquetWriteErro
}

/// Extracts distinct metric names from a RecordBatch.
fn extract_metric_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
pub(crate) fn extract_metric_names(
batch: &RecordBatch,
) -> Result<HashSet<String>, ParquetWriteError> {
let metric_idx = batch
.schema()
.index_of("metric_name")
Expand Down Expand Up @@ -227,7 +229,9 @@ fn extract_metric_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetW
}

/// Extracts distinct service names from a RecordBatch.
fn extract_service_names(batch: &RecordBatch) -> Result<HashSet<String>, ParquetWriteError> {
pub(crate) fn extract_service_names(
batch: &RecordBatch,
) -> Result<HashSet<String>, ParquetWriteError> {
let service_idx = match batch.schema().index_of("service").ok() {
Some(idx) => idx,
None => return Ok(HashSet::new()),
Expand Down
Loading