Skip to content

Commit 6fdced2

Browse files
g-talbotclaude
andcommitted
fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers (#6426)
Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0c7de76 commit 6fdced2

2 files changed

Lines changed: 167 additions & 3 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/streaming.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1893,6 +1893,17 @@ mod tests {
18931893
outputs[0].num_row_groups, 2,
18941894
"MergeOutputFile.num_row_groups should match physical row group count",
18951895
);
1896+
1897+
// F2 chunk-level verification: confirm each output RG actually
1898+
// carries a single distinct metric_name (PA-1 + PA-3 read
1899+
// straight off the column-chunk statistics).
1900+
assert_unique_rg_prefix_keys(
1901+
reader.metadata(),
1902+
"metric_name|-timestamp_secs/V2",
1903+
1,
1904+
"test_multi_rg_metric_aligned_input_produces_multi_rg_output output",
1905+
)
1906+
.expect("streaming engine output must satisfy PA-1 + PA-3 on metric_name");
18961907
}
18971908

18981909
/// Regression for Codex P2 on PR-6410: a streaming merge output
@@ -2081,6 +2092,20 @@ mod tests {
20812092
"three distinct (metric_name, service) pairs must produce three output RGs",
20822093
);
20832094
assert_eq!(outputs[0].num_row_groups, 3);
2095+
2096+
// F2 chunk-level verification: counting RGs and stamping a KV
2097+
// is not enough — the OUTPUT's row groups must actually be
2098+
// aligned on the composite (metric_name, service) prefix.
2099+
// `assert_unique_rg_prefix_keys` enforces PA-1 (intra-RG
2100+
// constancy) + PA-3 (inter-RG uniqueness) by reading the
2101+
// chunk-level statistics.
2102+
assert_unique_rg_prefix_keys(
2103+
reader.metadata(),
2104+
"metric_name|service|-timestamp_secs/V2",
2105+
2,
2106+
"test_streaming_merge_with_prefix_len_two output",
2107+
)
2108+
.expect("streaming engine output must satisfy PA-1 + PA-3 on the prefix columns");
20842109
}
20852110

20862111
/// Regression for Codex finding #1 on PR-6410: when one input
@@ -2768,6 +2793,16 @@ mod tests {
27682793
(third_block - 1.0).abs() < 1e-9,
27692794
"third output RG should be 'dev' (marker 1.0), got {third_block}",
27702795
);
2796+
2797+
// F2 chunk-level verification: each output RG must be aligned
2798+
// on (metric_name, -env). PA-1 + PA-3 read from chunk stats.
2799+
assert_unique_rg_prefix_keys(
2800+
reader.metadata(),
2801+
"metric_name|-env|-timestamp_secs/V2",
2802+
2,
2803+
"test_streaming_merge_with_desc_prefix_col output",
2804+
)
2805+
.expect("DESC prefix output must satisfy PA-1 + PA-3");
27712806
}
27722807

27732808
/// Regression for the composite-key encoding when ASC and DESC

quickwit/quickwit-parquet-engine/src/storage/legacy_adapter.rs

Lines changed: 132 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
// deprecated items at module scope keeps that lookup direct.
5858
#![allow(deprecated)]
5959

60+
use std::collections::HashMap;
6061
use std::io;
6162
use std::ops::Range;
6263
use std::path::{Path, PathBuf};
@@ -141,6 +142,26 @@ pub enum LegacyAdapterError {
141142
enough sort information to safely synthesize prefix-aligned row groups)"
142143
)]
143144
PrefixUnresolvable { target: u32, reason: String },
145+
146+
/// The legacy file's rows are not sorted by its declared sort schema
147+
/// (SS-1 violation): two row regions in the file carry the same
148+
/// composite prefix value with other prefix values in between. The
149+
/// adapter walks rows in physical order and emits one RG per
150+
/// prefix-value run, so an unsorted input produces multiple RGs
151+
/// sharing a prefix key — which violates PA-3 (per-input uniqueness).
152+
/// Bail upfront instead of producing a file the downstream merge
153+
/// engine will reject mid-merge.
154+
#[error(
155+
"legacy input is not sorted by its declared sort schema: rows at offset {first_offset} \
156+
and offset {second_offset} share composite prefix value (target_prefix_len = {target}). \
157+
The adapter relies on the file being sorted per SS-1; an unsorted file would synthesize \
158+
multiple row groups with the same prefix key (PA-3 violation)."
159+
)]
160+
InputNotSorted {
161+
target: u32,
162+
first_offset: usize,
163+
second_offset: usize,
164+
},
144165
}
145166

