Skip to content

perf(reader): parallelize Parquet decompression across tokio tasks#2342

Closed
t3hw wants to merge 1 commit intoapache:mainfrom
t3hw:pr/parallel-parquet-decompression
Closed

perf(reader): parallelize Parquet decompression across tokio tasks#2342
t3hw wants to merge 1 commit intoapache:mainfrom
t3hw:pr/parallel-parquet-decompression

Conversation

@t3hw
Copy link
Copy Markdown

@t3hw t3hw commented Apr 18, 2026

Which issue does this PR close?

No tracking issue filed yet. If a committer prefers, I can open one
first and update this PR to reference it.

What changes are included in this PR?

ArrowReader::read() uses try_buffer_unordered(N) to overlap async S3
I/O across N files, but try_flatten_unordered polls all returned
streams from a single thread. CPU-heavy Parquet decompression in each
inner stream therefore runs serially on the polling thread, so the
data_file_concurrency_limit only buys I/O overlap — it does not
translate to CPU parallelism across files.

This PR introduces spawn_record_batch_stream, which moves consumption
of each inner RecordBatch stream onto its own tokio task backed by a
bounded mpsc channel. The tokio runtime then distributes decompression
across its worker thread pool.

Behavior preserved on the single-concurrency fast path

When concurrency_limit == 1 the spawn is skipped. This keeps the
existing ordering guarantee, avoids spawn/channel overhead, and means
single-file reads are byte-identical to before.

Error handling is exhaustive

The spawned task handles three cases explicitly, so callers always see a
deterministic end-of-stream:

  • A stream Err is forwarded as an Err item and iteration stops.
  • A dropped receiver (consumer stopped reading) breaks the send loop.
  • A panic inside stream.next() is caught via catch_unwind and
    converted to an Err item — so a panic surfaces as an explicit
    error rather than a silent premature end-of-stream.

Measured impact

Measured on an end-to-end pipeline that combines iceberg-rust reads
with non-trivial downstream CPU work (Rayon-parallel aggregation,
DataFusion execution, Arrow-native transforms). Numbers are
pipeline-level — the iceberg-rust portion of each run is only one
component of total wall-clock.

  • Workload: Iceberg MOR table, ~71 M rows / 507 K distinct keys / 36 h
    scan window, 24-core host.
  • Pipeline wall-clock: 67.64 s → 59.15 s — a 12.6 % reduction
    at the pipeline level
    , of which the iceberg-rust read phase is
    only one portion; the isolated effect on the reader is therefore
    proportionally larger.
  • CPU utilization: 134 % → 166 % ((user+sys)/wall, i.e. average
    cores active).
  • Involuntary context switches: ~21 % lower (2.03 M → 1.60 M),
    consistent with decompression work no longer being serialized
    behind the single polling thread.
  • Memory overhead: negligible (≈ +0.3 GB peak RSS).
  • Correctness: output identical modulo row order across all cells
    (matched md5 on sort).

Three independent signals — wall-clock down, CPU % up, context
switches down — all point the same direction, which is what we'd
expect if decompression is genuinely spreading across cores rather
than being contention-relieved artificially.

Are these changes tested?

Yes. Four new tests in crates/iceberg/src/arrow/reader.rs:

Test Covers
test_read_with_multi_concurrency 3 files × concurrency=3; verifies all rows present (order not asserted, since try_flatten_unordered does not preserve it).
test_read_with_multi_concurrency_error_propagation A task pointing at a missing file surfaces an Err through the stream.
test_read_with_multi_concurrency_single_file Concurrency limit > file count (edge case).
test_spawn_record_batch_stream_panic_surfaces_as_error A panic inside the spawned stream becomes an Err item, not silent EOF.

The existing single-concurrency test (test_read_with_concurrency_one)
continues to exercise the ordering-preserving fast path.

All cargo test -p iceberg --lib arrow::reader pass (43/43), plus
cargo clippy -p iceberg --all-features --lib --tests -- -D warnings
and nightly cargo fmt --check are clean.


Notes for reviewers

  • Diff size (~350 lines). CONTRIBUTING.md discourages PRs over
    300–500 lines; most of this diff is the 4 new tests + a multi-file
    test helper. The production change itself is localized to
    ArrowReader::read() and one new 80-line spawn_record_batch_stream
    helper. I can split the tests into a follow-up if preferred.
  • Runtime boundary. The spawn uses crate::runtime::spawn to match
    the existing abstraction rather than pulling in tokio::spawn
    directly.

