Skip to content

Commit b3d4971

Browse files
g-talbotclaude
andauthored
feat(merge): legacy promotion path + body-col schema evolution (#6423)
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(merge): legacy-prefix promotion path + schema-evolution body cols Two adversarial-review follow-ups grouped because they share the streaming engine's input-routing and union-schema seams. ## (b) Legacy-prefix promotion A new operation type pairs a prefix_len=0 split with prefix_len>0 peers in one merge, so legacy splits can be folded into prefix- aligned buckets instead of aging out via retention. Adds: - `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality unchanged. - `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion target; `None` is the default regular-merge form. - `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality check in promotion mode. The output prefix_len still comes from the writer's KV stamp via `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1). - `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam PR-7 will wire from above. Tests: - `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`, `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`. - `test_mixed_prefix_ok_skips_input_equality_check`. - `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1. - `test_executor_mismatched_sources_count_bails`. ## F6 + F13: Schema evolution for body columns The merger now supports MC-4 across heterogeneous body-col schemas: - F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array flavour merge cleanly; before this they hit a "type conflict" at alignment time. - F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the union to be nullable when a List col is present in only some inputs; before this the writer rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 = element present) for nullable outer; non-nullable path unchanged. Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B with the same sort schema but different body cols (Utf8 vs Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32 A-only, Int64 B-only, common Float64). The merge produces the union schema; per-row rendering via `render_cell` matches across flavour boundaries; List cells from B render as nulls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt Same nightly-rustfmt drift as the storekey commit on #6424 (local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok` binding and the `merge_parquet_split_metadata` call now fit single-line under the newer width heuristics. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(merge-executor): route promotion merges through execute_merge_operation Codex P1 on PR #6423: the executor unconditionally called the in-memory `merge_sorted_parquet_files` path, which routes through `extract_and_validate_input_metadata` and bails on mixed `qh.rg_partition_prefix_len` before any output is produced. So a real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with `target_prefix_len_override = Some(1)` — failed before reaching the downstream `mixed_prefix_ok` plumbing in `merge_parquet_split_metadata`. The escape hatch existed but was unreachable for actual promotion inputs. Fix: branch in the executor's handle on `target_prefix_len_override.is_some()`. Promotion merges go through the engine's streaming entry point `quickwit_parquet_engine::merge::execute_merge_operation`, which opens each below-target input via `LegacyInputAdapter` and each at-target input directly. The streaming merge then sees a homogeneous stream advertising `prefix_len = target` on every input. Regular (non-promotion) merges keep the in-memory path. `execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>` parallel to `op.splits` — the engine deliberately doesn't depend on `quickwit-storage` (would invert layering and pull cloud SDKs into a pure parquet library). So this commit adds `LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by `tokio::fs::File`, one instance per downloaded split, each bound to its scratch-directory path. The `path: &Path` argument on the trait surface is ignored — the downloader has already resolved each split to a concrete local file before the executor runs. Coverage: - Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end` already exercises `execute_merge_operation` with a `prefix_len = 0` + `prefix_len = 1` pair, verifying the output advertises `prefix_len = 1` and passes PA-1 + PA-3 on the composite key. That's now the same code path the in-tree executor takes. - Module doc on the executor rewritten to spell out which path runs when. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track legacy promotion planner gap as GAP-011 The streaming Parquet merge stack landing in #6424#6428 ships the full legacy-promotion *mechanism* (engine + adapter + executor wiring) but not the planner-level *trigger*. In production today, `MergePolicyState::record_split` buckets by `CompactionScope::from_split` which includes `rg_partition_prefix_len`, so legacy (prefix=0) and aligned (prefix>0) splits are separated before `ParquetMergePolicy::operations` runs. The policy only emits `ParquetMergeOperation::new`; a repo-wide search finds `promote_legacy` only in tests. Legacy splits therefore never migrate without an explicit trigger. Tracking this as GAP-011 so we pick it up at the right time. The gap doc walks three resolution options (merge buckets in the scope key, dedicated promotion pass, or hybrid prefer-multi-input-promotion) and the cost trade-offs between them, so the eventual implementation PR has a starting point. Raised by Codex review comment id 4311184497 on PR #6423. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track download-vs-streaming merge executor gap as GAP-012 The Parquet streaming merge engine is built around `RemoteByteSource` and was designed to pull pages directly from object storage — two GETs per input, overlap fetch with merge, no scratch disk. The production actor pipeline doesn't take that path: a downloader actor materializes every input on local disk first, and the executor wraps the local files in a `LocalFileByteSource` to feed `execute_merge_operation` (or just calls the in-memory `merge_sorted_parquet_files` path). The streaming engine's central design benefit is unused. This isn't a correctness bug — both paths give the same result. It's a perf/architecture gap: every merge pays 2× I/O per input (network → scratch + scratch → merger), serializes phases (`max(input download time)` first-byte latency), and consumes scratch disk that scales with concurrent merges. Tracking as GAP-012 so we pick it up at the right time. The gap doc walks four options (stream-directly with download fallback, stream- by-default with circuit breaker, eliminate in-memory path only, stream-directly for promotion merges only) and the trade-offs between them — including the mid-merge retry surface, which is the main reason download-first is the current default. Surfaced during PR #6423 code walkthrough. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 71e5c6a commit b3d4971

10 files changed

Lines changed: 1344 additions & 128 deletions

File tree

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# GAP-011: No Planner-Level Legacy Promotion
2+
3+
**Status**: Open
4+
**Discovered**: 2026-05-18
5+
**Context**: Codex review on PR #6423 (`feat(merge): legacy promotion path + body-col schema evolution`) flagged that the promotion path is wired end-to-end at the library + executor layer but has no production trigger at the planner / policy level.
6+
7+
## Problem
8+
9+
The streaming Parquet merge stack now contains a complete *legacy promotion* pipeline:
10+
11+
- `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)` constructs an operation with
12+
`target_prefix_len_override = Some(target)`.
13+
- `merge::execute_merge_operation` routes each input through `LegacyInputAdapter` when its
14+
declared `rg_partition_prefix_len < target` and through `StreamingParquetReader` otherwise. The
15+
streaming engine then sees a homogeneous stream advertising `prefix_len = target` on every
16+
input.
17+
- `ParquetMergeExecutor` (in `quickwit-indexing`) detects `target_prefix_len_override.is_some()`
18+
and routes those merges through `execute_merge_operation` (with `LocalFileByteSource`) instead
19+
of the in-memory `merge_sorted_parquet_files` path.
20+
- `merge_parquet_split_metadata` accepts a `mixed_prefix_ok: bool` flag so the post-merge
21+
aggregator skips the input-side equality check.
22+
23+
What's missing: **nothing in the planner ever creates a `promote_legacy` operation in
24+
production**. `MergePolicyState::record_split` buckets each split by
25+
`CompactionScope::from_split`, and that scope key includes `rg_partition_prefix_len`. Legacy
26+
splits (`prefix_len = 0`) and aligned splits (`prefix_len > 0`) therefore land in *different*
27+
buckets before `ParquetMergePolicy::operations` ever runs. The production policy then iterates
28+
each bucket independently and emits `ParquetMergeOperation::new` (regular merge). A repo-wide
29+
search finds `promote_legacy` only in tests.
30+
31+
In a mixed deployment (legacy + aligned splits coexisting), legacy splits therefore stay in
32+
their `prefix_len = 0` bucket forever — never gaining the prefix alignment that downstream
33+
locality compaction depends on. The promotion plumbing is reachable only from tests.
34+
35+
## Evidence
36+
37+
- `quickwit-parquet-engine/src/merge/policy/mod.rs`: `ParquetMergePolicy::operations` calls
38+
`ParquetMergeOperation::new(...)` only. `promote_legacy` is constructed only by tests in the
39+
same file.
40+
- `MergePolicyState::record_split` keys its `BTreeMap` by `CompactionScope::from_split`. The
41+
scope derivation includes `rg_partition_prefix_len`, so a legacy split and a prefix-aligned
42+
split with otherwise identical sort fields / window / merge level are never compared by the
43+
policy.
44+
- The executor branch added in PR #6423 (`scratch.merge_operation.target_prefix_len_override
45+
.is_some()`) routes promotion through `execute_merge_operation`. Library coverage at
46+
`test_promote_legacy_executor_end_to_end` exercises a `prefix_len = 0` + `prefix_len = 1` pair
47+
successfully. But that operation is only ever constructed inside the test.
48+
49+
## State of the Art
50+
51+
- **Iceberg**: Compaction policies inspect file-level metadata (partitioning, sort order) and
52+
can rewrite files to align with the latest table partitioning even when individual files
53+
pre-date the change. The compaction service treats schema-evolution-style rewrites as
54+
first-class operations.
55+
- **Husky**: Background re-organization passes that promote files into newer storage layouts.
56+
Tracked separately from the size-tiered compaction policy so cost trade-offs can be tuned.
57+
58+
In both cases, the design separates the *trigger* (decision to promote) from the *mechanism*
59+
(how the promotion is performed). Quickwit currently has the mechanism but not the trigger.
60+
61+
## Potential Solutions
62+
63+
### Option A: Merge legacy + aligned buckets in `CompactionScope::from_split`
64+
65+
Drop `rg_partition_prefix_len` from the scope key (or normalize it to a target value before
66+
bucketing). The policy then sees legacy and aligned splits as candidates for the same
67+
compaction operation and `ParquetMergePolicy::operations` decides whether to emit a regular
68+
merge or a `promote_legacy` operation based on whether the bucket contains mixed prefix
69+
lengths.
70+
71+
Simplest change, but requires the policy to detect mixed-prefix buckets and choose between
72+
`new` and `promote_legacy` per operation.
73+
74+
### Option B: Dedicated promotion pass
75+
76+
Run a separate pass before the regular compaction policy that scans for legacy splits and emits
77+
`promote_legacy` operations for them. The regular policy then sees only aligned splits.
78+
79+
Cleaner separation of concerns, but means legacy splits are migrated *before* any opportunity
80+
to coalesce them with aligned neighbors in a single multi-input merge — possibly more work
81+
overall.
82+
83+
### Option C: Hybrid — bucket together, prefer single-pass promotion
84+
85+
Keep scope bucketing as in option A. Inside the policy, when a bucket contains mixed prefix
86+
lengths AND has enough splits to merit a multi-input merge, emit `promote_legacy`. When only
87+
legacy splits exist (no aligned neighbor), emit `promote_legacy` with the same target — single-
88+
input promotion is still valuable because it converts the file to the new format for future
89+
locality compaction.
90+
91+
Most flexible; gives the policy the freedom to amortize promotion cost when there are aligned
92+
neighbors AND to still promote isolated legacy splits in the background.
93+
94+
## Signal Impact
95+
96+
Primarily affects **metrics** in the near term: the legacy split format pre-dates the
97+
prefix-aligned RG layout, and only metrics has both formats in flight today. Traces and logs
98+
on the Parquet path will eventually reach the same state if a layout change ever happens; the
99+
same planner machinery would cover them.
100+
101+
## Cost Considerations
102+
103+
Promotion is strictly more expensive than a regular merge: the legacy adapter buffers the full
104+
input file in memory and re-encodes it as a single-RG stream before the merge engine sees it.
105+
For 50 MB metrics splits this is acceptable; for larger inputs the in-memory buffer is the
106+
gating cost.
107+
108+
The planner should account for this when scheduling — promotion is best amortized into a
109+
multi-input merge rather than performed as a standalone file rewrite. Option C's "prefer
110+
multi-input promotion, fall back to single-input" structure captures this.
111+
112+
## Impact
113+
114+
- **Severity**: Medium. Legacy splits accumulate cost (every query against them pays the
115+
prefix-less scan cost) but correctness is preserved — the locality compaction stack still
116+
works on aligned splits.
117+
- **Frequency**: Persistent. Legacy splits never migrate without an explicit trigger.
118+
- **Affected Areas**: `quickwit-parquet-engine/src/merge/policy/`, `quickwit-parquet-engine/src/merge/mod.rs` (`MergePolicyState::record_split` + `CompactionScope`).
119+
120+
## Next Steps
121+
122+
- [ ] Decide between options A / B / C based on operational priorities and benchmark data.
123+
- [ ] Design the policy-level "should promote?" heuristic: how many legacy splits before
124+
triggering, whether to wait for aligned neighbors, how to deprioritize promotion vs
125+
regular compaction.
126+
- [ ] Add metrics for `legacy_splits_pending_promotion` and `promotion_operations_emitted` so we
127+
can observe the policy in production.
128+
- [ ] Wire whichever option is chosen, with an integration test that exercises the full path
129+
(legacy split → planner → executor → published prefix-aligned split).
130+
131+
## References
132+
133+
- PR #6423 (legacy promotion path + body-col schema evolution).
134+
- Codex review comment id `4311184497` (raised the gap).
135+
- `test_promote_legacy_executor_end_to_end` in `quickwit-parquet-engine::merge::streaming`
136+
library-level coverage of the mechanism.
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
# GAP-012: Parquet Merge Executor Downloads Inputs Instead of Streaming Them
2+
3+
**Status**: Open
4+
**Discovered**: 2026-05-18
5+
**Context**: Code review of the Parquet streaming merge stack (PRs #6407#6428) — specifically the executor wiring on #6423 — surfaced the question of why the merge actor downloads every input to local disk before merging when the streaming engine is designed around `RemoteByteSource`.
6+
7+
## Problem
8+
9+
The Parquet streaming merge engine in `quickwit-parquet-engine` consumes inputs through a
10+
minimal `RemoteByteSource` trait (`file_size`, `get_slice`, `get_slice_stream`). The trait was
11+
deliberately defined so the engine can pull pages column-major directly from object storage —
12+
two GETs per input (footer + body stream) and the merge progresses as bytes arrive, holding
13+
only the page-bounded engine state in memory.
14+
15+
The actor pipeline in `quickwit-indexing` doesn't use that design. The
16+
`ParquetMergeSplitDownloader` actor pulls each input via `storage.copy_to_file(remote_path,
17+
local_path)` into a scratch directory, then hands `Vec<PathBuf>` to the
18+
`ParquetMergeExecutor`. The executor then either:
19+
20+
- Calls the in-memory `merge_sorted_parquet_files(input_paths, ...)` (regular merges), which
21+
reads each file fully into Arrow RecordBatches before merging, OR
22+
- Wraps each local path in a `LocalFileByteSource` and calls `execute_merge_operation` (added in
23+
PR #6423 for promotion merges only).
24+
25+
Either way, the streaming engine's central design benefit — overlapping the fetch with the
26+
merge and skipping the scratch disk entirely — is unused in production. Every merge reads each
27+
input twice: once over the network into scratch, once off scratch through the merger.
28+
29+
## Evidence
30+
31+
- `quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_split_downloader.rs`: per-split
32+
loop calling `self.storage.copy_to_file(...)` to materialize every input on local disk before
33+
forwarding `ParquetMergeScratch` to the executor.
34+
- `quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs`: receives
35+
`downloaded_parquet_files: Vec<PathBuf>` and chooses between the in-memory path or
36+
`execute_merge_operation` with `LocalFileByteSource` wrappers — never a `RemoteByteSource`
37+
that actually streams from object storage.
38+
- `quickwit-parquet-engine/src/storage/streaming_reader.rs:62-67`: the `RemoteByteSource` trait
39+
doc explicitly notes that callers in `quickwit-indexing` "provide a thin adapter that
40+
delegates to `quickwit_storage::Storage`." The adapter exists in principle but isn't wired up
41+
for the merge executor.
42+
43+
## State of the Art
44+
45+
- **ClickHouse `MergeTree`**: parts are accessed via the same storage abstraction whether the
46+
merge runs locally or against tiered/object storage. There's no separate "download then
47+
merge" actor pair — the merger reads parts where they live.
48+
- **Iceberg compaction**: data files are read directly from object storage by the compaction
49+
job. Local scratch is used only for the output file before commit.
50+
- **Husky**: column-major streaming merge reads directly from blob storage. Designed around the
51+
"two GETs per input" model the Quickwit streaming engine inherits.
52+
53+
Across these systems, downloading inputs before merging is treated as a fallback for
54+
operational reasons (unreliable network, kernel page-cache effects), not the default.
55+
56+
## Trade-offs
57+
58+
### Why download-first is the current default
59+
- **Retry locality**: the downloader actor centralizes retry/backoff/timeout for one file at a
60+
time. A mid-fetch S3 hiccup retries the download alone; the merger sees only successful
61+
downloads.
62+
- **Pure-compute executor**: once files are on disk the executor has no network dependency.
63+
Mid-merge failures are restricted to disk I/O and compute errors.
64+
- **Predictable disk budget**: scratch usage is bounded by `Σ input_sizes` per concurrent
65+
merge. Easy to reason about; easy to cap.
66+
- **Legacy in-memory path**: `merge_sorted_parquet_files` predates the streaming engine and
67+
requires local file paths. The download-first pattern existed before there was a streaming
68+
alternative.
69+
70+
### What download-first costs
71+
- **2× I/O per merge**: each input is transferred over the network into scratch AND read off
72+
scratch into the merger. The kernel page cache mitigates the disk-read pass to some extent but
73+
doesn't fully erase it.
74+
- **Serialized phases**: the merge can't start until *all* inputs are downloaded. First-byte
75+
latency on the merger is `max(input download time)` instead of `min(input first-byte time)`.
76+
- **Scratch disk usage**: a typical compaction merging 8× 50 MB splits holds 400 MB of scratch
77+
per merge, multiplied by the concurrent merge count. On lightweight indexer pods this caps
78+
parallelism.
79+
- **Underused design**: the streaming engine's single-body-GET model + page-bounded memory was
80+
built specifically for the no-scratch-disk case. Wiring through `LocalFileByteSource` works
81+
but bypasses the property the design was built around.
82+
83+
### What streaming-directly would cost
84+
- **Mid-merge retry surface**: a connection failure mid-body-GET kills the merge attempt
85+
entirely. Single-body-GET is forward-only — no partial recovery. The retry surface becomes
86+
"the merge failed after 30 % of work," not "the download failed, retry the file."
87+
- **Per-merge S3 connection count**: an N-way merge holds N concurrent body streams plus N
88+
footer connections. On dense merger nodes this multiplies.
89+
- **Tail latency**: the merge progresses at the speed of the slowest input. With downloads,
90+
parallel fetches average out; with streaming a slow input throttles the whole merge.
91+
92+
## Potential Solutions
93+
94+
### Option A: Streaming-directly when the input is reachable, download as fallback
95+
96+
The executor receives a hint from the storage layer (or detects mid-merge failure rates) and
97+
chooses per merge. Splits stored on reliable, low-latency backends go through `RemoteByteSource`
98+
adapters that talk directly to `quickwit_storage::Storage`; on flaky or high-latency backends
99+
the downloader actor still materializes files first.
100+
101+
Largest design lift but matches what mature compaction systems do.
102+
103+
### Option B: Stream-directly by default, fall back to download on persistent failures
104+
105+
Default to streaming; a circuit-breaker on per-merge failure rate routes the next attempt
106+
through download-first. Operationally simpler than Option A; tail latency is bounded by the
107+
circuit's reaction time.
108+
109+
### Option C: Keep download-first but eliminate the in-memory merge path
110+
111+
Make every merge go through `execute_merge_operation` with `LocalFileByteSource`. This doesn't
112+
recover the streaming engine's "no scratch disk" benefit but does remove the legacy in-memory
113+
codepath, simplifying the executor to a single path.
114+
115+
Smallest change, smallest gain. Worth doing regardless of A/B as a stepping stone.
116+
117+
### Option D: Streaming-directly only for promotion merges
118+
119+
Promotion already routes through `execute_merge_operation`; extend it to skip the download
120+
phase entirely for those operations and let the regular path stay as-is. Gains: legacy-adapter-
121+
backed promotion merges (the in-memory-buffering-heaviest case in the pipeline) avoid double
122+
I/O. Costs: split executor logic into "promotion = stream" vs "regular = download."
123+
124+
## Signal Impact
125+
126+
All Parquet-backed signals. Metrics is the first product to ship, so the impact lands on
127+
metrics first; traces and logs (when they migrate to Parquet storage) will pay the same cost
128+
unless this is addressed by then.
129+
130+
## Cost Considerations
131+
132+
The streaming engine's body-col page cache is already designed for backpressure: pages stream
133+
in column-major order as bytes arrive, and the engine processes them as quickly as it can. The
134+
bottleneck for streaming-directly becomes the slowest input's transfer rate rather than the
135+
total input size — usually a smaller number, but a longer tail.
136+
137+
## Impact
138+
139+
- **Severity**: Medium. Correctness is unaffected; the streaming engine works equivalently
140+
whether the source is local or remote. The cost is bandwidth, disk, and wall-clock latency.
141+
- **Frequency**: Every merge in production today pays the download cost.
142+
- **Affected Areas**: `quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_split_downloader.rs`,
143+
`quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs`,
144+
`quickwit-parquet-engine::merge::execute_merge_operation` callers.
145+
146+
## Next Steps
147+
148+
- [ ] Measure the current download-vs-merge phase breakdown on a representative production
149+
merge load (wall-clock + bytes-read + disk-write).
150+
- [ ] Build a `RemoteByteSource` adapter over `quickwit_storage::Storage` and prototype
151+
streaming-directly for promotion merges (Option D) to validate the engine's behavior
152+
against the existing storage backends.
153+
- [ ] Decide between options A / B based on observed mid-merge failure rates under real S3
154+
conditions.
155+
- [ ] Even if the default stays download-first, consider Option C as a simplification — the
156+
in-memory merge path is dead weight once the streaming engine handles every case.
157+
158+
## References
159+
160+
- PR #6407#6428 (Parquet streaming merge stack).
161+
- [PR #6423 discussion](https://github.com/quickwit-oss/quickwit/pull/6423) — surfaced the
162+
question while wiring promotion through `execute_merge_operation`.
163+
- `quickwit-parquet-engine/src/storage/streaming_reader.rs` (`RemoteByteSource` trait).
164+
- `quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs::LocalFileByteSource`.

docs/internals/adr/gaps/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,5 @@ Gap files use sequential numbering: `001-short-description.md`
115115
| [008](./008-no-high-query-rate-optimization.md) | No High Query Rate Optimization | Open | High |
116116
| [009](./009-no-leading-edge-prioritization.md) | No Leading Edge Prioritization | Open | High |
117117
| [010](./010-no-data-caching-or-query-affinity.md) | No Multi-Level Data Caching or Query Affinity Optimization | Open | High |
118+
| [011](./011-no-legacy-promotion-planner.md) | No Planner-Level Legacy Promotion | Open | Medium |
119+
| [012](./012-merge-downloads-instead-of-streaming.md) | Parquet Merge Executor Downloads Inputs Instead of Streaming Them | Open | Medium |

0 commit comments

Comments
 (0)