146167
/// 4 GiB upper bound on the input file size we will buffer into RAM.
@@ -305,7 +326,7 @@ fn reencode_prefix_aligned(
305326
let slices = if consolidated_batch.num_rows() == 0 {
306327
Vec::new()
307328
} else {
308-
compute_prefix_value_slices(&consolidated_batch, &prefix_col_indices)?
329+
compute_prefix_value_slices(&consolidated_batch, &prefix_col_indices, target_prefix_len)?
309330
};
310331
let kv_with_prefix = inject_prefix_len_kv(original_kv, target_prefix_len);
311332
let props = build_writer_properties(
@@ -404,9 +425,19 @@ fn resolve_prefix_sort_cols(
404425
/// constant and contributes no transitions to the composite key —
405426
/// equivalent to skipping it, but kept explicit so the resulting
406427
/// alignment claim matches the caller's requested `target_prefix_len`.
428+
///
429+
/// Detects SS-1 violations (unsorted input) up-front: each emitted
430+
/// slice's composite prefix-value bytes must be unique. If two
431+
/// non-adjacent slices carry the same prefix value (e.g., rows
432+
/// `[A,A,B,B,A,A]`), the input is not sorted by its declared sort
433+
/// schema, so we'd synthesize a file with two RGs sharing the prefix
434+
/// — a PA-3 violation the downstream merge engine would reject
435+
/// mid-merge. Bailing here with `InputNotSorted` keeps that bad file
436+
/// from ever landing on disk.
407437
fn compute_prefix_value_slices(
408438
batch: &RecordBatch,
409439
prefix_col_indices: &[Option<usize>],
440+
target_prefix_len: u32,
410441
) -> Result<Vec<(usize, usize)>, LegacyAdapterError> {
411442
let n = batch.num_rows();
412443
let cols: Vec<ArrayRef> = prefix_col_indices
@@ -428,15 +459,35 @@ fn compute_prefix_value_slices(
428459
if n_rows == 0 {
429460
return Ok(Vec::new());
430461
}
462+
// Track each emitted slice's starting prefix-value bytes; any
463+
// repeat signals SS-1 violation on the input.
464+
let mut seen: HashMap<Vec<u8>, usize> = HashMap::new();
431465
let mut slices = Vec::new();
432466
let mut start = 0;
467+
let record_slice = |slices: &mut Vec<(usize, usize)>,
468+
seen: &mut HashMap<Vec<u8>, usize>,
469+
slice_start: usize,
470+
slice_len: usize|
471+
-> Result<(), LegacyAdapterError> {
472+
let key = rows.row(slice_start).as_ref().to_vec();
473+
if let Some(&first_offset) = seen.get(&key) {
474+
return Err(LegacyAdapterError::InputNotSorted {
475+
target: target_prefix_len,
476+
first_offset,
477+
second_offset: slice_start,
478+
});
479+
}
480+
seen.insert(key, slice_start);
481+
slices.push((slice_start, slice_len));
482+
Ok(())
483+
};
433484
for i in 1..n_rows {
434485
if rows.row(i) != rows.row(i - 1) {
435-
slices.push((start, i - start));
486+
record_slice(&mut slices, &mut seen, start, i - start)?;
436487
start = i;
437488
}
438489
}
439-
slices.push((start, n_rows - start));
490+
record_slice(&mut slices, &mut seen, start, n_rows - start)?;
440491
Ok(slices)
441492
}
442493

@@ -1363,6 +1414,19 @@ mod tests {
13631414
Some("1"),
13641415
"re-encoded file must declare rg_partition_prefix_len=1",
13651416
);
1417+
1418+
// F9 chunk-level verification: the count + KV checks above
1419+
// would still pass if `compute_prefix_value_slices` had an
1420+
// off-by-one in its boundary detection. PA-1 + PA-3 on chunk
1421+
// statistics nail down that each RG's metric_name column is
1422+
// actually constant and no two RGs share a value.
1423+
crate::merge::streaming::region_grouping::assert_unique_rg_prefix_keys(
1424+
adapter.metadata(),
1425+
"metric_name|-timestamp_secs/V2",
1426+
1,
1427+
"test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg adapter output",
1428+
)
1429+
.expect("adapter output must satisfy PA-1 + PA-3 on metric_name");
13661430
}
13671431

13681432
/// Single-metric legacy file: only one prefix value, so the
@@ -1660,6 +1724,19 @@ mod tests {
16601724
Some("2"),
16611725
"stamped prefix_len must match caller's request",
16621726
);
1727+
1728+
// F9 chunk-level verification: a `compute_prefix_value_slices`
1729+
// bug splitting on only the first prefix col (or off by one)
1730+
// would still yield 4 RGs of [20,20,20,20] but with the wrong
1731+
// CONTENTS. PA-1 + PA-3 on the composite (metric, service)
1732+
// composite key verifies content alignment directly.
1733+
crate::merge::streaming::region_grouping::assert_unique_rg_prefix_keys(
1734+
adapter.metadata(),
1735+
"metric_name|service|-timestamp_secs/V2",
1736+
2,
1737+
"test_target_prefix_len_two_splits_by_metric_and_service adapter output",
1738+
)
1739+
.expect("composite prefix output must satisfy PA-1 + PA-3");
16631740
}
16641741

16651742
/// SS-3: a sort column named in `qh.sort_fields` but missing from
@@ -1732,6 +1809,58 @@ mod tests {
17321809
.expect("SS-3 null col must satisfy PA-1 + PA-3 (null is constant across all RGs)");
17331810
}
17341811

1812+
/// F8 regression: an unsorted legacy input (rows
1813+
/// `[A,A,B,B,A,A]` on `metric_name`) violates SS-1. Walking
1814+
/// row-by-row to find prefix transitions would emit three slices —
1815+
/// `A`, `B`, `A` — and synthesize a file with two RGs sharing the
1816+
/// prefix value `A`, violating PA-3. The downstream streaming
1817+
/// merge engine would catch this later, but only once the bad
1818+
/// file had been built and possibly archived. The adapter must
1819+
/// bail upfront with `InputNotSorted` so no PA-3-violating file
1820+
/// ever lands on disk.
1821+
#[tokio::test]
1822+
async fn test_unsorted_legacy_input_rejected_by_adapter() {
1823+
// metric_name in row order: cpu.usage, memory.used, cpu.usage.
1824+
// That's an SS-1 violation under sort schema `metric_name ASC`.
1825+
let bad_metrics = [
1826+
("cpu.usage", 20usize),
1827+
("memory.used", 20),
1828+
("cpu.usage", 20),
1829+
];
1830+
let bytes =
1831+
write_sorted_multi_rg_legacy_file(&bad_metrics, "metric_name|-timestamp_secs/V2", 20);
1832+
1833+
let source = CountingInMemorySource::new(bytes);
1834+
let result = LegacyInputAdapter::try_open(source, dummy_path(), 1).await;
1835+
let Err(err) = result else {
1836+
panic!(
1837+
"unsorted legacy input must surface as InputNotSorted, got Ok(...) — the adapter \
1838+
would have written a PA-3-violating file"
1839+
);
1840+
};
1841+
match err {
1842+
LegacyAdapterError::InputNotSorted {
1843+
target,
1844+
first_offset,
1845+
second_offset,
1846+
} => {
1847+
assert_eq!(target, 1);
1848+
// First `cpu.usage` run is at offset 0; second is at
1849+
// offset 40 (after the 20-row `cpu.usage` then 20-row
1850+
// `memory.used` runs).
1851+
assert_eq!(
1852+
first_offset, 0,
1853+
"first duplicate prefix offset should point at the first cpu.usage run",
1854+
);
1855+
assert_eq!(
1856+
second_offset, 40,
1857+
"second duplicate prefix offset should point at the second cpu.usage run",
1858+
);
1859+
}
1860+
other => panic!("expected InputNotSorted, got: {other}"),
1861+
}
1862+
}
1863+
17351864
/// Composite-prefix fixture: rows grouped by `(metric, service)`
17361865
/// in the order supplied. Used by the prefix_len=2 test to verify
17371866
/// transitions on the second prefix column trigger RG splits.

0 commit comments

Comments
 (0)