`ArrowReader::read()` uses `try_buffer_unordered(N)` to overlap async
S3 I/O across N files, but `try_flatten_unordered` polls all returned
`ArrowRecordBatchStream`s from a single thread. Parquet decompression
and Arrow decoding are synchronous CPU work that runs inside each
stream's `poll_next`, so they execute serially on that one thread —
only one core is used regardless of the concurrency limit.

Fix: add `spawn_record_batch_stream`, which bridges each file's stream
to a dedicated tokio task via a bounded `futures::channel::mpsc`
channel. The spawned task drains the inner Parquet stream (including
decompression) while the polling thread only receives pre-decoded
`RecordBatch`es from the channel. The tokio runtime distributes the
spawned tasks across its worker thread pool, achieving true multi-core
parallelism.

The single-concurrency fast-path (`concurrency_limit == 1`) is left
unchanged — it preserves ordering and avoids spawn overhead.

Error handling in the spawned task is exhaustive:
- Stream `Err`s are forwarded and iteration stops immediately.
- Panics inside `stream.next()` are caught via `catch_unwind` and
  converted to an `Err` item, so the consumer always receives an
  explicit error rather than a silent premature end-of-stream.
- A dropped receiver (consumer stopped reading) breaks the loop via
  the `tx.send` return value.

Three new tests cover: multi-file correctness with concurrency > 1,
error propagation from a missing file, single-file edge case (limit >
file count), and panic-to-error conversion.
@t3hw t3hw changed the title perf(reader): replace O(N*M) equality-delete predicate tree with O(N+M) HashSet filter perf(reader): parallelize Parquet decompression across tokio tasks Apr 18, 2026
@t3hw t3hw marked this pull request as ready for review April 18, 2026 21:53
// Spawn stream consumption onto a separate tokio task so that
// CPU-heavy Parquet decompression runs on the tokio thread pool
// rather than being serialized on the polling thread.
Ok(Self::spawn_record_batch_stream(stream, 1))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is similar to the idea that I had a couple of months back, this is @liurenjie1024's feedback: #1684 (comment)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vustef -thanks for the pointer, I read #1684 and Liu's comment carefully before drafting this.

I think Liu's architecture model is right: step 2 (parallel execution of FileScanTasks) belongs to the engine. That's exactly what Comet does in #2020 - one DF SessionContext per Spark Task, ArrowReader at
concurrency==1, cross-task parallelism owned by Spark.
My PR keeps that whole path intact: when concurrency_limit_data_files == 1, the #2020 fast path (and_then + try_flatten) is used verbatim - no spawn, no channel, no reordering. Callers that own their own parallelism (Comet, Spark, or any future DF executor that shards FileScanTasks across partitions) don't see a behavior change.

The change applies only on the multi-concurrency branch, which today is
hit by two groups:

  1. Callers using the raw ArrowReader / table.scan().to_arrow() path directly (my use case: streaming RecordBatches into a sink via the core crate, no DataFusion). For these callers, the existing try_buffer_unordered(N) already gives I/O concurrency but inner-stream polling is single-threaded, so Parquet decompression is serial - the spawn fixes that.

  2. The iceberg-datafusion integration, which today hard-codes Partitioning::UnknownPartitioning(1) in IcebergTableScan (see the TODO: "to be replaced once we support output-partitioning"). Until that TODO lands, DF sees a single-partition scan and can't shard FileScanTasks across partitions per Liu's model - so the multi-concurrency branch is the only place CPU parallelism can actually happen today. When output-partitioning lands, engines will set concurrency==1 themselves and hit the perf(reader): Fast path ArrowReader::read when concurrency is 1 to avoid waker churn and add determinism to FileScanTask processing #2020 fast path, just like Comet - at which point the spawn path is cleanly dormant rather than harmful.

Happy to split the tests into a follow-up or add a config flag to make the spawn strictly opt-in, if either would help it land.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed answer. Don't get me wrong, I'd love to see this in - in fact, in our fork we rely on this parallel behaviour since we still haven't switched our engine to Renjie's suggestion.
My change was before #2020, so it might be that things have changed since then. My impression then wasn't that a concern was that partitioned readers wouldn't get concurrency==1, but that we didn't want to_arrow() to behave like an parallel engine for scanning Iceberg, but keep it as a low-level API. If I was wrong, then something like #2020 would've been a very easy change for me back then :)
That said, it is indeed confusing then why it does have concurrency_limit_data_files at all.

