Skip to content

Commit f0dd5cf

Browse files
g-talbotclaude
andcommitted
feat: add MP-1/MP-2/MP-3 runtime invariant checks for merge operations
Add three merge-policy invariants to the shared verification layer (quickwit-dst) with check_invariant! enforcement in ParquetMergeOperation::new(): - MP-1: all splits share the same num_merge_ops level - MP-2: merge op has at least 2 input splits - MP-3: all splits share the same compaction scope (sort_fields + window) Shared pure functions in quickwit_dst::invariants::merge_policy are the single source of truth, usable by Stateright models and production code. Debug builds panic on violation; all builds emit metrics via the invariant recorder. Tests written first (4 should_panic tests failed before adding checks, pass after). Plus 1 positive test and 3 unit tests for shared functions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 841b853 commit f0dd5cf

4 files changed

Lines changed: 265 additions & 0 deletions

File tree

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Shared merge policy invariant checks.
16+
//!
17+
//! These pure functions are the single source of truth for merge operation
18+
//! validity, used by both Stateright models and production code.
19+
20+
/// MP-1: all splits in a merge operation must share the same `num_merge_ops`.
21+
///
22+
/// If splits from different levels are merged, the output gets stamped with
23+
/// `max(levels) + 1`, prematurely maturing lower-level data and breaking
24+
/// the bounded write amplification guarantee.
25+
pub fn all_same_merge_level(num_merge_ops: &[u32]) -> bool {
26+
match num_merge_ops.first() {
27+
None => true,
28+
Some(&first) => num_merge_ops.iter().all(|&n| n == first),
29+
}
30+
}
31+
32+
/// MP-2: every merge operation must have at least 2 input splits.
33+
///
34+
/// Merging a single split is a no-op that wastes I/O. Merging zero splits
35+
/// is nonsensical.
36+
pub fn has_minimum_splits(count: usize) -> bool {
37+
count >= 2
38+
}
39+
40+
/// MP-3: all splits in a merge operation must share the same compaction scope.
41+
///
42+
/// The scope is defined by `(sort_fields, window_start, window_duration)`.
43+
/// The merge engine validates that all inputs agree on these; a policy bug
44+
/// that groups incompatible splits will cause the merge to fail.
45+
///
46+
/// `index_uid` and `partition_id` are also part of the scope but are
47+
/// typically enforced by the grouping layer before the policy runs.
48+
pub fn all_same_compaction_scope(
49+
sort_fields: &[&str],
50+
windows: &[(i64, i64)], // (start, duration) pairs
51+
) -> bool {
52+
let same_sort = match sort_fields.first() {
53+
None => true,
54+
Some(&first) => sort_fields.iter().all(|&s| s == first),
55+
};
56+
let same_window = match windows.first() {
57+
None => true,
58+
Some(&first) => windows.iter().all(|w| *w == first),
59+
};
60+
same_sort && same_window
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use super::*;
66+
67+
#[test]
68+
fn test_all_same_merge_level() {
69+
assert!(all_same_merge_level(&[]));
70+
assert!(all_same_merge_level(&[0]));
71+
assert!(all_same_merge_level(&[2, 2, 2]));
72+
assert!(!all_same_merge_level(&[0, 1]));
73+
assert!(!all_same_merge_level(&[0, 0, 1]));
74+
}
75+
76+
#[test]
77+
fn test_has_minimum_splits() {
78+
assert!(!has_minimum_splits(0));
79+
assert!(!has_minimum_splits(1));
80+
assert!(has_minimum_splits(2));
81+
assert!(has_minimum_splits(100));
82+
}
83+
84+
#[test]
85+
fn test_all_same_compaction_scope() {
86+
// Empty is vacuously true.
87+
assert!(all_same_compaction_scope(&[], &[]));
88+
89+
// Same scope.
90+
assert!(all_same_compaction_scope(
91+
&["a|b|ts/V2", "a|b|ts/V2"],
92+
&[(0, 3600), (0, 3600)],
93+
));
94+
95+
// Different sort fields.
96+
assert!(!all_same_compaction_scope(
97+
&["a|b|ts/V2", "a|ts/V2"],
98+
&[(0, 3600), (0, 3600)],
99+
));
100+
101+
// Same start, different duration.
102+
assert!(!all_same_compaction_scope(
103+
&["a|b|ts/V2", "a|b|ts/V2"],
104+
&[(0, 900), (0, 1800)],
105+
));
106+
107+
// Different start.
108+
assert!(!all_same_compaction_scope(
109+
&["a|b|ts/V2", "a|b|ts/V2"],
110+
&[(0, 3600), (3600, 3600)],
111+
));
112+
}
113+
}

