perf(reader): parallelize Parquet decompression across tokio tasks#2342
perf(reader): parallelize Parquet decompression across tokio tasks#2342t3hw wants to merge 1 commit intoapache:mainfrom
Conversation
`ArrowReader::read()` uses `try_buffer_unordered(N)` to overlap async S3 I/O across N files, but `try_flatten_unordered` polls all returned `ArrowRecordBatchStream`s from a single thread. Parquet decompression and Arrow decoding are synchronous CPU work that runs inside each stream's `poll_next`, so they execute serially on that one thread — only one core is used regardless of the concurrency limit. Fix: add `spawn_record_batch_stream`, which bridges each file's stream to a dedicated tokio task via a bounded `futures::channel::mpsc` channel. The spawned task drains the inner Parquet stream (including decompression) while the polling thread only receives pre-decoded `RecordBatch`es from the channel. The tokio runtime distributes the spawned tasks across its worker thread pool, achieving true multi-core parallelism. The single-concurrency fast-path (`concurrency_limit == 1`) is left unchanged — it preserves ordering and avoids spawn overhead. Error handling in the spawned task is exhaustive: - Stream `Err`s are forwarded and iteration stops immediately. - Panics inside `stream.next()` are caught via `catch_unwind` and converted to an `Err` item, so the consumer always receives an explicit error rather than a silent premature end-of-stream. - A dropped receiver (consumer stopped reading) breaks the loop via the `tx.send` return value. Three new tests cover: multi-file correctness with concurrency > 1, error propagation from a missing file, single-file edge case (limit > file count), and panic-to-error conversion.
| // Spawn stream consumption onto a separate tokio task so that | ||
| // CPU-heavy Parquet decompression runs on the tokio thread pool | ||
| // rather than being serialized on the polling thread. | ||
| Ok(Self::spawn_record_batch_stream(stream, 1)) |
There was a problem hiding this comment.
I believe this is similar to the idea that I had a couple of months back, this is @liurenjie1024's feedback: #1684 (comment)
There was a problem hiding this comment.
Hi @vustef -thanks for the pointer, I read #1684 and Liu's comment carefully before drafting this.
I think Liu's architecture model is right: step 2 (parallel execution of FileScanTasks) belongs to the engine. That's exactly what Comet does in #2020 - one DF SessionContext per Spark Task, ArrowReader at
concurrency==1, cross-task parallelism owned by Spark.
My PR keeps that whole path intact: when concurrency_limit_data_files == 1, the #2020 fast path (and_then + try_flatten) is used verbatim - no spawn, no channel, no reordering. Callers that own their own parallelism (Comet, Spark, or any future DF executor that shards FileScanTasks across partitions) don't see a behavior change.
The change applies only on the multi-concurrency branch, which today is
hit by two groups:
-
Callers using the raw
ArrowReader/table.scan().to_arrow()path directly (my use case: streaming RecordBatches into a sink via the core crate, no DataFusion). For these callers, the existingtry_buffer_unordered(N)already gives I/O concurrency but inner-stream polling is single-threaded, so Parquet decompression is serial - the spawn fixes that. -
The
iceberg-datafusionintegration, which today hard-codesPartitioning::UnknownPartitioning(1)inIcebergTableScan(see the TODO: "to be replaced once we support output-partitioning"). Until that TODO lands, DF sees a single-partition scan and can't shard FileScanTasks across partitions per Liu's model - so the multi-concurrency branch is the only place CPU parallelism can actually happen today. When output-partitioning lands, engines will set concurrency==1 themselves and hit the perf(reader): Fast path ArrowReader::read when concurrency is 1 to avoid waker churn and add determinism to FileScanTask processing #2020 fast path, just like Comet - at which point the spawn path is cleanly dormant rather than harmful.
Happy to split the tests into a follow-up or add a config flag to make the spawn strictly opt-in, if either would help it land.
There was a problem hiding this comment.
Thanks for the detailed answer. Don't get me wrong, I'd love to see this in - in fact, in our fork we rely on this parallel behaviour since we still haven't switched our engine to Renjie's suggestion.
My change was before #2020, so it might be that things have changed since then. My impression then wasn't that a concern was that partitioned readers wouldn't get concurrency==1, but that we didn't want to_arrow() to behave like an parallel engine for scanning Iceberg, but keep it as a low-level API. If I was wrong, then something like #2020 would've been a very easy change for me back then :)
That said, it is indeed confusing then why it does have concurrency_limit_data_files at all.
If you're allowed to proceed with the change (🤞 ), then I have one comment (as a separate comment).
| // Spawn stream consumption onto a separate tokio task so that | ||
| // CPU-heavy Parquet decompression runs on the tokio thread pool | ||
| // rather than being serialized on the polling thread. | ||
| Ok(Self::spawn_record_batch_stream(stream, 1)) |
There was a problem hiding this comment.
Why don't we spawn for the whole thing, i.e. around process_file_scan_task too?
It's not that process_file_scan_task and its IO is parallel already while parquet decoding is not. Rather, it's just that process_file_scan_task is mostly IO, and for IO it's enough to have single task since it could execute many IOs concurrently, while for CPU-heavy operations we need to spawn tasks so that they can scale across threads.
Although I don't have any data out of the box, I believe it'd be beneficial to be able to scale process_file_scan_tasks across threads too.
blackmwk
left a comment
There was a problem hiding this comment.
Thanks @t3hw for this pr. I still don't think this is the right direction too add too much concurrency to ArrowReader, and from https://github.com/apache/iceberg-rust/pull/2342/changes#r3106860089 I think you already understood why. We already discussed in #1854, to process data parallelly, you should either rely on a parallel engine like datafusion, comet, or we could maintain a minimum local parallel engine.
|
@blackmwk thanks for the feedback! Your comment sent me down a rabbit hole of past and future changes, and has given me a lot to think about. @vustef, same to you. It looks like #2298 will fold parallel scanning directly into the DataFusion integration - but a big part of my use case is lazily streaming entire snapshot deltas, partitions, or whole MOR tables into a downstream sink, and parallelization helps greatly. Pulling DataFusion in just for the parallel decode feels heavy for what's essentially a low-level scan -> stream -> transform -> sink pipeline. I'll close this PR. Out of curiosity though, would a separate PR for a thin wrapper crate focused on low-level parallelization utilities be worth opening? If so, where would be the right home for it? Perhaps we could start working towards the goals laid out in this RFC |
What kind of thin wrapper are you talking about? In rust it's quite trivial if you mean sending different file scan tasks to different async tasks, and I don't think it's worth doing it here. |
I was going off your "or we could maintain a minimum local parallel engine" comment. Work on #1819 doesn't seem to have started yet, so my idea was to begin moving in that direction. A low-level parallel engine eventually living in a separate crate seems like a reasonable fit, keeps the core iceberg crate focused on just the protocol. |
Which issue does this PR close?
No tracking issue filed yet. If a committer prefers, I can open one
first and update this PR to reference it.
What changes are included in this PR?
ArrowReader::read()usestry_buffer_unordered(N)to overlap async S3I/O across
Nfiles, buttry_flatten_unorderedpolls all returnedstreams from a single thread. CPU-heavy Parquet decompression in each
inner stream therefore runs serially on the polling thread, so the
data_file_concurrency_limitonly buys I/O overlap — it does nottranslate to CPU parallelism across files.
This PR introduces
spawn_record_batch_stream, which moves consumptionof each inner
RecordBatchstream onto its own tokio task backed by abounded
mpscchannel. The tokio runtime then distributes decompressionacross its worker thread pool.
Behavior preserved on the single-concurrency fast path
When
concurrency_limit == 1the spawn is skipped. This keeps theexisting ordering guarantee, avoids spawn/channel overhead, and means
single-file reads are byte-identical to before.
Error handling is exhaustive
The spawned task handles three cases explicitly, so callers always see a
deterministic end-of-stream:
Erris forwarded as anErritem and iteration stops.stream.next()is caught viacatch_unwindandconverted to an
Erritem — so a panic surfaces as an expliciterror rather than a silent premature end-of-stream.
Measured impact
Measured on an end-to-end pipeline that combines iceberg-rust reads
with non-trivial downstream CPU work (Rayon-parallel aggregation,
DataFusion execution, Arrow-native transforms). Numbers are
pipeline-level — the iceberg-rust portion of each run is only one
component of total wall-clock.
scan window, 24-core host.
at the pipeline level, of which the iceberg-rust read phase is
only one portion; the isolated effect on the reader is therefore
proportionally larger.
(user+sys)/wall, i.e. averagecores active).
consistent with decompression work no longer being serialized
behind the single polling thread.
(matched md5 on sort).
Three independent signals — wall-clock down, CPU % up, context
switches down — all point the same direction, which is what we'd
expect if decompression is genuinely spreading across cores rather
than being contention-relieved artificially.
Are these changes tested?
Yes. Four new tests in
crates/iceberg/src/arrow/reader.rs:test_read_with_multi_concurrencytry_flatten_unordereddoes not preserve it).test_read_with_multi_concurrency_error_propagationErrthrough the stream.test_read_with_multi_concurrency_single_filetest_spawn_record_batch_stream_panic_surfaces_as_errorErritem, not silent EOF.The existing single-concurrency test (
test_read_with_concurrency_one)continues to exercise the ordering-preserving fast path.
All
cargo test -p iceberg --lib arrow::readerpass (43/43), pluscargo clippy -p iceberg --all-features --lib --tests -- -D warningsand nightly
cargo fmt --checkare clean.Notes for reviewers
CONTRIBUTING.mddiscourages PRs over300–500 lines; most of this diff is the 4 new tests + a multi-file
test helper. The production change itself is localized to
ArrowReader::read()and one new 80-linespawn_record_batch_streamhelper. I can split the tests into a follow-up if preferred.
crate::runtime::spawnto matchthe existing abstraction rather than pulling in
tokio::spawndirectly.