Commit d388bc9
feat(streaming-merge): per-region engine + multi-output sorted_series splitting (#6424)
* feat: per-merge-region streaming engine — multi-RG inputs + outputs (PR-6c.2)
Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region
loop. Unlocks multi-RG metric-aligned input support and produces
multi-RG output naturally — one output row group per merge region
(typically one per metric_name when `rg_partition_prefix_len == 1`).
Sort-prefix alignment (`prefix_len >= 1`) guarantees that any merge
region has AT MOST one row group per input. That single invariant
unlocks the restructure:
1. Pre-compute regions from RG metadata. For `prefix_len >= 1`, read
each RG's metric_name min stat (must equal max — verifies
metric-alignment). Group RGs across inputs by prefix_key. Sort
regions by prefix_key. For `prefix_len == 0` (single-RG inputs
only, validated earlier), one region covers everything.
2. Assign regions to output files by cumulative row count. Caller's
`num_outputs` preserved as the upper bound. Each output file gets
a contiguous slice of the region list, so output files have
non-overlapping key ranges.
3. Per-region processing: for each region, advance contributing
inputs' decoders through their RGs (drain sort cols of that RG,
then stream body cols via the existing page-bounded
BodyColOutputPageAssembler). Each region becomes one output RG in
the current writer; when the assignment moves to a new output
file, close the previous writer and open a new one.
The streaming body-col mechanism from PR-6b.2 (arrow::compute::
interleave + handle.block_on driven decoder) is unchanged; it just
runs over smaller row ranges (one region instead of one whole
output).
PR-6b.2's check that rejected any multi-RG input is replaced with:
reject only `prefix_len == 0` AND multi-RG (those still need PR-5's
LegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now
accepted natively.
PR-6b.2 optimised the per-output schema based on per-output sort col
data (drop all-null cols, re-dict-encode low-cardinality strings).
With per-region streaming we don't know each region's content until
we drain it, so PR-6c.2 declares the writer's schema as the full
union schema and leaves output strings as Utf8. Per-output dict
re-encoding can be reintroduced later by tracking cardinality during
the streaming pass.
- All 9 PR-6b.2 tests still pass (single-RG input regression —
behaviour preserved).
- New test_multi_rg_metric_aligned_input_produces_multi_rg_output:
feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 =
cpu.usage, RG 1 = memory.used); the streaming engine accepts it
and produces a 2-RG output (one RG per metric_name region).
- Renamed test_multi_rg_input_rejected →
test_legacy_multi_rg_input_rejected to reflect the new rejection
scope (only prefix_len == 0 multi-RG is rejected; metric-aligned
is accepted).
10/10 streaming tests pass. Clippy, doc, machete, fmt all clean.
1. File-size cap with sort-key-boundary splits.
2. Per-output schema optimisation (track region body-col cardinality
during the streaming pass).
3. Mid-region splits at sorted_series transitions for finer-grained
M:N control when callers want more outputs than regions.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(streaming): reject duplicate-prefix RGs + use escape encoding
Two P1 bugs flagged by Codex on PR-6c.2 (#6410):
1. **Duplicate input row groups silently dropped.** When one input
contained two RGs with the same composite prefix key,
`process_region` overwrote `sort_col_batches[input_idx]` while
`Region::total_rows` still counted both — losing rows and
misaligning the body-col / sort-col mapping. Now enforce
at-most-one-RG-per-input-per-prefix as a strong invariant at three
sites: the merge read path (`extract_regions_from_metadata`), the
streaming merge output finalize, and the indexing writer
(`ParquetWriter::write_to_bytes` / `write_to_file_with_metadata`).
The new `assert_unique_rg_prefix_keys` helper is shared.
2. **Byte-array prefix encoding broke lex order across lengths.**
The 4-byte length prefix made `"b"` sort before `"aa"`, violating
the declared ASC order. Switched to byte-stuffed escape encoding
(`0x00` → `0x00 0x01`, terminator `0x00 0x00`), which preserves
single-column lex order AND retains unambiguous concatenation for
composite keys (the terminator is the smallest 2-byte sequence
under escaping, so shorter values still sort before longer ones
with the same prefix).
Tests:
- `test_byte_array_prefix_preserves_lex_order_across_lengths` —
`"aa" < "b"`, empty < non-empty, shared-prefix shorter < longer,
null-byte escaping preserves order.
- `test_streaming_merge_rejects_duplicate_prefix_rgs_in_one_input` —
end-to-end bail with clear error.
- `test_write_to_bytes_rejects_duplicate_rg_prefix_when_claimed_aligned`
+ the `write_to_file` and single-RG positive counterparts.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* feat(streaming): split regions at sorted_series for prefix_len=0 multi-output
When inputs declare rg_partition_prefix_len = 0 (legacy single-RG)
and the caller asks for num_outputs > 1, the engine subdivides the
single region at sorted_series transitions in the merge order so it
can honor the output count. A single sorted_series run is never
broken; if one run exceeds the remaining budget the whole run lands
in one output anyway. The output inherits the input's
rg_partition_prefix_len (=0) — the engine does not synthesize a
prefix it can't unconditionally guarantee.
Also handles the giant-single-metric case (prefix_len=0, one
metric_name, num_outputs > 1): sorted_series transitions still
split the merge order even though there are no metric_name
transitions to drive a prefix synthesis.
Implementation:
- New `split_region_at_sorted_series` in region_grouping: walks the merge order and splits at
sorted_series transitions when accumulated rows reach the target budget.
- Main engine loop: when num_outputs > current_output_idx + 1 AND region's rows exceed the
remaining budget, drain sort cols for the region, compute merge order, call
split_region_at_sorted_series, process sub-regions.
- Per-col page cache + cursor keyed by col_idx so the body-col path can read pages once and re-use
them across sub-regions within the same top-level region. Resets between top-level regions
(different RGs).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* docs(streaming): correct 'crash' → 'bail' in MS-2 doc comments
The MS-2 validation path returns `Err` via `bail!()` (anyhow), not a
panic / abort. Five doc-comment / inline-comment sites described the
failure as "the engine would crash mid-merge" — overstated. Callers
get a `Result::Err` propagated up the spawn_blocking task and the
`streaming_merge_sorted_parquet_files` return.
Sites updated:
- `region_grouping.rs` module doc.
- `validate_region_order_matches_physical_rg_order` doc.
- streaming.rs MS-2 validation call-site comment.
- Test docstrings for `test_streaming_merge_with_desc_prefix_col` and
`test_ms2_region_order_disagrees_with_physical_rg_order_rejected`.
No behaviour change. 477 lib tests pass; clippy + nightly fmt clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* docs(streaming): fix wrong adapter type name + explain rejection intent
Two sites referenced a non-existent `LegacyMultiRGAdapter` — the
actual type, introduced in PR-5 (#6408), is `LegacyInputAdapter`
in `storage::legacy_adapter`. Fixed both references.
Also expanded the rejection-block comment to make the *intent* of
the guard explicit: it catches caller bugs (wiring a raw legacy
multi-RG `StreamingParquetReader` straight into the streaming
merge), not a degraded-input fallback. Production code routes
legacy splits through `merge::execute_merge_operation` which
wraps them in `LegacyInputAdapter` first.
No behaviour change. Targeted test passes.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(body_assembler): tighten output-iter termination + assert invariant
adamtobey nit on PR #6424: `rows_emitted >= expected_rows` accepts
`emitted > expected` as a normal termination condition, which would
actually be a real accounting bug. The math rules `>` out by
construction — `page_size = remaining.min(OUTPUT_PAGE_ROWS)` where
`remaining = expected_rows - rows_emitted`, so each
`rows_emitted += page_size` keeps `rows_emitted ≤ expected_rows`.
Two changes:
- Termination becomes `rows_emitted == expected_rows` so we don't
silently accept an overshoot.
- `debug_assert!(rows_emitted <= expected_rows, …)` at the top of
`next()` documents the invariant and surfaces a regression loudly
(panic in debug + tests) instead of silently terminating one
iteration too late.
No behaviour change in the happy path; bugs that would have produced
`>` now fail tests instead of producing wrong output.
477 lib tests pass; clippy + nightly fmt clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(streaming): recompute split budget across the output-rollover boundary
Codex P1 finding on PR #6424: when a top-level region exactly fills
the current output (so `remaining_in_current == 0`) and the next
prefix-aligned region needs splitting, the split's first-sub-region
budget was the stale zero remainder of the about-to-be-finalized
output. `split_region_at_sorted_series` therefore cut after the
first sorted_series run, producing a tiny leftover plus a large
continuation that both inherited the parent region's prefix key.
The sub-region loop then rolled over to a fresh output and wrote
both pieces there, tripping the PA-3 duplicate-prefix-RG check in
`finalize_output`.
Fix: detect the rollover at decision time and compute
`effective_first_target` / `effective_outputs_remaining` against the
*next* output's empty budget. With the fix, the example above just
chooses `needs_split = false` (region fits the fresh output's full
target), processes the region whole, and rolls over cleanly.
Regression test `test_region_exactly_fills_output_does_not_split_next_aligned_region`
exercises the exact scenario Codex described: three 50-row RGs with
distinct (metric, service) prefixes, `num_outputs = 3`, target = 50.
Pre-fix, the merge bailed with PA-3 on output 1; post-fix, three
clean outputs each with one unique prefix key. Verified by reading
each output's parquet metadata back through
`assert_unique_rg_prefix_keys`.
478 lib tests pass (477 prior + 1 new); clippy + nightly fmt clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(streaming): reject null-mixed + all-null prefix RGs
Codex P1 on PR #6424: `extract_aligned_prefix_value` decided
prefix alignment purely from `min` / `max` statistics. Parquet
records those over non-null cells only, with `null_count` reported
separately, so two real failure modes slipped through:
1. **Mixed null + non-null.** A row group with `N` nulls plus a
single non-null cell `"x"` reports `min == max == "x"` and the
`min == max` check silently accepted it — but two distinct
prefix keys (null and `"x"`) lived in that RG, breaking the
at-most-one-prefix-value-per-RG invariant (PA-1).
2. **All-null RG.** Parquet records no `min` / `max` for an all-
null chunk, so the legacy check bailed with the misleading "no
min in stats" error. Logically the RG carries one prefix value
(null) and is aligned — but supporting it cleanly requires a
null marker in the composite-key encoding that agrees with
SS-2's "nulls last" rule. `encode_byte_array_prefix(&[])` puts
nulls *first*; coordinating that with SS-2 is a follow-up.
Fix: read `null_count_opt()` from stats and `num_values()` from
the column-chunk metadata. Bail explicitly in both cases — mixed
with a PA-1 message naming the (nulls, non-null) split, all-null
with a clear "not yet supported" pointer.
Two regression tests in `streaming.rs::tests`:
- `test_mixed_null_and_value_prefix_rg_rejected`: 1 RG, 3 cells
`"cpu.usage"` + 1 null. Asserts PA-1 bail.
- `test_all_null_prefix_rg_rejected`: 1 RG, 3 nulls. Asserts the
"all-null … not yet supported" bail.
480 lib tests pass (+2 new); workspace clippy + nightly fmt clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* refactor(streaming): share storekey prefix encoding with sorted_series
The per-RG composite prefix key now uses the same storekey-based
encoding as `sorted_series` — same `(ordinal, value)` layout, same
direction-inversion, same null-skip pattern — so a per-RG prefix
key is a literal byte prefix of every `sorted_series` value emitted
by rows in that RG.
Why: the prior byte-stuffed escape encoding had no in-line way to
represent an all-null prefix RG (an empty marker would lex-sort
before any present-value key, conflicting with SS-2 nulls-last).
With the shared encoding, an all-null column is skipped entirely
and the next column's higher ordinal byte appears in its place,
giving nulls-last ordering for free — the same trick already proven
in `sorted_series::encode_row_key`.
Per-column logic now goes through one helper:
`crate::sorted_series::append_prefix_col_to_key(buf, ord, val, desc)`
shared between `sorted_series` (per-row keys) and
`merge::streaming::region_grouping` (per-RG keys). It writes
`storekey(ord) || storekey(val)` and inverts only the value bytes
for DESC columns. `sorted_series::encode_row_key` was refactored
to call the helper; the open-coded inline encoding is gone.
Trailing **prefix-length sentinel**: each per-RG key ends with a
`u8(prefix_len)` ordinal byte. This handles the prefix_len=1
edge case where an all-null RG's empty body would otherwise lex-sort
*before* any non-null RG — with the sentinel, the all-null key
becomes `[prefix_len]` and non-null keys still start with `ord(0)`
(< prefix_len), so non-null sorts first. The sentinel is also what
`sorted_series` writes immediately after the prefix cols, so the
literal-prefix property is preserved.
Null handling in `extract_rg_composite_prefix_key`:
- **All-null RG**: column skipped, RG groups into its own region (after non-null regions).
- **Mixed null + non-null**: rejected as a PA-1 violation (rows in the same RG would encode to
two distinct prefix keys; producer is supposed to start a new RG at the null/non-null
transition).
- **Otherwise**: standard `min == max` check, then the type-dispatched storekey encoding via the
helper.
Removed:
- `extract_aligned_prefix_value` (replaced by `encode_prefix_col_value` which calls the helper).
- `encode_byte_array_prefix` (byte-stuffed escape, no longer used).
- `invert_for_descending` (the helper handles inversion per-column).
- `test_invert_for_descending_reverses_lex_order` and
`test_byte_array_prefix_preserves_lex_order_across_lengths` (byte-level tests of the removed
encoding; semantic properties remain enforced by `storekey`'s own tests plus the higher-level
prefix tests).
Replaced `test_all_null_prefix_rg_rejected` with
`test_all_null_prefix_rg_groups_into_separate_region_sorted_last`:
builds two inputs (one with `metric_name = "cpu.usage"`, one with
`metric_name = NULL`) and verifies the merged output has two RGs
with the all-null region in RG 1 (sorted after the non-null
region) — pinning the nulls-last ordering that the sentinel
encoding produces.
Updated `test_extract_rg_composite_prefix_key_two_byte_array_cols`
for the new byte layout (`storekey(ord) || storekey(val)` per col
plus the trailing sentinel byte).
`PrefixColumn` gains an `ordinal: u8` field, populated from each
column's position in `qh.sort_fields` so it matches the ordinal
`sorted_series` would assign.
478 lib tests pass; workspace clippy + nightly fmt clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* style(streaming): re-fmt to latest nightly rustfmt
CI's nightly rustfmt (1.9.0-nightly 2026-05-17) wrapped a handful
of comment / bail!-message / where-clause / vec! literal lines
slightly differently than my local nightly at commit time
(1.9.0-nightly 2026-05-11). Re-formatting all three affected files
catches the drift in this commit so CI Lints stops complaining;
local nightly is now updated to match CI.
No behaviour change. 478 lib tests still pass on the slice.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(merge): preserve metastore rg_partition_prefix_len from writer's KV stamp
The streaming merge engine produces sort-prefix-aligned multi-RG output
and stamps `qh.rg_partition_prefix_len = input_meta.rg_partition_prefix_len`
in the file's KV (verified by `assert_unique_rg_prefix_keys` before close).
`merge_parquet_split_metadata` then ran after and unconditionally demoted
to 0 whenever `output.num_row_groups > 1` — breaking CS-1 (metastore must
mirror on-disk KV) for every multi-RG streaming-engine output. Aligned
splits got tagged 0 in the metastore on every merge and leaked out of
the prefix-aligned compaction bucket on the next pass.
Carry the value the writer actually stamped via a new
`MergeOutputFile.output_rg_partition_prefix_len` field, then propagate
it as-is in metadata aggregation. Both engines populate the field:
- Legacy `merge/writer.rs` reports its demoted value (row-count-driven
RG boundaries can't honor prefix alignment, so it stamps 0 on multi-RG).
- Streaming `merge/streaming/output.rs` reports the inputs' prefix
unchanged (it splits at prefix transitions and the writer verifies).
CS-1 holds by construction — same source of truth, no re-derivation.
Tests:
- `test_output_prefix_len_demoted_when_multi_rg` → renamed to
`test_output_prefix_len_carries_writers_value_when_demoted`; now
asserts that the metastore mirrors the writer's reported value.
- New `test_output_prefix_len_preserved_on_multi_rg_streaming_engine`
asserts that a multi-RG streaming output (writer reports prefix_len=2)
keeps the prefix in the metastore — the regression case for F1.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>1 parent 1436be7 commit d388bc9
10 files changed
Lines changed: 4058 additions & 1760 deletions
File tree
- quickwit/quickwit-parquet-engine/src
- merge
- streaming
- sorted_series
- storage
Lines changed: 70 additions & 27 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
119 | 119 | | |
120 | 120 | | |
121 | 121 | | |
122 | | - | |
123 | | - | |
124 | | - | |
125 | | - | |
126 | | - | |
127 | | - | |
128 | | - | |
129 | | - | |
130 | | - | |
131 | | - | |
132 | | - | |
133 | | - | |
134 | | - | |
135 | | - | |
136 | | - | |
137 | | - | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
138 | 134 | | |
139 | 135 | | |
140 | 136 | | |
| |||
212 | 208 | | |
213 | 209 | | |
214 | 210 | | |
| 211 | + | |
| 212 | + | |
| 213 | + | |
| 214 | + | |
| 215 | + | |
| 216 | + | |
| 217 | + | |
| 218 | + | |
| 219 | + | |
| 220 | + | |
| 221 | + | |
| 222 | + | |
| 223 | + | |
| 224 | + | |
| 225 | + | |
| 226 | + | |
| 227 | + | |
| 228 | + | |
215 | 229 | | |
216 | 230 | | |
217 | 231 | | |
218 | 232 | | |
219 | 233 | | |
| 234 | + | |
220 | 235 | | |
221 | 236 | | |
222 | 237 | | |
| |||
412 | 427 | | |
413 | 428 | | |
414 | 429 | | |
415 | | - | |
416 | | - | |
417 | | - | |
418 | | - | |
419 | | - | |
| 430 | + | |
| 431 | + | |
| 432 | + | |
| 433 | + | |
| 434 | + | |
| 435 | + | |
| 436 | + | |
420 | 437 | | |
421 | 438 | | |
422 | 439 | | |
423 | 440 | | |
424 | 441 | | |
425 | | - | |
| 442 | + | |
| 443 | + | |
| 444 | + | |
426 | 445 | | |
427 | 446 | | |
428 | 447 | | |
429 | 448 | | |
430 | 449 | | |
431 | 450 | | |
432 | 451 | | |
433 | | - | |
434 | | - | |
435 | | - | |
436 | | - | |
| 452 | + | |
| 453 | + | |
437 | 454 | | |
438 | 455 | | |
439 | 456 | | |
440 | 457 | | |
441 | 458 | | |
442 | | - | |
| 459 | + | |
443 | 460 | | |
444 | 461 | | |
445 | 462 | | |
446 | 463 | | |
| 464 | + | |
| 465 | + | |
| 466 | + | |
| 467 | + | |
| 468 | + | |
| 469 | + | |
| 470 | + | |
| 471 | + | |
| 472 | + | |
| 473 | + | |
| 474 | + | |
| 475 | + | |
| 476 | + | |
| 477 | + | |
| 478 | + | |
| 479 | + | |
| 480 | + | |
| 481 | + | |
| 482 | + | |
| 483 | + | |
| 484 | + | |
| 485 | + | |
| 486 | + | |
| 487 | + | |
| 488 | + | |
| 489 | + | |
447 | 490 | | |
448 | 491 | | |
449 | 492 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
65 | 65 | | |
66 | 66 | | |
67 | 67 | | |
68 | | - | |
| 68 | + | |
| 69 | + | |
69 | 70 | | |
70 | 71 | | |
71 | 72 | | |
72 | 73 | | |
73 | 74 | | |
74 | 75 | | |
75 | | - | |
76 | | - | |
77 | | - | |
78 | | - | |
79 | | - | |
80 | | - | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
81 | 81 | | |
82 | 82 | | |
83 | 83 | | |
| |||
95 | 95 | | |
96 | 96 | | |
97 | 97 | | |
98 | | - | |
99 | | - | |
100 | | - | |
101 | | - | |
102 | | - | |
103 | | - | |
104 | | - | |
| 98 | + | |
105 | 99 | | |
106 | 100 | | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
107 | 114 | | |
108 | 115 | | |
109 | 116 | | |
| |||
0 commit comments