quickwit/quickwit-dst/src/invariants/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
//! No external dependencies — only `std`.
2222
2323
mod check;
24+
pub mod merge_policy;
2425
pub mod recorder;
2526
pub mod registry;
2627
pub mod sort;

quickwit/quickwit-dst/src/invariants/registry.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ pub enum InvariantId {
7474
DM4,
7575
/// DM-5: timeseries_id persists through compaction without recomputation
7676
DM5,
77+
78+
/// MP-1: all splits in a merge operation have the same num_merge_ops level
79+
MP1,
80+
/// MP-2: every merge operation has at least 2 input splits
81+
MP2,
82+
/// MP-3: all splits in a merge operation share the same compaction scope
83+
MP3,
7784
}
7885

7986
impl InvariantId {
@@ -106,6 +113,10 @@ impl InvariantId {
106113
Self::DM3 => "DM-3",
107114
Self::DM4 => "DM-4",
108115
Self::DM5 => "DM-5",
116+
117+
Self::MP1 => "MP-1",
118+
Self::MP2 => "MP-2",
119+
Self::MP3 => "MP-3",
109120
}
110121
}
111122

@@ -136,6 +147,10 @@ impl InvariantId {
136147
Self::DM3 => "no interpolation — only ingested points",
137148
Self::DM4 => "deterministic TSID from tags",
138149
Self::DM5 => "TSID persists through compaction",
150+
151+
Self::MP1 => "merge op splits share num_merge_ops level",
152+
Self::MP2 => "merge op has at least 2 splits",
153+
Self::MP3 => "merge op splits share compaction scope",
139154
}
140155
}
141156
}
@@ -157,6 +172,7 @@ mod tests {
157172
assert_eq!(InvariantId::CS3.to_string(), "CS-3");
158173
assert_eq!(InvariantId::MC4.to_string(), "MC-4");
159174
assert_eq!(InvariantId::DM5.to_string(), "DM-5");
175+
assert_eq!(InvariantId::MP1.to_string(), "MP-1");
160176
}
161177

