Skip to content

Commit 70d4d44

Browse files
g-talbotclaude
andcommitted
fix(merge): consumer honors SS-3 (move F12 forward from #6426 to #6425)
Previously the F12 fix — "consumer side honors SS-3 missing prefix columns" — lived in the hardening PR (#6426). At the #6425 isolation level, the legacy adapter records `None` for a prefix column absent from the parquet schema and stamps `rg_partition_prefix_len = target_prefix_len` on the output, but the reader's `find_prefix_parquet_col_indices` bails on any missing column. So #6425 + #6424 alone would produce a legacy-adapter file that the streaming-merge reader rejects mid-merge — i.e. a known- incoherent intermediate stack state. Move F12 into this PR so the adapter and reader agree at the same slice: - `find_prefix_parquet_col_indices` now returns `Result<Vec<Option<PrefixColumn>>>`. `Some(_)` when the column is present in the parquet schema; `None` per SS-3 when the column is named in `qh.sort_fields` but absent from the schema. - `extract_rg_composite_prefix_key` skips `None` slots entirely (no ordinal byte, no value bytes for that column). The trailing `u8(prefix_len)` sentinel introduced in the storekey refactor keeps the resulting key well-formed across present/absent columns. - Callers that index into `prefix_cols` updated to use `.as_ref().expect(…)` where they assume presence. Existing SS-3 test `test_missing_prefix_col_treated_as_null_satisfies_alignment` in `legacy_adapter.rs` gets an `assert_unique_rg_prefix_keys` call verifying the adapter's output is consumable by the reader — pins the "stack-coherent at #6425" property the F12 hop establishes. Also incidental nightly-fmt cleanups in `sorted_series::append_prefix_col_to_key` and the two-input fixture in `test_all_null_prefix_rg_groups_into_separate_region_sorted_last`. The hardening PR (#6426) will be re-cascaded to drop the now- duplicated F12 hunks (keeping its F8 adapter-rejects-unsorted + F2 verifier-strength changes intact). 485 lib tests pass on this slice; workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8d055e0 commit 70d4d44

4 files changed

Lines changed: 108 additions & 56 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/streaming.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2250,10 +2250,8 @@ mod tests {
22502250
1,
22512251
);
22522252
let bytes_b = make_nullable_prefix_input_single_rg(&[None, None, None], 1);
2253-
let inputs: Vec<Box<dyn ColumnPageStream>> = vec![
2254-
open_stream(bytes_a).await,
2255-
open_stream(bytes_b).await,
2256-
];
2253+
let inputs: Vec<Box<dyn ColumnPageStream>> =
2254+
vec![open_stream(bytes_a).await, open_stream(bytes_b).await];
22572255

22582256
let tmp = TempDir::new().expect("tmpdir");
22592257
let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1))
@@ -2278,15 +2276,21 @@ mod tests {
22782276
use arrow::array::StringArray;
22792277
let combined = read_output_to_record_batch(&outputs[0].path);
22802278
let mn_idx = combined.schema().index_of("metric_name").expect("mn col");
2281-
let arr = combined.column(mn_idx).as_any().downcast_ref::<StringArray>();
2279+
let arr = combined
2280+
.column(mn_idx)
2281+
.as_any()
2282+
.downcast_ref::<StringArray>();
22822283
let arr = arr.expect("metric_name should decode as StringArray");
22832284
// First 3 rows are the non-null region, last 3 are all-null.
22842285
for i in 0..3 {
22852286
assert!(arr.is_valid(i), "row {i} should be non-null");
22862287
assert_eq!(arr.value(i), "cpu.usage");
22872288
}
22882289
for i in 3..6 {
2289-
assert!(arr.is_null(i), "row {i} should be null (all-null region sorts last)");
2290+
assert!(
2291+
arr.is_null(i),
2292+
"row {i} should be null (all-null region sorts last)"
2293+
);
22902294
}
22912295
}
22922296

@@ -2690,7 +2694,7 @@ mod tests {
26902694
Bytes::from(buf)
26912695
}
26922696

