Skip to content

Commit a255ae3

Browse files
g-talbotclaude
andcommitted
fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers
Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 61ad48c commit a255ae3

3 files changed

Lines changed: 254 additions & 32 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/streaming.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1893,6 +1893,17 @@ mod tests {
18931893
outputs[0].num_row_groups, 2,
18941894
"MergeOutputFile.num_row_groups should match physical row group count",
18951895
);
1896+
1897+
// F2 chunk-level verification: confirm each output RG actually
1898+
// carries a single distinct metric_name (PA-1 + PA-3 read
1899+
// straight off the column-chunk statistics).
1900+
assert_unique_rg_prefix_keys(
1901+
reader.metadata(),
1902+
"metric_name|-timestamp_secs/V2",
1903+
1,
1904+
"test_multi_rg_metric_aligned_input_produces_multi_rg_output output",
1905+
)
1906+
.expect("streaming engine output must satisfy PA-1 + PA-3 on metric_name");
18961907
}
18971908

18981909
/// Regression for Codex P2 on PR-6410: a streaming merge output
@@ -2068,6 +2079,20 @@ mod tests {
20682079
"three distinct (metric_name, service) pairs must produce three output RGs",
20692080
);
20702081
assert_eq!(outputs[0].num_row_groups, 3);
2082+
2083+
// F2 chunk-level verification: counting RGs and stamping a KV
2084+
// is not enough — the OUTPUT's row groups must actually be
2085+
// aligned on the composite (metric_name, service) prefix.
2086+
// `assert_unique_rg_prefix_keys` enforces PA-1 (intra-RG
2087+
// constancy) + PA-3 (inter-RG uniqueness) by reading the
2088+
// chunk-level statistics.
2089+
assert_unique_rg_prefix_keys(
2090+
reader.metadata(),
2091+
"metric_name|service|-timestamp_secs/V2",
2092+
2,
2093+
"test_streaming_merge_with_prefix_len_two output",
2094+
)
2095+
.expect("streaming engine output must satisfy PA-1 + PA-3 on the prefix columns");
20712096
}
20722097

20732098
/// Regression for Codex finding #1 on PR-6410: when one input
@@ -2784,6 +2809,16 @@ mod tests {
27842809
(third_block - 1.0).abs() < 1e-9,
27852810
"third output RG should be 'dev' (marker 1.0), got {third_block}",
27862811
);
2812+
2813+
// F2 chunk-level verification: each output RG must be aligned
2814+
// on (metric_name, -env). PA-1 + PA-3 read from chunk stats.
2815+
assert_unique_rg_prefix_keys(
2816+
reader.metadata(),
2817+
"metric_name|-env|-timestamp_secs/V2",
2818+
2,
2819+
"test_streaming_merge_with_desc_prefix_col output",
2820+
)
2821+
.expect("DESC prefix output must satisfy PA-1 + PA-3");
27872822
}
27882823

