Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 113 additions & 0 deletions quickwit/quickwit-dst/src/invariants/merge_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Shared merge policy invariant checks.
//!
//! These pure functions are the single source of truth for merge operation
//! validity, used by both Stateright models and production code.

/// MP-1: all splits in a merge operation must share the same `num_merge_ops`.
///
/// If splits from different levels are merged, the output gets stamped with
/// `max(levels) + 1`, prematurely maturing lower-level data and breaking
/// the bounded write amplification guarantee.
pub fn all_same_merge_level(num_merge_ops: &[u32]) -> bool {
match num_merge_ops.first() {
None => true,
Some(&first) => num_merge_ops.iter().all(|&n| n == first),
}
}

/// MP-2: every merge operation must have at least 2 input splits.
///
/// Merging a single split is a no-op that wastes I/O. Merging zero splits
/// is nonsensical.
pub fn has_minimum_splits(count: usize) -> bool {
count >= 2
}

/// MP-3: all splits in a merge operation must share the same compaction scope.
///
/// The scope is defined by `(sort_fields, window_start, window_duration)`.
/// The merge engine validates that all inputs agree on these; a policy bug
/// that groups incompatible splits will cause the merge to fail.
///
/// `index_uid` and `partition_id` are also part of the scope but are
/// typically enforced by the grouping layer before the policy runs.
pub fn all_same_compaction_scope(
sort_fields: &[&str],
windows: &[(i64, i64)], // (start, duration) pairs
) -> bool {
let same_sort = match sort_fields.first() {
None => true,
Some(&first) => sort_fields.iter().all(|&s| s == first),
};
let same_window = match windows.first() {
None => true,
Some(&first) => windows.iter().all(|w| *w == first),
};
same_sort && same_window
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_all_same_merge_level() {
assert!(all_same_merge_level(&[]));
assert!(all_same_merge_level(&[0]));
assert!(all_same_merge_level(&[2, 2, 2]));
assert!(!all_same_merge_level(&[0, 1]));
assert!(!all_same_merge_level(&[0, 0, 1]));
}

#[test]
fn test_has_minimum_splits() {
assert!(!has_minimum_splits(0));
assert!(!has_minimum_splits(1));
assert!(has_minimum_splits(2));
assert!(has_minimum_splits(100));
}

#[test]
fn test_all_same_compaction_scope() {
// Empty is vacuously true.
assert!(all_same_compaction_scope(&[], &[]));

// Same scope.
assert!(all_same_compaction_scope(
&["a|b|ts/V2", "a|b|ts/V2"],
&[(0, 3600), (0, 3600)],
));

// Different sort fields.
assert!(!all_same_compaction_scope(
&["a|b|ts/V2", "a|ts/V2"],
&[(0, 3600), (0, 3600)],
));

// Same start, different duration.
assert!(!all_same_compaction_scope(
&["a|b|ts/V2", "a|b|ts/V2"],
&[(0, 900), (0, 1800)],
));

// Different start.
assert!(!all_same_compaction_scope(
&["a|b|ts/V2", "a|b|ts/V2"],
&[(0, 3600), (3600, 3600)],
));
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-dst/src/invariants/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! No external dependencies — only `std`.
mod check;
pub mod merge_policy;
pub mod recorder;
pub mod registry;
pub mod sort;
Expand Down
19 changes: 19 additions & 0 deletions quickwit/quickwit-dst/src/invariants/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ pub enum InvariantId {
DM4,
/// DM-5: timeseries_id persists through compaction without recomputation
DM5,

/// MP-1: all splits in a merge operation have the same num_merge_ops level
MP1,
/// MP-2: every merge operation has at least 2 input splits
MP2,
/// MP-3: all splits in a merge operation share the same compaction scope
MP3,
}

impl InvariantId {
Expand Down Expand Up @@ -106,6 +113,10 @@ impl InvariantId {
Self::DM3 => "DM-3",
Self::DM4 => "DM-4",
Self::DM5 => "DM-5",

Self::MP1 => "MP-1",
Self::MP2 => "MP-2",
Self::MP3 => "MP-3",
}
}

Expand Down Expand Up @@ -136,6 +147,10 @@ impl InvariantId {
Self::DM3 => "no interpolation — only ingested points",
Self::DM4 => "deterministic TSID from tags",
Self::DM5 => "TSID persists through compaction",

Self::MP1 => "merge op splits share num_merge_ops level",
Self::MP2 => "merge op has at least 2 splits",
Self::MP3 => "merge op splits share compaction scope",
}
}
}
Expand All @@ -157,6 +172,7 @@ mod tests {
assert_eq!(InvariantId::CS3.to_string(), "CS-3");
assert_eq!(InvariantId::MC4.to_string(), "MC-4");
assert_eq!(InvariantId::DM5.to_string(), "DM-5");
assert_eq!(InvariantId::MP1.to_string(), "MP-1");
}

#[test]
Expand All @@ -182,6 +198,9 @@ mod tests {
InvariantId::DM3,
InvariantId::DM4,
InvariantId::DM5,
InvariantId::MP1,
InvariantId::MP2,
InvariantId::MP3,
];
for id in all {
assert!(!id.description().is_empty(), "{} has empty description", id);
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-parquet-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ulid = { workspace = true }

[dev-dependencies]
proptest = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
tempfile = { workspace = true }

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-parquet-engine/src/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! file has non-overlapping key ranges.

mod merge_order;
pub mod policy;
mod schema;
mod writer;

Expand Down
Loading
Loading