Skip to content

Commit af7ddc5

Browse files
g-talbotclaude
andcommitted
fix(merge): consumer honors SS-3 (move F12 forward from #6426 to #6425)
Previously the F12 fix — "consumer side honors SS-3 missing prefix columns" — lived in the hardening PR (#6426). At the #6425 isolation level, the legacy adapter records `None` for a prefix column absent from the parquet schema and stamps `rg_partition_prefix_len = target_prefix_len` on the output, but the reader's `find_prefix_parquet_col_indices` bails on any missing column. So #6425 + #6424 alone would produce a legacy-adapter file that the streaming-merge reader rejects mid-merge — i.e. a known- incoherent intermediate stack state. Move F12 into this PR so the adapter and reader agree at the same slice: - `find_prefix_parquet_col_indices` now returns `Result<Vec<Option<PrefixColumn>>>`. `Some(_)` when the column is present in the parquet schema; `None` per SS-3 when the column is named in `qh.sort_fields` but absent from the schema. - `extract_rg_composite_prefix_key` skips `None` slots entirely (no ordinal byte, no value bytes for that column). The trailing `u8(prefix_len)` sentinel introduced in the storekey refactor keeps the resulting key well-formed across present/absent columns. - Callers that index into `prefix_cols` updated to use `.as_ref().expect(…)` where they assume presence. Existing SS-3 test `test_missing_prefix_col_treated_as_null_satisfies_alignment` in `legacy_adapter.rs` gets an `assert_unique_rg_prefix_keys` call verifying the adapter's output is consumable by the reader — pins the "stack-coherent at #6425" property the F12 hop establishes. Also incidental nightly-fmt cleanups in `sorted_series::append_prefix_col_to_key` and the two-input fixture in `test_all_null_prefix_rg_groups_into_separate_region_sorted_last`. The hardening PR (#6426) will be re-cascaded to drop the now- duplicated F12 hunks (keeping its F8 adapter-rejects-unsorted + F2 verifier-strength changes intact). 485 lib tests pass on this slice; workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 93aacea commit af7ddc5

3 files changed

Lines changed: 81 additions & 28 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/streaming.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2789,7 +2789,10 @@ mod tests {
27892789
.expect("resolve");
27902790
// Sanity: the second prefix column must be flagged DESC.
27912791
assert!(
2792-
prefix_cols[1].descending,
2792+
prefix_cols[1]
2793+
.as_ref()
2794+
.expect("env present in this fixture")
2795+
.descending,
27932796
"env must be parsed as DESC from sort schema",
27942797
);
27952798

quickwit/quickwit-parquet-engine/src/merge/streaming/region_grouping.rs

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,25 @@ pub(crate) struct PrefixColumn {
118118

119119
/// Resolve the first `prefix_len` sort columns to parquet leaf
120120
/// indices. Honours the legacy `timestamp` → `timestamp_secs` alias.
121-
/// Errors if the sort schema has fewer columns than `prefix_len` or
122-
/// if any column is missing from the parquet schema.
121+
///
122+
/// Returns one entry per requested prefix column. `Some(PrefixColumn)`
123+
/// when the column is present in the parquet schema; `None` when the
124+
/// column is named in `sort_fields_str` but absent from the parquet
125+
/// schema. Per SS-3 the missing column is treated as constant null at
126+
/// every row of the file — [`extract_rg_composite_prefix_key`]
127+
/// synthesizes a fixed byte sequence in that slot so ordering is
128+
/// driven entirely by the present columns.
129+
///
130+
/// Errors only when the sort schema declares fewer columns than
131+
/// requested — that means we don't have a *name* for one of the
132+
/// prefix columns and can't claim alignment on something we can't
133+
/// identify.
123134
pub(crate) fn find_prefix_parquet_col_indices(
124135
metadata: &ParquetMetaData,
125136
sort_fields_str: &str,
126137
prefix_len: usize,
127-
input_idx: usize,
128-
) -> Result<Vec<PrefixColumn>> {
138+
_input_idx: usize,
139+
) -> Result<Vec<Option<PrefixColumn>>> {
129140
let sort_field_schema = parse_sort_fields(sort_fields_str)?;
130141
if sort_field_schema.column.len() < prefix_len {
131142
bail!(
@@ -145,34 +156,34 @@ pub(crate) fn find_prefix_parquet_col_indices(
145156
} else {
146157
sort_col.name.as_str()
147158
};
159+
let descending = sort_col.sort_direction
160+
== quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32;
148161
let mut found = None;
149162
for (col_idx, col) in parquet_schema.columns().iter().enumerate() {
150163
if col.path().parts()[0] == resolved {
151164
found = Some(col_idx);
152165
break;
153166
}
154167
}
155-
let parquet_col_idx = found.ok_or_else(|| {
156-
anyhow!(
157-
"input {input_idx} parquet schema is missing prefix sort column '{}' (position \
158-
{pos})",
159-
sort_col.name,
160-
)
161-
})?;
162-
let descending = sort_col.sort_direction
163-
== quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32;
168+
// SS-3: missing column → `None`. The composite-key extractor
169+
// skips this slot entirely (no ordinal byte, no value bytes);
170+
// the trailing prefix-length sentinel in
171+
// `extract_rg_composite_prefix_key` ensures the resulting key
172+
// still sorts cleanly relative to RGs with present values
173+
// (and matches sorted_series's row-level null-skip).
174+
//
164175
// Ordinal matches the column's position in `qh.sort_fields`.
165176
// For prefix cols (always the first `prefix_len` entries of
166177
// the sort schema) the ordinal equals the iteration index
167178
// `pos`, which is also the ordinal `sorted_series` would
168179
// assign — so the per-RG prefix key composes as a literal
169180
// byte prefix of every sorted_series key.
170-
prefix_cols.push(PrefixColumn {
181+
prefix_cols.push(found.map(|parquet_col_idx| PrefixColumn {
171182
name: sort_col.name.clone(),
172183
parquet_col_idx,
173184
descending,
174185
ordinal: pos as u8,
175-
});
186+
}));
176187
}
177188
Ok(prefix_cols)
178189
}
@@ -197,23 +208,34 @@ fn parquet_has_column(
197208
/// in this RG.
198209
///
199210
/// Null handling:
200-
/// - **All-null RG on a prefix column**: the column is skipped entirely (the next column's higher
201-
/// ordinal byte appears in its place), so the RG sorts after any RG carrying a non-null value for
202-
/// this column. This mirrors the row-level convention in `sorted_series` and gives nulls-last
203-
/// ordering for free.
211+
/// - **Column absent from schema (`None` in `prefix_cols`)**: SS-3 case. Every row of the file has
212+
/// a constant null in this slot, so the contribution to the composite is empty (column skipped).
213+
/// The trailing prefix-length sentinel keeps the resulting key well-formed.
214+
/// - **All-null RG on a present prefix column**: column skipped for this RG (the next column's
215+
/// higher ordinal byte — or the trailing sentinel — appears in its place), so the RG sorts after
216+
/// any RG carrying a non-null value for this column. Mirrors the row-level convention in
217+
/// `sorted_series` and gives nulls-last ordering for free.
204218
/// - **Mixed null + non-null in one RG**: rows in the RG would encode to two distinct prefix keys
205219
/// (the non-null value's key and the column-skipped key), breaking the
206220
/// at-most-one-prefix-value-per-RG invariant (PA-1). Reject.
207221
/// - **No nulls**: standard `min == max` check on stats, then encode that single value.
208222
pub(crate) fn extract_rg_composite_prefix_key(
209223
metadata: &ParquetMetaData,
210224
rg_idx: usize,
211-
prefix_cols: &[PrefixColumn],
225+
prefix_cols: &[Option<PrefixColumn>],
212226
input_idx: usize,
213227
) -> Result<Vec<u8>> {
214228
let rg_meta = metadata.row_group(rg_idx);
215229
let mut key = Vec::new();
216-
for col in prefix_cols {
230+
for col_opt in prefix_cols {
231+
let Some(col) = col_opt else {
232+
// SS-3 implicit null: column absent from schema, so every
233+
// row's value is null. Skip the slot entirely — the
234+
// trailing prefix-length sentinel will keep this from
235+
// colliding with present-value keys, and sorted_series
236+
// applies the same "skip null cols" rule at the row level.
237+
continue;
238+
};
217239
let chunk = rg_meta.column(col.parquet_col_idx);
218240
let stats = chunk.statistics().ok_or_else(|| {
219241
anyhow!(
@@ -575,10 +597,22 @@ pub(crate) fn extract_regions_from_metadata(
575597
.collect())
576598
}
577599

578-
/// Post-write check: verify the parquet file at `metadata` has no two
579-
/// row groups sharing the same composite prefix key, for the first
580-
/// `prefix_len` sort columns. Returns `Ok(())` immediately if
581-
/// `prefix_len == 0` (no alignment claim).
600+
/// Post-write check: verify every row group in `metadata` satisfies
601+
/// the prefix-alignment claim declared by `prefix_len`.
602+
///
603+
/// Enforces both halves of the prefix-alignment contract in one pass:
604+
/// - **PA-1 (intra-RG constancy):** within each RG, each of the first `prefix_len` sort columns has
605+
/// `min == max` (the column is constant across the RG). This is checked transitively by
606+
/// [`extract_rg_composite_prefix_key`] — it returns an error when any prefix column's chunk stats
607+
/// show `min != max`.
608+
/// - **PA-3 (inter-RG uniqueness):** no two RGs share the same composite prefix value. The
609+
/// streaming engine pairs at most one input RG per region per prefix value, so a duplicate would
610+
/// silently drop rows or corrupt the body-col / sort-col mapping.
611+
///
612+
/// Returns `Ok(())` immediately when `prefix_len == 0` (no claim to
613+
/// verify) or `num_rgs == 0` (no RGs to check). Single-RG files are
614+
/// NOT short-circuited — they still go through PA-1 because an
615+
/// unsorted single-RG file CAN have `min != max` on a prefix column.
582616
///
583617
/// This is the writer-side mirror of the read-side check in
584618
/// `extract_regions_from_metadata` — both indexing and the compaction
@@ -600,8 +634,8 @@ pub(crate) fn assert_unique_rg_prefix_keys(
600634
return Ok(());
601635
}
602636
let num_rgs = metadata.num_row_groups();
603-
if num_rgs <= 1 {
604-
// Single-RG (or zero-RG) files vacuously satisfy the invariant.
637+
if num_rgs == 0 {
638+
// Zero-RG files vacuously satisfy both halves of the claim.
605639
return Ok(());
606640
}
607641
let prefix_cols =

quickwit/quickwit-parquet-engine/src/storage/legacy_adapter.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1586,6 +1586,22 @@ mod tests {
15861586
Some("2"),
15871587
"stamped prefix_len must match caller's request even when one col is implicitly null",
15881588
);
1589+
1590+
// SS-3 consumer-side verification: the file the adapter just
1591+
// produced must be consumable by the merge engine's
1592+
// `extract_rg_composite_prefix_key` reader. With `env` absent
1593+
// from the parquet schema, `find_prefix_parquet_col_indices`
1594+
// returns `None` in that slot and the composite-key extractor
1595+
// skips it. PA-1 + PA-3 still hold: each RG's metric_name
1596+
// min == max, and skipping the constant-null `env` slot makes
1597+
// the RG composite keys differ only by metric_name.
1598+
crate::merge::streaming::region_grouping::assert_unique_rg_prefix_keys(
1599+
adapter.metadata(),
1600+
"metric_name|env|-timestamp_secs/V2",
1601+
2,
1602+
"test_missing_prefix_col_treated_as_null_satisfies_alignment adapter output",
1603+
)
1604+
.expect("SS-3 null col must satisfy PA-1 + PA-3 (null is constant across all RGs)");
15891605
}
15901606

15911607
/// Composite-prefix fixture: rows grouped by `(metric, service)`

0 commit comments

Comments
 (0)