162178
#[test]
@@ -182,6 +198,9 @@ mod tests {
182198
InvariantId::DM3,
183199
InvariantId::DM4,
184200
InvariantId::DM5,
201+
InvariantId::MP1,
202+
InvariantId::MP2,
203+
InvariantId::MP3,
185204
];
186205
for id in all {
187206
assert!(!id.description().is_empty(), "{} has empty description", id);

quickwit/quickwit-parquet-engine/src/merge/policy/mod.rs

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,49 @@ impl ParquetMergeOperation {
5050
///
5151
/// Generates a fresh split ID for the merged output. The `kind` is inferred
5252
/// from the first split (all splits in a merge share the same kind).
53+
///
54+
/// # Invariant checks (debug builds panic, all builds emit metrics)
55+
///
56+
/// - **MP-1**: all splits share the same `num_merge_ops` level
57+
/// - **MP-2**: at least 2 input splits
58+
/// - **MP-3**: all splits share the same compaction scope (sort_fields + window)
5359
pub fn new(splits: Vec<ParquetSplitMetadata>) -> Self {
60+
use quickwit_dst::check_invariant;
61+
use quickwit_dst::invariants::InvariantId;
62+
use quickwit_dst::invariants::merge_policy;
63+
64+
// MP-2: minimum split count.
65+
check_invariant!(
66+
InvariantId::MP2,
67+
merge_policy::has_minimum_splits(splits.len()),
68+
": got {} splits",
69+
splits.len()
70+
);
71+
72+
// MP-1: level homogeneity.
73+
let levels: Vec<u32> = splits.iter().map(|s| s.num_merge_ops).collect();
74+
check_invariant!(
75+
InvariantId::MP1,
76+
merge_policy::all_same_merge_level(&levels),
77+
": levels={:?}",
78+
levels
79+
);
80+
81+
// MP-3: scope homogeneity (sort_fields + window).
82+
let sort_fields_vec: Vec<&str> =
83+
splits.iter().map(|s| s.sort_fields.as_str()).collect();
84+
let windows: Vec<(i64, i64)> = splits
85+
.iter()
86+
.map(|s| match &s.window {
87+
Some(w) => (w.start, w.end - w.start),
88+
None => (0, 0),
89+
})
90+
.collect();
91+
check_invariant!(
92+
InvariantId::MP3,
93+
merge_policy::all_same_compaction_scope(&sort_fields_vec, &windows)
94+
);
95+
5496
let kind = splits
5597
.first()
5698
.map(|s| s.kind)
@@ -123,3 +165,93 @@ pub trait ParquetMergePolicy: Send + Sync + fmt::Debug {
123165
) {
124166
}
125167
}
168+
169+
#[cfg(test)]
170+
mod tests {
171+
use std::collections::HashSet;
172+
use std::time::SystemTime;
173+
174+
use super::*;
175+
use crate::split::{ParquetSplitId, ParquetSplitKind, TimeRange};
176+
177+
fn make_split(
178+
split_id: &str,
179+
num_merge_ops: u32,
180+
sort_fields: &str,
181+
window: Option<(i64, i64)>,
182+
) -> ParquetSplitMetadata {
183+
ParquetSplitMetadata {
184+
kind: ParquetSplitKind::Metrics,
185+
split_id: ParquetSplitId::new(split_id),
186+
index_uid: "test:001".to_string(),
187+
partition_id: 0,
188+
time_range: TimeRange::new(1000, 2000),
189+
num_rows: 100,
190+
size_bytes: 1_000_000,
191+
metric_names: HashSet::new(),
192+
low_cardinality_tags: Default::default(),
193+
high_cardinality_tag_keys: Default::default(),
194+
created_at: SystemTime::now(),
195+
parquet_file: format!("{split_id}.parquet"),
196+
window: window.map(|(start, dur)| start..start + dur),
197+
sort_fields: sort_fields.to_string(),
198+
num_merge_ops,
199+
row_keys_proto: None,
200+
zonemap_regexes: Default::default(),
201+
}
202+
}
203+
204+
#[test]
205+
#[should_panic(expected = "MP-1 violated")]
206+
fn test_mp1_mixed_merge_levels_panics() {
207+
// MP-1: constructing a merge op with splits at different levels
208+
// must panic in debug builds.
209+
let splits = vec![
210+
make_split("l0", 0, "a|ts/V2", Some((0, 3600))),
211+
make_split("l1", 1, "a|ts/V2", Some((0, 3600))),
212+
];
213+
ParquetMergeOperation::new(splits);
214+
}
215+
216+
#[test]
217+
#[should_panic(expected = "MP-2 violated")]
218+
fn test_mp2_single_split_panics() {
219+
// MP-2: constructing a merge op with < 2 splits must panic.
220+
let splits = vec![make_split("s0", 0, "a|ts/V2", Some((0, 3600)))];
221+
ParquetMergeOperation::new(splits);
222+
}
223+
224+
#[test]
225+
#[should_panic(expected = "MP-3 violated")]
226+
fn test_mp3_mixed_sort_fields_panics() {
227+
// MP-3: constructing a merge op with different sort_fields must panic.
228+
let splits = vec![
229+
make_split("s0", 0, "a|b|ts/V2", Some((0, 3600))),
230+
make_split("s1", 0, "a|ts/V2", Some((0, 3600))),
231+
];
232+
ParquetMergeOperation::new(splits);
233+
}
234+
235+
#[test]
236+
#[should_panic(expected = "MP-3 violated")]
237+
fn test_mp3_mixed_window_duration_panics() {
238+
// MP-3: same start but different duration must panic.
239+
let splits = vec![
240+
make_split("s0", 0, "a|ts/V2", Some((0, 900))),
241+
make_split("s1", 0, "a|ts/V2", Some((0, 1800))),
242+
];
243+
ParquetMergeOperation::new(splits);
244+
}
245+
246+
#[test]
247+
fn test_valid_merge_operation_succeeds() {
248+
// A well-formed merge op should not panic.
249+
let splits = vec![
250+
make_split("s0", 0, "a|ts/V2", Some((0, 3600))),
251+
make_split("s1", 0, "a|ts/V2", Some((0, 3600))),
252+
];
253+
let op = ParquetMergeOperation::new(splits);
254+
assert_eq!(op.splits.len(), 2);
255+
}
256+
}
257+

0 commit comments

Comments
 (0)