Skip to content

Commit d12a52a

Browse files
g-talbotclaude
andcommitted
fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers
Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent af7ddc5 commit d12a52a

2 files changed

Lines changed: 167 additions & 3 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/streaming.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1893,6 +1893,17 @@ mod tests {
18931893
outputs[0].num_row_groups, 2,
18941894
"MergeOutputFile.num_row_groups should match physical row group count",
18951895
);
1896+
1897+
// F2 chunk-level verification: confirm each output RG actually
1898+
// carries a single distinct metric_name (PA-1 + PA-3 read
1899+
// straight off the column-chunk statistics).
1900+
assert_unique_rg_prefix_keys(
1901+
reader.metadata(),
1902+
"metric_name|-timestamp_secs/V2",
1903+
1,
1904+
"test_multi_rg_metric_aligned_input_produces_multi_rg_output output",
1905+
)
1906+
.expect("streaming engine output must satisfy PA-1 + PA-3 on metric_name");
18961907
}
18971908

18981909
/// Regression for Codex P2 on PR-6410: a streaming merge output
@@ -2081,6 +2092,20 @@ mod tests {
20812092
"three distinct (metric_name, service) pairs must produce three output RGs",
20822093
);
20832094
assert_eq!(outputs[0].num_row_groups, 3);
2095+
2096+
// F2 chunk-level verification: counting RGs and stamping a KV
2097+
// is not enough — the OUTPUT's row groups must actually be
2098+
// aligned on the composite (metric_name, service) prefix.
2099+
// `assert_unique_rg_prefix_keys` enforces PA-1 (intra-RG
2100+
// constancy) + PA-3 (inter-RG uniqueness) by reading the
2101+
// chunk-level statistics.
2102+
assert_unique_rg_prefix_keys(
2103+
reader.metadata(),
2104+
"metric_name|service|-timestamp_secs/V2",
2105+
2,
2106+
"test_streaming_merge_with_prefix_len_two output",
2107+
)
2108+
.expect("streaming engine output must satisfy PA-1 + PA-3 on the prefix columns");
20842109
}
20852110

20862111
/// Regression for Codex finding #1 on PR-6410: when one input
@@ -2768,6 +2793,16 @@ mod tests {
27682793
(third_block - 1.0).abs() < 1e-9,
27692794
"third output RG should be 'dev' (marker 1.0), got {third_block}",
27702795
);
2796+
2797+
// F2 chunk-level verification: each output RG must be aligned
2798+
// on (metric_name, -env). PA-1 + PA-3 read from chunk stats.
2799+
assert_unique_rg_prefix_keys(
2800+
reader.metadata(),
2801+
"metric_name|-env|-timestamp_secs/V2",
2802+
2,
2803+
"test_streaming_merge_with_desc_prefix_col output",
2804+
)
2805+
.expect("DESC prefix output must satisfy PA-1 + PA-3");
27712806
}
27722807

27732808
/// Regression for the composite-key encoding when ASC and DESC

quickwit/quickwit-parquet-engine/src/storage/legacy_adapter.rs

Lines changed: 132 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
// deprecated items at module scope keeps that lookup direct.
5858
#![allow(deprecated)]
5959

60+
use std::collections::HashMap;
6061
use std::io;
6162
use std::ops::Range;
6263
use std::path::{Path, PathBuf};
@@ -141,6 +142,26 @@ pub enum LegacyAdapterError {
141142
enough sort information to safely synthesize prefix-aligned row groups)"
142143
)]
143144
PrefixUnresolvable { target: u32, reason: String },
145+
146+
/// The legacy file's rows are not sorted by its declared sort schema
147+
/// (SS-1 violation): two row regions in the file carry the same
148+
/// composite prefix value with other prefix values in between. The
149+
/// adapter walks rows in physical order and emits one RG per
150+
/// prefix-value run, so an unsorted input produces multiple RGs
151+
/// sharing a prefix key — which violates PA-3 (per-input uniqueness).
152+
/// Bail upfront instead of producing a file the downstream merge
153+
/// engine will reject mid-merge.
154+
#[error(
155+
"legacy input is not sorted by its declared sort schema: rows at offset {first_offset} \
156+
and offset {second_offset} share composite prefix value (target_prefix_len = {target}). \
157+
The adapter relies on the file being sorted per SS-1; an unsorted file would synthesize \
158+
multiple row groups with the same prefix key (PA-3 violation)."
159+
)]
160+
InputNotSorted {
161+
target: u32,
162+
first_offset: usize,
163+
second_offset: usize,
164+
},
144165
}
145166

