Skip to content

feat(merge): adversarial-review test coverage (F4/F5/F7) + F14 sub-region engine fix#6428

Merged
g-talbot merged 3 commits into
gtt/parquet-streaming-basefrom
gtt/adversarial-review-test-coverage
May 19, 2026
Merged

feat(merge): adversarial-review test coverage (F4/F5/F7) + F14 sub-region engine fix#6428
g-talbot merged 3 commits into
gtt/parquet-streaming-basefrom
gtt/adversarial-review-test-coverage

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

Stacked on #6423. Closes the remaining open code findings from the adversarial review (.planning/reviews/2026-05-13-adversarial-review-parquet-merger.md).

Summary

Three test additions plus one engine fix surfaced by the F4 tests:

  • F4 — MS-7 generalized to multi-input × sub-region.
  • F14 — sub-region path inverted col/sub-region loop nesting to keep page cache bounded.
  • F5 — prefix-aware proptest over the streaming engine.
  • F7 — production-shape integration test (5 inputs × ~4 RGs × 4 outputs × prefix_len=1).

F4: MS-7 generalized

  • test_ms7_per_input_bound_across_num_inputs sweeps num_inputs ∈ {1, 3, 8} × rows_per_input ∈ {3 000, 30 000} and asserts the per-input peak stays bounded.
  • test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows runs the prefix_len=0 multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input row count.

The sub-region test surfaced F14 — without the engine fix, the sub-region path's peak grew ~9× when rows grew 10×.

F14: invert col/sub-region loop nesting

Parquet streams emit pages in column-major order (all of col 0, then all of col 1, ...). The old sub-region-outer / col-inner ordering meant that while processing sub-region 0's col K, the stream emitted cols 0..K-1's remaining pages first to reach col K — those skipped pages got cached under their own col_idx for later sub-regions to consume. The cache scaled with input row count.

Fix: new process_split_region_col_outer function for the needs_split path. Cols iterate in the outer loop, sub-regions in the inner. Each parquet col chunk is fully consumed from the stream across all sub-regions before col K+1 starts. Single-region path stays on the existing process_region unchanged.

Mechanics: pre-determine writer assignments for the region's sub-regions (sub-regions can span multiple output writers; consecutive sub-regions on the same writer get coalesced into one combined Region so each writer holds one concurrent RG — RGs on the same writer are sequential per parquet's single-active-RG constraint).

F5: prefix-aware proptest

prop_merge_prefix_aligned_streaming sweeps (num_inputs ∈ 1..=3, per-input RG specs, num_outputs ∈ 1..=3) with prefix_len=1. Asserts on every generated case:

  • MC-1: rows preserved.
  • MC-3: sorted_series monotone within each output.
  • MS-3: MergeOutputFile.num_row_groups matches footer.
  • PA-1+PA-3: assert_unique_rg_prefix_keys passes.
  • CS-1: MergeOutputFile.output_rg_partition_prefix_len matches on-disk KV.

32 cases capped to keep runtime tight. Fixture honors the storekey property: different metric_names produce non-overlapping sorted_series byte ranges, same (metric, row_offset) across inputs gets the same sorted_series (the realistic "same series in multiple splits" case).

F7: production-shape integration test

test_f7_production_shape_multi_input_multi_rg_multi_output: 5 inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1, with overlap (every metric appears in 2-3 of the 5 inputs). Asserts MC-1, MS-3, PA-1+PA-3, MS-5 (cross-output sorted_series monotonicity — a single metric CAN span outputs when its region overflows the per-output budget; the cross-output invariant is sorted_series monotonicity, not "each metric in one output"), and CS-1.