27892824
/// Regression for the composite-key encoding when ASC and DESC
@@ -2805,7 +2840,10 @@ mod tests {
28052840
.expect("resolve");
28062841
// Sanity: the second prefix column must be flagged DESC.
28072842
assert!(
2808-
prefix_cols[1].descending,
2843+
prefix_cols[1]
2844+
.as_ref()
2845+
.expect("env present in this fixture")
2846+
.descending,
28092847
"env must be parsed as DESC from sort schema",
28102848
);
28112849

quickwit/quickwit-parquet-engine/src/merge/streaming/region_grouping.rs

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,25 @@ pub(crate) struct PrefixColumn {
112112

113113
/// Resolve the first `prefix_len` sort columns to parquet leaf
114114
/// indices. Honours the legacy `timestamp` → `timestamp_secs` alias.
115-
/// Errors if the sort schema has fewer columns than `prefix_len` or
116-
/// if any column is missing from the parquet schema.
115+
///
116+
/// Returns one entry per requested prefix column. `Some(PrefixColumn)`
117+
/// when the column is present in the parquet schema; `None` when the
118+
/// column is named in `sort_fields_str` but absent from the parquet
119+
/// schema. Per SS-3 the missing column is treated as constant null at
120+
/// every row of the file — [`extract_rg_composite_prefix_key`]
121+
/// synthesizes a fixed byte sequence in that slot so ordering is
122+
/// driven entirely by the present columns.
123+
///
124+
/// Errors only when the sort schema declares fewer columns than
125+
/// requested — that means we don't have a *name* for one of the
126+
/// prefix columns and can't claim alignment on something we can't
127+
/// identify.
117128
pub(crate) fn find_prefix_parquet_col_indices(
118129
metadata: &ParquetMetaData,
119130
sort_fields_str: &str,
120131
prefix_len: usize,
121-
input_idx: usize,
122-
) -> Result<Vec<PrefixColumn>> {
132+
_input_idx: usize,
133+
) -> Result<Vec<Option<PrefixColumn>>> {
123134
let sort_field_schema = parse_sort_fields(sort_fields_str)?;
124135
if sort_field_schema.column.len() < prefix_len {
125136
bail!(
@@ -129,7 +140,7 @@ pub(crate) fn find_prefix_parquet_col_indices(
129140
}
130141
let parquet_schema = metadata.file_metadata().schema_descr();
131142
let mut prefix_cols = Vec::with_capacity(prefix_len);
132-
for (pos, sort_col) in sort_field_schema.column.iter().take(prefix_len).enumerate() {
143+
for sort_col in sort_field_schema.column.iter().take(prefix_len) {
133144
// Apply the same `timestamp` / `timestamp_secs` alias the rest
134145
// of the engine uses.
135146
let resolved = if is_timestamp_column_name(&sort_col.name)
@@ -139,27 +150,23 @@ pub(crate) fn find_prefix_parquet_col_indices(
139150
} else {
140151
sort_col.name.as_str()
141152
};
153+
let descending = sort_col.sort_direction
154+
== quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32;
142155
let mut found = None;
143156
for (col_idx, col) in parquet_schema.columns().iter().enumerate() {
144157
if col.path().parts()[0] == resolved {
145158
found = Some(col_idx);
146159
break;
147160
}
148161
}
149-
let parquet_col_idx = found.ok_or_else(|| {
150-
anyhow!(
151-
"input {input_idx} parquet schema is missing prefix sort column '{}' (position \
152-
{pos})",
153-
sort_col.name,
154-
)
155-
})?;
156-
let descending = sort_col.sort_direction
157-
== quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32;
158-
prefix_cols.push(PrefixColumn {
162+
// SS-3: missing column → None. Caller treats it as constant
163+
// null at every row, which trivially satisfies alignment on
164+
// that column.
165+
prefix_cols.push(found.map(|parquet_col_idx| PrefixColumn {
159166
name: sort_col.name.clone(),
160167
parquet_col_idx,
161168
descending,
162-
});
169+
}));
163170
}
164171
Ok(prefix_cols)
165172
}
@@ -179,19 +186,36 @@ fn parquet_has_column(
179186
/// prefix column's value bytes in declared order, with each column's
180187
/// encoding chosen so that lexicographic order on the composite
181188
/// matches the sort schema's order across the prefix columns. Each
182-
/// column is required to be **constant within the RG** — either
183-
/// `min == max` on the non-null cells with zero nulls, or all rows
184-
/// null. A mix of nulls and non-nulls in the same RG breaks the
185-
/// at-most-one-prefix-value-per-RG invariant (PA-1) and is rejected.
189+
/// present column is required to be **constant within the RG** —
190+
/// either `min == max` on the non-null cells with zero nulls, or
191+
/// all rows null. A mix of nulls and non-nulls in the same RG
192+
/// breaks the at-most-one-prefix-value-per-RG invariant (PA-1) and
193+
/// is rejected by [`extract_aligned_prefix_value`].
194+
///
195+
/// A `None` slot in `prefix_cols` represents an SS-3 case: the
196+
/// column is declared in `qh.sort_fields` but absent from the
197+
/// parquet schema. Per SS-3 every row's value in that column is
198+
/// implicitly null. Since the value is constant across all RGs in
199+
/// the file, we contribute a fixed byte sequence (the encoded
200+
/// empty value) in that slot — ordering on this column does no
201+
/// work, and ordering on the other prefix columns picks the region
202+
/// boundaries.
186203
pub(crate) fn extract_rg_composite_prefix_key(
187204
metadata: &ParquetMetaData,
188205
rg_idx: usize,
189-
prefix_cols: &[PrefixColumn],
206+
prefix_cols: &[Option<PrefixColumn>],
190207
input_idx: usize,
191208
) -> Result<Vec<u8>> {
192209
let rg_meta = metadata.row_group(rg_idx);
193210
let mut key = Vec::new();
194-
for col in prefix_cols {
211+
for col_opt in prefix_cols {
212+
let Some(col) = col_opt else {
213+
// SS-3 implicit null: constant for every RG, so any fixed
214+
// marker works. Use the encoded empty byte string so the
215+
// contribution is byte-recognizable in dumps.
216+
key.extend_from_slice(&encode_byte_array_prefix(&[]));
217+
continue;
218+
};
195219
let chunk = rg_meta.column(col.parquet_col_idx);
196220
let stats = chunk.statistics().ok_or_else(|| {
197221
anyhow!(
@@ -569,10 +593,22 @@ pub(crate) fn extract_regions_from_metadata(
569593
.collect())
570594
}
571595

572-
/// Post-write check: verify the parquet file at `metadata` has no two
573-
/// row groups sharing the same composite prefix key, for the first
574-
/// `prefix_len` sort columns. Returns `Ok(())` immediately if
575-
/// `prefix_len == 0` (no alignment claim).
596+
/// Post-write check: verify every row group in `metadata` satisfies
597+
/// the prefix-alignment claim declared by `prefix_len`.
598+
///
599+
/// Enforces both halves of the prefix-alignment contract in one pass:
600+
/// - **PA-1 (intra-RG constancy):** within each RG, each of the first `prefix_len` sort columns has
601+
/// `min == max` (the column is constant across the RG). This is checked transitively by
602+
/// [`extract_rg_composite_prefix_key`] — it returns an error when any prefix column's chunk stats
603+
/// show `min != max`.
604+
/// - **PA-3 (inter-RG uniqueness):** no two RGs share the same composite prefix value. The
605+
/// streaming engine pairs at most one input RG per region per prefix value, so a duplicate would
606+
/// silently drop rows or corrupt the body-col / sort-col mapping.
607+
///
608+
/// Returns `Ok(())` immediately when `prefix_len == 0` (no claim to
609+
/// verify) or `num_rgs == 0` (no RGs to check). Single-RG files are
610+
/// NOT short-circuited — they still go through PA-1 because an
611+
/// unsorted single-RG file CAN have `min != max` on a prefix column.
576612
///
577613
/// This is the writer-side mirror of the read-side check in
578614
/// `extract_regions_from_metadata` — both indexing and the compaction
@@ -594,8 +630,8 @@ pub(crate) fn assert_unique_rg_prefix_keys(
594630
return Ok(());
595631
}
596632
let num_rgs = metadata.num_row_groups();
597-
if num_rgs <= 1 {
598-
// Single-RG (or zero-RG) files vacuously satisfy the invariant.
633+
if num_rgs == 0 {
634+
// Zero-RG files vacuously satisfy both halves of the claim.
599635
return Ok(());
600636
}
601637
let prefix_cols =

0 commit comments

Comments
 (0)