146167
/// 4 GiB upper bound on the input file size we will buffer into RAM.
@@ -290,7 +311,7 @@ fn reencode_prefix_aligned(
290311
let slices = if consolidated_batch.num_rows() == 0 {
291312
Vec::new()
292313
} else {
293-
compute_prefix_value_slices(&consolidated_batch, &prefix_col_indices)?
314+
compute_prefix_value_slices(&consolidated_batch, &prefix_col_indices, target_prefix_len)?
294315
};
295316
let kv_with_prefix = inject_prefix_len_kv(original_kv, target_prefix_len);
296317
let props = build_writer_properties(
@@ -389,9 +410,19 @@ fn resolve_prefix_sort_cols(
389410
/// constant and contributes no transitions to the composite key —
390411
/// equivalent to skipping it, but kept explicit so the resulting
391412
/// alignment claim matches the caller's requested `target_prefix_len`.
413+
///
414+
/// Detects SS-1 violations (unsorted input) up-front: each emitted
415+
/// slice's composite prefix-value bytes must be unique. If two
416+
/// non-adjacent slices carry the same prefix value (e.g., rows
417+
/// `[A,A,B,B,A,A]`), the input is not sorted by its declared sort
418+
/// schema, so we'd synthesize a file with two RGs sharing the prefix
419+
/// — a PA-3 violation the downstream merge engine would reject
420+
/// mid-merge. Bailing here with `InputNotSorted` keeps that bad file
421+
/// from ever landing on disk.
392422
fn compute_prefix_value_slices(
393423
batch: &RecordBatch,
394424
prefix_col_indices: &[Option<usize>],
425+
target_prefix_len: u32,
395426
) -> Result<Vec<(usize, usize)>, LegacyAdapterError> {
396427
let n = batch.num_rows();
397428
let cols: Vec<ArrayRef> = prefix_col_indices
@@ -413,15 +444,35 @@ fn compute_prefix_value_slices(
413444
if n_rows == 0 {
414445
return Ok(Vec::new());
415446
}
447+
// Track each emitted slice's starting prefix-value bytes; any
448+
// repeat signals SS-1 violation on the input.
449+
let mut seen: HashMap<Vec<u8>, usize> = HashMap::new();
416450
let mut slices = Vec::new();
417451
let mut start = 0;
452+
let record_slice = |slices: &mut Vec<(usize, usize)>,
453+
seen: &mut HashMap<Vec<u8>, usize>,
454+
slice_start: usize,
455+
slice_len: usize|
456+
-> Result<(), LegacyAdapterError> {
457+
let key = rows.row(slice_start).as_ref().to_vec();
458+
if let Some(&first_offset) = seen.get(&key) {
459+
return Err(LegacyAdapterError::InputNotSorted {
460+
target: target_prefix_len,
461+
first_offset,
462+
second_offset: slice_start,
463+
});
464+
}
465+
seen.insert(key, slice_start);
466+
slices.push((slice_start, slice_len));
467+
Ok(())
468+
};
418469
for i in 1..n_rows {
419470
if rows.row(i) != rows.row(i - 1) {
420-
slices.push((start, i - start));
471+
record_slice(&mut slices, &mut seen, start, i - start)?;
421472
start = i;
422473
}
423474
}
424-
slices.push((start, n_rows - start));
475+
record_slice(&mut slices, &mut seen, start, n_rows - start)?;
425476
Ok(slices)
426477
}
427478

@@ -1348,6 +1399,19 @@ mod tests {
13481399
Some("1"),
13491400
"re-encoded file must declare rg_partition_prefix_len=1",
13501401
);
1402+
1403+
// F9 chunk-level verification: the count + KV checks above
1404+
// would still pass if `compute_prefix_value_slices` had an
1405+
// off-by-one in its boundary detection. PA-1 + PA-3 on chunk
1406+
// statistics nail down that each RG's metric_name column is
1407+
// actually constant and no two RGs share a value.
1408+
crate::merge::streaming::region_grouping::assert_unique_rg_prefix_keys(
1409+
adapter.metadata(),
1410+
"metric_name|-timestamp_secs/V2",
1411+
1,
1412+
"test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg adapter output",
1413+
)
1414+
.expect("adapter output must satisfy PA-1 + PA-3 on metric_name");
13511415
}
13521416

13531417
/// Single-metric legacy file: only one prefix value, so the
@@ -1532,6 +1596,19 @@ mod tests {
15321596
Some("2"),
15331597
"stamped prefix_len must match caller's request",
15341598
);
1599+
1600+
// F9 chunk-level verification: a `compute_prefix_value_slices`
1601+
// bug splitting on only the first prefix col (or off by one)
1602+
// would still yield 4 RGs of [20,20,20,20] but with the wrong
1603+
// CONTENTS. PA-1 + PA-3 on the composite (metric, service)
1604+
// composite key verifies content alignment directly.
1605+
crate::merge::streaming::region_grouping::assert_unique_rg_prefix_keys(
1606+
adapter.metadata(),
1607+
"metric_name|service|-timestamp_secs/V2",
1608+
2,
1609+
"test_target_prefix_len_two_splits_by_metric_and_service adapter output",
1610+
)
1611+
.expect("composite prefix output must satisfy PA-1 + PA-3");
15351612
}
15361613

15371614
/// SS-3: a sort column named in `qh.sort_fields` but missing from
@@ -1604,6 +1681,58 @@ mod tests {
16041681
.expect("SS-3 null col must satisfy PA-1 + PA-3 (null is constant across all RGs)");
16051682
}
16061683

1684+
/// F8 regression: an unsorted legacy input (rows
1685+
/// `[A,A,B,B,A,A]` on `metric_name`) violates SS-1. Walking
1686+
/// row-by-row to find prefix transitions would emit three slices —
1687+
/// `A`, `B`, `A` — and synthesize a file with two RGs sharing the
1688+
/// prefix value `A`, violating PA-3. The downstream streaming
1689+
/// merge engine would catch this later, but only once the bad
1690+
/// file had been built and possibly archived. The adapter must
1691+
/// bail upfront with `InputNotSorted` so no PA-3-violating file
1692+
/// ever lands on disk.
1693+
#[tokio::test]
1694+
async fn test_unsorted_legacy_input_rejected_by_adapter() {
1695+
// metric_name in row order: cpu.usage, memory.used, cpu.usage.
1696+
// That's an SS-1 violation under sort schema `metric_name ASC`.
1697+
let bad_metrics = [
1698+
("cpu.usage", 20usize),
1699+
("memory.used", 20),
1700+
("cpu.usage", 20),
1701+
];
1702+
let bytes =
1703+
write_sorted_multi_rg_legacy_file(&bad_metrics, "metric_name|-timestamp_secs/V2", 20);
1704+
1705+
let source = CountingInMemorySource::new(bytes);
1706+
let result = LegacyInputAdapter::try_open(source, dummy_path(), 1).await;
1707+
let Err(err) = result else {
1708+
panic!(
1709+
"unsorted legacy input must surface as InputNotSorted, got Ok(...) — the adapter \
1710+
would have written a PA-3-violating file"
1711+
);
1712+
};
1713+
match err {
1714+
LegacyAdapterError::InputNotSorted {
1715+
target,
1716+
first_offset,
1717+
second_offset,
1718+
} => {
1719+
assert_eq!(target, 1);
1720+
// First `cpu.usage` run is at offset 0; second is at
1721+
// offset 40 (after the 20-row `cpu.usage` then 20-row
1722+
// `memory.used` runs).
1723+
assert_eq!(
1724+
first_offset, 0,
1725+
"first duplicate prefix offset should point at the first cpu.usage run",
1726+
);
1727+
assert_eq!(
1728+
second_offset, 40,
1729+
"second duplicate prefix offset should point at the second cpu.usage run",
1730+
);
1731+
}
1732+
other => panic!("expected InputNotSorted, got: {other}"),
1733+
}
1734+
}
1735+
16071736
/// Composite-prefix fixture: rows grouped by `(metric, service)`
16081737
/// in the order supplied. Used by the prefix_len=2 test to verify
16091738
/// transitions on the second prefix column trigger RG splits.

0 commit comments

Comments
 (0)