feat(merge): adversarial-review test coverage (F4/F5/F7) + F14 sub-region engine fix#6428
Merged
g-talbot merged 3 commits intoMay 19, 2026
Conversation
2a3f09a to
edd45a6
Compare
6ed25aa to
c990bfa
Compare
34d634f to
a6c72d5
Compare
c990bfa to
3b90477
Compare
a6c72d5 to
2efe6c2
Compare
3b90477 to
814a8c7
Compare
2efe6c2 to
5f04604
Compare
814a8c7 to
435a4b5
Compare
5f04604 to
6b31ac0
Compare
154defd to
112ca1f
Compare
6b31ac0 to
9047531
Compare
112ca1f to
bc4bb77
Compare
af2b07c to
ebce99f
Compare
bc4bb77 to
1ea081c
Compare
ebce99f to
d9aa050
Compare
1ea081c to
a5d1096
Compare
d9aa050 to
c067600
Compare
a5d1096 to
95a662e
Compare
c067600 to
062eb87
Compare
95a662e to
dc1438f
Compare
062eb87 to
93bccc4
Compare
dc1438f to
292d1f9
Compare
93bccc4 to
65f92dc
Compare
292d1f9 to
6d142b2
Compare
65f92dc to
3c60e96
Compare
6d142b2 to
4013bda
Compare
f29f6d4 to
c3e91f8
Compare
09002dd to
d887b5b
Compare
c3e91f8 to
e5618e9
Compare
d887b5b to
7c0e1a2
Compare
Contributor
Author
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7c0e1a2892
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
7c0e1a2 to
82cd92e
Compare
g-talbot
added a commit
that referenced
this pull request
May 18, 2026
…plit Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot
added a commit
that referenced
this pull request
May 18, 2026
The streaming Parquet merge stack landing in #6424–#6428 ships the full legacy-promotion *mechanism* (engine + adapter + executor wiring) but not the planner-level *trigger*. In production today, `MergePolicyState::record_split` buckets by `CompactionScope::from_split` which includes `rg_partition_prefix_len`, so legacy (prefix=0) and aligned (prefix>0) splits are separated before `ParquetMergePolicy::operations` runs. The policy only emits `ParquetMergeOperation::new`; a repo-wide search finds `promote_legacy` only in tests. Legacy splits therefore never migrate without an explicit trigger. Tracking this as GAP-011 so we pick it up at the right time. The gap doc walks three resolution options (merge buckets in the scope key, dedicated promotion pass, or hybrid prefer-multi-input-promotion) and the cost trade-offs between them, so the eventual implementation PR has a starting point. Raised by Codex review comment id 4311184497 on PR #6423. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot
added a commit
that referenced
this pull request
May 18, 2026
…plit Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
15fb382 to
0690fe1
Compare
8 tasks
g-talbot
added a commit
that referenced
this pull request
May 18, 2026
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(merge): legacy-prefix promotion path + schema-evolution body cols Two adversarial-review follow-ups grouped because they share the streaming engine's input-routing and union-schema seams. ## (b) Legacy-prefix promotion A new operation type pairs a prefix_len=0 split with prefix_len>0 peers in one merge, so legacy splits can be folded into prefix- aligned buckets instead of aging out via retention. Adds: - `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality unchanged. - `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion target; `None` is the default regular-merge form. - `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality check in promotion mode. The output prefix_len still comes from the writer's KV stamp via `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1). - `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam PR-7 will wire from above. Tests: - `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`, `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`. - `test_mixed_prefix_ok_skips_input_equality_check`. - `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1. - `test_executor_mismatched_sources_count_bails`. ## F6 + F13: Schema evolution for body columns The merger now supports MC-4 across heterogeneous body-col schemas: - F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array flavour merge cleanly; before this they hit a "type conflict" at alignment time. - F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the union to be nullable when a List col is present in only some inputs; before this the writer rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 = element present) for nullable outer; non-nullable path unchanged. Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B with the same sort schema but different body cols (Utf8 vs Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32 A-only, Int64 B-only, common Float64). The merge produces the union schema; per-row rendering via `render_cell` matches across flavour boundaries; List cells from B render as nulls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt Same nightly-rustfmt drift as the storekey commit on #6424 (local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok` binding and the `merge_parquet_split_metadata` call now fit single-line under the newer width heuristics. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(merge-executor): route promotion merges through execute_merge_operation Codex P1 on PR #6423: the executor unconditionally called the in-memory `merge_sorted_parquet_files` path, which routes through `extract_and_validate_input_metadata` and bails on mixed `qh.rg_partition_prefix_len` before any output is produced. So a real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with `target_prefix_len_override = Some(1)` — failed before reaching the downstream `mixed_prefix_ok` plumbing in `merge_parquet_split_metadata`. The escape hatch existed but was unreachable for actual promotion inputs. Fix: branch in the executor's handle on `target_prefix_len_override.is_some()`. Promotion merges go through the engine's streaming entry point `quickwit_parquet_engine::merge::execute_merge_operation`, which opens each below-target input via `LegacyInputAdapter` and each at-target input directly. The streaming merge then sees a homogeneous stream advertising `prefix_len = target` on every input. Regular (non-promotion) merges keep the in-memory path. `execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>` parallel to `op.splits` — the engine deliberately doesn't depend on `quickwit-storage` (would invert layering and pull cloud SDKs into a pure parquet library). So this commit adds `LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by `tokio::fs::File`, one instance per downloaded split, each bound to its scratch-directory path. The `path: &Path` argument on the trait surface is ignored — the downloader has already resolved each split to a concrete local file before the executor runs. Coverage: - Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end` already exercises `execute_merge_operation` with a `prefix_len = 0` + `prefix_len = 1` pair, verifying the output advertises `prefix_len = 1` and passes PA-1 + PA-3 on the composite key. That's now the same code path the in-tree executor takes. - Module doc on the executor rewritten to spell out which path runs when. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track legacy promotion planner gap as GAP-011 The streaming Parquet merge stack landing in #6424–#6428 ships the full legacy-promotion *mechanism* (engine + adapter + executor wiring) but not the planner-level *trigger*. In production today, `MergePolicyState::record_split` buckets by `CompactionScope::from_split` which includes `rg_partition_prefix_len`, so legacy (prefix=0) and aligned (prefix>0) splits are separated before `ParquetMergePolicy::operations` runs. The policy only emits `ParquetMergeOperation::new`; a repo-wide search finds `promote_legacy` only in tests. Legacy splits therefore never migrate without an explicit trigger. Tracking this as GAP-011 so we pick it up at the right time. The gap doc walks three resolution options (merge buckets in the scope key, dedicated promotion pass, or hybrid prefer-multi-input-promotion) and the cost trade-offs between them, so the eventual implementation PR has a starting point. Raised by Codex review comment id 4311184497 on PR #6423. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track download-vs-streaming merge executor gap as GAP-012 The Parquet streaming merge engine is built around `RemoteByteSource` and was designed to pull pages directly from object storage — two GETs per input, overlap fetch with merge, no scratch disk. The production actor pipeline doesn't take that path: a downloader actor materializes every input on local disk first, and the executor wraps the local files in a `LocalFileByteSource` to feed `execute_merge_operation` (or just calls the in-memory `merge_sorted_parquet_files` path). The streaming engine's central design benefit is unused. This isn't a correctness bug — both paths give the same result. It's a perf/architecture gap: every merge pays 2× I/O per input (network → scratch + scratch → merger), serializes phases (`max(input download time)` first-byte latency), and consumes scratch disk that scales with concurrent merges. Tracking as GAP-012 so we pick it up at the right time. The gap doc walks four options (stream-directly with download fallback, stream- by-default with circuit breaker, eliminate in-memory path only, stream-directly for promotion merges only) and the trade-offs between them — including the mid-merge retry surface, which is the main reason download-first is the current default. Surfaced during PR #6423 code walkthrough. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Base automatically changed from
gtt/legacy-promotion-and-schema-evo
to
gtt/parquet-streaming-base
May 18, 2026 18:15
Three test additions plus one engine fix surfaced by the F4 tests.
The existing MS-7 test proved the per-input page-cache bound for one
input and one region. F4 extends the coverage:
- `test_ms7_per_input_bound_across_num_inputs` sweeps `num_inputs ∈ {1, 3, 8}` × `rows_per_input ∈
{3 000, 30 000}` and asserts the per-input peak stays bounded. Cross-axis growth check: going
from 1 input to 8 must not push the peak up.
- `test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows` runs the prefix_len=0
multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input
row count. **This test surfaced F14 (below) — without the engine fix, the sub-region path's peak
grew ~9× when rows grew 10×.**
Tests serialize via `ms7_serial_lock` because
`PEAK_BODY_COL_PAGE_CACHE_LEN` is process-global; concurrent tests
would pollute each other's readings.
Parquet streams emit pages in column-major order (all of col 0,
then all of col 1, ...). The old sub-region-outer / col-inner
ordering meant that while processing sub-region 0's col K, the
stream emitted cols 0..K-1's remaining pages first to reach col K —
those skipped pages got cached under their own col_idx for later
sub-regions to consume, and the cache scaled with input row count.
Fix: new `process_split_region_col_outer` function for the
`needs_split` path. Cols iterate in the outer loop, sub-regions in
the inner. Each parquet col chunk is fully consumed from the stream
across all sub-regions before col K+1 starts. Cache for col K is
empty before col K+1's pages arrive.
Mechanics: pre-determine writer assignments for the region's
sub-regions (a top-level region's sub-regions may span multiple
output writers; consecutive sub-regions on the same writer get
coalesced into one combined Region so each writer holds one RG
concurrently — RGs on the same writer are sequential, so coalescing
keeps the parquet writer's single-active-RG constraint intact).
Single-region path stays on the existing `process_region`.
`prop_merge_prefix_aligned_streaming` sweeps `(num_inputs ∈ 1..=3,
per-input RG specs, num_outputs ∈ 1..=3)` with prefix_len=1 and
asserts MC-1 (rows preserved), MC-3 (sorted_series monotone within
each output), MS-3 (num_row_groups matches footer), PA-1+PA-3
(`assert_unique_rg_prefix_keys`), and CS-1 (metastore prefix_len ==
KV) on every generated case. 32 cases capped to keep runtime under
a second.
Fixture: `make_prefix_len_one_input` writes one RG per
`(metric_name, rows)` entry by calling `writer.flush()` between
batches. `sorted_series` encodes
`metric_base + row_offset_within_metric`, mirroring production's
storekey property that different metric_names produce
non-overlapping `sorted_series` byte ranges.
Plus a focused unit test `test_f5_single_input_two_metrics_minimal`
that pins one specific case for fast iteration.
`test_f7_production_shape_multi_input_multi_rg_multi_output`: 5
inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1.
Asserts the full invariant bundle (MC-1, MS-3, PA-1+PA-3, MS-5
cross-output sorted_series monotonicity, CS-1) — the corner the
adversarial review flagged as "untested production case".
MS-5 is "across adjacent outputs, sorted_series is monotone
non-decreasing." A single metric CAN span outputs (the engine
splits at sorted_series transitions inside an overflowing region),
so the cross-output invariant is sorted_series monotonicity, not
"each metric in one output."
- `cargo test -p quickwit-parquet-engine --lib` — 498 unit tests pass.
- `cargo clippy -p quickwit-parquet-engine --tests --all-features` with `-Dwarnings`.
- `cargo doc --no-deps -p quickwit-parquet-engine` warning-free.
- `cargo fmt --all -- --check` (nightly via PATH override).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Newer nightly rustfmt (2026-05-17) flags the extra blank line that crept into the test module between the F4 fixture helper and the "Heterogeneous-output regressions" section header. Single-line gap is what nightly fmt wants. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…plit Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
0690fe1 to
4ab52a4
Compare
rishabh
approved these changes
May 19, 2026
g-talbot
added a commit
that referenced
this pull request
May 20, 2026
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(merge): legacy-prefix promotion path + schema-evolution body cols Two adversarial-review follow-ups grouped because they share the streaming engine's input-routing and union-schema seams. ## (b) Legacy-prefix promotion A new operation type pairs a prefix_len=0 split with prefix_len>0 peers in one merge, so legacy splits can be folded into prefix- aligned buckets instead of aging out via retention. Adds: - `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality unchanged. - `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion target; `None` is the default regular-merge form. - `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality check in promotion mode. The output prefix_len still comes from the writer's KV stamp via `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1). - `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam PR-7 will wire from above. Tests: - `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`, `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`. - `test_mixed_prefix_ok_skips_input_equality_check`. - `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1. - `test_executor_mismatched_sources_count_bails`. ## F6 + F13: Schema evolution for body columns The merger now supports MC-4 across heterogeneous body-col schemas: - F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array flavour merge cleanly; before this they hit a "type conflict" at alignment time. - F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the union to be nullable when a List col is present in only some inputs; before this the writer rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 = element present) for nullable outer; non-nullable path unchanged. Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B with the same sort schema but different body cols (Utf8 vs Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32 A-only, Int64 B-only, common Float64). The merge produces the union schema; per-row rendering via `render_cell` matches across flavour boundaries; List cells from B render as nulls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt Same nightly-rustfmt drift as the storekey commit on #6424 (local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok` binding and the `merge_parquet_split_metadata` call now fit single-line under the newer width heuristics. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(merge-executor): route promotion merges through execute_merge_operation Codex P1 on PR #6423: the executor unconditionally called the in-memory `merge_sorted_parquet_files` path, which routes through `extract_and_validate_input_metadata` and bails on mixed `qh.rg_partition_prefix_len` before any output is produced. So a real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with `target_prefix_len_override = Some(1)` — failed before reaching the downstream `mixed_prefix_ok` plumbing in `merge_parquet_split_metadata`. The escape hatch existed but was unreachable for actual promotion inputs. Fix: branch in the executor's handle on `target_prefix_len_override.is_some()`. Promotion merges go through the engine's streaming entry point `quickwit_parquet_engine::merge::execute_merge_operation`, which opens each below-target input via `LegacyInputAdapter` and each at-target input directly. The streaming merge then sees a homogeneous stream advertising `prefix_len = target` on every input. Regular (non-promotion) merges keep the in-memory path. `execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>` parallel to `op.splits` — the engine deliberately doesn't depend on `quickwit-storage` (would invert layering and pull cloud SDKs into a pure parquet library). So this commit adds `LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by `tokio::fs::File`, one instance per downloaded split, each bound to its scratch-directory path. The `path: &Path` argument on the trait surface is ignored — the downloader has already resolved each split to a concrete local file before the executor runs. Coverage: - Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end` already exercises `execute_merge_operation` with a `prefix_len = 0` + `prefix_len = 1` pair, verifying the output advertises `prefix_len = 1` and passes PA-1 + PA-3 on the composite key. That's now the same code path the in-tree executor takes. - Module doc on the executor rewritten to spell out which path runs when. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track legacy promotion planner gap as GAP-011 The streaming Parquet merge stack landing in #6424–#6428 ships the full legacy-promotion *mechanism* (engine + adapter + executor wiring) but not the planner-level *trigger*. In production today, `MergePolicyState::record_split` buckets by `CompactionScope::from_split` which includes `rg_partition_prefix_len`, so legacy (prefix=0) and aligned (prefix>0) splits are separated before `ParquetMergePolicy::operations` runs. The policy only emits `ParquetMergeOperation::new`; a repo-wide search finds `promote_legacy` only in tests. Legacy splits therefore never migrate without an explicit trigger. Tracking this as GAP-011 so we pick it up at the right time. The gap doc walks three resolution options (merge buckets in the scope key, dedicated promotion pass, or hybrid prefer-multi-input-promotion) and the cost trade-offs between them, so the eventual implementation PR has a starting point. Raised by Codex review comment id 4311184497 on PR #6423. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track download-vs-streaming merge executor gap as GAP-012 The Parquet streaming merge engine is built around `RemoteByteSource` and was designed to pull pages directly from object storage — two GETs per input, overlap fetch with merge, no scratch disk. The production actor pipeline doesn't take that path: a downloader actor materializes every input on local disk first, and the executor wraps the local files in a `LocalFileByteSource` to feed `execute_merge_operation` (or just calls the in-memory `merge_sorted_parquet_files` path). The streaming engine's central design benefit is unused. This isn't a correctness bug — both paths give the same result. It's a perf/architecture gap: every merge pays 2× I/O per input (network → scratch + scratch → merger), serializes phases (`max(input download time)` first-byte latency), and consumes scratch disk that scales with concurrent merges. Tracking as GAP-012 so we pick it up at the right time. The gap doc walks four options (stream-directly with download fallback, stream- by-default with circuit breaker, eliminate in-memory path only, stream-directly for promotion merges only) and the trade-offs between them — including the mid-merge retry surface, which is the main reason download-first is the current default. Surfaced during PR #6423 code walkthrough. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot
added a commit
that referenced
this pull request
May 20, 2026
…gion engine fix (#6428) * feat(merge): close F4/F5/F7/F14 from the adversarial review Three test additions plus one engine fix surfaced by the F4 tests. The existing MS-7 test proved the per-input page-cache bound for one input and one region. F4 extends the coverage: - `test_ms7_per_input_bound_across_num_inputs` sweeps `num_inputs ∈ {1, 3, 8}` × `rows_per_input ∈ {3 000, 30 000}` and asserts the per-input peak stays bounded. Cross-axis growth check: going from 1 input to 8 must not push the peak up. - `test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows` runs the prefix_len=0 multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input row count. **This test surfaced F14 (below) — without the engine fix, the sub-region path's peak grew ~9× when rows grew 10×.** Tests serialize via `ms7_serial_lock` because `PEAK_BODY_COL_PAGE_CACHE_LEN` is process-global; concurrent tests would pollute each other's readings. Parquet streams emit pages in column-major order (all of col 0, then all of col 1, ...). The old sub-region-outer / col-inner ordering meant that while processing sub-region 0's col K, the stream emitted cols 0..K-1's remaining pages first to reach col K — those skipped pages got cached under their own col_idx for later sub-regions to consume, and the cache scaled with input row count. Fix: new `process_split_region_col_outer` function for the `needs_split` path. Cols iterate in the outer loop, sub-regions in the inner. Each parquet col chunk is fully consumed from the stream across all sub-regions before col K+1 starts. Cache for col K is empty before col K+1's pages arrive. Mechanics: pre-determine writer assignments for the region's sub-regions (a top-level region's sub-regions may span multiple output writers; consecutive sub-regions on the same writer get coalesced into one combined Region so each writer holds one RG concurrently — RGs on the same writer are sequential, so coalescing keeps the parquet writer's single-active-RG constraint intact). Single-region path stays on the existing `process_region`. `prop_merge_prefix_aligned_streaming` sweeps `(num_inputs ∈ 1..=3, per-input RG specs, num_outputs ∈ 1..=3)` with prefix_len=1 and asserts MC-1 (rows preserved), MC-3 (sorted_series monotone within each output), MS-3 (num_row_groups matches footer), PA-1+PA-3 (`assert_unique_rg_prefix_keys`), and CS-1 (metastore prefix_len == KV) on every generated case. 32 cases capped to keep runtime under a second. Fixture: `make_prefix_len_one_input` writes one RG per `(metric_name, rows)` entry by calling `writer.flush()` between batches. `sorted_series` encodes `metric_base + row_offset_within_metric`, mirroring production's storekey property that different metric_names produce non-overlapping `sorted_series` byte ranges. Plus a focused unit test `test_f5_single_input_two_metrics_minimal` that pins one specific case for fast iteration. `test_f7_production_shape_multi_input_multi_rg_multi_output`: 5 inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1. Asserts the full invariant bundle (MC-1, MS-3, PA-1+PA-3, MS-5 cross-output sorted_series monotonicity, CS-1) — the corner the adversarial review flagged as "untested production case". MS-5 is "across adjacent outputs, sorted_series is monotone non-decreasing." A single metric CAN span outputs (the engine splits at sorted_series transitions inside an overflowing region), so the cross-output invariant is sorted_series monotonicity, not "each metric in one output." - `cargo test -p quickwit-parquet-engine --lib` — 498 unit tests pass. - `cargo clippy -p quickwit-parquet-engine --tests --all-features` with `-Dwarnings`. - `cargo doc --no-deps -p quickwit-parquet-engine` warning-free. - `cargo fmt --all -- --check` (nightly via PATH override). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(streaming): drop stray blank line before tests section header Newer nightly rustfmt (2026-05-17) flags the extra blank line that crept into the test module between the F4 fixture helper and the "Heterogeneous-output regressions" section header. Single-line gap is what nightly fmt wants. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(streaming): roll over chunk-assignment before first chunk after split Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
6 tasks
g-talbot
added a commit
that referenced
this pull request
May 20, 2026
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(merge): legacy-prefix promotion path + schema-evolution body cols Two adversarial-review follow-ups grouped because they share the streaming engine's input-routing and union-schema seams. ## (b) Legacy-prefix promotion A new operation type pairs a prefix_len=0 split with prefix_len>0 peers in one merge, so legacy splits can be folded into prefix- aligned buckets instead of aging out via retention. Adds: - `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality unchanged. - `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion target; `None` is the default regular-merge form. - `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality check in promotion mode. The output prefix_len still comes from the writer's KV stamp via `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1). - `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam PR-7 will wire from above. Tests: - `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`, `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`. - `test_mixed_prefix_ok_skips_input_equality_check`. - `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1. - `test_executor_mismatched_sources_count_bails`. ## F6 + F13: Schema evolution for body columns The merger now supports MC-4 across heterogeneous body-col schemas: - F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array flavour merge cleanly; before this they hit a "type conflict" at alignment time. - F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the union to be nullable when a List col is present in only some inputs; before this the writer rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 = element present) for nullable outer; non-nullable path unchanged. Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B with the same sort schema but different body cols (Utf8 vs Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32 A-only, Int64 B-only, common Float64). The merge produces the union schema; per-row rendering via `render_cell` matches across flavour boundaries; List cells from B render as nulls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt Same nightly-rustfmt drift as the storekey commit on #6424 (local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok` binding and the `merge_parquet_split_metadata` call now fit single-line under the newer width heuristics. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(merge-executor): route promotion merges through execute_merge_operation Codex P1 on PR #6423: the executor unconditionally called the in-memory `merge_sorted_parquet_files` path, which routes through `extract_and_validate_input_metadata` and bails on mixed `qh.rg_partition_prefix_len` before any output is produced. So a real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with `target_prefix_len_override = Some(1)` — failed before reaching the downstream `mixed_prefix_ok` plumbing in `merge_parquet_split_metadata`. The escape hatch existed but was unreachable for actual promotion inputs. Fix: branch in the executor's handle on `target_prefix_len_override.is_some()`. Promotion merges go through the engine's streaming entry point `quickwit_parquet_engine::merge::execute_merge_operation`, which opens each below-target input via `LegacyInputAdapter` and each at-target input directly. The streaming merge then sees a homogeneous stream advertising `prefix_len = target` on every input. Regular (non-promotion) merges keep the in-memory path. `execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>` parallel to `op.splits` — the engine deliberately doesn't depend on `quickwit-storage` (would invert layering and pull cloud SDKs into a pure parquet library). So this commit adds `LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by `tokio::fs::File`, one instance per downloaded split, each bound to its scratch-directory path. The `path: &Path` argument on the trait surface is ignored — the downloader has already resolved each split to a concrete local file before the executor runs. Coverage: - Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end` already exercises `execute_merge_operation` with a `prefix_len = 0` + `prefix_len = 1` pair, verifying the output advertises `prefix_len = 1` and passes PA-1 + PA-3 on the composite key. That's now the same code path the in-tree executor takes. - Module doc on the executor rewritten to spell out which path runs when. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track legacy promotion planner gap as GAP-011 The streaming Parquet merge stack landing in #6424–#6428 ships the full legacy-promotion *mechanism* (engine + adapter + executor wiring) but not the planner-level *trigger*. In production today, `MergePolicyState::record_split` buckets by `CompactionScope::from_split` which includes `rg_partition_prefix_len`, so legacy (prefix=0) and aligned (prefix>0) splits are separated before `ParquetMergePolicy::operations` runs. The policy only emits `ParquetMergeOperation::new`; a repo-wide search finds `promote_legacy` only in tests. Legacy splits therefore never migrate without an explicit trigger. Tracking this as GAP-011 so we pick it up at the right time. The gap doc walks three resolution options (merge buckets in the scope key, dedicated promotion pass, or hybrid prefer-multi-input-promotion) and the cost trade-offs between them, so the eventual implementation PR has a starting point. Raised by Codex review comment id 4311184497 on PR #6423. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track download-vs-streaming merge executor gap as GAP-012 The Parquet streaming merge engine is built around `RemoteByteSource` and was designed to pull pages directly from object storage — two GETs per input, overlap fetch with merge, no scratch disk. The production actor pipeline doesn't take that path: a downloader actor materializes every input on local disk first, and the executor wraps the local files in a `LocalFileByteSource` to feed `execute_merge_operation` (or just calls the in-memory `merge_sorted_parquet_files` path). The streaming engine's central design benefit is unused. This isn't a correctness bug — both paths give the same result. It's a perf/architecture gap: every merge pays 2× I/O per input (network → scratch + scratch → merger), serializes phases (`max(input download time)` first-byte latency), and consumes scratch disk that scales with concurrent merges. Tracking as GAP-012 so we pick it up at the right time. The gap doc walks four options (stream-directly with download fallback, stream- by-default with circuit breaker, eliminate in-memory path only, stream-directly for promotion merges only) and the trade-offs between them — including the mid-merge retry surface, which is the main reason download-first is the current default. Surfaced during PR #6423 code walkthrough. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot
added a commit
that referenced
this pull request
May 20, 2026
…gion engine fix (#6428) * feat(merge): close F4/F5/F7/F14 from the adversarial review Three test additions plus one engine fix surfaced by the F4 tests. The existing MS-7 test proved the per-input page-cache bound for one input and one region. F4 extends the coverage: - `test_ms7_per_input_bound_across_num_inputs` sweeps `num_inputs ∈ {1, 3, 8}` × `rows_per_input ∈ {3 000, 30 000}` and asserts the per-input peak stays bounded. Cross-axis growth check: going from 1 input to 8 must not push the peak up. - `test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows` runs the prefix_len=0 multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input row count. **This test surfaced F14 (below) — without the engine fix, the sub-region path's peak grew ~9× when rows grew 10×.** Tests serialize via `ms7_serial_lock` because `PEAK_BODY_COL_PAGE_CACHE_LEN` is process-global; concurrent tests would pollute each other's readings. Parquet streams emit pages in column-major order (all of col 0, then all of col 1, ...). The old sub-region-outer / col-inner ordering meant that while processing sub-region 0's col K, the stream emitted cols 0..K-1's remaining pages first to reach col K — those skipped pages got cached under their own col_idx for later sub-regions to consume, and the cache scaled with input row count. Fix: new `process_split_region_col_outer` function for the `needs_split` path. Cols iterate in the outer loop, sub-regions in the inner. Each parquet col chunk is fully consumed from the stream across all sub-regions before col K+1 starts. Cache for col K is empty before col K+1's pages arrive. Mechanics: pre-determine writer assignments for the region's sub-regions (a top-level region's sub-regions may span multiple output writers; consecutive sub-regions on the same writer get coalesced into one combined Region so each writer holds one RG concurrently — RGs on the same writer are sequential, so coalescing keeps the parquet writer's single-active-RG constraint intact). Single-region path stays on the existing `process_region`. `prop_merge_prefix_aligned_streaming` sweeps `(num_inputs ∈ 1..=3, per-input RG specs, num_outputs ∈ 1..=3)` with prefix_len=1 and asserts MC-1 (rows preserved), MC-3 (sorted_series monotone within each output), MS-3 (num_row_groups matches footer), PA-1+PA-3 (`assert_unique_rg_prefix_keys`), and CS-1 (metastore prefix_len == KV) on every generated case. 32 cases capped to keep runtime under a second. Fixture: `make_prefix_len_one_input` writes one RG per `(metric_name, rows)` entry by calling `writer.flush()` between batches. `sorted_series` encodes `metric_base + row_offset_within_metric`, mirroring production's storekey property that different metric_names produce non-overlapping `sorted_series` byte ranges. Plus a focused unit test `test_f5_single_input_two_metrics_minimal` that pins one specific case for fast iteration. `test_f7_production_shape_multi_input_multi_rg_multi_output`: 5 inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1. Asserts the full invariant bundle (MC-1, MS-3, PA-1+PA-3, MS-5 cross-output sorted_series monotonicity, CS-1) — the corner the adversarial review flagged as "untested production case". MS-5 is "across adjacent outputs, sorted_series is monotone non-decreasing." A single metric CAN span outputs (the engine splits at sorted_series transitions inside an overflowing region), so the cross-output invariant is sorted_series monotonicity, not "each metric in one output." - `cargo test -p quickwit-parquet-engine --lib` — 498 unit tests pass. - `cargo clippy -p quickwit-parquet-engine --tests --all-features` with `-Dwarnings`. - `cargo doc --no-deps -p quickwit-parquet-engine` warning-free. - `cargo fmt --all -- --check` (nightly via PATH override). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(streaming): drop stray blank line before tests section header Newer nightly rustfmt (2026-05-17) flags the extra blank line that crept into the test module between the F4 fixture helper and the "Heterogeneous-output regressions" section header. Single-line gap is what nightly fmt wants. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(streaming): roll over chunk-assignment before first chunk after split Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot
added a commit
that referenced
this pull request
May 21, 2026
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(merge): legacy-prefix promotion path + schema-evolution body cols Two adversarial-review follow-ups grouped because they share the streaming engine's input-routing and union-schema seams. ## (b) Legacy-prefix promotion A new operation type pairs a prefix_len=0 split with prefix_len>0 peers in one merge, so legacy splits can be folded into prefix- aligned buckets instead of aging out via retention. Adds: - `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality unchanged. - `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion target; `None` is the default regular-merge form. - `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality check in promotion mode. The output prefix_len still comes from the writer's KV stamp via `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1). - `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam PR-7 will wire from above. Tests: - `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`, `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`. - `test_mixed_prefix_ok_skips_input_equality_check`. - `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1. - `test_executor_mismatched_sources_count_bails`. ## F6 + F13: Schema evolution for body columns The merger now supports MC-4 across heterogeneous body-col schemas: - F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array flavour merge cleanly; before this they hit a "type conflict" at alignment time. - F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the union to be nullable when a List col is present in only some inputs; before this the writer rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 = element present) for nullable outer; non-nullable path unchanged. Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B with the same sort schema but different body cols (Utf8 vs Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32 A-only, Int64 B-only, common Float64). The merge produces the union schema; per-row rendering via `render_cell` matches across flavour boundaries; List cells from B render as nulls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt Same nightly-rustfmt drift as the storekey commit on #6424 (local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok` binding and the `merge_parquet_split_metadata` call now fit single-line under the newer width heuristics. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(merge-executor): route promotion merges through execute_merge_operation Codex P1 on PR #6423: the executor unconditionally called the in-memory `merge_sorted_parquet_files` path, which routes through `extract_and_validate_input_metadata` and bails on mixed `qh.rg_partition_prefix_len` before any output is produced. So a real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with `target_prefix_len_override = Some(1)` — failed before reaching the downstream `mixed_prefix_ok` plumbing in `merge_parquet_split_metadata`. The escape hatch existed but was unreachable for actual promotion inputs. Fix: branch in the executor's handle on `target_prefix_len_override.is_some()`. Promotion merges go through the engine's streaming entry point `quickwit_parquet_engine::merge::execute_merge_operation`, which opens each below-target input via `LegacyInputAdapter` and each at-target input directly. The streaming merge then sees a homogeneous stream advertising `prefix_len = target` on every input. Regular (non-promotion) merges keep the in-memory path. `execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>` parallel to `op.splits` — the engine deliberately doesn't depend on `quickwit-storage` (would invert layering and pull cloud SDKs into a pure parquet library). So this commit adds `LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by `tokio::fs::File`, one instance per downloaded split, each bound to its scratch-directory path. The `path: &Path` argument on the trait surface is ignored — the downloader has already resolved each split to a concrete local file before the executor runs. Coverage: - Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end` already exercises `execute_merge_operation` with a `prefix_len = 0` + `prefix_len = 1` pair, verifying the output advertises `prefix_len = 1` and passes PA-1 + PA-3 on the composite key. That's now the same code path the in-tree executor takes. - Module doc on the executor rewritten to spell out which path runs when. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track legacy promotion planner gap as GAP-011 The streaming Parquet merge stack landing in #6424–#6428 ships the full legacy-promotion *mechanism* (engine + adapter + executor wiring) but not the planner-level *trigger*. In production today, `MergePolicyState::record_split` buckets by `CompactionScope::from_split` which includes `rg_partition_prefix_len`, so legacy (prefix=0) and aligned (prefix>0) splits are separated before `ParquetMergePolicy::operations` runs. The policy only emits `ParquetMergeOperation::new`; a repo-wide search finds `promote_legacy` only in tests. Legacy splits therefore never migrate without an explicit trigger. Tracking this as GAP-011 so we pick it up at the right time. The gap doc walks three resolution options (merge buckets in the scope key, dedicated promotion pass, or hybrid prefer-multi-input-promotion) and the cost trade-offs between them, so the eventual implementation PR has a starting point. Raised by Codex review comment id 4311184497 on PR #6423. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track download-vs-streaming merge executor gap as GAP-012 The Parquet streaming merge engine is built around `RemoteByteSource` and was designed to pull pages directly from object storage — two GETs per input, overlap fetch with merge, no scratch disk. The production actor pipeline doesn't take that path: a downloader actor materializes every input on local disk first, and the executor wraps the local files in a `LocalFileByteSource` to feed `execute_merge_operation` (or just calls the in-memory `merge_sorted_parquet_files` path). The streaming engine's central design benefit is unused. This isn't a correctness bug — both paths give the same result. It's a perf/architecture gap: every merge pays 2× I/O per input (network → scratch + scratch → merger), serializes phases (`max(input download time)` first-byte latency), and consumes scratch disk that scales with concurrent merges. Tracking as GAP-012 so we pick it up at the right time. The gap doc walks four options (stream-directly with download fallback, stream- by-default with circuit breaker, eliminate in-memory path only, stream-directly for promotion merges only) and the trade-offs between them — including the mid-merge retry surface, which is the main reason download-first is the current default. Surfaced during PR #6423 code walkthrough. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot
added a commit
that referenced
this pull request
May 21, 2026
…gion engine fix (#6428) * feat(merge): close F4/F5/F7/F14 from the adversarial review Three test additions plus one engine fix surfaced by the F4 tests. The existing MS-7 test proved the per-input page-cache bound for one input and one region. F4 extends the coverage: - `test_ms7_per_input_bound_across_num_inputs` sweeps `num_inputs ∈ {1, 3, 8}` × `rows_per_input ∈ {3 000, 30 000}` and asserts the per-input peak stays bounded. Cross-axis growth check: going from 1 input to 8 must not push the peak up. - `test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows` runs the prefix_len=0 multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input row count. **This test surfaced F14 (below) — without the engine fix, the sub-region path's peak grew ~9× when rows grew 10×.** Tests serialize via `ms7_serial_lock` because `PEAK_BODY_COL_PAGE_CACHE_LEN` is process-global; concurrent tests would pollute each other's readings. Parquet streams emit pages in column-major order (all of col 0, then all of col 1, ...). The old sub-region-outer / col-inner ordering meant that while processing sub-region 0's col K, the stream emitted cols 0..K-1's remaining pages first to reach col K — those skipped pages got cached under their own col_idx for later sub-regions to consume, and the cache scaled with input row count. Fix: new `process_split_region_col_outer` function for the `needs_split` path. Cols iterate in the outer loop, sub-regions in the inner. Each parquet col chunk is fully consumed from the stream across all sub-regions before col K+1 starts. Cache for col K is empty before col K+1's pages arrive. Mechanics: pre-determine writer assignments for the region's sub-regions (a top-level region's sub-regions may span multiple output writers; consecutive sub-regions on the same writer get coalesced into one combined Region so each writer holds one RG concurrently — RGs on the same writer are sequential, so coalescing keeps the parquet writer's single-active-RG constraint intact). Single-region path stays on the existing `process_region`. `prop_merge_prefix_aligned_streaming` sweeps `(num_inputs ∈ 1..=3, per-input RG specs, num_outputs ∈ 1..=3)` with prefix_len=1 and asserts MC-1 (rows preserved), MC-3 (sorted_series monotone within each output), MS-3 (num_row_groups matches footer), PA-1+PA-3 (`assert_unique_rg_prefix_keys`), and CS-1 (metastore prefix_len == KV) on every generated case. 32 cases capped to keep runtime under a second. Fixture: `make_prefix_len_one_input` writes one RG per `(metric_name, rows)` entry by calling `writer.flush()` between batches. `sorted_series` encodes `metric_base + row_offset_within_metric`, mirroring production's storekey property that different metric_names produce non-overlapping `sorted_series` byte ranges. Plus a focused unit test `test_f5_single_input_two_metrics_minimal` that pins one specific case for fast iteration. `test_f7_production_shape_multi_input_multi_rg_multi_output`: 5 inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1. Asserts the full invariant bundle (MC-1, MS-3, PA-1+PA-3, MS-5 cross-output sorted_series monotonicity, CS-1) — the corner the adversarial review flagged as "untested production case". MS-5 is "across adjacent outputs, sorted_series is monotone non-decreasing." A single metric CAN span outputs (the engine splits at sorted_series transitions inside an overflowing region), so the cross-output invariant is sorted_series monotonicity, not "each metric in one output." - `cargo test -p quickwit-parquet-engine --lib` — 498 unit tests pass. - `cargo clippy -p quickwit-parquet-engine --tests --all-features` with `-Dwarnings`. - `cargo doc --no-deps -p quickwit-parquet-engine` warning-free. - `cargo fmt --all -- --check` (nightly via PATH override). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(streaming): drop stray blank line before tests section header Newer nightly rustfmt (2026-05-17) flags the extra blank line that crept into the test module between the F4 fixture helper and the "Heterogeneous-output regressions" section header. Single-line gap is what nightly fmt wants. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(streaming): roll over chunk-assignment before first chunk after split Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on #6423. Closes the remaining open code findings from the adversarial review (
.planning/reviews/2026-05-13-adversarial-review-parquet-merger.md).Summary
Three test additions plus one engine fix surfaced by the F4 tests:
F4: MS-7 generalized
test_ms7_per_input_bound_across_num_inputssweepsnum_inputs ∈ {1, 3, 8}×rows_per_input ∈ {3 000, 30 000}and asserts the per-input peak stays bounded.test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rowsruns the prefix_len=0 multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input row count.The sub-region test surfaced F14 — without the engine fix, the sub-region path's peak grew ~9× when rows grew 10×.
F14: invert col/sub-region loop nesting
Parquet streams emit pages in column-major order (all of col 0, then all of col 1, ...). The old sub-region-outer / col-inner ordering meant that while processing sub-region 0's col K, the stream emitted cols 0..K-1's remaining pages first to reach col K — those skipped pages got cached under their own col_idx for later sub-regions to consume. The cache scaled with input row count.
Fix: new
process_split_region_col_outerfunction for theneeds_splitpath. Cols iterate in the outer loop, sub-regions in the inner. Each parquet col chunk is fully consumed from the stream across all sub-regions before col K+1 starts. Single-region path stays on the existingprocess_regionunchanged.Mechanics: pre-determine writer assignments for the region's sub-regions (sub-regions can span multiple output writers; consecutive sub-regions on the same writer get coalesced into one combined Region so each writer holds one concurrent RG — RGs on the same writer are sequential per parquet's single-active-RG constraint).
F5: prefix-aware proptest
prop_merge_prefix_aligned_streamingsweeps(num_inputs ∈ 1..=3, per-input RG specs, num_outputs ∈ 1..=3)with prefix_len=1. Asserts on every generated case:MergeOutputFile.num_row_groupsmatches footer.assert_unique_rg_prefix_keyspasses.MergeOutputFile.output_rg_partition_prefix_lenmatches on-disk KV.32 cases capped to keep runtime tight. Fixture honors the storekey property: different metric_names produce non-overlapping
sorted_seriesbyte ranges, same(metric, row_offset)across inputs gets the samesorted_series(the realistic "same series in multiple splits" case).F7: production-shape integration test
test_f7_production_shape_multi_input_multi_rg_multi_output: 5 inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1, with overlap (every metric appears in 2-3 of the 5 inputs). Asserts MC-1, MS-3, PA-1+PA-3, MS-5 (cross-output sorted_series monotonicity — a single metric CAN span outputs when its region overflows the per-output budget; the cross-output invariant is sorted_series monotonicity, not "each metric in one output"), and CS-1.Test plan
cargo test -p quickwit-parquet-engine --lib— 498 unit tests pass (was 495 on feat(merge): legacy promotion path + body-col schema evolution #6423; added test_ms7_per_input_bound_across_num_inputs, test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows, prop_merge_prefix_aligned_streaming, test_f5_single_input_two_metrics_minimal, test_f7_production_shape_multi_input_multi_rg_multi_output).cargo clippy -p quickwit-parquet-engine --tests --all-featureswith-Dwarnings.cargo fmt --all -- --check(nightly).cargo doc --no-deps -p quickwit-parquet-enginewarning-free.Status of the adversarial review
After this PR, the remaining open work from the original review is just the spec docs (PA/MS/CS supplement to ADR-002). All code findings (F1, F2, F4, F6–F14) are closed across PRs #6424, #6425, #6426, #6423, and this one.
🤖 Generated with Claude Code