If you're allowed to proceed with the change (🤞 ), then I have one comment (as a separate comment).

// Spawn stream consumption onto a separate tokio task so that
// CPU-heavy Parquet decompression runs on the tokio thread pool
// rather than being serialized on the polling thread.
Ok(Self::spawn_record_batch_stream(stream, 1))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we spawn for the whole thing, i.e. around process_file_scan_task too?
It's not that process_file_scan_task and its IO is parallel already while parquet decoding is not. Rather, it's just that process_file_scan_task is mostly IO, and for IO it's enough to have single task since it could execute many IOs concurrently, while for CPU-heavy operations we need to spawn tasks so that they can scale across threads.
Although I don't have any data out of the box, I believe it'd be beneficial to be able to scale process_file_scan_tasks across threads too.

Copy link
Copy Markdown
Contributor

@blackmwk blackmwk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @t3hw for this pr. I still don't think this is the right direction too add too much concurrency to ArrowReader, and from https://github.com/apache/iceberg-rust/pull/2342/changes#r3106860089 I think you already understood why. We already discussed in #1854, to process data parallelly, you should either rely on a parallel engine like datafusion, comet, or we could maintain a minimum local parallel engine.

@t3hw
Copy link
Copy Markdown
Author

t3hw commented Apr 20, 2026

@blackmwk thanks for the feedback! Your comment sent me down a rabbit hole of past and future changes, and has given me a lot to think about. @vustef, same to you.

It looks like #2298 will fold parallel scanning directly into the DataFusion integration - but a big part of my use case is lazily streaming entire snapshot deltas, partitions, or whole MOR tables into a downstream sink, and parallelization helps greatly. Pulling DataFusion in just for the parallel decode feels heavy for what's essentially a low-level scan -> stream -> transform -> sink pipeline.

I'll close this PR. Out of curiosity though, would a separate PR for a thin wrapper crate focused on low-level parallelization utilities be worth opening? If so, where would be the right home for it?

Perhaps we could start working towards the goals laid out in this RFC

@blackmwk
Copy link
Copy Markdown
Contributor

@blackmwk thanks for the feedback! Your comment sent me down a rabbit hole of past and future changes, and has given me a lot to think about. @vustef, same to you.

It looks like #2298 will fold parallel scanning directly into the DataFusion integration - but a big part of my use case is lazily streaming entire snapshot deltas, partitions, or whole MOR tables into a downstream sink, and parallelization helps greatly. Pulling DataFusion in just for the parallel decode feels heavy for what's essentially a low-level scan -> stream -> transform -> sink pipeline.

I'll close this PR. Out of curiosity though, would a separate PR for a thin wrapper crate focused on low-level parallelization utilities be worth opening? If so, where would be the right home for it?

Perhaps we could start working towards the goals laid out in this RFC

What kind of thin wrapper are you talking about? In rust it's quite trivial if you mean sending different file scan tasks to different async tasks, and I don't think it's worth doing it here.

@t3hw t3hw closed this Apr 21, 2026
@t3hw
Copy link
Copy Markdown
Author

t3hw commented Apr 21, 2026

@blackmwk thanks for the feedback! Your comment sent me down a rabbit hole of past and future changes, and has given me a lot to think about. @vustef, same to you.
It looks like #2298 will fold parallel scanning directly into the DataFusion integration - but a big part of my use case is lazily streaming entire snapshot deltas, partitions, or whole MOR tables into a downstream sink, and parallelization helps greatly. Pulling DataFusion in just for the parallel decode feels heavy for what's essentially a low-level scan -> stream -> transform -> sink pipeline.
I'll close this PR. Out of curiosity though, would a separate PR for a thin wrapper crate focused on low-level parallelization utilities be worth opening? If so, where would be the right home for it?
Perhaps we could start working towards the goals laid out in this RFC

What kind of thin wrapper are you talking about? In rust it's quite trivial if you mean sending different file scan tasks to different async tasks, and I don't think it's worth doing it here.

I was going off your "or we could maintain a minimum local parallel engine" comment. Work on #1819 doesn't seem to have started yet, so my idea was to begin moving in that direction. A low-level parallel engine eventually living in a separate crate seems like a reasonable fit, keeps the core iceberg crate focused on just the protocol.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants