Skip to content

Commit 61ad48c

Browse files
g-talbotclaude
andcommitted
fix(merge): preserve metastore rg_partition_prefix_len from writer's KV stamp
The streaming merge engine produces sort-prefix-aligned multi-RG output and stamps `qh.rg_partition_prefix_len = input_meta.rg_partition_prefix_len` in the file's KV (verified by `assert_unique_rg_prefix_keys` before close). `merge_parquet_split_metadata` then ran after and unconditionally demoted to 0 whenever `output.num_row_groups > 1` — breaking CS-1 (metastore must mirror on-disk KV) for every multi-RG streaming-engine output. Aligned splits got tagged 0 in the metastore on every merge and leaked out of the prefix-aligned compaction bucket on the next pass. Carry the value the writer actually stamped via a new `MergeOutputFile.output_rg_partition_prefix_len` field, then propagate it as-is in metadata aggregation. Both engines populate the field: - Legacy `merge/writer.rs` reports its demoted value (row-count-driven RG boundaries can't honor prefix alignment, so it stamps 0 on multi-RG). - Streaming `merge/streaming/output.rs` reports the inputs' prefix unchanged (it splits at prefix transitions and the writer verifies). CS-1 holds by construction — same source of truth, no re-derivation. Tests: - `test_output_prefix_len_demoted_when_multi_rg` → renamed to `test_output_prefix_len_carries_writers_value_when_demoted`; now asserts that the metastore mirrors the writer's reported value. - New `test_output_prefix_len_preserved_on_multi_rg_streaming_engine` asserts that a multi-RG streaming output (writer reports prefix_len=2) keeps the prefix in the metastore — the regression case for F1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent dc7730d commit 61ad48c

4 files changed

Lines changed: 92 additions & 34 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs

