|
| 1 | +# GAP-012: Parquet Merge Executor Downloads Inputs Instead of Streaming Them |
| 2 | + |
| 3 | +**Status**: Open |
| 4 | +**Discovered**: 2026-05-18 |
| 5 | +**Context**: Code review of the Parquet streaming merge stack (PRs #6407–#6428) — specifically the executor wiring on #6423 — surfaced the question of why the merge actor downloads every input to local disk before merging when the streaming engine is designed around `RemoteByteSource`. |
| 6 | + |
| 7 | +## Problem |
| 8 | + |
| 9 | +The Parquet streaming merge engine in `quickwit-parquet-engine` consumes inputs through a |
| 10 | +minimal `RemoteByteSource` trait (`file_size`, `get_slice`, `get_slice_stream`). The trait was |
| 11 | +deliberately defined so the engine can pull pages column-major directly from object storage — |
| 12 | +two GETs per input (footer + body stream) and the merge progresses as bytes arrive, holding |
| 13 | +only the page-bounded engine state in memory. |
| 14 | + |
| 15 | +The actor pipeline in `quickwit-indexing` doesn't use that design. The |
| 16 | +`ParquetMergeSplitDownloader` actor pulls each input via `storage.copy_to_file(remote_path, |
| 17 | +local_path)` into a scratch directory, then hands `Vec<PathBuf>` to the |
| 18 | +`ParquetMergeExecutor`. The executor then either: |
| 19 | + |
| 20 | +- Calls the in-memory `merge_sorted_parquet_files(input_paths, ...)` (regular merges), which |
| 21 | + reads each file fully into Arrow RecordBatches before merging, OR |
| 22 | +- Wraps each local path in a `LocalFileByteSource` and calls `execute_merge_operation` (added in |
| 23 | + PR #6423 for promotion merges only). |
| 24 | + |
| 25 | +Either way, the streaming engine's central design benefit — overlapping the fetch with the |
| 26 | +merge and skipping the scratch disk entirely — is unused in production. Every merge reads each |
| 27 | +input twice: once over the network into scratch, once off scratch through the merger. |
| 28 | + |
| 29 | +## Evidence |
| 30 | + |
| 31 | +- `quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_split_downloader.rs`: per-split |
| 32 | + loop calling `self.storage.copy_to_file(...)` to materialize every input on local disk before |
| 33 | + forwarding `ParquetMergeScratch` to the executor. |
| 34 | +- `quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs`: receives |
| 35 | + `downloaded_parquet_files: Vec<PathBuf>` and chooses between the in-memory path or |
| 36 | + `execute_merge_operation` with `LocalFileByteSource` wrappers — never a `RemoteByteSource` |
| 37 | + that actually streams from object storage. |
| 38 | +- `quickwit-parquet-engine/src/storage/streaming_reader.rs:62-67`: the `RemoteByteSource` trait |
| 39 | + doc explicitly notes that callers in `quickwit-indexing` "provide a thin adapter that |
| 40 | + delegates to `quickwit_storage::Storage`." The adapter exists in principle but isn't wired up |
| 41 | + for the merge executor. |
| 42 | + |
| 43 | +## State of the Art |
| 44 | + |
| 45 | +- **ClickHouse `MergeTree`**: parts are accessed via the same storage abstraction whether the |
| 46 | + merge runs locally or against tiered/object storage. There's no separate "download then |
| 47 | + merge" actor pair — the merger reads parts where they live. |
| 48 | +- **Iceberg compaction**: data files are read directly from object storage by the compaction |
| 49 | + job. Local scratch is used only for the output file before commit. |
| 50 | +- **Husky**: column-major streaming merge reads directly from blob storage. Designed around the |
| 51 | + "two GETs per input" model the Quickwit streaming engine inherits. |
| 52 | + |
| 53 | +Across these systems, downloading inputs before merging is treated as a fallback for |
| 54 | +operational reasons (unreliable network, kernel page-cache effects), not the default. |
| 55 | + |
| 56 | +## Trade-offs |
| 57 | + |
| 58 | +### Why download-first is the current default |
| 59 | +- **Retry locality**: the downloader actor centralizes retry/backoff/timeout for one file at a |
| 60 | + time. A mid-fetch S3 hiccup retries the download alone; the merger sees only successful |
| 61 | + downloads. |
| 62 | +- **Pure-compute executor**: once files are on disk the executor has no network dependency. |
| 63 | + Mid-merge failures are restricted to disk I/O and compute errors. |
| 64 | +- **Predictable disk budget**: scratch usage is bounded by `Σ input_sizes` per concurrent |
| 65 | + merge. Easy to reason about; easy to cap. |
| 66 | +- **Legacy in-memory path**: `merge_sorted_parquet_files` predates the streaming engine and |
| 67 | + requires local file paths. The download-first pattern existed before there was a streaming |
| 68 | + alternative. |
| 69 | + |
| 70 | +### What download-first costs |
| 71 | +- **2× I/O per merge**: each input is transferred over the network into scratch AND read off |
| 72 | + scratch into the merger. The kernel page cache mitigates the disk-read pass to some extent but |
| 73 | + doesn't fully erase it. |
| 74 | +- **Serialized phases**: the merge can't start until *all* inputs are downloaded. First-byte |
| 75 | + latency on the merger is `max(input download time)` instead of `min(input first-byte time)`. |
| 76 | +- **Scratch disk usage**: a typical compaction merging 8× 50 MB splits holds 400 MB of scratch |
| 77 | + per merge, multiplied by the concurrent merge count. On lightweight indexer pods this caps |
| 78 | + parallelism. |
| 79 | +- **Underused design**: the streaming engine's single-body-GET model + page-bounded memory was |
| 80 | + built specifically for the no-scratch-disk case. Wiring through `LocalFileByteSource` works |
| 81 | + but bypasses the property the design was built around. |
| 82 | + |
| 83 | +### What streaming-directly would cost |
| 84 | +- **Mid-merge retry surface**: a connection failure mid-body-GET kills the merge attempt |
| 85 | + entirely. Single-body-GET is forward-only — no partial recovery. The retry surface becomes |
| 86 | + "the merge failed after 30 % of work," not "the download failed, retry the file." |
| 87 | +- **Per-merge S3 connection count**: an N-way merge holds N concurrent body streams plus N |
| 88 | + footer connections. On dense merger nodes this multiplies. |
| 89 | +- **Tail latency**: the merge progresses at the speed of the slowest input. With downloads, |
| 90 | + parallel fetches average out; with streaming a slow input throttles the whole merge. |
| 91 | + |
| 92 | +## Potential Solutions |
| 93 | + |
| 94 | +### Option A: Streaming-directly when the input is reachable, download as fallback |
| 95 | + |
| 96 | +The executor receives a hint from the storage layer (or detects mid-merge failure rates) and |
| 97 | +chooses per merge. Splits stored on reliable, low-latency backends go through `RemoteByteSource` |
| 98 | +adapters that talk directly to `quickwit_storage::Storage`; on flaky or high-latency backends |
| 99 | +the downloader actor still materializes files first. |
| 100 | + |
| 101 | +Largest design lift but matches what mature compaction systems do. |
| 102 | + |
| 103 | +### Option B: Stream-directly by default, fall back to download on persistent failures |
| 104 | + |
| 105 | +Default to streaming; a circuit-breaker on per-merge failure rate routes the next attempt |
| 106 | +through download-first. Operationally simpler than Option A; tail latency is bounded by the |
| 107 | +circuit's reaction time. |
| 108 | + |
| 109 | +### Option C: Keep download-first but eliminate the in-memory merge path |
| 110 | + |
| 111 | +Make every merge go through `execute_merge_operation` with `LocalFileByteSource`. This doesn't |
| 112 | +recover the streaming engine's "no scratch disk" benefit but does remove the legacy in-memory |
| 113 | +codepath, simplifying the executor to a single path. |
| 114 | + |
| 115 | +Smallest change, smallest gain. Worth doing regardless of A/B as a stepping stone. |
| 116 | + |
| 117 | +### Option D: Streaming-directly only for promotion merges |
| 118 | + |
| 119 | +Promotion already routes through `execute_merge_operation`; extend it to skip the download |
| 120 | +phase entirely for those operations and let the regular path stay as-is. Gains: legacy-adapter- |
| 121 | +backed promotion merges (the in-memory-buffering-heaviest case in the pipeline) avoid double |
| 122 | +I/O. Costs: split executor logic into "promotion = stream" vs "regular = download." |
| 123 | + |
| 124 | +## Signal Impact |
| 125 | + |
| 126 | +All Parquet-backed signals. Metrics is the first product to ship, so the impact lands on |
| 127 | +metrics first; traces and logs (when they migrate to Parquet storage) will pay the same cost |
| 128 | +unless this is addressed by then. |
| 129 | + |
| 130 | +## Cost Considerations |
| 131 | + |
| 132 | +The streaming engine's body-col page cache is already designed for backpressure: pages stream |
| 133 | +in column-major order as bytes arrive, and the engine processes them as quickly as it can. The |
| 134 | +bottleneck for streaming-directly becomes the slowest input's transfer rate rather than the |
| 135 | +total input size — usually a smaller number, but a longer tail. |
| 136 | + |
| 137 | +## Impact |
| 138 | + |
| 139 | +- **Severity**: Medium. Correctness is unaffected; the streaming engine works equivalently |
| 140 | + whether the source is local or remote. The cost is bandwidth, disk, and wall-clock latency. |
| 141 | +- **Frequency**: Every merge in production today pays the download cost. |
| 142 | +- **Affected Areas**: `quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_split_downloader.rs`, |
| 143 | + `quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs`, |
| 144 | + `quickwit-parquet-engine::merge::execute_merge_operation` callers. |
| 145 | + |
| 146 | +## Next Steps |
| 147 | + |
| 148 | +- [ ] Measure the current download-vs-merge phase breakdown on a representative production |
| 149 | + merge load (wall-clock + bytes-read + disk-write). |
| 150 | +- [ ] Build a `RemoteByteSource` adapter over `quickwit_storage::Storage` and prototype |
| 151 | + streaming-directly for promotion merges (Option D) to validate the engine's behavior |
| 152 | + against the existing storage backends. |
| 153 | +- [ ] Decide between options A / B based on observed mid-merge failure rates under real S3 |
| 154 | + conditions. |
| 155 | +- [ ] Even if the default stays download-first, consider Option C as a simplification — the |
| 156 | + in-memory merge path is dead weight once the streaming engine handles every case. |
| 157 | + |
| 158 | +## References |
| 159 | + |
| 160 | +- PR #6407–#6428 (Parquet streaming merge stack). |
| 161 | +- [PR #6423 discussion](https://github.com/quickwit-oss/quickwit/pull/6423) — surfaced the |
| 162 | + question while wiring promotion through `execute_merge_operation`. |
| 163 | +- `quickwit-parquet-engine/src/storage/streaming_reader.rs` (`RemoteByteSource` trait). |
| 164 | +- `quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs::LocalFileByteSource`. |
0 commit comments