Test plan

  • cargo test -p quickwit-parquet-engine --lib — 498 unit tests pass (was 495 on feat(merge): legacy promotion path + body-col schema evolution #6423; added test_ms7_per_input_bound_across_num_inputs, test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows, prop_merge_prefix_aligned_streaming, test_f5_single_input_two_metrics_minimal, test_f7_production_shape_multi_input_multi_rg_multi_output).
  • cargo clippy -p quickwit-parquet-engine --tests --all-features with -Dwarnings.
  • cargo fmt --all -- --check (nightly).
  • cargo doc --no-deps -p quickwit-parquet-engine warning-free.

Status of the adversarial review

After this PR, the remaining open work from the original review is just the spec docs (PA/MS/CS supplement to ADR-002). All code findings (F1, F2, F4, F6–F14) are closed across PRs #6424, #6425, #6426, #6423, and this one.

🤖 Generated with Claude Code

@g-talbot g-talbot requested a review from a team as a code owner May 13, 2026 18:00
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from 2a3f09a to edd45a6 Compare May 13, 2026 18:11
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch 2 times, most recently from 6ed25aa to c990bfa Compare May 14, 2026 13:48
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from 34d634f to a6c72d5 Compare May 14, 2026 14:23
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from c990bfa to 3b90477 Compare May 14, 2026 14:23
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from a6c72d5 to 2efe6c2 Compare May 14, 2026 14:49
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 3b90477 to 814a8c7 Compare May 14, 2026 14:49
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from 2efe6c2 to 5f04604 Compare May 14, 2026 17:16
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 814a8c7 to 435a4b5 Compare May 14, 2026 17:16
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from 5f04604 to 6b31ac0 Compare May 14, 2026 18:10
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch 2 times, most recently from 154defd to 112ca1f Compare May 14, 2026 19:55
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from 6b31ac0 to 9047531 Compare May 14, 2026 19:55
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 112ca1f to bc4bb77 Compare May 15, 2026 14:03
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch 2 times, most recently from af2b07c to ebce99f Compare May 15, 2026 17:53
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from bc4bb77 to 1ea081c Compare May 15, 2026 17:53
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from ebce99f to d9aa050 Compare May 15, 2026 21:36
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 1ea081c to a5d1096 Compare May 15, 2026 21:36
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from d9aa050 to c067600 Compare May 15, 2026 21:45
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from a5d1096 to 95a662e Compare May 15, 2026 21:45
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from c067600 to 062eb87 Compare May 15, 2026 21:53
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 95a662e to dc1438f Compare May 15, 2026 21:53
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from 062eb87 to 93bccc4 Compare May 18, 2026 11:43
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from dc1438f to 292d1f9 Compare May 18, 2026 11:43
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from 93bccc4 to 65f92dc Compare May 18, 2026 12:34
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 292d1f9 to 6d142b2 Compare May 18, 2026 12:34
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from 65f92dc to 3c60e96 Compare May 18, 2026 12:49
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 6d142b2 to 4013bda Compare May 18, 2026 12:49
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from f29f6d4 to c3e91f8 Compare May 18, 2026 13:25
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 09002dd to d887b5b Compare May 18, 2026 13:25
@g-talbot g-talbot force-pushed the gtt/legacy-promotion-and-schema-evo branch from c3e91f8 to e5618e9 Compare May 18, 2026 14:04
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from d887b5b to 7c0e1a2 Compare May 18, 2026 14:04
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7c0e1a2892

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread quickwit/quickwit-parquet-engine/src/merge/streaming.rs Outdated
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 7c0e1a2 to 82cd92e Compare May 18, 2026 14:25
g-talbot added a commit that referenced this pull request May 18, 2026
…plit

Codex P1 on PR #6428: the previous "Recompute split budget after
rolling over" fix (commit 56e773f, #6424) handled the split
*decision* but not the split *assignment*. When the previous region
fills the current output exactly and the next region enters the
`needs_split` path, the chunk-assignment loop in
`process_split_region_col_outer`'s setup initializes from the stale
`current_output_idx` / `current_output_rows`. Its inner
`needs_new_writer` check guards on `!chunk_assignments.is_empty()`,
so the first iteration cannot roll over: the first sub-region is
appended to the already-full output and only the second one
advances. Output K ends up at 2× target while subsequent outputs
are short or empty.

Fix: initialize `active_output_idx` / `active_rows` from the
`will_roll_over` case before the loop. The inner `needs_new_writer`
check then works for both the first and subsequent iterations (on
the first iteration `active_rows = 0 < target` so it correctly
doesn't re-roll). The `can_reuse_current` check in the writer-
materialization loop already handles "first chunk's output_idx
doesn't match current_writer" by finalizing the current output
(which is correct: it's full, close it) and opening a fresh writer
at the next index.

Regression test
`test_split_chunk_assignment_rolls_over_before_first_chunk`:
prefix_len=1, two metrics of 200 + 400 rows = 600 total,
`num_outputs = 3` → `target_per_output = 200`. Region A fills
output 0 exactly; region B needs splitting. Pre-fix the merge
produced 2 outputs of 400 + 200 (output 0 overfilled, output 2
empty); post-fix it produces 3 outputs of ~200 rows each.

502 lib tests pass (+1); workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot added a commit that referenced this pull request May 18, 2026
The streaming Parquet merge stack landing in #6424#6428 ships the
full legacy-promotion *mechanism* (engine + adapter + executor
wiring) but not the planner-level *trigger*. In production today,
`MergePolicyState::record_split` buckets by
`CompactionScope::from_split` which includes
`rg_partition_prefix_len`, so legacy (prefix=0) and aligned
(prefix>0) splits are separated before `ParquetMergePolicy::operations`
runs. The policy only emits `ParquetMergeOperation::new`; a
repo-wide search finds `promote_legacy` only in tests. Legacy
splits therefore never migrate without an explicit trigger.

Tracking this as GAP-011 so we pick it up at the right time. The
gap doc walks three resolution options (merge buckets in the scope
key, dedicated promotion pass, or hybrid prefer-multi-input-promotion)
and the cost trade-offs between them, so the eventual implementation
PR has a starting point.

Raised by Codex review comment id 4311184497 on PR #6423.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot added a commit that referenced this pull request May 18, 2026
…plit

Codex P1 on PR #6428: the previous "Recompute split budget after
rolling over" fix (commit 56e773f, #6424) handled the split
*decision* but not the split *assignment*. When the previous region
fills the current output exactly and the next region enters the
`needs_split` path, the chunk-assignment loop in
`process_split_region_col_outer`'s setup initializes from the stale
`current_output_idx` / `current_output_rows`. Its inner
`needs_new_writer` check guards on `!chunk_assignments.is_empty()`,
so the first iteration cannot roll over: the first sub-region is
appended to the already-full output and only the second one
advances. Output K ends up at 2× target while subsequent outputs
are short or empty.

Fix: initialize `active_output_idx` / `active_rows` from the
`will_roll_over` case before the loop. The inner `needs_new_writer`
check then works for both the first and subsequent iterations (on
the first iteration `active_rows = 0 < target` so it correctly
doesn't re-roll). The `can_reuse_current` check in the writer-
materialization loop already handles "first chunk's output_idx
doesn't match current_writer" by finalizing the current output
(which is correct: it's full, close it) and opening a fresh writer
at the next index.

Regression test
`test_split_chunk_assignment_rolls_over_before_first_chunk`:
prefix_len=1, two metrics of 200 + 400 rows = 600 total,
`num_outputs = 3` → `target_per_output = 200`. Region A fills
output 0 exactly; region B needs splitting. Pre-fix the merge
produced 2 outputs of 400 + 200 (output 0 overfilled, output 2
empty); post-fix it produces 3 outputs of ~200 rows each.

502 lib tests pass (+1); workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 15fb382 to 0690fe1 Compare May 18, 2026 15:07
g-talbot added a commit that referenced this pull request May 18, 2026
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers

Three adversarial-review findings on the prefix/RG machinery, bundled
because they touch the same producer/consumer contract:

**F8: Legacy adapter rejects SS-1-violating input upfront.**
The adapter walked rows in physical order and emitted one RG per
prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`)
produced a 3-RG file where two RGs shared prefix `A`, violating PA-3.
The streaming merge engine would later reject it mid-merge — but only
after a quietly-bad file had been built. Now `compute_prefix_value_slices`
tracks each slice's composite prefix-value bytes and bails with
`LegacyAdapterError::InputNotSorted` on duplicates, surfacing the
SS-1 violation before any file lands on disk.

**F12: Consumer-side SS-3 (cross-layer divergence, discovered while
wiring F2's chunk-level verifier into the SS-3 test).** The adapter
implements SS-3 correctly (missing-from-schema → synthesized NullArray
during slice computation, file stamps `prefix_len = N`). The streaming
engine's reader did not: `find_prefix_parquet_col_indices` hard-required
every named prefix column to be physically present, so a file the
adapter produced from an SS-3 input was unreadable by the merge engine.
Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>`
and `extract_rg_composite_prefix_key` emits a constant null marker
(`encode_byte_array_prefix(&[])`) for None slots. The column contributes
no cross-RG ordering signal (constant everywhere) so region boundaries
are driven entirely by the present columns. Both halves of SS-3 now
agree end-to-end.

Known limitation: cross-file SS-3 — where some inputs have a sort
column and others don't — uses [0x00, 0x00] for the null contribution,
which sorts BEFORE non-null per the encoded-empty-string convention.
That weakly violates SS-2 (nulls sort last). Single-file SS-3 is
correct because every RG in such a file contributes the same constant.
If cross-file SS-3 becomes a production scenario, the encoding needs
a leading-0xff sentinel instead. Not exercised today.

**F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming
tests.** Tests asserting `num_row_groups == N` + KV stamped to N would
have passed even with an off-by-one in slice-boundary detection or
column-content scrambling. The verifier reads chunk-level statistics
directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness)
on the composite key. Wired into six tests:
- streaming engine: `test_streaming_merge_with_prefix_len_two`,
  `test_multi_rg_metric_aligned_input_produces_multi_rg_output`,
  `test_streaming_merge_with_desc_prefix_col`
- legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`,
  `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`,
  `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now
  passes thanks to F12).

Also: `assert_unique_rg_prefix_keys` no longer short-circuits on
single-RG files — they still go through PA-1 because an unsorted
single-RG file CAN have `min != max` on a prefix column.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(merge): legacy-prefix promotion path + schema-evolution body cols

Two adversarial-review follow-ups grouped because they share the
streaming engine's input-routing and union-schema seams.

## (b) Legacy-prefix promotion

A new operation type pairs a prefix_len=0 split with prefix_len>0
peers in one merge, so legacy splits can be folded into prefix-
aligned buckets instead of aging out via retention. Adds:

- `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed
  `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality
  unchanged.
- `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion
  target; `None` is the default regular-merge form.
- `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality
  check in promotion mode. The output prefix_len still comes from the writer's KV stamp via
  `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1).
- `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as
  either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or
  `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam
  PR-7 will wire from above.

Tests:
- `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`,
  `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`.
- `test_mixed_prefix_ok_skips_input_equality_check`.
- `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output
  passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1.
- `test_executor_mismatched_sources_count_bails`.

## F6 + F13: Schema evolution for body columns

The merger now supports MC-4 across heterogeneous body-col schemas:

- F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous
  to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array
  flavour merge cleanly; before this they hit a "type conflict" at alignment time.
- F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from
  `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the
  union to be nullable when a List col is present in only some inputs; before this the writer
  rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 =
  element present) for nullable outer; non-nullable path unchanged.

Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B
with the same sort schema but different body cols (Utf8 vs
Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32
A-only, Int64 B-only, common Float64). The merge produces the
union schema; per-row rendering via `render_cell` matches across
flavour boundaries; List cells from B render as nulls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt

Same nightly-rustfmt drift as the storekey commit on #6424
(local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok`
binding and the `merge_parquet_split_metadata` call now fit
single-line under the newer width heuristics. No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(merge-executor): route promotion merges through execute_merge_operation

Codex P1 on PR #6423: the executor unconditionally called the
in-memory `merge_sorted_parquet_files` path, which routes through
`extract_and_validate_input_metadata` and bails on mixed
`qh.rg_partition_prefix_len` before any output is produced. So a
real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with
`target_prefix_len_override = Some(1)` — failed before reaching the
downstream `mixed_prefix_ok` plumbing in
`merge_parquet_split_metadata`. The escape hatch existed but was
unreachable for actual promotion inputs.

Fix: branch in the executor's handle on
`target_prefix_len_override.is_some()`. Promotion merges go through
the engine's streaming entry point
`quickwit_parquet_engine::merge::execute_merge_operation`, which
opens each below-target input via `LegacyInputAdapter` and each
at-target input directly. The streaming merge then sees a
homogeneous stream advertising `prefix_len = target` on every
input. Regular (non-promotion) merges keep the in-memory path.

`execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>`
parallel to `op.splits` — the engine deliberately doesn't depend
on `quickwit-storage` (would invert layering and pull cloud SDKs
into a pure parquet library). So this commit adds
`LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by
`tokio::fs::File`, one instance per downloaded split, each bound
to its scratch-directory path. The `path: &Path` argument on the
trait surface is ignored — the downloader has already resolved
each split to a concrete local file before the executor runs.

Coverage:
- Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end`
  already exercises `execute_merge_operation` with a
  `prefix_len = 0` + `prefix_len = 1` pair, verifying the output
  advertises `prefix_len = 1` and passes PA-1 + PA-3 on the
  composite key. That's now the same code path the in-tree
  executor takes.
- Module doc on the executor rewritten to spell out which path runs
  when.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track legacy promotion planner gap as GAP-011

The streaming Parquet merge stack landing in #6424#6428 ships the
full legacy-promotion *mechanism* (engine + adapter + executor
wiring) but not the planner-level *trigger*. In production today,
`MergePolicyState::record_split` buckets by
`CompactionScope::from_split` which includes
`rg_partition_prefix_len`, so legacy (prefix=0) and aligned
(prefix>0) splits are separated before `ParquetMergePolicy::operations`
runs. The policy only emits `ParquetMergeOperation::new`; a
repo-wide search finds `promote_legacy` only in tests. Legacy
splits therefore never migrate without an explicit trigger.

Tracking this as GAP-011 so we pick it up at the right time. The
gap doc walks three resolution options (merge buckets in the scope
key, dedicated promotion pass, or hybrid prefer-multi-input-promotion)
and the cost trade-offs between them, so the eventual implementation
PR has a starting point.

Raised by Codex review comment id 4311184497 on PR #6423.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track download-vs-streaming merge executor gap as GAP-012

The Parquet streaming merge engine is built around `RemoteByteSource`
and was designed to pull pages directly from object storage — two
GETs per input, overlap fetch with merge, no scratch disk. The
production actor pipeline doesn't take that path: a downloader actor
materializes every input on local disk first, and the executor wraps
the local files in a `LocalFileByteSource` to feed `execute_merge_operation`
(or just calls the in-memory `merge_sorted_parquet_files` path). The
streaming engine's central design benefit is unused.

This isn't a correctness bug — both paths give the same result. It's
a perf/architecture gap: every merge pays 2× I/O per input
(network → scratch + scratch → merger), serializes phases
(`max(input download time)` first-byte latency), and consumes scratch
disk that scales with concurrent merges.

Tracking as GAP-012 so we pick it up at the right time. The gap doc
walks four options (stream-directly with download fallback, stream-
by-default with circuit breaker, eliminate in-memory path only,
stream-directly for promotion merges only) and the trade-offs between
them — including the mid-merge retry surface, which is the main
reason download-first is the current default.

Surfaced during PR #6423 code walkthrough.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Base automatically changed from gtt/legacy-promotion-and-schema-evo to gtt/parquet-streaming-base May 18, 2026 18:15
@g-talbot g-talbot requested a review from a team as a code owner May 18, 2026 18:15
g-talbot and others added 3 commits May 18, 2026 14:16
Three test additions plus one engine fix surfaced by the F4 tests.

The existing MS-7 test proved the per-input page-cache bound for one
input and one region. F4 extends the coverage:

- `test_ms7_per_input_bound_across_num_inputs` sweeps `num_inputs ∈ {1, 3, 8}` × `rows_per_input ∈
  {3 000, 30 000}` and asserts the per-input peak stays bounded. Cross-axis growth check: going
  from 1 input to 8 must not push the peak up.
- `test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows` runs the prefix_len=0
  multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input
  row count. **This test surfaced F14 (below) — without the engine fix, the sub-region path's peak
  grew ~9× when rows grew 10×.**

Tests serialize via `ms7_serial_lock` because
`PEAK_BODY_COL_PAGE_CACHE_LEN` is process-global; concurrent tests
would pollute each other's readings.

Parquet streams emit pages in column-major order (all of col 0,
then all of col 1, ...). The old sub-region-outer / col-inner
ordering meant that while processing sub-region 0's col K, the
stream emitted cols 0..K-1's remaining pages first to reach col K —
those skipped pages got cached under their own col_idx for later
sub-regions to consume, and the cache scaled with input row count.

Fix: new `process_split_region_col_outer` function for the
`needs_split` path. Cols iterate in the outer loop, sub-regions in
the inner. Each parquet col chunk is fully consumed from the stream
across all sub-regions before col K+1 starts. Cache for col K is
empty before col K+1's pages arrive.

Mechanics: pre-determine writer assignments for the region's
sub-regions (a top-level region's sub-regions may span multiple
output writers; consecutive sub-regions on the same writer get
coalesced into one combined Region so each writer holds one RG
concurrently — RGs on the same writer are sequential, so coalescing
keeps the parquet writer's single-active-RG constraint intact).
Single-region path stays on the existing `process_region`.

`prop_merge_prefix_aligned_streaming` sweeps `(num_inputs ∈ 1..=3,
per-input RG specs, num_outputs ∈ 1..=3)` with prefix_len=1 and
asserts MC-1 (rows preserved), MC-3 (sorted_series monotone within
each output), MS-3 (num_row_groups matches footer), PA-1+PA-3
(`assert_unique_rg_prefix_keys`), and CS-1 (metastore prefix_len ==
KV) on every generated case. 32 cases capped to keep runtime under
a second.

Fixture: `make_prefix_len_one_input` writes one RG per
`(metric_name, rows)` entry by calling `writer.flush()` between
batches. `sorted_series` encodes
`metric_base + row_offset_within_metric`, mirroring production's
storekey property that different metric_names produce
non-overlapping `sorted_series` byte ranges.

Plus a focused unit test `test_f5_single_input_two_metrics_minimal`
that pins one specific case for fast iteration.

`test_f7_production_shape_multi_input_multi_rg_multi_output`: 5
inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1.
Asserts the full invariant bundle (MC-1, MS-3, PA-1+PA-3, MS-5
cross-output sorted_series monotonicity, CS-1) — the corner the
adversarial review flagged as "untested production case".

MS-5 is "across adjacent outputs, sorted_series is monotone
non-decreasing." A single metric CAN span outputs (the engine
splits at sorted_series transitions inside an overflowing region),
so the cross-output invariant is sorted_series monotonicity, not
"each metric in one output."

- `cargo test -p quickwit-parquet-engine --lib` — 498 unit tests pass.
- `cargo clippy -p quickwit-parquet-engine --tests --all-features` with `-Dwarnings`.
- `cargo doc --no-deps -p quickwit-parquet-engine` warning-free.
- `cargo fmt --all -- --check` (nightly via PATH override).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Newer nightly rustfmt (2026-05-17) flags the extra blank line that
crept into the test module between the F4 fixture helper and the
"Heterogeneous-output regressions" section header. Single-line
gap is what nightly fmt wants.

No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…plit

Codex P1 on PR #6428: the previous "Recompute split budget after
rolling over" fix (commit 56e773f, #6424) handled the split
*decision* but not the split *assignment*. When the previous region
fills the current output exactly and the next region enters the
`needs_split` path, the chunk-assignment loop in
`process_split_region_col_outer`'s setup initializes from the stale
`current_output_idx` / `current_output_rows`. Its inner
`needs_new_writer` check guards on `!chunk_assignments.is_empty()`,
so the first iteration cannot roll over: the first sub-region is
appended to the already-full output and only the second one
advances. Output K ends up at 2× target while subsequent outputs
are short or empty.

Fix: initialize `active_output_idx` / `active_rows` from the
`will_roll_over` case before the loop. The inner `needs_new_writer`
check then works for both the first and subsequent iterations (on
the first iteration `active_rows = 0 < target` so it correctly
doesn't re-roll). The `can_reuse_current` check in the writer-
materialization loop already handles "first chunk's output_idx
doesn't match current_writer" by finalizing the current output
(which is correct: it's full, close it) and opening a fresh writer
at the next index.

Regression test
`test_split_chunk_assignment_rolls_over_before_first_chunk`:
prefix_len=1, two metrics of 200 + 400 rows = 600 total,
`num_outputs = 3` → `target_per_output = 200`. Region A fills
output 0 exactly; region B needs splitting. Pre-fix the merge
produced 2 outputs of 400 + 200 (output 0 overfilled, output 2
empty); post-fix it produces 3 outputs of ~200 rows each.

502 lib tests pass (+1); workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 0690fe1 to 4ab52a4 Compare May 18, 2026 18:17
@g-talbot g-talbot requested a review from adamtobey May 18, 2026 18:23
@g-talbot g-talbot merged commit 4597b7a into gtt/parquet-streaming-base May 19, 2026
5 checks passed
@g-talbot g-talbot deleted the gtt/adversarial-review-test-coverage branch May 19, 2026 08:41
g-talbot added a commit that referenced this pull request May 20, 2026
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers

Three adversarial-review findings on the prefix/RG machinery, bundled
because they touch the same producer/consumer contract:

**F8: Legacy adapter rejects SS-1-violating input upfront.**
The adapter walked rows in physical order and emitted one RG per
prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`)
produced a 3-RG file where two RGs shared prefix `A`, violating PA-3.
The streaming merge engine would later reject it mid-merge — but only
after a quietly-bad file had been built. Now `compute_prefix_value_slices`
tracks each slice's composite prefix-value bytes and bails with
`LegacyAdapterError::InputNotSorted` on duplicates, surfacing the
SS-1 violation before any file lands on disk.

**F12: Consumer-side SS-3 (cross-layer divergence, discovered while
wiring F2's chunk-level verifier into the SS-3 test).** The adapter
implements SS-3 correctly (missing-from-schema → synthesized NullArray
during slice computation, file stamps `prefix_len = N`). The streaming
engine's reader did not: `find_prefix_parquet_col_indices` hard-required
every named prefix column to be physically present, so a file the
adapter produced from an SS-3 input was unreadable by the merge engine.
Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>`
and `extract_rg_composite_prefix_key` emits a constant null marker
(`encode_byte_array_prefix(&[])`) for None slots. The column contributes
no cross-RG ordering signal (constant everywhere) so region boundaries
are driven entirely by the present columns. Both halves of SS-3 now
agree end-to-end.

Known limitation: cross-file SS-3 — where some inputs have a sort
column and others don't — uses [0x00, 0x00] for the null contribution,
which sorts BEFORE non-null per the encoded-empty-string convention.
That weakly violates SS-2 (nulls sort last). Single-file SS-3 is
correct because every RG in such a file contributes the same constant.
If cross-file SS-3 becomes a production scenario, the encoding needs
a leading-0xff sentinel instead. Not exercised today.

**F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming
tests.** Tests asserting `num_row_groups == N` + KV stamped to N would
have passed even with an off-by-one in slice-boundary detection or
column-content scrambling. The verifier reads chunk-level statistics
directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness)
on the composite key. Wired into six tests:
- streaming engine: `test_streaming_merge_with_prefix_len_two`,
  `test_multi_rg_metric_aligned_input_produces_multi_rg_output`,
  `test_streaming_merge_with_desc_prefix_col`
- legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`,
  `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`,
  `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now
  passes thanks to F12).

Also: `assert_unique_rg_prefix_keys` no longer short-circuits on
single-RG files — they still go through PA-1 because an unsorted
single-RG file CAN have `min != max` on a prefix column.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(merge): legacy-prefix promotion path + schema-evolution body cols

Two adversarial-review follow-ups grouped because they share the
streaming engine's input-routing and union-schema seams.

## (b) Legacy-prefix promotion

A new operation type pairs a prefix_len=0 split with prefix_len>0
peers in one merge, so legacy splits can be folded into prefix-
aligned buckets instead of aging out via retention. Adds:

- `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed
  `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality
  unchanged.
- `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion
  target; `None` is the default regular-merge form.
- `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality
  check in promotion mode. The output prefix_len still comes from the writer's KV stamp via
  `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1).
- `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as
  either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or
  `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam
  PR-7 will wire from above.

Tests:
- `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`,
  `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`.
- `test_mixed_prefix_ok_skips_input_equality_check`.
- `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output
  passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1.
- `test_executor_mismatched_sources_count_bails`.

## F6 + F13: Schema evolution for body columns

The merger now supports MC-4 across heterogeneous body-col schemas:

- F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous
  to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array
  flavour merge cleanly; before this they hit a "type conflict" at alignment time.
- F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from
  `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the
  union to be nullable when a List col is present in only some inputs; before this the writer
  rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 =
  element present) for nullable outer; non-nullable path unchanged.

Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B
with the same sort schema but different body cols (Utf8 vs
Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32
A-only, Int64 B-only, common Float64). The merge produces the
union schema; per-row rendering via `render_cell` matches across
flavour boundaries; List cells from B render as nulls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt

Same nightly-rustfmt drift as the storekey commit on #6424
(local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok`
binding and the `merge_parquet_split_metadata` call now fit
single-line under the newer width heuristics. No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(merge-executor): route promotion merges through execute_merge_operation

Codex P1 on PR #6423: the executor unconditionally called the
in-memory `merge_sorted_parquet_files` path, which routes through
`extract_and_validate_input_metadata` and bails on mixed
`qh.rg_partition_prefix_len` before any output is produced. So a
real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with
`target_prefix_len_override = Some(1)` — failed before reaching the
downstream `mixed_prefix_ok` plumbing in
`merge_parquet_split_metadata`. The escape hatch existed but was
unreachable for actual promotion inputs.

Fix: branch in the executor's handle on
`target_prefix_len_override.is_some()`. Promotion merges go through
the engine's streaming entry point
`quickwit_parquet_engine::merge::execute_merge_operation`, which
opens each below-target input via `LegacyInputAdapter` and each
at-target input directly. The streaming merge then sees a
homogeneous stream advertising `prefix_len = target` on every
input. Regular (non-promotion) merges keep the in-memory path.

`execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>`
parallel to `op.splits` — the engine deliberately doesn't depend
on `quickwit-storage` (would invert layering and pull cloud SDKs
into a pure parquet library). So this commit adds
`LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by
`tokio::fs::File`, one instance per downloaded split, each bound
to its scratch-directory path. The `path: &Path` argument on the
trait surface is ignored — the downloader has already resolved
each split to a concrete local file before the executor runs.

Coverage:
- Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end`
  already exercises `execute_merge_operation` with a
  `prefix_len = 0` + `prefix_len = 1` pair, verifying the output
  advertises `prefix_len = 1` and passes PA-1 + PA-3 on the
  composite key. That's now the same code path the in-tree
  executor takes.
- Module doc on the executor rewritten to spell out which path runs
  when.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track legacy promotion planner gap as GAP-011

The streaming Parquet merge stack landing in #6424#6428 ships the
full legacy-promotion *mechanism* (engine + adapter + executor
wiring) but not the planner-level *trigger*. In production today,
`MergePolicyState::record_split` buckets by
`CompactionScope::from_split` which includes
`rg_partition_prefix_len`, so legacy (prefix=0) and aligned
(prefix>0) splits are separated before `ParquetMergePolicy::operations`
runs. The policy only emits `ParquetMergeOperation::new`; a
repo-wide search finds `promote_legacy` only in tests. Legacy
splits therefore never migrate without an explicit trigger.

Tracking this as GAP-011 so we pick it up at the right time. The
gap doc walks three resolution options (merge buckets in the scope
key, dedicated promotion pass, or hybrid prefer-multi-input-promotion)
and the cost trade-offs between them, so the eventual implementation
PR has a starting point.

Raised by Codex review comment id 4311184497 on PR #6423.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track download-vs-streaming merge executor gap as GAP-012

The Parquet streaming merge engine is built around `RemoteByteSource`
and was designed to pull pages directly from object storage — two
GETs per input, overlap fetch with merge, no scratch disk. The
production actor pipeline doesn't take that path: a downloader actor
materializes every input on local disk first, and the executor wraps
the local files in a `LocalFileByteSource` to feed `execute_merge_operation`
(or just calls the in-memory `merge_sorted_parquet_files` path). The
streaming engine's central design benefit is unused.

This isn't a correctness bug — both paths give the same result. It's
a perf/architecture gap: every merge pays 2× I/O per input
(network → scratch + scratch → merger), serializes phases
(`max(input download time)` first-byte latency), and consumes scratch
disk that scales with concurrent merges.

Tracking as GAP-012 so we pick it up at the right time. The gap doc
walks four options (stream-directly with download fallback, stream-
by-default with circuit breaker, eliminate in-memory path only,
stream-directly for promotion merges only) and the trade-offs between
them — including the mid-merge retry surface, which is the main
reason download-first is the current default.

Surfaced during PR #6423 code walkthrough.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot added a commit that referenced this pull request May 20, 2026
…gion engine fix (#6428)

* feat(merge): close F4/F5/F7/F14 from the adversarial review

Three test additions plus one engine fix surfaced by the F4 tests.

The existing MS-7 test proved the per-input page-cache bound for one
input and one region. F4 extends the coverage:

- `test_ms7_per_input_bound_across_num_inputs` sweeps `num_inputs ∈ {1, 3, 8}` × `rows_per_input ∈
  {3 000, 30 000}` and asserts the per-input peak stays bounded. Cross-axis growth check: going
  from 1 input to 8 must not push the peak up.
- `test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows` runs the prefix_len=0
  multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input
  row count. **This test surfaced F14 (below) — without the engine fix, the sub-region path's peak
  grew ~9× when rows grew 10×.**

Tests serialize via `ms7_serial_lock` because
`PEAK_BODY_COL_PAGE_CACHE_LEN` is process-global; concurrent tests
would pollute each other's readings.

Parquet streams emit pages in column-major order (all of col 0,
then all of col 1, ...). The old sub-region-outer / col-inner
ordering meant that while processing sub-region 0's col K, the
stream emitted cols 0..K-1's remaining pages first to reach col K —
those skipped pages got cached under their own col_idx for later
sub-regions to consume, and the cache scaled with input row count.

Fix: new `process_split_region_col_outer` function for the
`needs_split` path. Cols iterate in the outer loop, sub-regions in
the inner. Each parquet col chunk is fully consumed from the stream
across all sub-regions before col K+1 starts. Cache for col K is
empty before col K+1's pages arrive.

Mechanics: pre-determine writer assignments for the region's
sub-regions (a top-level region's sub-regions may span multiple
output writers; consecutive sub-regions on the same writer get
coalesced into one combined Region so each writer holds one RG
concurrently — RGs on the same writer are sequential, so coalescing
keeps the parquet writer's single-active-RG constraint intact).
Single-region path stays on the existing `process_region`.

`prop_merge_prefix_aligned_streaming` sweeps `(num_inputs ∈ 1..=3,
per-input RG specs, num_outputs ∈ 1..=3)` with prefix_len=1 and
asserts MC-1 (rows preserved), MC-3 (sorted_series monotone within
each output), MS-3 (num_row_groups matches footer), PA-1+PA-3
(`assert_unique_rg_prefix_keys`), and CS-1 (metastore prefix_len ==
KV) on every generated case. 32 cases capped to keep runtime under
a second.

Fixture: `make_prefix_len_one_input` writes one RG per
`(metric_name, rows)` entry by calling `writer.flush()` between
batches. `sorted_series` encodes
`metric_base + row_offset_within_metric`, mirroring production's
storekey property that different metric_names produce
non-overlapping `sorted_series` byte ranges.

Plus a focused unit test `test_f5_single_input_two_metrics_minimal`
that pins one specific case for fast iteration.

`test_f7_production_shape_multi_input_multi_rg_multi_output`: 5
inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1.
Asserts the full invariant bundle (MC-1, MS-3, PA-1+PA-3, MS-5
cross-output sorted_series monotonicity, CS-1) — the corner the
adversarial review flagged as "untested production case".

MS-5 is "across adjacent outputs, sorted_series is monotone
non-decreasing." A single metric CAN span outputs (the engine
splits at sorted_series transitions inside an overflowing region),
so the cross-output invariant is sorted_series monotonicity, not
"each metric in one output."

- `cargo test -p quickwit-parquet-engine --lib` — 498 unit tests pass.
- `cargo clippy -p quickwit-parquet-engine --tests --all-features` with `-Dwarnings`.
- `cargo doc --no-deps -p quickwit-parquet-engine` warning-free.
- `cargo fmt --all -- --check` (nightly via PATH override).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(streaming): drop stray blank line before tests section header

Newer nightly rustfmt (2026-05-17) flags the extra blank line that
crept into the test module between the F4 fixture helper and the
"Heterogeneous-output regressions" section header. Single-line
gap is what nightly fmt wants.

No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): roll over chunk-assignment before first chunk after split

Codex P1 on PR #6428: the previous "Recompute split budget after
rolling over" fix (commit 56e773f, #6424) handled the split
*decision* but not the split *assignment*. When the previous region
fills the current output exactly and the next region enters the
`needs_split` path, the chunk-assignment loop in
`process_split_region_col_outer`'s setup initializes from the stale
`current_output_idx` / `current_output_rows`. Its inner
`needs_new_writer` check guards on `!chunk_assignments.is_empty()`,
so the first iteration cannot roll over: the first sub-region is
appended to the already-full output and only the second one
advances. Output K ends up at 2× target while subsequent outputs
are short or empty.

Fix: initialize `active_output_idx` / `active_rows` from the
`will_roll_over` case before the loop. The inner `needs_new_writer`
check then works for both the first and subsequent iterations (on
the first iteration `active_rows = 0 < target` so it correctly
doesn't re-roll). The `can_reuse_current` check in the writer-
materialization loop already handles "first chunk's output_idx
doesn't match current_writer" by finalizing the current output
(which is correct: it's full, close it) and opening a fresh writer
at the next index.

Regression test
`test_split_chunk_assignment_rolls_over_before_first_chunk`:
prefix_len=1, two metrics of 200 + 400 rows = 600 total,
`num_outputs = 3` → `target_per_output = 200`. Region A fills
output 0 exactly; region B needs splitting. Pre-fix the merge
produced 2 outputs of 400 + 200 (output 0 overfilled, output 2
empty); post-fix it produces 3 outputs of ~200 rows each.

502 lib tests pass (+1); workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot added a commit that referenced this pull request May 20, 2026
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers

Three adversarial-review findings on the prefix/RG machinery, bundled
because they touch the same producer/consumer contract:

**F8: Legacy adapter rejects SS-1-violating input upfront.**
The adapter walked rows in physical order and emitted one RG per
prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`)
produced a 3-RG file where two RGs shared prefix `A`, violating PA-3.
The streaming merge engine would later reject it mid-merge — but only
after a quietly-bad file had been built. Now `compute_prefix_value_slices`
tracks each slice's composite prefix-value bytes and bails with
`LegacyAdapterError::InputNotSorted` on duplicates, surfacing the
SS-1 violation before any file lands on disk.

**F12: Consumer-side SS-3 (cross-layer divergence, discovered while
wiring F2's chunk-level verifier into the SS-3 test).** The adapter
implements SS-3 correctly (missing-from-schema → synthesized NullArray
during slice computation, file stamps `prefix_len = N`). The streaming
engine's reader did not: `find_prefix_parquet_col_indices` hard-required
every named prefix column to be physically present, so a file the
adapter produced from an SS-3 input was unreadable by the merge engine.
Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>`
and `extract_rg_composite_prefix_key` emits a constant null marker
(`encode_byte_array_prefix(&[])`) for None slots. The column contributes
no cross-RG ordering signal (constant everywhere) so region boundaries
are driven entirely by the present columns. Both halves of SS-3 now
agree end-to-end.

Known limitation: cross-file SS-3 — where some inputs have a sort
column and others don't — uses [0x00, 0x00] for the null contribution,
which sorts BEFORE non-null per the encoded-empty-string convention.
That weakly violates SS-2 (nulls sort last). Single-file SS-3 is
correct because every RG in such a file contributes the same constant.
If cross-file SS-3 becomes a production scenario, the encoding needs
a leading-0xff sentinel instead. Not exercised today.

**F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming
tests.** Tests asserting `num_row_groups == N` + KV stamped to N would
have passed even with an off-by-one in slice-boundary detection or
column-content scrambling. The verifier reads chunk-level statistics
directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness)
on the composite key. Wired into six tests:
- streaming engine: `test_streaming_merge_with_prefix_len_two`,
  `test_multi_rg_metric_aligned_input_produces_multi_rg_output`,
  `test_streaming_merge_with_desc_prefix_col`
- legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`,
  `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`,
  `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now
  passes thanks to F12).

Also: `assert_unique_rg_prefix_keys` no longer short-circuits on
single-RG files — they still go through PA-1 because an unsorted
single-RG file CAN have `min != max` on a prefix column.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(merge): legacy-prefix promotion path + schema-evolution body cols

Two adversarial-review follow-ups grouped because they share the
streaming engine's input-routing and union-schema seams.

## (b) Legacy-prefix promotion

A new operation type pairs a prefix_len=0 split with prefix_len>0
peers in one merge, so legacy splits can be folded into prefix-
aligned buckets instead of aging out via retention. Adds:

- `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed
  `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality
  unchanged.
- `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion
  target; `None` is the default regular-merge form.
- `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality
  check in promotion mode. The output prefix_len still comes from the writer's KV stamp via
  `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1).
- `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as
  either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or
  `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam
  PR-7 will wire from above.

Tests:
- `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`,
  `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`.
- `test_mixed_prefix_ok_skips_input_equality_check`.
- `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output
  passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1.
- `test_executor_mismatched_sources_count_bails`.

## F6 + F13: Schema evolution for body columns

The merger now supports MC-4 across heterogeneous body-col schemas:

- F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous
  to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array
  flavour merge cleanly; before this they hit a "type conflict" at alignment time.
- F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from
  `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the
  union to be nullable when a List col is present in only some inputs; before this the writer
  rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 =
  element present) for nullable outer; non-nullable path unchanged.

Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B
with the same sort schema but different body cols (Utf8 vs
Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32
A-only, Int64 B-only, common Float64). The merge produces the
union schema; per-row rendering via `render_cell` matches across
flavour boundaries; List cells from B render as nulls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt

Same nightly-rustfmt drift as the storekey commit on #6424
(local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok`
binding and the `merge_parquet_split_metadata` call now fit
single-line under the newer width heuristics. No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(merge-executor): route promotion merges through execute_merge_operation

Codex P1 on PR #6423: the executor unconditionally called the
in-memory `merge_sorted_parquet_files` path, which routes through
`extract_and_validate_input_metadata` and bails on mixed
`qh.rg_partition_prefix_len` before any output is produced. So a
real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with
`target_prefix_len_override = Some(1)` — failed before reaching the
downstream `mixed_prefix_ok` plumbing in
`merge_parquet_split_metadata`. The escape hatch existed but was
unreachable for actual promotion inputs.

Fix: branch in the executor's handle on
`target_prefix_len_override.is_some()`. Promotion merges go through
the engine's streaming entry point
`quickwit_parquet_engine::merge::execute_merge_operation`, which
opens each below-target input via `LegacyInputAdapter` and each
at-target input directly. The streaming merge then sees a
homogeneous stream advertising `prefix_len = target` on every
input. Regular (non-promotion) merges keep the in-memory path.

`execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>`
parallel to `op.splits` — the engine deliberately doesn't depend
on `quickwit-storage` (would invert layering and pull cloud SDKs
into a pure parquet library). So this commit adds
`LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by
`tokio::fs::File`, one instance per downloaded split, each bound
to its scratch-directory path. The `path: &Path` argument on the
trait surface is ignored — the downloader has already resolved
each split to a concrete local file before the executor runs.

Coverage:
- Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end`
  already exercises `execute_merge_operation` with a
  `prefix_len = 0` + `prefix_len = 1` pair, verifying the output
  advertises `prefix_len = 1` and passes PA-1 + PA-3 on the
  composite key. That's now the same code path the in-tree
  executor takes.
- Module doc on the executor rewritten to spell out which path runs
  when.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track legacy promotion planner gap as GAP-011

The streaming Parquet merge stack landing in #6424#6428 ships the
full legacy-promotion *mechanism* (engine + adapter + executor
wiring) but not the planner-level *trigger*. In production today,
`MergePolicyState::record_split` buckets by
`CompactionScope::from_split` which includes
`rg_partition_prefix_len`, so legacy (prefix=0) and aligned
(prefix>0) splits are separated before `ParquetMergePolicy::operations`
runs. The policy only emits `ParquetMergeOperation::new`; a
repo-wide search finds `promote_legacy` only in tests. Legacy
splits therefore never migrate without an explicit trigger.

Tracking this as GAP-011 so we pick it up at the right time. The
gap doc walks three resolution options (merge buckets in the scope
key, dedicated promotion pass, or hybrid prefer-multi-input-promotion)
and the cost trade-offs between them, so the eventual implementation
PR has a starting point.

Raised by Codex review comment id 4311184497 on PR #6423.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track download-vs-streaming merge executor gap as GAP-012

The Parquet streaming merge engine is built around `RemoteByteSource`
and was designed to pull pages directly from object storage — two
GETs per input, overlap fetch with merge, no scratch disk. The
production actor pipeline doesn't take that path: a downloader actor
materializes every input on local disk first, and the executor wraps
the local files in a `LocalFileByteSource` to feed `execute_merge_operation`
(or just calls the in-memory `merge_sorted_parquet_files` path). The
streaming engine's central design benefit is unused.

This isn't a correctness bug — both paths give the same result. It's
a perf/architecture gap: every merge pays 2× I/O per input
(network → scratch + scratch → merger), serializes phases
(`max(input download time)` first-byte latency), and consumes scratch
disk that scales with concurrent merges.

Tracking as GAP-012 so we pick it up at the right time. The gap doc
walks four options (stream-directly with download fallback, stream-
by-default with circuit breaker, eliminate in-memory path only,
stream-directly for promotion merges only) and the trade-offs between
them — including the mid-merge retry surface, which is the main
reason download-first is the current default.

Surfaced during PR #6423 code walkthrough.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot added a commit that referenced this pull request May 20, 2026
…gion engine fix (#6428)

* feat(merge): close F4/F5/F7/F14 from the adversarial review

Three test additions plus one engine fix surfaced by the F4 tests.

The existing MS-7 test proved the per-input page-cache bound for one
input and one region. F4 extends the coverage:

- `test_ms7_per_input_bound_across_num_inputs` sweeps `num_inputs ∈ {1, 3, 8}` × `rows_per_input ∈
  {3 000, 30 000}` and asserts the per-input peak stays bounded. Cross-axis growth check: going
  from 1 input to 8 must not push the peak up.
- `test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows` runs the prefix_len=0
  multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input
  row count. **This test surfaced F14 (below) — without the engine fix, the sub-region path's peak
  grew ~9× when rows grew 10×.**

Tests serialize via `ms7_serial_lock` because
`PEAK_BODY_COL_PAGE_CACHE_LEN` is process-global; concurrent tests
would pollute each other's readings.

Parquet streams emit pages in column-major order (all of col 0,
then all of col 1, ...). The old sub-region-outer / col-inner
ordering meant that while processing sub-region 0's col K, the
stream emitted cols 0..K-1's remaining pages first to reach col K —
those skipped pages got cached under their own col_idx for later
sub-regions to consume, and the cache scaled with input row count.

Fix: new `process_split_region_col_outer` function for the
`needs_split` path. Cols iterate in the outer loop, sub-regions in
the inner. Each parquet col chunk is fully consumed from the stream
across all sub-regions before col K+1 starts. Cache for col K is
empty before col K+1's pages arrive.

Mechanics: pre-determine writer assignments for the region's
sub-regions (a top-level region's sub-regions may span multiple
output writers; consecutive sub-regions on the same writer get
coalesced into one combined Region so each writer holds one RG
concurrently — RGs on the same writer are sequential, so coalescing
keeps the parquet writer's single-active-RG constraint intact).
Single-region path stays on the existing `process_region`.

`prop_merge_prefix_aligned_streaming` sweeps `(num_inputs ∈ 1..=3,
per-input RG specs, num_outputs ∈ 1..=3)` with prefix_len=1 and
asserts MC-1 (rows preserved), MC-3 (sorted_series monotone within
each output), MS-3 (num_row_groups matches footer), PA-1+PA-3
(`assert_unique_rg_prefix_keys`), and CS-1 (metastore prefix_len ==
KV) on every generated case. 32 cases capped to keep runtime under
a second.

Fixture: `make_prefix_len_one_input` writes one RG per
`(metric_name, rows)` entry by calling `writer.flush()` between
batches. `sorted_series` encodes
`metric_base + row_offset_within_metric`, mirroring production's
storekey property that different metric_names produce
non-overlapping `sorted_series` byte ranges.

Plus a focused unit test `test_f5_single_input_two_metrics_minimal`
that pins one specific case for fast iteration.

`test_f7_production_shape_multi_input_multi_rg_multi_output`: 5
inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1.
Asserts the full invariant bundle (MC-1, MS-3, PA-1+PA-3, MS-5
cross-output sorted_series monotonicity, CS-1) — the corner the
adversarial review flagged as "untested production case".

MS-5 is "across adjacent outputs, sorted_series is monotone
non-decreasing." A single metric CAN span outputs (the engine
splits at sorted_series transitions inside an overflowing region),
so the cross-output invariant is sorted_series monotonicity, not
"each metric in one output."

- `cargo test -p quickwit-parquet-engine --lib` — 498 unit tests pass.
- `cargo clippy -p quickwit-parquet-engine --tests --all-features` with `-Dwarnings`.
- `cargo doc --no-deps -p quickwit-parquet-engine` warning-free.
- `cargo fmt --all -- --check` (nightly via PATH override).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(streaming): drop stray blank line before tests section header

Newer nightly rustfmt (2026-05-17) flags the extra blank line that
crept into the test module between the F4 fixture helper and the
"Heterogeneous-output regressions" section header. Single-line
gap is what nightly fmt wants.

No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): roll over chunk-assignment before first chunk after split

Codex P1 on PR #6428: the previous "Recompute split budget after
rolling over" fix (commit 56e773f, #6424) handled the split
*decision* but not the split *assignment*. When the previous region
fills the current output exactly and the next region enters the
`needs_split` path, the chunk-assignment loop in
`process_split_region_col_outer`'s setup initializes from the stale
`current_output_idx` / `current_output_rows`. Its inner
`needs_new_writer` check guards on `!chunk_assignments.is_empty()`,
so the first iteration cannot roll over: the first sub-region is
appended to the already-full output and only the second one
advances. Output K ends up at 2× target while subsequent outputs
are short or empty.

Fix: initialize `active_output_idx` / `active_rows` from the
`will_roll_over` case before the loop. The inner `needs_new_writer`
check then works for both the first and subsequent iterations (on
the first iteration `active_rows = 0 < target` so it correctly
doesn't re-roll). The `can_reuse_current` check in the writer-
materialization loop already handles "first chunk's output_idx
doesn't match current_writer" by finalizing the current output
(which is correct: it's full, close it) and opening a fresh writer
at the next index.

Regression test
`test_split_chunk_assignment_rolls_over_before_first_chunk`:
prefix_len=1, two metrics of 200 + 400 rows = 600 total,
`num_outputs = 3` → `target_per_output = 200`. Region A fills
output 0 exactly; region B needs splitting. Pre-fix the merge
produced 2 outputs of 400 + 200 (output 0 overfilled, output 2
empty); post-fix it produces 3 outputs of ~200 rows each.

502 lib tests pass (+1); workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot added a commit that referenced this pull request May 21, 2026
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers

Three adversarial-review findings on the prefix/RG machinery, bundled
because they touch the same producer/consumer contract:

**F8: Legacy adapter rejects SS-1-violating input upfront.**
The adapter walked rows in physical order and emitted one RG per
prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`)
produced a 3-RG file where two RGs shared prefix `A`, violating PA-3.
The streaming merge engine would later reject it mid-merge — but only
after a quietly-bad file had been built. Now `compute_prefix_value_slices`
tracks each slice's composite prefix-value bytes and bails with
`LegacyAdapterError::InputNotSorted` on duplicates, surfacing the
SS-1 violation before any file lands on disk.

**F12: Consumer-side SS-3 (cross-layer divergence, discovered while
wiring F2's chunk-level verifier into the SS-3 test).** The adapter
implements SS-3 correctly (missing-from-schema → synthesized NullArray
during slice computation, file stamps `prefix_len = N`). The streaming
engine's reader did not: `find_prefix_parquet_col_indices` hard-required
every named prefix column to be physically present, so a file the
adapter produced from an SS-3 input was unreadable by the merge engine.
Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>`
and `extract_rg_composite_prefix_key` emits a constant null marker
(`encode_byte_array_prefix(&[])`) for None slots. The column contributes
no cross-RG ordering signal (constant everywhere) so region boundaries
are driven entirely by the present columns. Both halves of SS-3 now
agree end-to-end.

Known limitation: cross-file SS-3 — where some inputs have a sort
column and others don't — uses [0x00, 0x00] for the null contribution,
which sorts BEFORE non-null per the encoded-empty-string convention.
That weakly violates SS-2 (nulls sort last). Single-file SS-3 is
correct because every RG in such a file contributes the same constant.
If cross-file SS-3 becomes a production scenario, the encoding needs
a leading-0xff sentinel instead. Not exercised today.

**F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming
tests.** Tests asserting `num_row_groups == N` + KV stamped to N would
have passed even with an off-by-one in slice-boundary detection or
column-content scrambling. The verifier reads chunk-level statistics
directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness)
on the composite key. Wired into six tests:
- streaming engine: `test_streaming_merge_with_prefix_len_two`,
  `test_multi_rg_metric_aligned_input_produces_multi_rg_output`,
  `test_streaming_merge_with_desc_prefix_col`
- legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`,
  `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`,
  `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now
  passes thanks to F12).

Also: `assert_unique_rg_prefix_keys` no longer short-circuits on
single-RG files — they still go through PA-1 because an unsorted
single-RG file CAN have `min != max` on a prefix column.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(merge): legacy-prefix promotion path + schema-evolution body cols

Two adversarial-review follow-ups grouped because they share the
streaming engine's input-routing and union-schema seams.

## (b) Legacy-prefix promotion

A new operation type pairs a prefix_len=0 split with prefix_len>0
peers in one merge, so legacy splits can be folded into prefix-
aligned buckets instead of aging out via retention. Adds:

- `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed
  `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality
  unchanged.
- `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion
  target; `None` is the default regular-merge form.
- `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality
  check in promotion mode. The output prefix_len still comes from the writer's KV stamp via
  `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1).
- `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as
  either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or
  `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam
  PR-7 will wire from above.

Tests:
- `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`,
  `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`.
- `test_mixed_prefix_ok_skips_input_equality_check`.
- `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output
  passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1.
- `test_executor_mismatched_sources_count_bails`.

## F6 + F13: Schema evolution for body columns

The merger now supports MC-4 across heterogeneous body-col schemas:

- F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous
  to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array
  flavour merge cleanly; before this they hit a "type conflict" at alignment time.
- F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from
  `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the
  union to be nullable when a List col is present in only some inputs; before this the writer
  rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 =
  element present) for nullable outer; non-nullable path unchanged.

Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B
with the same sort schema but different body cols (Utf8 vs
Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32
A-only, Int64 B-only, common Float64). The merge produces the
union schema; per-row rendering via `render_cell` matches across
flavour boundaries; List cells from B render as nulls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt

Same nightly-rustfmt drift as the storekey commit on #6424
(local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok`
binding and the `merge_parquet_split_metadata` call now fit
single-line under the newer width heuristics. No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(merge-executor): route promotion merges through execute_merge_operation

Codex P1 on PR #6423: the executor unconditionally called the
in-memory `merge_sorted_parquet_files` path, which routes through
`extract_and_validate_input_metadata` and bails on mixed
`qh.rg_partition_prefix_len` before any output is produced. So a
real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with
`target_prefix_len_override = Some(1)` — failed before reaching the
downstream `mixed_prefix_ok` plumbing in
`merge_parquet_split_metadata`. The escape hatch existed but was
unreachable for actual promotion inputs.

Fix: branch in the executor's handle on
`target_prefix_len_override.is_some()`. Promotion merges go through
the engine's streaming entry point
`quickwit_parquet_engine::merge::execute_merge_operation`, which
opens each below-target input via `LegacyInputAdapter` and each
at-target input directly. The streaming merge then sees a
homogeneous stream advertising `prefix_len = target` on every
input. Regular (non-promotion) merges keep the in-memory path.

`execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>`
parallel to `op.splits` — the engine deliberately doesn't depend
on `quickwit-storage` (would invert layering and pull cloud SDKs
into a pure parquet library). So this commit adds
`LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by
`tokio::fs::File`, one instance per downloaded split, each bound
to its scratch-directory path. The `path: &Path` argument on the
trait surface is ignored — the downloader has already resolved
each split to a concrete local file before the executor runs.

Coverage:
- Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end`
  already exercises `execute_merge_operation` with a
  `prefix_len = 0` + `prefix_len = 1` pair, verifying the output
  advertises `prefix_len = 1` and passes PA-1 + PA-3 on the
  composite key. That's now the same code path the in-tree
  executor takes.
- Module doc on the executor rewritten to spell out which path runs
  when.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track legacy promotion planner gap as GAP-011

The streaming Parquet merge stack landing in #6424#6428 ships the
full legacy-promotion *mechanism* (engine + adapter + executor
wiring) but not the planner-level *trigger*. In production today,
`MergePolicyState::record_split` buckets by
`CompactionScope::from_split` which includes
`rg_partition_prefix_len`, so legacy (prefix=0) and aligned
(prefix>0) splits are separated before `ParquetMergePolicy::operations`
runs. The policy only emits `ParquetMergeOperation::new`; a
repo-wide search finds `promote_legacy` only in tests. Legacy
splits therefore never migrate without an explicit trigger.

Tracking this as GAP-011 so we pick it up at the right time. The
gap doc walks three resolution options (merge buckets in the scope
key, dedicated promotion pass, or hybrid prefer-multi-input-promotion)
and the cost trade-offs between them, so the eventual implementation
PR has a starting point.

Raised by Codex review comment id 4311184497 on PR #6423.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track download-vs-streaming merge executor gap as GAP-012

The Parquet streaming merge engine is built around `RemoteByteSource`
and was designed to pull pages directly from object storage — two
GETs per input, overlap fetch with merge, no scratch disk. The
production actor pipeline doesn't take that path: a downloader actor
materializes every input on local disk first, and the executor wraps
the local files in a `LocalFileByteSource` to feed `execute_merge_operation`
(or just calls the in-memory `merge_sorted_parquet_files` path). The
streaming engine's central design benefit is unused.

This isn't a correctness bug — both paths give the same result. It's
a perf/architecture gap: every merge pays 2× I/O per input
(network → scratch + scratch → merger), serializes phases
(`max(input download time)` first-byte latency), and consumes scratch
disk that scales with concurrent merges.

Tracking as GAP-012 so we pick it up at the right time. The gap doc
walks four options (stream-directly with download fallback, stream-
by-default with circuit breaker, eliminate in-memory path only,
stream-directly for promotion merges only) and the trade-offs between
them — including the mid-merge retry surface, which is the main
reason download-first is the current default.

Surfaced during PR #6423 code walkthrough.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
g-talbot added a commit that referenced this pull request May 21, 2026
…gion engine fix (#6428)

* feat(merge): close F4/F5/F7/F14 from the adversarial review

Three test additions plus one engine fix surfaced by the F4 tests.

The existing MS-7 test proved the per-input page-cache bound for one
input and one region. F4 extends the coverage:

- `test_ms7_per_input_bound_across_num_inputs` sweeps `num_inputs ∈ {1, 3, 8}` × `rows_per_input ∈
  {3 000, 30 000}` and asserts the per-input peak stays bounded. Cross-axis growth check: going
  from 1 input to 8 must not push the peak up.
- `test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows` runs the prefix_len=0
  multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input
  row count. **This test surfaced F14 (below) — without the engine fix, the sub-region path's peak
  grew ~9× when rows grew 10×.**

Tests serialize via `ms7_serial_lock` because
`PEAK_BODY_COL_PAGE_CACHE_LEN` is process-global; concurrent tests
would pollute each other's readings.

Parquet streams emit pages in column-major order (all of col 0,
then all of col 1, ...). The old sub-region-outer / col-inner
ordering meant that while processing sub-region 0's col K, the
stream emitted cols 0..K-1's remaining pages first to reach col K —
those skipped pages got cached under their own col_idx for later
sub-regions to consume, and the cache scaled with input row count.

Fix: new `process_split_region_col_outer` function for the
`needs_split` path. Cols iterate in the outer loop, sub-regions in
the inner. Each parquet col chunk is fully consumed from the stream
across all sub-regions before col K+1 starts. Cache for col K is
empty before col K+1's pages arrive.

Mechanics: pre-determine writer assignments for the region's
sub-regions (a top-level region's sub-regions may span multiple
output writers; consecutive sub-regions on the same writer get
coalesced into one combined Region so each writer holds one RG
concurrently — RGs on the same writer are sequential, so coalescing
keeps the parquet writer's single-active-RG constraint intact).
Single-region path stays on the existing `process_region`.

`prop_merge_prefix_aligned_streaming` sweeps `(num_inputs ∈ 1..=3,
per-input RG specs, num_outputs ∈ 1..=3)` with prefix_len=1 and
asserts MC-1 (rows preserved), MC-3 (sorted_series monotone within
each output), MS-3 (num_row_groups matches footer), PA-1+PA-3
(`assert_unique_rg_prefix_keys`), and CS-1 (metastore prefix_len ==
KV) on every generated case. 32 cases capped to keep runtime under
a second.

Fixture: `make_prefix_len_one_input` writes one RG per
`(metric_name, rows)` entry by calling `writer.flush()` between
batches. `sorted_series` encodes
`metric_base + row_offset_within_metric`, mirroring production's
storekey property that different metric_names produce
non-overlapping `sorted_series` byte ranges.

Plus a focused unit test `test_f5_single_input_two_metrics_minimal`
that pins one specific case for fast iteration.

`test_f7_production_shape_multi_input_multi_rg_multi_output`: 5
inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1.
Asserts the full invariant bundle (MC-1, MS-3, PA-1+PA-3, MS-5
cross-output sorted_series monotonicity, CS-1) — the corner the
adversarial review flagged as "untested production case".

MS-5 is "across adjacent outputs, sorted_series is monotone
non-decreasing." A single metric CAN span outputs (the engine
splits at sorted_series transitions inside an overflowing region),
so the cross-output invariant is sorted_series monotonicity, not
"each metric in one output."

- `cargo test -p quickwit-parquet-engine --lib` — 498 unit tests pass.
- `cargo clippy -p quickwit-parquet-engine --tests --all-features` with `-Dwarnings`.
- `cargo doc --no-deps -p quickwit-parquet-engine` warning-free.
- `cargo fmt --all -- --check` (nightly via PATH override).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(streaming): drop stray blank line before tests section header

Newer nightly rustfmt (2026-05-17) flags the extra blank line that
crept into the test module between the F4 fixture helper and the
"Heterogeneous-output regressions" section header. Single-line
gap is what nightly fmt wants.

No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): roll over chunk-assignment before first chunk after split

Codex P1 on PR #6428: the previous "Recompute split budget after
rolling over" fix (commit 56e773f, #6424) handled the split
*decision* but not the split *assignment*. When the previous region
fills the current output exactly and the next region enters the
`needs_split` path, the chunk-assignment loop in
`process_split_region_col_outer`'s setup initializes from the stale
`current_output_idx` / `current_output_rows`. Its inner
`needs_new_writer` check guards on `!chunk_assignments.is_empty()`,
so the first iteration cannot roll over: the first sub-region is
appended to the already-full output and only the second one
advances. Output K ends up at 2× target while subsequent outputs
are short or empty.

Fix: initialize `active_output_idx` / `active_rows` from the
`will_roll_over` case before the loop. The inner `needs_new_writer`
check then works for both the first and subsequent iterations (on
the first iteration `active_rows = 0 < target` so it correctly
doesn't re-roll). The `can_reuse_current` check in the writer-
materialization loop already handles "first chunk's output_idx
doesn't match current_writer" by finalizing the current output
(which is correct: it's full, close it) and opening a fresh writer
at the next index.

Regression test
`test_split_chunk_assignment_rolls_over_before_first_chunk`:
prefix_len=1, two metrics of 200 + 400 rows = 600 total,
`num_outputs = 3` → `target_per_output = 200`. Region A fills
output 0 exactly; region B needs splitting. Pre-fix the merge
produced 2 outputs of 400 + 200 (output 0 overfilled, output 2
empty); post-fix it produces 3 outputs of ~200 rows each.

502 lib tests pass (+1); workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants