diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index f2528361840..9d72f8bbdc4 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -8915,6 +8915,7 @@ dependencies = [ "quickwit-common", "quickwit-dst", "quickwit-proto", + "rand 0.10.1", "regex", "serde", "serde_json", diff --git a/quickwit/quickwit-dst/src/invariants/merge_policy.rs b/quickwit/quickwit-dst/src/invariants/merge_policy.rs new file mode 100644 index 00000000000..202d66f3aae --- /dev/null +++ b/quickwit/quickwit-dst/src/invariants/merge_policy.rs @@ -0,0 +1,113 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Shared merge policy invariant checks. +//! +//! These pure functions are the single source of truth for merge operation +//! validity, used by both Stateright models and production code. + +/// MP-1: all splits in a merge operation must share the same `num_merge_ops`. +/// +/// If splits from different levels are merged, the output gets stamped with +/// `max(levels) + 1`, prematurely maturing lower-level data and breaking +/// the bounded write amplification guarantee. +pub fn all_same_merge_level(num_merge_ops: &[u32]) -> bool { + match num_merge_ops.first() { + None => true, + Some(&first) => num_merge_ops.iter().all(|&n| n == first), + } +} + +/// MP-2: every merge operation must have at least 2 input splits. +/// +/// Merging a single split is a no-op that wastes I/O. Merging zero splits +/// is nonsensical. +pub fn has_minimum_splits(count: usize) -> bool { + count >= 2 +} + +/// MP-3: all splits in a merge operation must share the same compaction scope. +/// +/// The scope is defined by `(sort_fields, window_start, window_duration)`. +/// The merge engine validates that all inputs agree on these; a policy bug +/// that groups incompatible splits will cause the merge to fail. +/// +/// `index_uid` and `partition_id` are also part of the scope but are +/// typically enforced by the grouping layer before the policy runs. +pub fn all_same_compaction_scope( + sort_fields: &[&str], + windows: &[(i64, i64)], // (start, duration) pairs +) -> bool { + let same_sort = match sort_fields.first() { + None => true, + Some(&first) => sort_fields.iter().all(|&s| s == first), + }; + let same_window = match windows.first() { + None => true, + Some(&first) => windows.iter().all(|w| *w == first), + }; + same_sort && same_window +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_all_same_merge_level() { + assert!(all_same_merge_level(&[])); + assert!(all_same_merge_level(&[0])); + assert!(all_same_merge_level(&[2, 2, 2])); + assert!(!all_same_merge_level(&[0, 1])); + assert!(!all_same_merge_level(&[0, 0, 1])); + } + + #[test] + fn test_has_minimum_splits() { + assert!(!has_minimum_splits(0)); + assert!(!has_minimum_splits(1)); + assert!(has_minimum_splits(2)); + assert!(has_minimum_splits(100)); + } + + #[test] + fn test_all_same_compaction_scope() { + // Empty is vacuously true. + assert!(all_same_compaction_scope(&[], &[])); + + // Same scope. + assert!(all_same_compaction_scope( + &["a|b|ts/V2", "a|b|ts/V2"], + &[(0, 3600), (0, 3600)], + )); + + // Different sort fields. + assert!(!all_same_compaction_scope( + &["a|b|ts/V2", "a|ts/V2"], + &[(0, 3600), (0, 3600)], + )); + + // Same start, different duration. + assert!(!all_same_compaction_scope( + &["a|b|ts/V2", "a|b|ts/V2"], + &[(0, 900), (0, 1800)], + )); + + // Different start. + assert!(!all_same_compaction_scope( + &["a|b|ts/V2", "a|b|ts/V2"], + &[(0, 3600), (3600, 3600)], + )); + } +} diff --git a/quickwit/quickwit-dst/src/invariants/mod.rs b/quickwit/quickwit-dst/src/invariants/mod.rs index 88a731eb730..94c2158260e 100644 --- a/quickwit/quickwit-dst/src/invariants/mod.rs +++ b/quickwit/quickwit-dst/src/invariants/mod.rs @@ -21,6 +21,7 @@ //! No external dependencies — only `std`. mod check; +pub mod merge_policy; pub mod recorder; pub mod registry; pub mod sort; diff --git a/quickwit/quickwit-dst/src/invariants/registry.rs b/quickwit/quickwit-dst/src/invariants/registry.rs index afb1838f361..138015dad0c 100644 --- a/quickwit/quickwit-dst/src/invariants/registry.rs +++ b/quickwit/quickwit-dst/src/invariants/registry.rs @@ -74,6 +74,13 @@ pub enum InvariantId { DM4, /// DM-5: timeseries_id persists through compaction without recomputation DM5, + + /// MP-1: all splits in a merge operation have the same num_merge_ops level + MP1, + /// MP-2: every merge operation has at least 2 input splits + MP2, + /// MP-3: all splits in a merge operation share the same compaction scope + MP3, } impl InvariantId { @@ -106,6 +113,10 @@ impl InvariantId { Self::DM3 => "DM-3", Self::DM4 => "DM-4", Self::DM5 => "DM-5", + + Self::MP1 => "MP-1", + Self::MP2 => "MP-2", + Self::MP3 => "MP-3", } } @@ -136,6 +147,10 @@ impl InvariantId { Self::DM3 => "no interpolation — only ingested points", Self::DM4 => "deterministic TSID from tags", Self::DM5 => "TSID persists through compaction", + + Self::MP1 => "merge op splits share num_merge_ops level", + Self::MP2 => "merge op has at least 2 splits", + Self::MP3 => "merge op splits share compaction scope", } } } @@ -157,6 +172,7 @@ mod tests { assert_eq!(InvariantId::CS3.to_string(), "CS-3"); assert_eq!(InvariantId::MC4.to_string(), "MC-4"); assert_eq!(InvariantId::DM5.to_string(), "DM-5"); + assert_eq!(InvariantId::MP1.to_string(), "MP-1"); } #[test] @@ -182,6 +198,9 @@ mod tests { InvariantId::DM3, InvariantId::DM4, InvariantId::DM5, + InvariantId::MP1, + InvariantId::MP2, + InvariantId::MP3, ]; for id in all { assert!(!id.description().is_empty(), "{} has empty description", id); diff --git a/quickwit/quickwit-parquet-engine/Cargo.toml b/quickwit/quickwit-parquet-engine/Cargo.toml index 6d89fdad824..d4893c8287e 100644 --- a/quickwit/quickwit-parquet-engine/Cargo.toml +++ b/quickwit/quickwit-parquet-engine/Cargo.toml @@ -30,6 +30,7 @@ ulid = { workspace = true } [dev-dependencies] proptest = { workspace = true } +rand = { workspace = true } regex = { workspace = true } tempfile = { workspace = true } diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index 7295d41183d..c167860fe5a 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -21,6 +21,7 @@ //! file has non-overlapping key ranges. mod merge_order; +pub mod policy; mod schema; mod writer; diff --git a/quickwit/quickwit-parquet-engine/src/merge/policy/const_write_amplification.rs b/quickwit/quickwit-parquet-engine/src/merge/policy/const_write_amplification.rs new file mode 100644 index 00000000000..60c9b48b1e0 --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/merge/policy/const_write_amplification.rs @@ -0,0 +1,989 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Constant write amplification merge policy for Parquet splits. +//! +//! Adapted from `quickwit_indexing::merge_policy::ConstWriteAmplificationMergePolicy` +//! but using byte size instead of document count as the primary size metric. +//! Parquet rows vary wildly in size depending on tag density, so byte size is +//! the meaningful convergence target. +//! +//! # Algorithm +//! +//! 1. Separate mature splits from immature. +//! 2. Group immature splits by `num_merge_ops` (the merge level). +//! 3. Within each level, sort by creation time (oldest first) and greedily accumulate splits until +//! `merge_factor` or `target_split_size_bytes`. +//! 4. Each merge operation contains splits from exactly one level. +//! +//! This bounds write amplification: each byte is rewritten at most +//! `max_merge_ops` times across the split's lifetime. + +use std::collections::HashMap; +use std::ops::RangeInclusive; +use std::time::{Duration, SystemTime}; + +use tracing::info; + +use super::{ParquetMergeOperation, ParquetMergePolicy, ParquetSplitMaturity}; +use crate::split::ParquetSplitMetadata; + +/// Smallest number of splits in a finalize merge. +const FINALIZE_MIN_MERGE_FACTOR: usize = 2; + +/// Configuration for the constant write amplification Parquet merge policy. +#[derive(Debug, Clone)] +pub struct ParquetMergePolicyConfig { + /// Minimum number of splits to trigger a merge. + pub merge_factor: usize, + /// Maximum number of splits in a single merge operation. + pub max_merge_factor: usize, + /// Maximum number of merge operations a split can undergo before becoming + /// mature. Bounds total write amplification. + pub max_merge_ops: u32, + /// Target size for merged output splits in bytes. When accumulated bytes + /// reach this threshold, a merge is triggered even if `merge_factor` is + /// not reached. + pub target_split_size_bytes: u64, + /// Duration after creation when a split becomes mature regardless of size + /// or merge count. + pub maturation_period: Duration, + /// Maximum number of merge operations emitted by `finalize_operations`. + /// Set to 0 to disable finalization. + pub max_finalize_merge_operations: usize, +} + +impl Default for ParquetMergePolicyConfig { + fn default() -> Self { + Self { + merge_factor: 10, + max_merge_factor: 12, + max_merge_ops: 4, + target_split_size_bytes: 256 * 1024 * 1024, // 256 MiB + maturation_period: Duration::from_secs(48 * 3600), // 48 hours + max_finalize_merge_operations: 3, + } + } +} + +/// Constant write amplification merge policy for Parquet splits. +/// +/// Only splits with the same `num_merge_ops` level are merged together. +/// After sorting by creation date, splits are greedily accumulated until +/// reaching `max_merge_factor` or `target_split_size_bytes`. +#[derive(Debug, Clone)] +pub struct ConstWriteAmplificationParquetMergePolicy { + config: ParquetMergePolicyConfig, +} + +impl ConstWriteAmplificationParquetMergePolicy { + /// Create a new policy with the given configuration. + pub fn new(config: ParquetMergePolicyConfig) -> Self { + Self { config } + } + + /// Returns the merge factor range for normal operations. + fn merge_factor_range(&self) -> RangeInclusive { + self.config.merge_factor..=self.config.max_merge_factor + } + + /// Check if a split is mature based on its metadata. + fn is_split_mature(&self, split: &ParquetSplitMetadata) -> bool { + if split.num_merge_ops >= self.config.max_merge_ops { + return true; + } + if split.size_bytes >= self.config.target_split_size_bytes { + return true; + } + let elapsed = split.created_at.elapsed().unwrap_or(Duration::ZERO); + elapsed >= self.config.maturation_period + } + + /// Try to build a single merge operation from the front of a sorted split + /// list. Returns `None` if there aren't enough splits to merge. + /// + /// Assumes `splits` are sorted by creation time (oldest first). + fn single_merge_operation( + &self, + splits: &mut Vec, + merge_factor_range: RangeInclusive, + ) -> Option { + let mut num_splits_in_merge = 0; + let mut total_bytes_in_merge: u64 = 0; + + for split in splits.iter().take(*merge_factor_range.end()) { + total_bytes_in_merge += split.size_bytes; + num_splits_in_merge += 1; + if total_bytes_in_merge >= self.config.target_split_size_bytes { + break; + } + } + + // Not enough to merge: fewer than merge_factor and total bytes under target. + if total_bytes_in_merge < self.config.target_split_size_bytes + && num_splits_in_merge < *merge_factor_range.start() + { + return None; + } + + debug_assert!( + num_splits_in_merge >= 2, + "merge operations must contain at least 2 splits" + ); + let splits_in_merge: Vec = + splits.drain(0..num_splits_in_merge).collect(); + Some(ParquetMergeOperation::new(splits_in_merge)) + } + + /// Build all merge operations within a single `num_merge_ops` level. + fn merge_operations_within_level( + &self, + splits: &mut Vec, + ) -> Vec { + sort_splits_oldest_first(splits); + + let mut operations = Vec::new(); + while let Some(op) = self.single_merge_operation(splits, self.merge_factor_range()) { + operations.push(op); + } + operations + } +} + +/// Sort splits by creation time (oldest first), then by split ID for +/// determinism. Uses seconds-since-epoch to avoid `SystemTime` comparison +/// issues across platforms. +fn sort_splits_oldest_first(splits: &mut [ParquetSplitMetadata]) { + splits.sort_by(|a, b| { + created_at_secs(a) + .cmp(&created_at_secs(b)) + .then_with(|| a.split_id.as_str().cmp(b.split_id.as_str())) + }); +} + +/// Sort splits by creation time (newest first), then by split ID. +fn sort_splits_newest_first(splits: &mut [ParquetSplitMetadata]) { + splits.sort_by(|a, b| { + created_at_secs(a) + .cmp(&created_at_secs(b)) + .reverse() + .then_with(|| a.split_id.as_str().cmp(b.split_id.as_str())) + }); +} + +/// Extract seconds since epoch from a split's `created_at` field. +fn created_at_secs(split: &ParquetSplitMetadata) -> u64 { + split + .created_at + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_secs() +} + +impl ParquetMergePolicy for ConstWriteAmplificationParquetMergePolicy { + fn operations(&self, splits: &mut Vec) -> Vec { + let mut group_by_level: HashMap> = HashMap::new(); + let mut mature_splits = Vec::new(); + + for split in splits.drain(..) { + if self.is_split_mature(&split) { + mature_splits.push(split); + } else { + group_by_level + .entry(split.num_merge_ops) + .or_default() + .push(split); + } + } + + // Mature splits go back into the vec untouched. + splits.extend(mature_splits); + + let mut merge_operations = Vec::new(); + for level_splits in group_by_level.values_mut() { + let ops = self.merge_operations_within_level(level_splits); + merge_operations.extend(ops); + // Un-merged splits at this level go back into the vec. + splits.append(level_splits); + } + + merge_operations + } + + fn finalize_operations( + &self, + splits: &mut Vec, + ) -> Vec { + if self.config.max_finalize_merge_operations == 0 { + return Vec::new(); + } + + // Separate mature splits — don't touch them. + let mut group_by_level: HashMap> = HashMap::new(); + let mut mature_splits = Vec::new(); + for split in splits.drain(..) { + if self.is_split_mature(&split) { + mature_splits.push(split); + } else { + group_by_level + .entry(split.num_merge_ops) + .or_default() + .push(split); + } + } + splits.extend(mature_splits); + + let min_merge_factor = FINALIZE_MIN_MERGE_FACTOR.min(self.config.max_merge_factor); + let merge_factor_range = min_merge_factor..=self.config.max_merge_factor; + + // Within each level, sort youngest/smallest first. If we limit the + // number of finalize merges, we focus on the young/small ones for + // maximum compaction. + let mut merge_operations = Vec::new(); + for level_splits in group_by_level.values_mut() { + sort_splits_newest_first(level_splits); + while merge_operations.len() < self.config.max_finalize_merge_operations { + if let Some(op) = + self.single_merge_operation(level_splits, merge_factor_range.clone()) + { + merge_operations.push(op); + } else { + break; + } + } + // Un-merged splits at this level go back. + splits.append(level_splits); + } + + let num_splits_per_op: Vec = + merge_operations.iter().map(|op| op.splits.len()).collect(); + let bytes_per_op: Vec = merge_operations + .iter() + .map(|op| op.total_size_bytes()) + .collect(); + info!( + num_splits_per_op = ?num_splits_per_op, + bytes_per_op = ?bytes_per_op, + "finalize merge operations" + ); + + merge_operations + } + + fn split_maturity(&self, size_bytes: u64, num_merge_ops: u32) -> ParquetSplitMaturity { + if num_merge_ops >= self.config.max_merge_ops { + return ParquetSplitMaturity::Mature; + } + if size_bytes >= self.config.target_split_size_bytes { + return ParquetSplitMaturity::Mature; + } + ParquetSplitMaturity::Immature { + maturation_period: self.config.maturation_period, + } + } + + #[cfg(test)] + fn check_is_valid( + &self, + merge_op: &ParquetMergeOperation, + _remaining_splits: &[ParquetSplitMetadata], + ) { + use std::collections::HashSet; + + // Must not exceed max_merge_factor. + assert!(merge_op.splits.len() <= self.config.max_merge_factor); + + // If fewer than merge_factor, total bytes must have reached target. + if merge_op.splits.len() < self.config.merge_factor { + let total_bytes: u64 = merge_op.splits.iter().map(|s| s.size_bytes).sum(); + let last_split_bytes = merge_op.splits.last().unwrap().size_bytes; + assert!( + total_bytes >= self.config.target_split_size_bytes, + "under-factor merge must be size-triggered: total_bytes={total_bytes}, \ + target={target}", + target = self.config.target_split_size_bytes, + ); + assert!( + total_bytes - last_split_bytes < self.config.target_split_size_bytes, + "should not have accumulated beyond target before adding last split" + ); + } + + // MC-LEVEL: all splits must have the same num_merge_ops. + let levels: HashSet = merge_op.splits.iter().map(|s| s.num_merge_ops).collect(); + assert_eq!( + levels.len(), + 1, + "all splits in a merge must be at the same level" + ); + + // MC-WA: no split should have reached max_merge_ops. + let level = *levels.iter().next().unwrap(); + assert!( + level < self.config.max_merge_ops, + "mature splits (level {level} >= max_merge_ops {}) should not be merged", + self.config.max_merge_ops + ); + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::time::{Duration, SystemTime}; + + use proptest::prelude::*; + use rand::seq::SliceRandom; + + use super::*; + use crate::split::{ParquetSplitId, ParquetSplitKind, TimeRange}; + + /// Create a policy suitable for testing (small values for fast iteration). + fn test_policy() -> ConstWriteAmplificationParquetMergePolicy { + let config = ParquetMergePolicyConfig { + merge_factor: 3, + max_merge_factor: 5, + max_merge_ops: 3, + target_split_size_bytes: 256 * 1024 * 1024, // 256 MiB + maturation_period: Duration::from_secs(3600), + max_finalize_merge_operations: 3, + }; + ConstWriteAmplificationParquetMergePolicy::new(config) + } + + /// Build a test split with explicit parameters. + fn make_split( + split_id: &str, + size_bytes: u64, + num_merge_ops: u32, + created_at: SystemTime, + ) -> ParquetSplitMetadata { + ParquetSplitMetadata { + kind: ParquetSplitKind::Metrics, + split_id: ParquetSplitId::new(split_id), + index_uid: "test-index:001".to_string(), + partition_id: 0, + time_range: TimeRange::new(1000, 2000), + num_rows: 1000, + size_bytes, + metric_names: HashSet::new(), + low_cardinality_tags: Default::default(), + high_cardinality_tag_keys: Default::default(), + created_at, + parquet_file: format!("{split_id}.parquet"), + window: Some(0..3600), + sort_fields: "metric_name|host|timestamp/V2".to_string(), + num_merge_ops, + row_keys_proto: None, + zonemap_regexes: Default::default(), + } + } + + fn now() -> SystemTime { + SystemTime::now() + } + + fn secs_ago(secs: u64) -> SystemTime { + SystemTime::now() - Duration::from_secs(secs) + } + + // ── Unit Tests ────────────────────────────────────────────────── + + #[test] + fn test_empty_input() { + let policy = test_policy(); + let mut splits = Vec::new(); + let ops = policy.operations(&mut splits); + assert!(ops.is_empty()); + assert!(splits.is_empty()); + } + + #[test] + fn test_single_split() { + let policy = test_policy(); + let mut splits = vec![make_split("s0", 1_000_000, 0, now())]; + let ops = policy.operations(&mut splits); + assert!(ops.is_empty()); + assert_eq!(splits.len(), 1); + } + + #[test] + fn test_two_splits_below_merge_factor() { + let policy = test_policy(); // merge_factor = 3 + let mut splits = vec![ + make_split("s0", 1_000_000, 0, now()), + make_split("s1", 1_000_000, 0, now()), + ]; + let ops = policy.operations(&mut splits); + assert!(ops.is_empty(), "2 splits < merge_factor=3 should not merge"); + assert_eq!(splits.len(), 2); + } + + #[test] + fn test_all_mature_by_merge_ops() { + let policy = test_policy(); // max_merge_ops = 3 + let mut splits = vec![ + make_split("s0", 1_000_000, 3, now()), + make_split("s1", 1_000_000, 3, now()), + make_split("s2", 1_000_000, 3, now()), + ]; + let ops = policy.operations(&mut splits); + assert!(ops.is_empty(), "mature splits should not be merged"); + assert_eq!(splits.len(), 3); + } + + #[test] + fn test_all_mature_by_size() { + let policy = test_policy(); // target = 256 MiB + let big = 300 * 1024 * 1024; // 300 MiB > target + let mut splits = vec![ + make_split("s0", big, 0, now()), + make_split("s1", big, 0, now()), + make_split("s2", big, 0, now()), + ]; + let ops = policy.operations(&mut splits); + assert!(ops.is_empty(), "size-mature splits should not be merged"); + assert_eq!(splits.len(), 3); + } + + #[test] + fn test_exactly_merge_factor() { + let policy = test_policy(); // merge_factor = 3 + let mut splits = vec![ + make_split("s0", 1_000_000, 0, secs_ago(30)), + make_split("s1", 1_000_000, 0, secs_ago(20)), + make_split("s2", 1_000_000, 0, secs_ago(10)), + ]; + let ops = policy.operations(&mut splits); + assert_eq!(ops.len(), 1); + assert_eq!(ops[0].splits.len(), 3); + assert!(splits.is_empty(), "all splits consumed"); + } + + #[test] + fn test_more_than_max_merge_factor() { + let policy = test_policy(); // max_merge_factor = 5, merge_factor = 3 + let mut splits: Vec = (0..8) + .map(|i| make_split(&format!("s{i}"), 1_000_000, 0, secs_ago(80 - i * 10))) + .collect(); + let ops = policy.operations(&mut splits); + // First op takes max_merge_factor=5, then 3 remain >= merge_factor=3 -> second op. + assert_eq!(ops.len(), 2); + assert_eq!(ops[0].splits.len(), 5); + assert_eq!(ops[1].splits.len(), 3); + assert!(splits.is_empty()); + } + + #[test] + fn test_first_op_capped_at_max_merge_factor() { + let policy = test_policy(); // merge_factor = 3, max_merge_factor = 5 + let mut splits: Vec = (0..6) + .map(|i| make_split(&format!("s{i}"), 1_000_000, 0, secs_ago(60 - i * 10))) + .collect(); + let ops = policy.operations(&mut splits); + // First op takes max_merge_factor=5, then 1 remains < merge_factor=3. + assert_eq!(ops.len(), 1); + assert_eq!(ops[0].splits.len(), 5); + assert_eq!(splits.len(), 1, "1 leftover below merge_factor"); + } + + #[test] + fn test_two_operations_at_same_level() { + let policy = test_policy(); // merge_factor = 3, max_merge_factor = 5 + let mut splits: Vec = (0..10) + .map(|i| make_split(&format!("s{i:02}"), 1_000_000, 0, secs_ago(100 - i * 10))) + .collect(); + let ops = policy.operations(&mut splits); + assert_eq!( + ops.len(), + 2, + "10 splits should produce 2 ops (5+5 or 5+3+leftover)" + ); + let total_consumed: usize = ops.iter().map(|op| op.splits.len()).sum(); + assert_eq!(total_consumed + splits.len(), 10); + } + + #[test] + fn test_mixed_levels() { + let policy = test_policy(); // merge_factor = 3 + let mut splits = vec![ + // Level 0: 3 splits + make_split("l0_s0", 1_000_000, 0, secs_ago(30)), + make_split("l0_s1", 1_000_000, 0, secs_ago(20)), + make_split("l0_s2", 1_000_000, 0, secs_ago(10)), + // Level 1: 3 splits + make_split("l1_s0", 1_000_000, 1, secs_ago(30)), + make_split("l1_s1", 1_000_000, 1, secs_ago(20)), + make_split("l1_s2", 1_000_000, 1, secs_ago(10)), + ]; + let ops = policy.operations(&mut splits); + assert_eq!(ops.len(), 2, "one op per level"); + assert!(splits.is_empty()); + + // Verify each op has homogeneous levels. + for op in &ops { + let levels: HashSet = op.splits.iter().map(|s| s.num_merge_ops).collect(); + assert_eq!(levels.len(), 1); + } + } + + #[test] + fn test_size_triggered_merge() { + let policy = test_policy(); // merge_factor = 3, target = 256 MiB + let big = 100 * 1024 * 1024; // 100 MiB each + let mut splits = vec![ + make_split("s0", big, 0, secs_ago(30)), + make_split("s1", big, 0, secs_ago(20)), + make_split("s2", big, 0, secs_ago(10)), // total 300 MiB > target + ]; + let ops = policy.operations(&mut splits); + // 3 splits >= merge_factor=3, so this merges regardless of size. + // But also total_bytes >= target, so size-triggered too. + assert_eq!(ops.len(), 1); + assert_eq!(ops[0].splits.len(), 3); + } + + #[test] + fn test_size_triggered_below_merge_factor() { + // Size trigger can cause a merge with fewer than merge_factor splits. + let config = ParquetMergePolicyConfig { + merge_factor: 10, + max_merge_factor: 12, + max_merge_ops: 4, + target_split_size_bytes: 100 * 1024 * 1024, // 100 MiB + maturation_period: Duration::from_secs(48 * 3600), + max_finalize_merge_operations: 0, + }; + let policy = ConstWriteAmplificationParquetMergePolicy::new(config); + + let big = 60 * 1024 * 1024; // 60 MiB each + let mut splits = vec![ + make_split("s0", big, 0, secs_ago(30)), + make_split("s1", big, 0, secs_ago(20)), + // 2 splits, total 120 MiB > target 100 MiB + ]; + let ops = policy.operations(&mut splits); + assert_eq!(ops.len(), 1, "size-triggered merge with 2 splits"); + assert_eq!(ops[0].splits.len(), 2); + } + + #[test] + fn test_oldest_first_ordering() { + let policy = test_policy(); + let mut splits = vec![ + make_split("newest", 1_000_000, 0, secs_ago(10)), + make_split("middle", 1_000_000, 0, secs_ago(50)), + make_split("oldest", 1_000_000, 0, secs_ago(100)), + ]; + let ops = policy.operations(&mut splits); + assert_eq!(ops.len(), 1); + let ids: Vec<&str> = ops[0].splits.iter().map(|s| s.split_id.as_str()).collect(); + assert_eq!(ids, &["oldest", "middle", "newest"]); + } + + #[test] + fn test_split_maturity_by_merge_ops() { + let policy = test_policy(); // max_merge_ops = 3 + assert_eq!( + policy.split_maturity(1_000_000, 3), + ParquetSplitMaturity::Mature, + ); + assert_eq!( + policy.split_maturity(1_000_000, 4), + ParquetSplitMaturity::Mature, + ); + } + + #[test] + fn test_split_maturity_by_size() { + let policy = test_policy(); // target = 256 MiB + assert_eq!( + policy.split_maturity(300 * 1024 * 1024, 0), + ParquetSplitMaturity::Mature, + ); + } + + #[test] + fn test_split_maturity_immature() { + let policy = test_policy(); + assert_eq!( + policy.split_maturity(1_000_000, 0), + ParquetSplitMaturity::Immature { + maturation_period: Duration::from_secs(3600) + }, + ); + } + + #[test] + fn test_finalize_merges_below_merge_factor() { + let policy = test_policy(); // merge_factor=3, finalize min=2 + let mut splits = vec![ + make_split("s0", 1_000_000, 0, now()), + make_split("s1", 1_000_000, 0, now()), + ]; + // Normal operations should not merge 2 splits (below merge_factor=3). + let ops = policy.operations(&mut splits); + assert!(ops.is_empty()); + assert_eq!(splits.len(), 2); + + // But finalize should merge them (min_merge_factor=2). + let ops = policy.finalize_operations(&mut splits); + assert_eq!(ops.len(), 1); + assert_eq!(ops[0].splits.len(), 2); + assert!(splits.is_empty()); + } + + #[test] + fn test_finalize_disabled() { + let config = ParquetMergePolicyConfig { + max_finalize_merge_operations: 0, + ..test_policy().config.clone() + }; + let policy = ConstWriteAmplificationParquetMergePolicy::new(config); + let mut splits = vec![ + make_split("s0", 1_000_000, 0, now()), + make_split("s1", 1_000_000, 0, now()), + ]; + let ops = policy.finalize_operations(&mut splits); + assert!(ops.is_empty()); + assert_eq!(splits.len(), 2); + } + + #[test] + fn test_finalize_respects_max_operations() { + let config = ParquetMergePolicyConfig { + merge_factor: 3, + max_merge_factor: 3, + max_finalize_merge_operations: 1, + ..test_policy().config.clone() + }; + let policy = ConstWriteAmplificationParquetMergePolicy::new(config); + let mut splits: Vec = (0..6) + .map(|i| make_split(&format!("s{i}"), 1_000_000, 0, secs_ago(i * 10))) + .collect(); + + let ops = policy.finalize_operations(&mut splits); + assert_eq!(ops.len(), 1, "limited to max_finalize_merge_operations=1"); + assert_eq!(splits.len(), 3); + } + + #[test] + fn test_mature_splits_excluded_from_finalize() { + let policy = test_policy(); // max_merge_ops = 3 + let mut splits = vec![ + make_split("mature1", 1_000_000, 3, now()), + make_split("mature2", 1_000_000, 3, now()), + make_split("young1", 1_000_000, 0, now()), + make_split("young2", 1_000_000, 0, now()), + ]; + let ops = policy.finalize_operations(&mut splits); + assert_eq!(ops.len(), 1); + assert_eq!(ops[0].splits.len(), 2); + // Only young splits should be in the merge. + for split in &ops[0].splits { + assert!(split.num_merge_ops < 3); + } + // Mature splits remain. + assert_eq!(splits.len(), 2); + assert!(splits.iter().all(|s| s.num_merge_ops >= 3)); + } + + #[test] + fn test_finalize_respects_mc_level_invariant() { + // Bug: finalize_operations() did not group by num_merge_ops level, + // so splits from different levels could be merged together. This + // violates MC-LEVEL and causes the merged output to be stamped with + // max(num_merge_ops) + 1, prematurely maturing lower-level data. + let policy = test_policy(); // merge_factor=3, max_merge_ops=3 + let mut splits = vec![ + // Two level-0 splits + make_split("l0_a", 1_000_000, 0, now()), + make_split("l0_b", 1_000_000, 0, now()), + // One level-1 split + make_split("l1_a", 1_000_000, 1, now()), + ]; + let ops = policy.finalize_operations(&mut splits); + + // MC-LEVEL: every operation must contain splits from exactly one level. + for op in &ops { + let levels: HashSet = op.splits.iter().map(|s| s.num_merge_ops).collect(); + assert_eq!( + levels.len(), + 1, + "finalize produced a merge mixing levels: {:?}", + levels + ); + } + } + + // ── Property Tests ────────────────────────────────────────────── + + prop_compose! { + fn parquet_split_strategy()( + num_merge_ops in 0u32..5u32, + size_bytes in 1u64..500_000_000u64, + num_rows in 1u64..10_000_000u64, + created_secs_ago in 0u64..200u64, + split_ord in 0u32..1_000_000u32, + ) -> ParquetSplitMetadata { + let split_id = format!("prop_split_{split_ord:06}_{num_merge_ops}"); + let created_at = SystemTime::now() - Duration::from_secs(created_secs_ago); + ParquetSplitMetadata { + kind: ParquetSplitKind::Metrics, + split_id: ParquetSplitId::new(split_id), + index_uid: "test:prop".to_string(), + partition_id: 0, + time_range: TimeRange::new(1000, 2000), + num_rows, + size_bytes, + metric_names: HashSet::new(), + low_cardinality_tags: Default::default(), + high_cardinality_tag_keys: Default::default(), + created_at, + parquet_file: String::new(), + window: Some(0..3600), + sort_fields: "metric_name|host|timestamp/V2".to_string(), + num_merge_ops, + row_keys_proto: None, + zonemap_regexes: Default::default(), + } + } + } + + /// Compute a checksum for a set of operations that is independent of + /// operation order (but depends on which splits are in which op). + fn operations_checksum(ops: &[ParquetMergeOperation]) -> u64 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut checksum = 0u64; + for op in ops { + let mut op_hash = 0u64; + for split in &op.splits { + let mut h = DefaultHasher::new(); + split.split_id.as_str().hash(&mut h); + op_hash ^= h.finish(); + } + let mut h = DefaultHasher::new(); + h.write_u64(op_hash); + checksum ^= h.finish(); + } + checksum + } + + proptest! { + #[test] + fn proptest_merge_policy_invariants( + mut splits in prop::collection::vec(parquet_split_strategy(), 0..80) + ) { + let policy = test_policy(); + + let original_total_bytes: u64 = splits.iter().map(|s| s.size_bytes).sum(); + let original_count = splits.len(); + + // MC-IDEMPOTENT / MC-ORDER-INDEPENDENT: shuffle produces same result. + let mut cloned = splits.clone(); + cloned.shuffle(&mut rand::rng()); + + let ops = policy.operations(&mut splits); + let ops_shuffled = policy.operations(&mut cloned); + + prop_assert_eq!( + operations_checksum(&ops), + operations_checksum(&ops_shuffled), + "policy must be order-independent" + ); + + // MC-CONSERVE: total bytes in ops + remaining = original. + let ops_bytes: u64 = ops.iter() + .flat_map(|op| op.splits.iter()) + .map(|s| s.size_bytes) + .sum(); + let remaining_bytes: u64 = splits.iter().map(|s| s.size_bytes).sum(); + prop_assert_eq!( + ops_bytes + remaining_bytes, + original_total_bytes, + "byte conservation violated" + ); + + // Count conservation. + let ops_count: usize = ops.iter().map(|op| op.splits.len()).sum(); + prop_assert_eq!( + ops_count + splits.len(), + original_count, + "split count conservation violated" + ); + + for op in &ops { + // Each op has >= 2 splits. + prop_assert!( + op.splits.len() >= 2, + "merge op must have >= 2 splits, got {}", + op.splits.len() + ); + + // MC-LEVEL: all splits in op have same num_merge_ops. + let levels: HashSet = op.splits.iter().map(|s| s.num_merge_ops).collect(); + prop_assert_eq!( + levels.len(), 1, + "all splits in merge must be same level, got {:?}", levels + ); + + // MC-WA: no mature splits in ops. + for split in &op.splits { + prop_assert!( + !policy.is_split_mature(split), + "mature split {} should not be in merge", + split.split_id + ); + } + + // Validate policy-specific invariants. + policy.check_is_valid(op, &splits); + } + } + } + + proptest! { + #[test] + fn proptest_finalize_respects_mc_level( + mut splits in prop::collection::vec(parquet_split_strategy(), 0..80) + ) { + let policy = test_policy(); + let original_count = splits.len(); + let original_total_bytes: u64 = splits.iter().map(|s| s.size_bytes).sum(); + + let ops = policy.finalize_operations(&mut splits); + + // MC-CONSERVE for finalize. + let ops_bytes: u64 = ops.iter() + .flat_map(|op| op.splits.iter()) + .map(|s| s.size_bytes) + .sum(); + let remaining_bytes: u64 = splits.iter().map(|s| s.size_bytes).sum(); + prop_assert_eq!( + ops_bytes + remaining_bytes, + original_total_bytes, + "finalize byte conservation violated" + ); + + let ops_count: usize = ops.iter().map(|op| op.splits.len()).sum(); + prop_assert_eq!( + ops_count + splits.len(), + original_count, + "finalize split count conservation violated" + ); + + for op in &ops { + prop_assert!( + op.splits.len() >= 2, + "finalize merge op must have >= 2 splits" + ); + + // MC-LEVEL: all splits in op have same num_merge_ops. + let levels: HashSet = op.splits.iter().map(|s| s.num_merge_ops).collect(); + prop_assert_eq!( + levels.len(), 1, + "finalize mixed levels: {:?}", levels + ); + + // MC-WA: no mature splits. + for split in &op.splits { + prop_assert!( + !policy.is_split_mature(split), + "mature split in finalize merge" + ); + } + } + } + } + + // ── Simulation Test ───────────────────────────────────────────── + + #[test] + fn test_simulation_convergence() { + let config = ParquetMergePolicyConfig { + merge_factor: 3, + max_merge_factor: 5, + max_merge_ops: 3, + target_split_size_bytes: 256 * 1024 * 1024, + maturation_period: Duration::from_secs(999_999), // never time-expire + max_finalize_merge_operations: 0, + }; + let policy = ConstWriteAmplificationParquetMergePolicy::new(config); + + // Simulate 200 ingested splits of 10 MiB each. + let mut all_splits: Vec = (0..200) + .map(|i| { + make_split( + &format!("ingest_{i:03}"), + 10 * 1024 * 1024, + 0, + secs_ago(200 - i), + ) + }) + .collect(); + + let max_rounds = 20; + for round in 0..max_rounds { + let ops = policy.operations(&mut all_splits); + if ops.is_empty() { + break; + } + + // Simulate merges: create output splits with incremented merge ops. + for op in &ops { + let total_bytes: u64 = op.splits.iter().map(|s| s.size_bytes).sum(); + let total_rows: u64 = op.splits.iter().map(|s| s.num_rows).sum(); + let max_merge_ops = op.splits.iter().map(|s| s.num_merge_ops).max().unwrap(); + let merged = make_split( + &format!("merged_r{round}_{}", op.merge_split_id), + total_bytes, + max_merge_ops + 1, + now(), + ); + // Override the auto-generated fields. + let mut merged = merged; + merged.num_rows = total_rows; + all_splits.push(merged); + } + } + + // Verify bounded write amplification. + for split in &all_splits { + assert!( + split.num_merge_ops <= policy.config.max_merge_ops, + "split {} has num_merge_ops={}, exceeds max={}", + split.split_id, + split.num_merge_ops, + policy.config.max_merge_ops, + ); + } + + // Verify convergence: at each non-mature level, fewer than merge_factor splits. + let mut level_counts: HashMap = HashMap::new(); + for split in &all_splits { + if split.num_merge_ops < policy.config.max_merge_ops { + *level_counts.entry(split.num_merge_ops).or_default() += 1; + } + } + for (level, count) in &level_counts { + assert!( + *count < policy.config.merge_factor, + "level {level} has {count} splits, should be < merge_factor={}", + policy.config.merge_factor, + ); + } + } +} diff --git a/quickwit/quickwit-parquet-engine/src/merge/policy/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/policy/mod.rs new file mode 100644 index 00000000000..84a2f8c662c --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/merge/policy/mod.rs @@ -0,0 +1,254 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Parquet merge policy for compaction. +//! +//! Decides which Parquet splits within a compaction scope should be merged. +//! This is a pure computational module with no I/O or actor dependencies. +//! +//! The trait mirrors the shape of `quickwit_indexing::MergePolicy` but operates +//! on [`ParquetSplitMetadata`] instead of Tantivy's `SplitMetadata`. When +//! Nadav's `quickwit-compaction` crate adds Parquet support, the +//! `CompactionPlanner` can call into this trait directly. + +pub mod const_write_amplification; +pub mod scope; + +use std::fmt; +use std::time::Duration; + +pub use const_write_amplification::{ + ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig, +}; +pub use scope::{CompactionScope, group_by_compaction_scope}; + +use crate::split::{ParquetSplitId, ParquetSplitKind, ParquetSplitMetadata}; + +/// A merge operation to be executed: merge the input splits into one output. +#[derive(Debug, Clone)] +pub struct ParquetMergeOperation { + /// New split ID for the merged output. + pub merge_split_id: ParquetSplitId, + /// The input splits being merged (all from the same compaction scope and + /// `num_merge_ops` level). + pub splits: Vec, +} + +impl ParquetMergeOperation { + /// Create a new merge operation consuming the given splits. + /// + /// Generates a fresh split ID for the merged output. The `kind` is inferred + /// from the first split (all splits in a merge share the same kind). + /// + /// # Invariant checks (debug builds panic, all builds emit metrics) + /// + /// - **MP-1**: all splits share the same `num_merge_ops` level + /// - **MP-2**: at least 2 input splits + /// - **MP-3**: all splits share the same compaction scope (sort_fields + window) + pub fn new(splits: Vec) -> Self { + use quickwit_dst::check_invariant; + use quickwit_dst::invariants::{InvariantId, merge_policy}; + + // MP-2: minimum split count. + check_invariant!( + InvariantId::MP2, + merge_policy::has_minimum_splits(splits.len()), + ": got {} splits", + splits.len() + ); + + // MP-1: level homogeneity. + let levels: Vec = splits.iter().map(|s| s.num_merge_ops).collect(); + check_invariant!( + InvariantId::MP1, + merge_policy::all_same_merge_level(&levels), + ": levels={:?}", + levels + ); + + // MP-3: scope homogeneity (sort_fields + window). + let sort_fields_vec: Vec<&str> = splits.iter().map(|s| s.sort_fields.as_str()).collect(); + let windows: Vec<(i64, i64)> = splits + .iter() + .map(|s| match &s.window { + Some(w) => (w.start, w.end - w.start), + None => (0, 0), + }) + .collect(); + check_invariant!( + InvariantId::MP3, + merge_policy::all_same_compaction_scope(&sort_fields_vec, &windows) + ); + + let kind = splits + .first() + .map(|s| s.kind) + .unwrap_or(ParquetSplitKind::Metrics); + Self { + merge_split_id: ParquetSplitId::generate(kind), + splits, + } + } + + /// Returns the input splits as a slice. + pub fn splits_as_slice(&self) -> &[ParquetSplitMetadata] { + &self.splits + } + + /// Total size in bytes across all input splits. + pub fn total_size_bytes(&self) -> u64 { + self.splits.iter().map(|s| s.size_bytes).sum() + } + + /// Total number of rows across all input splits. + pub fn total_num_rows(&self) -> u64 { + self.splits.iter().map(|s| s.num_rows).sum() + } +} + +/// Whether a split is eligible for further merging. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ParquetSplitMaturity { + /// Split will not undergo further merges. + Mature, + /// Split can still be merged. After `maturation_period` elapses from + /// creation, the split becomes mature regardless of size. + Immature { maturation_period: Duration }, +} + +/// Decides which Parquet splits to merge within a single compaction scope. +/// +/// # Contract +/// +/// - The caller passes all immature splits for one compaction scope in `splits`. +/// - `operations()` drains splits that participate in merges from the vec. +/// - Remaining splits (not in any operation) stay in the vec. +/// - Invariant: `splits_before = splits_in_ops + splits_remaining`. +/// +/// This contract mirrors `quickwit_indexing::MergePolicy` for consistency. +pub trait ParquetMergePolicy: Send + Sync + fmt::Debug { + /// Returns merge operations for the given splits. + fn operations(&self, splits: &mut Vec) -> Vec; + + /// Like `operations` but with a lower merge factor for finalization of + /// cold windows. Called by the planner when a window has received no new + /// splits for a configured duration. + fn finalize_operations( + &self, + _splits: &mut Vec, + ) -> Vec { + Vec::new() + } + + /// Determines whether a split is mature (no further merges needed). + fn split_maturity(&self, size_bytes: u64, num_merge_ops: u32) -> ParquetSplitMaturity; + + /// Validate invariants of a merge operation (for property testing). + #[cfg(test)] + fn check_is_valid( + &self, + _merge_op: &ParquetMergeOperation, + _remaining_splits: &[ParquetSplitMetadata], + ) { + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::time::SystemTime; + + use super::*; + use crate::split::{ParquetSplitId, ParquetSplitKind, TimeRange}; + + fn make_split( + split_id: &str, + num_merge_ops: u32, + sort_fields: &str, + window: Option<(i64, i64)>, + ) -> ParquetSplitMetadata { + ParquetSplitMetadata { + kind: ParquetSplitKind::Metrics, + split_id: ParquetSplitId::new(split_id), + index_uid: "test:001".to_string(), + partition_id: 0, + time_range: TimeRange::new(1000, 2000), + num_rows: 100, + size_bytes: 1_000_000, + metric_names: HashSet::new(), + low_cardinality_tags: Default::default(), + high_cardinality_tag_keys: Default::default(), + created_at: SystemTime::now(), + parquet_file: format!("{split_id}.parquet"), + window: window.map(|(start, dur)| start..start + dur), + sort_fields: sort_fields.to_string(), + num_merge_ops, + row_keys_proto: None, + zonemap_regexes: Default::default(), + } + } + + #[test] + #[should_panic(expected = "MP-1 violated")] + fn test_mp1_mixed_merge_levels_panics() { + // MP-1: constructing a merge op with splits at different levels + // must panic in debug builds. + let splits = vec![ + make_split("l0", 0, "a|ts/V2", Some((0, 3600))), + make_split("l1", 1, "a|ts/V2", Some((0, 3600))), + ]; + ParquetMergeOperation::new(splits); + } + + #[test] + #[should_panic(expected = "MP-2 violated")] + fn test_mp2_single_split_panics() { + // MP-2: constructing a merge op with < 2 splits must panic. + let splits = vec![make_split("s0", 0, "a|ts/V2", Some((0, 3600)))]; + ParquetMergeOperation::new(splits); + } + + #[test] + #[should_panic(expected = "MP-3 violated")] + fn test_mp3_mixed_sort_fields_panics() { + // MP-3: constructing a merge op with different sort_fields must panic. + let splits = vec![ + make_split("s0", 0, "a|b|ts/V2", Some((0, 3600))), + make_split("s1", 0, "a|ts/V2", Some((0, 3600))), + ]; + ParquetMergeOperation::new(splits); + } + + #[test] + #[should_panic(expected = "MP-3 violated")] + fn test_mp3_mixed_window_duration_panics() { + // MP-3: same start but different duration must panic. + let splits = vec![ + make_split("s0", 0, "a|ts/V2", Some((0, 900))), + make_split("s1", 0, "a|ts/V2", Some((0, 1800))), + ]; + ParquetMergeOperation::new(splits); + } + + #[test] + fn test_valid_merge_operation_succeeds() { + // A well-formed merge op should not panic. + let splits = vec![ + make_split("s0", 0, "a|ts/V2", Some((0, 3600))), + make_split("s1", 0, "a|ts/V2", Some((0, 3600))), + ]; + let op = ParquetMergeOperation::new(splits); + assert_eq!(op.splits.len(), 2); + } +} diff --git a/quickwit/quickwit-parquet-engine/src/merge/policy/scope.rs b/quickwit/quickwit-parquet-engine/src/merge/policy/scope.rs new file mode 100644 index 00000000000..1824bee2a4a --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/merge/policy/scope.rs @@ -0,0 +1,316 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Compaction scope grouping for Parquet splits. +//! +//! Splits are eligible for merging only when they share the same compaction +//! scope. A scope captures the key dimensions that must match: index, +//! partition, sort schema, and time window. +//! +//! Future extensions (Phase 3+) will add `source_id` to the scope when +//! that field is populated on `ParquetSplitMetadata`. + +use std::collections::HashMap; + +use crate::split::ParquetSplitMetadata; + +/// The compaction scope key. +/// +/// Splits sharing the same scope are candidates for merging. +/// Splits with different scopes must **never** be merged together. +/// +/// # Invariant MC-SCOPE +/// +/// All splits in a [`super::ParquetMergeOperation`] must share the same +/// `CompactionScope`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CompactionScope { + /// Index unique identifier (includes incarnation). + pub index_uid: String, + /// Partition ID computed from the index's routing expression. + /// Splits with different partition IDs must not be merged. + pub partition_id: u64, + /// Husky-style sort schema string (e.g., "metric_name|host|timestamp/V2"). + pub sort_fields: String, + /// Window start in epoch seconds. + pub window_start_secs: i64, + /// Window duration in seconds. The merge engine requires all inputs to + /// agree on both start and duration, so a config change that alters the + /// window size must not cause old and new splits to be merged. + pub window_duration_secs: i64, +} + +impl CompactionScope { + /// Extract compaction scope from split metadata. + /// + /// Returns `None` if the split has no window (pre-Phase-31 splits that + /// cannot participate in compaction). + pub fn from_split(split: &ParquetSplitMetadata) -> Option { + let window = split.window.as_ref()?; + Some(Self { + index_uid: split.index_uid.clone(), + partition_id: split.partition_id, + sort_fields: split.sort_fields.clone(), + window_start_secs: window.start, + window_duration_secs: window.end - window.start, + }) + } +} + +/// Group splits by compaction scope. +/// +/// Returns only groups with >= 2 splits, since single-split groups cannot +/// produce a merge operation. Splits without a window (pre-Phase-31) are +/// silently excluded. +pub fn group_by_compaction_scope( + splits: Vec, +) -> HashMap> { + let mut groups: HashMap> = HashMap::new(); + for split in splits { + if let Some(scope) = CompactionScope::from_split(&split) { + groups.entry(scope).or_default().push(split); + } + } + groups.retain(|_scope, splits| splits.len() >= 2); + groups +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::split::{ParquetSplitId, ParquetSplitKind, ParquetSplitMetadata, TimeRange}; + + /// Build a test split with the given scope fields. + fn test_split( + index_uid: &str, + sort_fields: &str, + window_start: Option, + window_duration: u32, + ) -> ParquetSplitMetadata { + test_split_with_partition(index_uid, 0, sort_fields, window_start, window_duration) + } + + fn test_split_with_partition( + index_uid: &str, + partition_id: u64, + sort_fields: &str, + window_start: Option, + window_duration: u32, + ) -> ParquetSplitMetadata { + let mut builder = ParquetSplitMetadata::metrics_builder() + .split_id(ParquetSplitId::generate(ParquetSplitKind::Metrics)) + .index_uid(index_uid) + .partition_id(partition_id) + .time_range(TimeRange::new(1000, 2000)) + .sort_fields(sort_fields); + + if let Some(start) = window_start { + builder = builder + .window_start_secs(start) + .window_duration_secs(window_duration); + } + + builder.build() + } + + #[test] + fn test_empty_input() { + let result = group_by_compaction_scope(vec![]); + assert!(result.is_empty()); + } + + #[test] + fn test_single_split_filtered() { + let splits = vec![test_split( + "idx:001", + "metric_name|host|timestamp/V2", + Some(1000), + 3600, + )]; + let result = group_by_compaction_scope(splits); + assert!(result.is_empty(), "single-split groups should be filtered"); + } + + #[test] + fn test_two_splits_same_scope() { + let splits = vec![ + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + ]; + let result = group_by_compaction_scope(splits); + assert_eq!(result.len(), 1); + let group = result.values().next().unwrap(); + assert_eq!(group.len(), 2); + } + + #[test] + fn test_different_index_uid() { + let splits = vec![ + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + test_split("idx:002", "metric_name|host|timestamp/V2", Some(1000), 3600), + ]; + let result = group_by_compaction_scope(splits); + // Each group has only 1 split, so both are filtered. + assert!(result.is_empty()); + } + + #[test] + fn test_different_sort_fields() { + let splits = vec![ + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + test_split("idx:001", "metric_name|timestamp/V2", Some(1000), 3600), + ]; + let result = group_by_compaction_scope(splits); + assert!(result.is_empty()); + } + + #[test] + fn test_different_window_duration() { + // Bug: only window_start was in the scope key, so splits with the + // same start but different durations (e.g. after a config change) + // would be grouped together. The merge engine requires all inputs + // to agree on both window_start and window_duration. + let sort = "metric_name|host|timestamp/V2"; + let splits = vec![ + test_split("idx:001", sort, Some(0), 900), // 0..900 + test_split("idx:001", sort, Some(0), 1800), // 0..1800 + ]; + let result = group_by_compaction_scope(splits); + assert!( + result.is_empty(), + "different window durations must not be grouped together" + ); + } + + #[test] + fn test_different_window_start() { + let splits = vec![ + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + test_split("idx:001", "metric_name|host|timestamp/V2", Some(4600), 3600), + ]; + let result = group_by_compaction_scope(splits); + assert!(result.is_empty()); + } + + #[test] + fn test_no_window_excluded() { + let splits = vec![ + test_split("idx:001", "metric_name|host|timestamp/V2", None, 0), + test_split("idx:001", "metric_name|host|timestamp/V2", None, 0), + ]; + let result = group_by_compaction_scope(splits); + assert!( + result.is_empty(), + "splits without window should be excluded" + ); + } + + #[test] + fn test_mixed_with_and_without_window() { + let splits = vec![ + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + test_split("idx:001", "metric_name|host|timestamp/V2", None, 0), + ]; + let result = group_by_compaction_scope(splits); + assert_eq!(result.len(), 1); + let group = result.values().next().unwrap(); + assert_eq!(group.len(), 2, "only windowed splits should be grouped"); + } + + #[test] + fn test_multiple_scopes() { + let splits = vec![ + // Scope A: 3 splits + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + test_split("idx:001", "metric_name|host|timestamp/V2", Some(1000), 3600), + // Scope B: 2 splits (different window) + test_split("idx:001", "metric_name|host|timestamp/V2", Some(4600), 3600), + test_split("idx:001", "metric_name|host|timestamp/V2", Some(4600), 3600), + // Scope C: 1 split (filtered) + test_split("idx:002", "metric_name|host|timestamp/V2", Some(1000), 3600), + ]; + let result = group_by_compaction_scope(splits); + assert_eq!(result.len(), 2, "should have 2 groups (scope C filtered)"); + + let scope_a = CompactionScope { + index_uid: "idx:001".to_string(), + partition_id: 0, + sort_fields: "metric_name|host|timestamp/V2".to_string(), + window_start_secs: 1000, + window_duration_secs: 3600, + }; + let scope_b = CompactionScope { + index_uid: "idx:001".to_string(), + partition_id: 0, + sort_fields: "metric_name|host|timestamp/V2".to_string(), + window_start_secs: 4600, + window_duration_secs: 3600, + }; + + assert_eq!(result[&scope_a].len(), 3); + assert_eq!(result[&scope_b].len(), 2); + } + + #[test] + fn test_different_partition_id() { + let sort = "metric_name|host|timestamp/V2"; + let splits = vec![ + test_split_with_partition("idx:001", 1, sort, Some(1000), 3600), + test_split_with_partition("idx:001", 2, sort, Some(1000), 3600), + ]; + let result = group_by_compaction_scope(splits); + assert!( + result.is_empty(), + "different partition_ids should not be grouped" + ); + } + + #[test] + fn test_same_partition_id() { + let sort = "metric_name|host|timestamp/V2"; + let splits = vec![ + test_split_with_partition("idx:001", 42, sort, Some(1000), 3600), + test_split_with_partition("idx:001", 42, sort, Some(1000), 3600), + ]; + let result = group_by_compaction_scope(splits); + assert_eq!(result.len(), 1); + let group = result.values().next().unwrap(); + assert_eq!(group.len(), 2); + } + + #[test] + fn test_from_split_returns_none_for_no_window() { + let split = test_split("idx:001", "metric_name|host|timestamp/V2", None, 0); + assert!(CompactionScope::from_split(&split).is_none()); + } + + #[test] + fn test_from_split_returns_scope() { + let split = test_split_with_partition( + "idx:001", + 7, + "metric_name|host|timestamp/V2", + Some(7200), + 3600, + ); + let scope = CompactionScope::from_split(&split).unwrap(); + assert_eq!(scope.index_uid, "idx:001"); + assert_eq!(scope.partition_id, 7); + assert_eq!(scope.sort_fields, "metric_name|host|timestamp/V2"); + assert_eq!(scope.window_start_secs, 7200); + assert_eq!(scope.window_duration_secs, 3600); + } +}