Lines changed: 70 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -119,22 +119,18 @@ pub fn merge_parquet_split_metadata(
119119
let split_id = ParquetSplitId::generate(first.kind);
120120
let parquet_file = format!("{split_id}.parquet");
121121

122-
// `rg_partition_prefix_len` propagation rule: a single-row-group
123-
// output vacuously satisfies any prefix claim (no boundary to
124-
// misalign), so we keep the inputs' prefix. Multi-RG output with
125-
// arbitrary row-count-driven boundaries (the only kind the current
126-
// merge writer can produce) cannot honor a non-zero claim and must
127-
// reset to 0. PR-6 (streaming column-major merge engine) will
128-
// produce sort-prefix-aligned multi-RG output and propagate the
129-
// prefix unconditionally.
130-
//
131-
// This must agree with the value the writer embeds in the file's
132-
// `qh.rg_partition_prefix_len` KV — see `write_merge_outputs`.
133-
let output_prefix_len = if output.num_row_groups <= 1 {
134-
first.rg_partition_prefix_len
135-
} else {
136-
0
137-
};
122+
// CS-1: the metastore-recorded `rg_partition_prefix_len` must equal
123+
// the value the writer embedded in the file's
124+
// `qh.rg_partition_prefix_len` KV. Each writer makes its own
125+
// decision (the legacy `merge/writer.rs` demotes to 0 on multi-RG
126+
// output because its boundaries are row-count-driven; the streaming
127+
// writer propagates the inputs' prefix unchanged because it splits
128+
// at prefix transitions and verifies via `assert_unique_rg_prefix_keys`)
129+
// and reports it via `MergeOutputFile.output_rg_partition_prefix_len`.
130+
// We propagate that one source of truth — re-deriving here from
131+
// `num_row_groups` would silently diverge from the streaming
132+
// engine's prefix-aligned multi-RG output.
133+
let output_prefix_len = output.output_rg_partition_prefix_len;
138134

139135
// Data-dependent fields come from the MergeOutputFile (extracted from
140136
// this output's actual rows during the merge write pass).
@@ -212,11 +208,30 @@ mod tests {
212208
num_row_groups: usize,
213209
time_range: (u64, u64),
214210
metric_names: &[&str],
211+
) -> MergeOutputFile {
212+
make_output_full_with_prefix(
213+
num_rows,
214+
size_bytes,
215+
num_row_groups,
216+
0,
217+
time_range,
218+
metric_names,
219+
)
220+
}
221+
222+
fn make_output_full_with_prefix(
223+
num_rows: usize,
224+
size_bytes: u64,
225+
num_row_groups: usize,
226+
output_rg_partition_prefix_len: u32,
227+
time_range: (u64, u64),
228+
metric_names: &[&str],
215229
) -> MergeOutputFile {
216230
MergeOutputFile {
217231
path: PathBuf::from("/tmp/merged.parquet"),
218232
num_rows,
219233
num_row_groups,
234+
output_rg_partition_prefix_len,
220235
size_bytes,
221236
row_keys_proto: Some(vec![0x08, 0x01]),
222237
zonemap_regexes: HashMap::from([("metric_name".to_string(), "cpu\\..*".to_string())]),
@@ -412,38 +427,66 @@ mod tests {
412427
}
413428

414429
#[test]
415-
fn test_output_prefix_len_demoted_when_multi_rg() {
416-
// The current merge writer rolls over RGs at row count, not at
417-
// sort-prefix transitions. When the output ends up with > 1 RG,
418-
// the boundaries are at arbitrary places and the inputs' prefix
419-
// claim cannot be honored — the output's prefix must be 0.
430+
fn test_output_prefix_len_carries_writers_value_when_demoted() {
431+
// CS-1: the metastore-recorded value must match the writer's
432+
// KV stamp. Legacy `merge/writer.rs` demotes to 0 when its
433+
// row-count-driven RG layout produces multi-RG output and
434+
// reports that demoted value on the `MergeOutputFile`. The
435+
// metastore aggregator must propagate it as-is (NOT re-derive
436+
// from inputs) so the metastore agrees with the file's KV.
420437
let mut s0 = make_test_split("s0", (1000, 2000), 0);
421438
let mut s1 = make_test_split("s1", (1000, 2000), 0);
422439
s0.rg_partition_prefix_len = 3;
423440
s1.rg_partition_prefix_len = 3;
424441

425-
let output = make_output_full(200, 9000, 2, (1000, 2000), &["cpu.usage"]);
442+
// num_row_groups = 2 + writer reports demoted prefix_len = 0
443+
// (the legacy writer's choice for a row-count-driven multi-RG).
444+
let output = make_output_full_with_prefix(200, 9000, 2, 0, (1000, 2000), &["cpu.usage"]);
426445
let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap();
427446
assert_eq!(result.rg_partition_prefix_len, 0);
428447
}
429448

430449
#[test]
431450
fn test_output_prefix_len_preserved_when_single_rg() {
432451
// A single-RG output vacuously satisfies any prefix alignment
433-
// claim (one RG, no boundary to misalign). Propagate the inputs'
434-
// prefix so the merge output stays in the same compaction bucket
435-
// as the inputs, instead of leaking into the prefix=0 bucket on
436-
// every merge.
452+
// claim (one RG, no boundary to misalign). The writer reports
453+
// the inputs' prefix; aggregator propagates it.
437454
let mut s0 = make_test_split("s0", (1000, 2000), 0);
438455
let mut s1 = make_test_split("s1", (1000, 2000), 0);
439456
s0.rg_partition_prefix_len = 3;
440457
s1.rg_partition_prefix_len = 3;
441458

442-
let output = make_output_full(200, 9000, 1, (1000, 2000), &["cpu.usage"]);
459+
let output = make_output_full_with_prefix(200, 9000, 1, 3, (1000, 2000), &["cpu.usage"]);
443460
let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap();
444461
assert_eq!(result.rg_partition_prefix_len, 3);
445462
}
446463

464+
#[test]
465+
fn test_output_prefix_len_preserved_on_multi_rg_streaming_engine() {
466+
// CS-1 regression for F1: the streaming engine produces
467+
// sort-prefix-aligned multi-RG output and reports the inputs'
468+
// prefix unchanged via `MergeOutputFile.output_rg_partition_prefix_len`.
469+
// Before this fix, `merge_parquet_split_metadata` would
470+
// unconditionally demote to 0 whenever `num_row_groups > 1`,
471+
// breaking CS-1 (metastore disagreed with the file's KV) and
472+
// leaking aligned outputs into the unaligned compaction bucket
473+
// on every subsequent merge.
474+
let mut s0 = make_test_split("s0", (1000, 2000), 0);
475+
let mut s1 = make_test_split("s1", (1000, 2000), 0);
476+
s0.rg_partition_prefix_len = 2;
477+
s1.rg_partition_prefix_len = 2;
478+
479+
// num_row_groups = 3 (multi-RG) AND writer reports prefix_len = 2
480+
// (the streaming engine's stamp because it verified alignment).
481+
let output = make_output_full_with_prefix(300, 12000, 3, 2, (1000, 2000), &["cpu.usage"]);
482+
let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap();
483+
assert_eq!(
484+
result.rg_partition_prefix_len, 2,
485+
"metastore must mirror the writer's KV (CS-1); multi-RG aligned output keeps its \
486+
prefix claim"
487+
);
488+
}
489+
447490
#[test]
448491
fn test_fresh_split_id_generated() {
449492
let inputs = vec![

quickwit/quickwit-parquet-engine/src/merge/mod.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,22 @@ pub struct MergeOutputFile {
9595
/// Number of rows in this output file.
9696
pub num_rows: usize,
9797

98-
/// Number of row groups the writer produced for this file. Used by
99-
/// `merge_parquet_split_metadata` to decide whether the input prefix
100-
/// alignment claim (`rg_partition_prefix_len`) can be propagated to
101-
/// the output: a single-RG file vacuously satisfies any claim, so
102-
/// we keep the inputs' prefix; a multi-RG file with arbitrary
103-
/// boundaries (the only kind the current writer can produce) must
104-
/// reset the claim to 0.
98+
/// Number of row groups the writer produced for this file.
10599
pub num_row_groups: usize,
106100

101+
/// `qh.rg_partition_prefix_len` value the writer embedded in this
102+
/// file's KV metadata. The legacy `merge/writer.rs` writer demotes
103+
/// to 0 when it produces multi-RG output (its RG boundaries are
104+
/// row-count-driven, not prefix-aligned). The streaming writer
105+
/// (`merge/streaming/output.rs`) propagates the inputs' prefix
106+
/// unchanged because it splits at prefix transitions AND
107+
/// `assert_unique_rg_prefix_keys` verifies the file. Carrying the
108+
/// value here lets `merge_parquet_split_metadata` (CS-1: metastore
109+
/// == KV) propagate it directly to `ParquetSplitMetadata` instead
110+
/// of re-deriving — preventing the metastore from disagreeing with
111+
/// the on-disk KV when both engines coexist.
112+
pub output_rg_partition_prefix_len: u32,
113+
107114
/// File size in bytes.
108115
pub size_bytes: u64,
109116

quickwit/quickwit-parquet-engine/src/merge/streaming/output.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,13 @@ pub(crate) fn finalize_output(
312312
path: output_path,
313313
num_rows: accumulator.num_rows,
314314
num_row_groups,
315+
// The streaming engine stamps `input_meta.rg_partition_prefix_len`
316+
// unconditionally in the file's KV (see
317+
// `open_output_writer_for_streaming`) and verifies the claim with
318+
// `assert_unique_rg_prefix_keys`. CS-1 requires the metastore-
319+
// recorded value to match the on-disk KV, so propagate the same
320+
// value here.
321+
output_rg_partition_prefix_len: input_meta.rg_partition_prefix_len,
315322
size_bytes,
316323
row_keys_proto,
317324
zonemap_regexes,

quickwit/quickwit-parquet-engine/src/merge/writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ pub fn write_merge_outputs(
169169
path: output_path,
170170
num_rows: sorted_batch.num_rows(),
171171
num_row_groups: written.num_row_groups,
172+
output_rg_partition_prefix_len: output_prefix_len,
172173
size_bytes: written.size_bytes,
173174
row_keys_proto,
174175
zonemap_regexes,

0 commit comments

Comments
 (0)