Skip to content

Commit 0c7de76

Browse files
g-talbotclaude
andcommitted
feat(legacy-adapter): prefix-aware output with caller-supplied target_prefix_len (#6425)
* feat(legacy-adapter): synthesize prefix-aligned row groups The legacy adapter previously consolidated multi-RG legacy inputs into a single oversized row group and left `rg_partition_prefix_len` at the original's (typically `0`). The streaming merge engine then sent these single-RG/prefix=0 inputs through the new sub-region splitting path — correct, but it forfeits the prefix-aware fast path for outputs derived from legacy inputs and gives up the row-group pruning that prefix alignment enables. After consolidating, the adapter now slices the resulting record batch at first-sort-col transitions (typically `metric_name`) and emits one parquet row group per slice, stamping the re-encoded file with `qh.rg_partition_prefix_len = 1`. The merge engine then reads it through the prefix-aware fast path: one region per metric_name, the existing duplicate-prefix invariant on read validates uniqueness. Fallback: if the original file has no `qh.sort_fields` KV, the sort-fields string fails to parse, the first column can't be resolved in the arrow schema, or the consolidated batch is empty, the adapter reverts to a single-RG re-encode without claiming any prefix alignment. That input still works — the engine's prefix_len=0 sub-region splitting path picks it up. This keeps the adapter robust for files written by very early versions of the indexer that may pre-date the standard KV layout. Implementation: `reencode_prefix_aligned` replaces `reencode_as_single_row_group` and either dispatches to the new multi-RG writer or to the legacy single-RG writer based on whether the first sort col is resolvable. `RowConverter` handles the prefix-value equality check uniformly across dictionary, utf8, and primitive types. The KV injection helper replaces (rather than appends) any existing `qh.rg_partition_prefix_len` so re-runs and files mistakenly carrying a stale value still land at the freshly synthesized prefix. Tests: - `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg` — 3 metrics × 40 rows, multi-RG input → 3 prefix-aligned output RGs and `qh.rg_partition_prefix_len = 1` KV. - `test_legacy_input_single_metric_yields_one_rg_with_prefix_kv` — one metric → one RG, prefix KV still stamped (vacuously aligned). - `test_legacy_input_without_sort_fields_falls_back_to_single_rg` — fallback path preserved when sort-fields KV is missing. - All existing tests pass unchanged (they use empty KVs or unparseable sort-fields strings, both of which exercise the fallback path). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(legacy-adapter): parameterize on target_prefix_len with composite-prefix support `LegacyInputAdapter::try_open` now takes `target_prefix_len: u32` chosen by the caller, matching the merge plan's consensus prefix length. The adapter slices the consolidated batch at every transition of the first N sort columns (composite key, via `RowConverter` over all N fields) and emits one output row group per slice, stamping the output with `qh.rg_partition_prefix_len = target_prefix_len`. With `target_prefix_len = 0` the adapter takes the original single-RG passthrough path with no prefix-alignment claim. A sort column that is named in `qh.sort_fields` but missing from the file's arrow schema is treated as implicitly null at every row per SS-3. A constantly-null column trivially satisfies alignment on that column (null == null) and contributes no transitions, so the split boundaries are driven by the columns that are present. This matches the merge engine's compaction-time treatment of missing columns and keeps a legacy file with an evolved schema usable as a prefix-aligned input. `PrefixUnresolvable` now fires only on cases where the file doesn't advertise enough sort *names* to honor the request: - `qh.sort_fields` absent or unparseable - `qh.sort_fields` declares fewer sort columns than `target_prefix_len` A column missing from the arrow schema no longer counts as unresolvable; the adapter materialises a `NullArray` of the batch's length in that slot and proceeds. Tests: - `test_target_prefix_len_zero_passes_through_as_single_rg` — explicit N=0 fallback, no prefix KV stamped. - `test_target_prefix_len_two_splits_by_metric_and_service` — composite prefix (`metric_name`, `service`) → 4 RGs, KV declares prefix_len=2. - `test_target_prefix_len_one_without_sort_fields_returns_unresolvable` — no `qh.sort_fields` KV → `PrefixUnresolvable`. - `test_target_prefix_len_exceeds_declared_sort_cols_returns_unresolvable` — sort schema declares 2 cols, caller asks 3 → `PrefixUnresolvable`. - `test_missing_prefix_col_treated_as_null_satisfies_alignment` — sort schema declares `metric_name|env|-timestamp_secs` but `env` is absent from the arrow schema → no error, only metric_name transitions split RGs, KV still stamps prefix_len=2. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(legacy_adapter): note where reader-side SS-3 handling lands Codex P2 on PR #6425: the adapter records `None` for missing prefix columns and stamps `rg_partition_prefix_len = target_prefix_len` anyway. In isolation that produces a file with an advertised prefix the current reader (`find_prefix_parquet_col_indices` on the #6425 state) bails on. The reader-side fix — returning `Vec<Option<PrefixColumn>>` and synthesizing a constant `[0x00, 0x00]` byte for `None` slots — lands in PR #6426 (the hardening slice, F12 from the adversarial review). The only caller of this adapter is `execute_merge_operation`, introduced in PR #6423 which sits above #6426 in the stack, so no production caller can produce a missing- column prefix until the reader fix is in place. Adding the in-code pointer so a future reader bisecting the stack doesn't have to trace the relationship from scratch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(merge): consumer honors SS-3 (move F12 forward from #6426 to #6425) Previously the F12 fix — "consumer side honors SS-3 missing prefix columns" — lived in the hardening PR (#6426). At the #6425 isolation level, the legacy adapter records `None` for a prefix column absent from the parquet schema and stamps `rg_partition_prefix_len = target_prefix_len` on the output, but the reader's `find_prefix_parquet_col_indices` bails on any missing column. So #6425 + #6424 alone would produce a legacy-adapter file that the streaming-merge reader rejects mid-merge — i.e. a known- incoherent intermediate stack state. Move F12 into this PR so the adapter and reader agree at the same slice: - `find_prefix_parquet_col_indices` now returns `Result<Vec<Option<PrefixColumn>>>`. `Some(_)` when the column is present in the parquet schema; `None` per SS-3 when the column is named in `qh.sort_fields` but absent from the schema. - `extract_rg_composite_prefix_key` skips `None` slots entirely (no ordinal byte, no value bytes for that column). The trailing `u8(prefix_len)` sentinel introduced in the storekey refactor keeps the resulting key well-formed across present/absent columns. - Callers that index into `prefix_cols` updated to use `.as_ref().expect(…)` where they assume presence. Existing SS-3 test `test_missing_prefix_col_treated_as_null_satisfies_alignment` in `legacy_adapter.rs` gets an `assert_unique_rg_prefix_keys` call verifying the adapter's output is consumable by the reader — pins the "stack-coherent at #6425" property the F12 hop establishes. Also incidental nightly-fmt cleanups in `sorted_series::append_prefix_col_to_key` and the two-input fixture in `test_all_null_prefix_rg_groups_into_separate_region_sorted_last`. The hardening PR (#6426) will be re-cascaded to drop the now- duplicated F12 hunks (keeping its F8 adapter-rejects-unsorted + F2 verifier-strength changes intact). 485 lib tests pass on this slice; workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(legacy-adapter): strip stale rg_partition_prefix_len when target=0 Codex P2 on PR #6425: when the legacy adapter is called with `target_prefix_len == 0` it consolidates the input into a single RG, but the previous version preserved the input's footer KVs unchanged. If the input itself already carried a stale nonzero `qh.rg_partition_prefix_len` claim (e.g., a prefix-aware split being re-encoded through the legacy fallback path), the single-RG output would still advertise that claim. Downstream metadata extraction would take the prefix-aware path against an RG carrying multiple first-prefix values — failing the PA-1 min/max alignment check on read despite the caller explicitly asking for the legacy path. Strip `PARQUET_META_RG_PARTITION_PREFIX_LEN` from `original_kv` in the `target_prefix_len == 0` branch. Absence of the KV is the legacy convention for "no alignment claim", matching the existing `test_target_prefix_len_zero_passes_through_as_single_rg` test's `prefix_kv.is_none()` assertion. New regression test `test_target_prefix_len_zero_strips_stale_prefix_kv_from_input`: inputs a 2-RG file with `qh.rg_partition_prefix_len = "1"` AND opens through adapter with `target_prefix_len = 0`; asserts the re-encoded output has no prefix KV. Pre-fix this test caught the leak; post-fix the stale value is dropped. 487 lib tests pass on the slice; clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1e627a8 commit 0c7de76

3 files changed

Lines changed: 966 additions & 53 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/streaming.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2789,7 +2789,10 @@ mod tests {
27892789
.expect("resolve");
27902790
// Sanity: the second prefix column must be flagged DESC.
27912791
assert!(
2792-
prefix_cols[1].descending,
2792+
prefix_cols[1]
2793+
.as_ref()
2794+
.expect("env present in this fixture")
2795+
.descending,
27932796
"env must be parsed as DESC from sort schema",
27942797
);
27952798

quickwit/quickwit-parquet-engine/src/merge/streaming/region_grouping.rs

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,25 @@ pub(crate) struct PrefixColumn {
118118

119119
/// Resolve the first `prefix_len` sort columns to parquet leaf
120120
/// indices. Honours the legacy `timestamp` → `timestamp_secs` alias.
121-
/// Errors if the sort schema has fewer columns than `prefix_len` or
122-
/// if any column is missing from the parquet schema.
121+
///
122+
/// Returns one entry per requested prefix column. `Some(PrefixColumn)`
123+
/// when the column is present in the parquet schema; `None` when the
124+
/// column is named in `sort_fields_str` but absent from the parquet
125+
/// schema. Per SS-3 the missing column is treated as constant null at
126+
/// every row of the file — [`extract_rg_composite_prefix_key`]
127+
/// synthesizes a fixed byte sequence in that slot so ordering is
128+
/// driven entirely by the present columns.
129+
///
130+
/// Errors only when the sort schema declares fewer columns than
131+
/// requested — that means we don't have a *name* for one of the
132+
/// prefix columns and can't claim alignment on something we can't
133+
/// identify.
123134
pub(crate) fn find_prefix_parquet_col_indices(
124135
metadata: &ParquetMetaData,
125136
sort_fields_str: &str,
126137
prefix_len: usize,
127-
input_idx: usize,
128-
) -> Result<Vec<PrefixColumn>> {
138+
_input_idx: usize,
139+
) -> Result<Vec<Option<PrefixColumn>>> {
129140
let sort_field_schema = parse_sort_fields(sort_fields_str)?;
130141
if sort_field_schema.column.len() < prefix_len {
131142
bail!(
@@ -145,34 +156,34 @@ pub(crate) fn find_prefix_parquet_col_indices(
145156
} else {
146157
sort_col.name.as_str()
147158
};
159+
let descending = sort_col.sort_direction
160+
== quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32;
148161
let mut found = None;
149162
for (col_idx, col) in parquet_schema.columns().iter().enumerate() {
150163
if col.path().parts()[0] == resolved {
151164
found = Some(col_idx);
152165
break;
153166
}
154167
}
155-
let parquet_col_idx = found.ok_or_else(|| {
156-
anyhow!(
157-
"input {input_idx} parquet schema is missing prefix sort column '{}' (position \
158-
{pos})",
159-
sort_col.name,
160-
)
161-
})?;
162-
let descending = sort_col.sort_direction
163-
== quickwit_proto::sortschema::SortColumnDirection::SortDirectionDescending as i32;
168+
// SS-3: missing column → `None`. The composite-key extractor
169+
// skips this slot entirely (no ordinal byte, no value bytes);
170+
// the trailing prefix-length sentinel in
171+
// `extract_rg_composite_prefix_key` ensures the resulting key
172+
// still sorts cleanly relative to RGs with present values
173+
// (and matches sorted_series's row-level null-skip).
174+
//
164175
// Ordinal matches the column's position in `qh.sort_fields`.
165176
// For prefix cols (always the first `prefix_len` entries of
166177
// the sort schema) the ordinal equals the iteration index
167178
// `pos`, which is also the ordinal `sorted_series` would
168179
// assign — so the per-RG prefix key composes as a literal
169180
// byte prefix of every sorted_series key.
170-
prefix_cols.push(PrefixColumn {
181+
prefix_cols.push(found.map(|parquet_col_idx| PrefixColumn {
171182
name: sort_col.name.clone(),
172183
parquet_col_idx,
173184
descending,
174185
ordinal: pos as u8,
175-
});
186+
}));
176187
}
177188
Ok(prefix_cols)
178189
}
@@ -197,23 +208,34 @@ fn parquet_has_column(
197208
/// in this RG.
198209
///
199210
/// Null handling:
200-
/// - **All-null RG on a prefix column**: the column is skipped entirely (the next column's higher
201-
/// ordinal byte appears in its place), so the RG sorts after any RG carrying a non-null value for
202-
/// this column. This mirrors the row-level convention in `sorted_series` and gives nulls-last
203-
/// ordering for free.
211+
/// - **Column absent from schema (`None` in `prefix_cols`)**: SS-3 case. Every row of the file has
212+
/// a constant null in this slot, so the contribution to the composite is empty (column skipped).
213+
/// The trailing prefix-length sentinel keeps the resulting key well-formed.
214+
/// - **All-null RG on a present prefix column**: column skipped for this RG (the next column's
215+
/// higher ordinal byte — or the trailing sentinel — appears in its place), so the RG sorts after
216+
/// any RG carrying a non-null value for this column. Mirrors the row-level convention in
217+
/// `sorted_series` and gives nulls-last ordering for free.
204218
/// - **Mixed null + non-null in one RG**: rows in the RG would encode to two distinct prefix keys
205219
/// (the non-null value's key and the column-skipped key), breaking the
206220
/// at-most-one-prefix-value-per-RG invariant (PA-1). Reject.
207221
/// - **No nulls**: standard `min == max` check on stats, then encode that single value.
208222
pub(crate) fn extract_rg_composite_prefix_key(
209223
metadata: &ParquetMetaData,
210224
rg_idx: usize,
211-
prefix_cols: &[PrefixColumn],
225+
prefix_cols: &[Option<PrefixColumn>],
212226
input_idx: usize,
213227
) -> Result<Vec<u8>> {
214228
let rg_meta = metadata.row_group(rg_idx);
215229
let mut key = Vec::new();
216-
for col in prefix_cols {
230+
for col_opt in prefix_cols {
231+
let Some(col) = col_opt else {
232+
// SS-3 implicit null: column absent from schema, so every
233+
// row's value is null. Skip the slot entirely — the
234+
// trailing prefix-length sentinel will keep this from
235+
// colliding with present-value keys, and sorted_series
236+
// applies the same "skip null cols" rule at the row level.
237+
continue;
238+
};
217239
let chunk = rg_meta.column(col.parquet_col_idx);
218240
let stats = chunk.statistics().ok_or_else(|| {
219241
anyhow!(
@@ -575,10 +597,22 @@ pub(crate) fn extract_regions_from_metadata(
575597
.collect())
576598
}
577599

578-
/// Post-write check: verify the parquet file at `metadata` has no two
579-
/// row groups sharing the same composite prefix key, for the first
580-
/// `prefix_len` sort columns. Returns `Ok(())` immediately if
581-
/// `prefix_len == 0` (no alignment claim).
600+
/// Post-write check: verify every row group in `metadata` satisfies
601+
/// the prefix-alignment claim declared by `prefix_len`.
602+
///
603+
/// Enforces both halves of the prefix-alignment contract in one pass:
604+
/// - **PA-1 (intra-RG constancy):** within each RG, each of the first `prefix_len` sort columns has
605+
/// `min == max` (the column is constant across the RG). This is checked transitively by
606+
/// [`extract_rg_composite_prefix_key`] — it returns an error when any prefix column's chunk stats
607+
/// show `min != max`.
608+
/// - **PA-3 (inter-RG uniqueness):** no two RGs share the same composite prefix value. The
609+
/// streaming engine pairs at most one input RG per region per prefix value, so a duplicate would
610+
/// silently drop rows or corrupt the body-col / sort-col mapping.
611+
///
612+
/// Returns `Ok(())` immediately when `prefix_len == 0` (no claim to
613+
/// verify) or `num_rgs == 0` (no RGs to check). Single-RG files are
614+
/// NOT short-circuited — they still go through PA-1 because an
615+
/// unsorted single-RG file CAN have `min != max` on a prefix column.
582616
///
583617
/// This is the writer-side mirror of the read-side check in
584618
/// `extract_regions_from_metadata` — both indexing and the compaction
@@ -600,8 +634,8 @@ pub(crate) fn assert_unique_rg_prefix_keys(
600634
return Ok(());
601635
}
602636
let num_rgs = metadata.num_row_groups();
603-
if num_rgs <= 1 {
604-
// Single-RG (or zero-RG) files vacuously satisfy the invariant.
637+
if num_rgs == 0 {
638+
// Zero-RG files vacuously satisfy both halves of the claim.
605639
return Ok(());
606640
}
607641
let prefix_cols =

0 commit comments

Comments
 (0)