Skip to content

Commit 6a8d88b

Browse files
g-talbotclaude
andcommitted
fix: add partition_id to CompactionScope, rebase on #6340
Now that ParquetSplitMetadata has partition_id (from Matt's PR #6340), include it in CompactionScope so splits with different partitions are never merged together. Adds 2 new scope tests for partition isolation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d66196e commit 6a8d88b

2 files changed

Lines changed: 58 additions & 5 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/policy/const_write_amplification.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ mod tests {
369369
kind: ParquetSplitKind::Metrics,
370370
split_id: ParquetSplitId::new(split_id),
371371
index_uid: "test-index:001".to_string(),
372+
partition_id: 0,
372373
time_range: TimeRange::new(1000, 2000),
373374
num_rows: 1000,
374375
size_bytes,
@@ -710,6 +711,7 @@ mod tests {
710711
kind: ParquetSplitKind::Metrics,
711712
split_id: ParquetSplitId::new(split_id),
712713
index_uid: "test:prop".to_string(),
714+
partition_id: 0,
713715
time_range: TimeRange::new(1000, 2000),
714716
num_rows,
715717
size_bytes,

quickwit/quickwit-parquet-engine/src/merge/policy/scope.rs

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
//! Compaction scope grouping for Parquet splits.
1616
//!
1717
//! Splits are eligible for merging only when they share the same compaction
18-
//! scope. A scope captures the key dimensions that must match: index, sort
19-
//! schema, and time window.
18+
//! scope. A scope captures the key dimensions that must match: index,
19+
//! partition, sort schema, and time window.
2020
//!
21-
//! Future extensions (Phase 3+) will add `source_id` and `partition_id` to
22-
//! the scope when those fields are populated on `ParquetSplitMetadata`.
21+
//! Future extensions (Phase 3+) will add `source_id` to the scope when
22+
//! that field is populated on `ParquetSplitMetadata`.
2323
2424
use std::collections::HashMap;
2525

@@ -38,6 +38,9 @@ use crate::split::ParquetSplitMetadata;
3838
pub struct CompactionScope {
3939
/// Index unique identifier (includes incarnation).
4040
pub index_uid: String,
41+
/// Partition ID computed from the index's routing expression.
42+
/// Splits with different partition IDs must not be merged.
43+
pub partition_id: u64,
4144
/// Husky-style sort schema string (e.g., "metric_name|host|timestamp/V2").
4245
pub sort_fields: String,
4346
/// Window start in epoch seconds.
@@ -53,6 +56,7 @@ impl CompactionScope {
5356
let window = split.window.as_ref()?;
5457
Some(Self {
5558
index_uid: split.index_uid.clone(),
59+
partition_id: split.partition_id,
5660
sort_fields: split.sort_fields.clone(),
5761
window_start_secs: window.start,
5862
})
@@ -89,10 +93,21 @@ mod tests {
8993
sort_fields: &str,
9094
window_start: Option<i64>,
9195
window_duration: u32,
96+
) -> ParquetSplitMetadata {
97+
test_split_with_partition(index_uid, 0, sort_fields, window_start, window_duration)
98+
}
99+
100+
fn test_split_with_partition(
101+
index_uid: &str,
102+
partition_id: u64,
103+
sort_fields: &str,
104+
window_start: Option<i64>,
105+
window_duration: u32,
92106
) -> ParquetSplitMetadata {
93107
let mut builder = ParquetSplitMetadata::metrics_builder()
94108
.split_id(ParquetSplitId::generate(ParquetSplitKind::Metrics))
95109
.index_uid(index_uid)
110+
.partition_id(partition_id)
96111
.time_range(TimeRange::new(1000, 2000))
97112
.sort_fields(sort_fields);
98113

@@ -210,11 +225,13 @@ mod tests {
210225

211226
let scope_a = CompactionScope {
212227
index_uid: "idx:001".to_string(),
228+
partition_id: 0,
213229
sort_fields: "metric_name|host|timestamp/V2".to_string(),
214230
window_start_secs: 1000,
215231
};
216232
let scope_b = CompactionScope {
217233
index_uid: "idx:001".to_string(),
234+
partition_id: 0,
218235
sort_fields: "metric_name|host|timestamp/V2".to_string(),
219236
window_start_secs: 4600,
220237
};
@@ -223,6 +240,33 @@ mod tests {
223240
assert_eq!(result[&scope_b].len(), 2);
224241
}
225242

243+
#[test]
244+
fn test_different_partition_id() {
245+
let sort = "metric_name|host|timestamp/V2";
246+
let splits = vec![
247+
test_split_with_partition("idx:001", 1, sort, Some(1000), 3600),
248+
test_split_with_partition("idx:001", 2, sort, Some(1000), 3600),
249+
];
250+
let result = group_by_compaction_scope(splits);
251+
assert!(
252+
result.is_empty(),
253+
"different partition_ids should not be grouped"
254+
);
255+
}
256+
257+
#[test]
258+
fn test_same_partition_id() {
259+
let sort = "metric_name|host|timestamp/V2";
260+
let splits = vec![
261+
test_split_with_partition("idx:001", 42, sort, Some(1000), 3600),
262+
test_split_with_partition("idx:001", 42, sort, Some(1000), 3600),
263+
];
264+
let result = group_by_compaction_scope(splits);
265+
assert_eq!(result.len(), 1);
266+
let group = result.values().next().unwrap();
267+
assert_eq!(group.len(), 2);
268+
}
269+
226270
#[test]
227271
fn test_from_split_returns_none_for_no_window() {
228272
let split = test_split("idx:001", "metric_name|host|timestamp/V2", None, 0);
@@ -231,9 +275,16 @@ mod tests {
231275

232276
#[test]
233277
fn test_from_split_returns_scope() {
234-
let split = test_split("idx:001", "metric_name|host|timestamp/V2", Some(7200), 3600);
278+
let split = test_split_with_partition(
279+
"idx:001",
280+
7,
281+
"metric_name|host|timestamp/V2",
282+
Some(7200),
283+
3600,
284+
);
235285
let scope = CompactionScope::from_split(&split).unwrap();
236286
assert_eq!(scope.index_uid, "idx:001");
287+
assert_eq!(scope.partition_id, 7);
237288
assert_eq!(scope.sort_fields, "metric_name|host|timestamp/V2");
238289
assert_eq!(scope.window_start_secs, 7200);
239290
}

0 commit comments

Comments
 (0)