2693-
/// End-to-end regression for DESC prefix columns. Three RGs with
2697+
/// End-to-end regression for DESC prefix columns. Three RGs with
26942698
/// the same metric_name (ASC) and distinct `env` values; sort
26952699
/// schema declares env DESC. The input file must itself be
26962700
/// DESC-sorted on env (RGs in physical order staging → prod →
@@ -2785,7 +2789,10 @@ mod tests {
27852789
.expect("resolve");
27862790
// Sanity: the second prefix column must be flagged DESC.
27872791
assert!(
2788-
prefix_cols[1].descending,
2792+
prefix_cols[1]
2793+
.as_ref()
2794+
.expect("env present in this fixture")
2795+
.descending,
27892796
"env must be parsed as DESC from sort schema",
27902797
);
27912798

quickwit/quickwit-parquet-engine/src/merge/streaming/region_grouping.rs

Lines changed: 73 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,25 @@ pub(crate) struct PrefixColumn {
118118

119119
/// Resolve the first `prefix_len` sort columns to parquet leaf
120120
/// indices. Honours the legacy `timestamp` → `timestamp_secs` alias.
121-
/// Errors if the sort schema has fewer columns than `prefix_len` or
122-
/// if any column is missing from the parquet schema.
121+
///
122+
/// Returns one entry per requested prefix column. `Some(PrefixColumn)`
123+
/// when the column is present in the parquet schema; `None` when the
124+
/// column is named in `sort_fields_str` but absent from the parquet
125+
/// schema. Per SS-3 the missing column is treated as constant null at
126+
/// every row of the file — [`extract_rg_composite_prefix_key`]
127+
/// synthesizes a fixed byte sequence in that slot so ordering is
128+
/// driven entirely by the present columns.
129+
///
130+
/// Errors only when the sort schema declares fewer columns than
131+
/// requested — that means we don't have a *name* for one of the
132+
/// prefix columns and can't claim alignment on something we can't
133+
/// identify.
123134
pub(crate) fn find_prefix_parquet_col_indices(
124135
metadata: &ParquetMetaData,
125136
sort_fields_str: &str,
126137
prefix_len: usize,
127-
input_idx: usize,
128-
) -> Result<Vec<PrefixColumn>> {
138+
_input_idx: usize,
139+
) -> Result<Vec<Option<PrefixColumn>>> {
129140
let sort_field_schema = parse_sort_fields(sort_fields_str)?;
130141
if sort_field_schema.column.len() < prefix_len {
131142
bail!(
@@ -145,34 +156,34 @@ pub(crate) fn find_prefix_parquet_col_indices(
145156
} else {
146157
sort_col.name.as_str()
147158
};
159+
let descending = sort_col.sort_direction
160+
== quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32;
148161
let mut found = None;
149162
for (col_idx, col) in parquet_schema.columns().iter().enumerate() {
150163
if col.path().parts()[0] == resolved {
151164
found = Some(col_idx);
152165
break;
153166
}
154167
}
155-
let parquet_col_idx = found.ok_or_else(|| {
156-
anyhow!(
157-
"input {input_idx} parquet schema is missing prefix sort column '{}' (position \
158-
{pos})",
159-
sort_col.name,
160-
)
161-
})?;
162-
let descending = sort_col.sort_direction
163-
== quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32;
168+
// SS-3: missing column → `None`. The composite-key extractor
169+
// skips this slot entirely (no ordinal byte, no value bytes);
170+
// the trailing prefix-length sentinel in
171+
// `extract_rg_composite_prefix_key` ensures the resulting key
172+
// still sorts cleanly relative to RGs with present values
173+
// (and matches sorted_series's row-level null-skip).
174+
//
164175
// Ordinal matches the column's position in `qh.sort_fields`.
165176
// For prefix cols (always the first `prefix_len` entries of
166177
// the sort schema) the ordinal equals the iteration index
167178
// `pos`, which is also the ordinal `sorted_series` would
168179
// assign — so the per-RG prefix key composes as a literal
169180
// byte prefix of every sorted_series key.
170-
prefix_cols.push(PrefixColumn {
181+
prefix_cols.push(found.map(|parquet_col_idx| PrefixColumn {
171182
name: sort_col.name.clone(),
172183
parquet_col_idx,
173184
descending,
174185
ordinal: pos as u8,
175-
});
186+
}));
176187
}
177188
Ok(prefix_cols)
178189
}
@@ -197,23 +208,34 @@ fn parquet_has_column(
197208
/// in this RG.
198209
///
199210
/// Null handling:
200-
/// - **All-null RG on a prefix column**: the column is skipped entirely (the next column's higher
201-
/// ordinal byte appears in its place), so the RG sorts after any RG carrying a non-null value
202-
/// for this column. This mirrors the row-level convention in `sorted_series` and gives
203-
/// nulls-last ordering for free.
211+
/// - **Column absent from schema (`None` in `prefix_cols`)**: SS-3 case. Every row of the file has
212+
/// a constant null in this slot, so the contribution to the composite is empty (column skipped).
213+
/// The trailing prefix-length sentinel keeps the resulting key well-formed.
214+
/// - **All-null RG on a present prefix column**: column skipped for this RG (the next column's
215+
/// higher ordinal byte — or the trailing sentinel — appears in its place), so the RG sorts after
216+
/// any RG carrying a non-null value for this column. Mirrors the row-level convention in
217+
/// `sorted_series` and gives nulls-last ordering for free.
204218
/// - **Mixed null + non-null in one RG**: rows in the RG would encode to two distinct prefix keys
205219
/// (the non-null value's key and the column-skipped key), breaking the
206220
/// at-most-one-prefix-value-per-RG invariant (PA-1). Reject.
207221
/// - **No nulls**: standard `min == max` check on stats, then encode that single value.
208222
pub(crate) fn extract_rg_composite_prefix_key(
209223
metadata: &ParquetMetaData,
210224
rg_idx: usize,
211-
prefix_cols: &[PrefixColumn],
225+
prefix_cols: &[Option<PrefixColumn>],
212226
input_idx: usize,
213227
) -> Result<Vec<u8>> {
214228
let rg_meta = metadata.row_group(rg_idx);
215229
let mut key = Vec::new();
216-
for col in prefix_cols {
230+
for col_opt in prefix_cols {
231+
let Some(col) = col_opt else {
232+
// SS-3 implicit null: column absent from schema, so every
233+
// row's value is null. Skip the slot entirely — the
234+
// trailing prefix-length sentinel will keep this from
235+
// colliding with present-value keys, and sorted_series
236+
// applies the same "skip null cols" rule at the row level.
237+
continue;
238+
};
217239
let chunk = rg_meta.column(col.parquet_col_idx);
218240
let stats = chunk.statistics().ok_or_else(|| {
219241
anyhow!(
@@ -245,9 +267,8 @@ pub(crate) fn extract_rg_composite_prefix_key(
245267
bail!(
246268
"input {input_idx} rg {rg_idx} col '{}' is NOT prefix-aligned: contains \
247269
{null_count} nulls plus {} non-null values. PA-1 requires each row group to \
248-
carry a single prefix value; rows with null on this column encode to a \
249-
different prefix key (with the column skipped) than rows with the non-null \
250-
value.",
270+
carry a single prefix value; rows with null on this column encode to a different \
271+
prefix key (with the column skipped) than rows with the non-null value.",
251272
col.name,
252273
num_values - null_count,
253274
);
@@ -259,22 +280,17 @@ pub(crate) fn extract_rg_composite_prefix_key(
259280
// Trailing prefix-length sentinel: an additional `u8(prefix_len)`
260281
// ordinal byte that does two things at once:
261282
//
262-
// 1. **Forces nulls-last ordering across RGs.** For prefix_len=1
263-
// an all-null RG produces an empty per-column body and would
264-
// otherwise lex-sort *before* any non-null RG. With the
265-
// sentinel, the all-null key becomes `[prefix_len]` and the
266-
// non-null key becomes `[ord(0), storekey(value), ..., prefix_len]`.
267-
// The non-null key starts with `ord(0) = 0x00`, smaller than
268-
// `prefix_len >= 1`, so non-null RGs sort first — matching
269-
// `sorted_series`'s row-level nulls-last convention via the
270-
// same "the next ordinal byte appears in the skipped slot"
283+
// 1. **Forces nulls-last ordering across RGs.** For prefix_len=1 an all-null RG produces an
284+
// empty per-column body and would otherwise lex-sort *before* any non-null RG. With the
285+
// sentinel, the all-null key becomes `[prefix_len]` and the non-null key becomes `[ord(0),
286+
// storekey(value), ..., prefix_len]`. The non-null key starts with `ord(0) = 0x00`, smaller
287+
// than `prefix_len >= 1`, so non-null RGs sort first — matching `sorted_series`'s row-level
288+
// nulls-last convention via the same "the next ordinal byte appears in the skipped slot"
271289
// mechanism.
272-
// 2. **Preserves the "literal prefix of sorted_series" property.**
273-
// The byte we append is exactly what `sorted_series` writes
274-
// right after the prefix columns: the ordinal of the next
275-
// sort-schema column (`u8(prefix_len)`). So the per-RG key
276-
// remains a byte-for-byte prefix of every row's
277-
// `sorted_series` value in that RG.
290+
// 2. **Preserves the "literal prefix of sorted_series" property.** The byte we append is
291+
// exactly what `sorted_series` writes right after the prefix columns: the ordinal of the
292+
// next sort-schema column (`u8(prefix_len)`). So the per-RG key remains a byte-for-byte
293+
// prefix of every row's `sorted_series` value in that RG.
278294
storekey::encode(&mut key, &(prefix_cols.len() as u8))
279295
.map_err(|e| anyhow!("storekey encode prefix-length sentinel: {}", e))?;
280296

@@ -581,10 +597,22 @@ pub(crate) fn extract_regions_from_metadata(
581597
.collect())
582598
}
583599

584-
/// Post-write check: verify the parquet file at `metadata` has no two
585-
/// row groups sharing the same composite prefix key, for the first
586-
/// `prefix_len` sort columns. Returns `Ok(())` immediately if
587-
/// `prefix_len == 0` (no alignment claim).
600+
/// Post-write check: verify every row group in `metadata` satisfies
601+
/// the prefix-alignment claim declared by `prefix_len`.
602+
///
603+
/// Enforces both halves of the prefix-alignment contract in one pass:
604+
/// - **PA-1 (intra-RG constancy):** within each RG, each of the first `prefix_len` sort columns has
605+
/// `min == max` (the column is constant across the RG). This is checked transitively by
606+
/// [`extract_rg_composite_prefix_key`] — it returns an error when any prefix column's chunk stats
607+
/// show `min != max`.
608+
/// - **PA-3 (inter-RG uniqueness):** no two RGs share the same composite prefix value. The
609+
/// streaming engine pairs at most one input RG per region per prefix value, so a duplicate would
610+
/// silently drop rows or corrupt the body-col / sort-col mapping.
611+
///
612+
/// Returns `Ok(())` immediately when `prefix_len == 0` (no claim to
613+
/// verify) or `num_rgs == 0` (no RGs to check). Single-RG files are
614+
/// NOT short-circuited — they still go through PA-1 because an
615+
/// unsorted single-RG file CAN have `min != max` on a prefix column.
588616
///
589617
/// This is the writer-side mirror of the read-side check in
590618
/// `extract_regions_from_metadata` — both indexing and the compaction
@@ -606,8 +634,8 @@ pub(crate) fn assert_unique_rg_prefix_keys(
606634
return Ok(());
607635
}
608636
let num_rgs = metadata.num_row_groups();
609-
if num_rgs <= 1 {
610-
// Single-RG (or zero-RG) files vacuously satisfy the invariant.
637+
if num_rgs == 0 {
638+
// Zero-RG files vacuously satisfy both halves of the claim.
611639
return Ok(());
612640
}
613641
let prefix_cols =

quickwit/quickwit-parquet-engine/src/sorted_series/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,10 @@ pub(crate) fn append_prefix_col_to_key<T>(
286286
value: &T,
287287
descending: bool,
288288
) -> Result<()>
289-
where T: ?Sized + storekey::Encode {
290-
storekey::encode(&mut *buf, &ordinal)
291-
.map_err(|e| anyhow!("storekey encode ordinal: {}", e))?;
289+
where
290+
T: ?Sized + storekey::Encode,
291+
{
292+
storekey::encode(&mut *buf, &ordinal).map_err(|e| anyhow!("storekey encode ordinal: {}", e))?;
292293
let value_start = buf.len();
293294
storekey::encode(&mut *buf, value).map_err(|e| anyhow!("storekey encode value: {}", e))?;
294295
if descending {

quickwit/quickwit-parquet-engine/src/storage/legacy_adapter.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1586,6 +1586,22 @@ mod tests {
15861586
Some("2"),
15871587
"stamped prefix_len must match caller's request even when one col is implicitly null",
15881588
);
1589+
1590+
// SS-3 consumer-side verification: the file the adapter just
1591+
// produced must be consumable by the merge engine's
1592+
// `extract_rg_composite_prefix_key` reader. With `env` absent
1593+
// from the parquet schema, `find_prefix_parquet_col_indices`
1594+
// returns `None` in that slot and the composite-key extractor
1595+
// skips it. PA-1 + PA-3 still hold: each RG's metric_name
1596+
// min == max, and skipping the constant-null `env` slot makes
1597+
// the RG composite keys differ only by metric_name.
1598+
crate::merge::streaming::region_grouping::assert_unique_rg_prefix_keys(
1599+
adapter.metadata(),
1600+
"metric_name|env|-timestamp_secs/V2",
1601+
2,
1602+
"test_missing_prefix_col_treated_as_null_satisfies_alignment adapter output",
1603+
)
1604+
.expect("SS-3 null col must satisfy PA-1 + PA-3 (null is constant across all RGs)");
15891605
}
15901606

15911607
/// Composite-prefix fixture: rows grouped by `(metric, service)`

0 commit comments

Comments
 (0)