Skip to content

Commit 7190d9b

Browse files
g-talbotclaude
andauthored
feat: add Parquet merge policy for compaction (Phase 2) (#6351)
* feat: add Parquet merge policy for compaction (Phase 2) Adds a constant write amplification merge policy for Parquet splits, adapted from the existing ConstWriteAmplificationMergePolicy but using byte size instead of document count as the primary size metric. This is Phase 2 of the Parquet compaction project — the decision layer that determines which splits to merge within each compaction scope. Key components: - ParquetMergePolicy trait mirroring the MergePolicy interface - CompactionScope grouping by (index_uid, sort_fields, window_start) - ConstWriteAmplificationParquetMergePolicy with bounded write amp - finalize_operations() for cold window compaction - 33 tests: unit, proptest (MC-CONSERVE/LEVEL/WA/IDEMPOTENT), simulation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: add partition_id to CompactionScope, rebase on #6340 Now that ParquetSplitMetadata has partition_id (from Matt's PR #6340), include it in CompactionScope so splits with different partitions are never merged together. Adds 2 new scope tests for partition isolation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: rustfmt nightly comment wrapping and import ordering Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: finalize_operations must respect MC-LEVEL invariant finalize_operations() was running single_merge_operation over all young splits without grouping by num_merge_ops level. This could merge level-0 and level-1 splits together, stamping the output with max(levels) + 1 and prematurely maturing lower-level data. Fix: group by num_merge_ops in finalize just like operations() does, then apply the lower merge factor within each level independently. Added test_finalize_respects_mc_level_invariant (unit) and proptest_finalize_respects_mc_level (property test) — both caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: include window_duration_secs in CompactionScope CompactionScope only used window_start_secs, so splits with the same start but different durations (e.g. after a window config change) would be grouped together. The merge engine requires all inputs to agree on both window_start and window_duration, so merging across durations would fail validation. Added test_different_window_duration which caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: add MP-1/MP-2/MP-3 runtime invariant checks for merge operations Add three merge-policy invariants to the shared verification layer (quickwit-dst) with check_invariant! enforcement in ParquetMergeOperation::new(): - MP-1: all splits share the same num_merge_ops level - MP-2: merge op has at least 2 input splits - MP-3: all splits share the same compaction scope (sort_fields + window) Shared pure functions in quickwit_dst::invariants::merge_policy are the single source of truth, usable by Stateright models and production code. Debug builds panic on violation; all builds emit metrics via the invariant recorder. Tests written first (4 should_panic tests failed before adding checks, pass after). Plus 1 positive test and 3 unit tests for shared functions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: nightly rustfmt — merge imports, unwrap short line, trailing newline Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 43553d3 commit 7190d9b

9 files changed

Lines changed: 1695 additions & 0 deletions

File tree

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Shared merge policy invariant checks.
16+
//!
17+
//! These pure functions are the single source of truth for merge operation
18+
//! validity, used by both Stateright models and production code.
19+
20+
/// MP-1: all splits in a merge operation must share the same `num_merge_ops`.
21+
///
22+
/// If splits from different levels are merged, the output gets stamped with
23+
/// `max(levels) + 1`, prematurely maturing lower-level data and breaking
24+
/// the bounded write amplification guarantee.
25+
pub fn all_same_merge_level(num_merge_ops: &[u32]) -> bool {
26+
match num_merge_ops.first() {
27+
None => true,
28+
Some(&first) => num_merge_ops.iter().all(|&n| n == first),
29+
}
30+
}
31+
32+
/// MP-2: every merge operation must have at least 2 input splits.
33+
///
34+
/// Merging a single split is a no-op that wastes I/O. Merging zero splits
35+
/// is nonsensical.
36+
pub fn has_minimum_splits(count: usize) -> bool {
37+
count >= 2
38+
}
39+
40+
/// MP-3: all splits in a merge operation must share the same compaction scope.
41+
///
42+
/// The scope is defined by `(sort_fields, window_start, window_duration)`.
43+
/// The merge engine validates that all inputs agree on these; a policy bug
44+
/// that groups incompatible splits will cause the merge to fail.
45+
///
46+
/// `index_uid` and `partition_id` are also part of the scope but are
47+
/// typically enforced by the grouping layer before the policy runs.
48+
pub fn all_same_compaction_scope(
49+
sort_fields: &[&str],
50+
windows: &[(i64, i64)], // (start, duration) pairs
51+
) -> bool {
52+
let same_sort = match sort_fields.first() {
53+
None => true,
54+
Some(&first) => sort_fields.iter().all(|&s| s == first),
55+
};
56+
let same_window = match windows.first() {
57+
None => true,
58+
Some(&first) => windows.iter().all(|w| *w == first),
59+
};
60+
same_sort && same_window
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use super::*;
66+
67+
#[test]
68+
fn test_all_same_merge_level() {
69+
assert!(all_same_merge_level(&[]));
70+
assert!(all_same_merge_level(&[0]));
71+
assert!(all_same_merge_level(&[2, 2, 2]));
72+
assert!(!all_same_merge_level(&[0, 1]));
73+
assert!(!all_same_merge_level(&[0, 0, 1]));
74+
}
75+
76+
#[test]
77+
fn test_has_minimum_splits() {
78+
assert!(!has_minimum_splits(0));
79+
assert!(!has_minimum_splits(1));
80+
assert!(has_minimum_splits(2));
81+
assert!(has_minimum_splits(100));
82+
}
83+
84+
#[test]
85+
fn test_all_same_compaction_scope() {
86+
// Empty is vacuously true.
87+
assert!(all_same_compaction_scope(&[], &[]));
88+
89+
// Same scope.
90+
assert!(all_same_compaction_scope(
91+
&["a|b|ts/V2", "a|b|ts/V2"],
92+
&[(0, 3600), (0, 3600)],
93+
));
94+
95+
// Different sort fields.
96+
assert!(!all_same_compaction_scope(
97+
&["a|b|ts/V2", "a|ts/V2"],
98+
&[(0, 3600), (0, 3600)],
99+
));
100+
101+
// Same start, different duration.
102+
assert!(!all_same_compaction_scope(
103+
&["a|b|ts/V2", "a|b|ts/V2"],
104+
&[(0, 900), (0, 1800)],
105+
));
106+
107+
// Different start.
108+
assert!(!all_same_compaction_scope(
109+
&["a|b|ts/V2", "a|b|ts/V2"],
110+
&[(0, 3600), (3600, 3600)],
111+
));
112+
}
113+
}

quickwit/quickwit-dst/src/invariants/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
//! No external dependencies — only `std`.
2222
2323
mod check;
24+
pub mod merge_policy;
2425
pub mod recorder;
2526
pub mod registry;
2627
pub mod sort;

quickwit/quickwit-dst/src/invariants/registry.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ pub enum InvariantId {
7474
DM4,
7575
/// DM-5: timeseries_id persists through compaction without recomputation
7676
DM5,
77+
78+
/// MP-1: all splits in a merge operation have the same num_merge_ops level
79+
MP1,
80+
/// MP-2: every merge operation has at least 2 input splits
81+
MP2,
82+
/// MP-3: all splits in a merge operation share the same compaction scope
83+
MP3,
7784
}
7885

7986
impl InvariantId {
@@ -106,6 +113,10 @@ impl InvariantId {
106113
Self::DM3 => "DM-3",
107114
Self::DM4 => "DM-4",
108115
Self::DM5 => "DM-5",
116+
117+
Self::MP1 => "MP-1",
118+
Self::MP2 => "MP-2",
119+
Self::MP3 => "MP-3",
109120
}
110121
}
111122

@@ -136,6 +147,10 @@ impl InvariantId {
136147
Self::DM3 => "no interpolation — only ingested points",
137148
Self::DM4 => "deterministic TSID from tags",
138149
Self::DM5 => "TSID persists through compaction",
150+
151+
Self::MP1 => "merge op splits share num_merge_ops level",
152+
Self::MP2 => "merge op has at least 2 splits",
153+
Self::MP3 => "merge op splits share compaction scope",
139154
}
140155
}
141156
}
@@ -157,6 +172,7 @@ mod tests {
157172
assert_eq!(InvariantId::CS3.to_string(), "CS-3");
158173
assert_eq!(InvariantId::MC4.to_string(), "MC-4");
159174
assert_eq!(InvariantId::DM5.to_string(), "DM-5");
175+
assert_eq!(InvariantId::MP1.to_string(), "MP-1");
160176
}
161177

162178
#[test]
@@ -182,6 +198,9 @@ mod tests {
182198
InvariantId::DM3,
183199
InvariantId::DM4,
184200
InvariantId::DM5,
201+
InvariantId::MP1,
202+
InvariantId::MP2,
203+
InvariantId::MP3,
185204
];
186205
for id in all {
187206
assert!(!id.description().is_empty(), "{} has empty description", id);

quickwit/quickwit-parquet-engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ ulid = { workspace = true }
3030

3131
[dev-dependencies]
3232
proptest = { workspace = true }
33+
rand = { workspace = true }
3334
regex = { workspace = true }
3435
tempfile = { workspace = true }
3536

quickwit/quickwit-parquet-engine/src/merge/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
//! file has non-overlapping key ranges.
2222
2323
mod merge_order;
24+
pub mod policy;
2425
mod schema;
2526
mod writer;
2627

0 commit comments

Comments
 (0)