-
Notifications
You must be signed in to change notification settings - Fork 452
perf(reader): parallelize Parquet decompression across tokio tasks #2342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,8 +32,9 @@ use arrow_schema::{ | |
| use arrow_string::like::starts_with; | ||
| use bytes::Bytes; | ||
| use fnv::FnvHashSet; | ||
| use futures::channel::mpsc; | ||
| use futures::future::BoxFuture; | ||
| use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; | ||
| use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; | ||
| use parquet::arrow::arrow_reader::{ | ||
| ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, | ||
| }; | ||
|
|
@@ -281,16 +282,25 @@ impl ArrowReader { | |
| tasks | ||
| .map_ok(move |task| { | ||
| let file_io = file_io.clone(); | ||
| let delete_file_loader = self.delete_file_loader.clone(); | ||
|
|
||
| async move { | ||
| let stream = Self::process_file_scan_task( | ||
| task, | ||
| batch_size, | ||
| file_io, | ||
| delete_file_loader, | ||
| row_group_filtering_enabled, | ||
| row_selection_enabled, | ||
| parquet_read_options, | ||
| ) | ||
| .await?; | ||
|
|
||
| Self::process_file_scan_task( | ||
| task, | ||
| batch_size, | ||
| file_io, | ||
| self.delete_file_loader.clone(), | ||
| row_group_filtering_enabled, | ||
| row_selection_enabled, | ||
| parquet_read_options, | ||
| ) | ||
| // Spawn stream consumption onto a separate tokio task so that | ||
| // CPU-heavy Parquet decompression runs on the tokio thread pool | ||
| // rather than being serialized on the polling thread. | ||
| Ok(Self::spawn_record_batch_stream(stream, 1)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we spawn for the whole thing, i.e. around |
||
| } | ||
| }) | ||
| .map_err(|err| { | ||
| Error::new(ErrorKind::Unexpected, "file scan task generate failed") | ||
|
|
@@ -304,6 +314,78 @@ impl ArrowReader { | |
| Ok(stream) | ||
| } | ||
|
|
||
| /// Spawns consumption of a RecordBatch stream onto a separate tokio task, | ||
| /// returning a new stream backed by a bounded channel. | ||
| /// | ||
| /// Without this, `try_flatten_unordered` polls all inner Parquet streams | ||
| /// from a single thread; CPU-heavy decompression in each stream blocks that | ||
| /// thread, serializing work across files. By moving each stream to its own | ||
| /// spawned task, the tokio runtime distributes decompression across its | ||
| /// worker thread pool. | ||
| /// | ||
| /// The bounded channel provides backpressure: the spawned task suspends | ||
| /// when the buffer is full, limiting per-file memory to approximately | ||
| /// `buffer_size + 1` record batches. | ||
| /// | ||
| /// # Error handling | ||
| /// | ||
| /// - Stream errors are forwarded as `Err` items and iteration stops immediately. | ||
| /// - Panics inside the Parquet stream are caught via `catch_unwind` and | ||
| /// converted to an `Err` item, so the consumer always receives an explicit | ||
| /// error rather than a silent premature end-of-stream. | ||
| fn spawn_record_batch_stream( | ||
| stream: ArrowRecordBatchStream, | ||
| buffer_size: usize, | ||
| ) -> ArrowRecordBatchStream { | ||
| let (mut tx, rx) = mpsc::channel(buffer_size); | ||
|
|
||
| crate::runtime::spawn(async move { | ||
| futures::pin_mut!(stream); | ||
| loop { | ||
| // Wrap each poll in catch_unwind so that a panic inside the | ||
| // Parquet decoder surfaces as an explicit Err rather than | ||
| // silently closing the channel (which would look like EOF). | ||
| let poll_result = std::panic::AssertUnwindSafe(stream.next()) | ||
| .catch_unwind() | ||
| .await; | ||
|
|
||
| let batch_result = match poll_result { | ||
| // Stream exhausted — exit cleanly | ||
| Ok(None) => break, | ||
| // Normal batch | ||
| Ok(Some(Ok(batch))) => Ok(batch), | ||
| // Error from the Parquet stream | ||
| Ok(Some(Err(e))) => Err(e), | ||
| // Panic inside stream.next() — convert to Err | ||
| Err(panic_payload) => { | ||
| let msg = panic_payload | ||
| .downcast_ref::<&str>() | ||
| .copied() | ||
| .or_else(|| panic_payload.downcast_ref::<String>().map(String::as_str)) | ||
| .unwrap_or("unknown panic"); | ||
| Err(Error::new( | ||
| ErrorKind::Unexpected, | ||
| format!("panic reading Parquet batch: {msg}"), | ||
| )) | ||
| } | ||
| }; | ||
|
|
||
| let is_err = batch_result.is_err(); | ||
| // Ignore send errors: if the receiver was dropped the consumer | ||
| // has already moved on and we just exit. | ||
| let _ = tx.send(batch_result).await; | ||
|
|
||
| if is_err { | ||
| // Don't continue polling a broken stream after forwarding | ||
| // the error to the consumer. | ||
| break; | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| Box::pin(rx) | ||
| } | ||
|
|
||
| async fn process_file_scan_task( | ||
| task: FileScanTask, | ||
| batch_size: Option<usize>, | ||
|
|
@@ -4712,6 +4794,254 @@ message schema { | |
| assert_eq!(result[2], expected_2); | ||
| } | ||
|
|
||
| /// Helper: create N Parquet files with 10 rows each (id, file_num columns) | ||
| /// and return the (schema, file_io, tasks) needed for reader tests. | ||
| fn create_multi_file_test_data( | ||
| num_files: i32, | ||
| tmp_dir: &TempDir, | ||
| ) -> (SchemaRef, FileIO, Vec<crate::Result<FileScanTask>>) { | ||
| use arrow_array::Int32Array; | ||
|
|
||
| let schema = Arc::new( | ||
| Schema::builder() | ||
| .with_schema_id(1) | ||
| .with_fields(vec![ | ||
| NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), | ||
| NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int)) | ||
| .into(), | ||
| ]) | ||
| .build() | ||
| .unwrap(), | ||
| ); | ||
|
|
||
| let arrow_schema = Arc::new(ArrowSchema::new(vec![ | ||
| Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( | ||
| PARQUET_FIELD_ID_META_KEY.to_string(), | ||
| "1".to_string(), | ||
| )])), | ||
| Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([( | ||
| PARQUET_FIELD_ID_META_KEY.to_string(), | ||
| "2".to_string(), | ||
| )])), | ||
| ])); | ||
|
|
||
| let table_location = tmp_dir.path().to_str().unwrap().to_string(); | ||
| let file_io = FileIO::new_with_fs(); | ||
|
|
||
| let props = WriterProperties::builder() | ||
| .set_compression(Compression::SNAPPY) | ||
| .build(); | ||
|
|
||
| for file_num in 0..num_files { | ||
| let id_data = Arc::new(Int32Array::from_iter_values( | ||
| file_num * 10..(file_num + 1) * 10, | ||
| )) as ArrayRef; | ||
| let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef; | ||
|
|
||
| let to_write = | ||
| RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap(); | ||
|
|
||
| let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap(); | ||
| let mut writer = | ||
| ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); | ||
| writer.write(&to_write).expect("Writing batch"); | ||
| writer.close().unwrap(); | ||
| } | ||
|
|
||
| let tasks: Vec<crate::Result<FileScanTask>> = (0..num_files) | ||
| .map(|file_num| { | ||
| Ok(FileScanTask { | ||
| file_size_in_bytes: std::fs::metadata(format!( | ||
| "{table_location}/file_{file_num}.parquet" | ||
| )) | ||
| .unwrap() | ||
| .len(), | ||
| start: 0, | ||
| length: 0, | ||
| record_count: None, | ||
| data_file_path: format!("{table_location}/file_{file_num}.parquet"), | ||
| data_file_format: DataFileFormat::Parquet, | ||
| schema: schema.clone(), | ||
| project_field_ids: vec![1, 2], | ||
| predicate: None, | ||
| deletes: vec![], | ||
| partition: None, | ||
| partition_spec: None, | ||
| name_mapping: None, | ||
| case_sensitive: false, | ||
| }) | ||
| }) | ||
| .collect(); | ||
|
|
||
| (schema, file_io, tasks) | ||
| } | ||
|
|
||
| /// Test that multi-concurrency reads all files correctly. | ||
| /// Unlike concurrency=1, ordering is NOT guaranteed with | ||
| /// try_flatten_unordered, so we verify data presence without | ||
| /// checking order. | ||
| #[tokio::test] | ||
| async fn test_read_with_multi_concurrency() { | ||
| let tmp_dir = TempDir::new().unwrap(); | ||
| let (_schema, file_io, tasks) = create_multi_file_test_data(3, &tmp_dir); | ||
|
|
||
| let reader = ArrowReaderBuilder::new(file_io) | ||
| .with_data_file_concurrency_limit(3) | ||
| .build(); | ||
|
|
||
| let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream; | ||
|
|
||
| let result = reader | ||
| .read(tasks_stream) | ||
| .unwrap() | ||
| .try_collect::<Vec<RecordBatch>>() | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); | ||
| assert_eq!(total_rows, 30, "Should have 30 total rows from 3 files"); | ||
|
|
||
| // Collect all (id, file_num) pairs | ||
| let mut pairs: Vec<(i32, i32)> = Vec::new(); | ||
| for batch in &result { | ||
| let id_col = batch | ||
| .column(0) | ||
| .as_primitive::<arrow_array::types::Int32Type>(); | ||
| let file_num_col = batch | ||
| .column(1) | ||
| .as_primitive::<arrow_array::types::Int32Type>(); | ||
|
|
||
| for i in 0..batch.num_rows() { | ||
| pairs.push((id_col.value(i), file_num_col.value(i))); | ||
| } | ||
| } | ||
|
|
||
| // Sort by id for deterministic comparison (order is not guaranteed) | ||
| pairs.sort_by_key(|&(id, _)| id); | ||
|
|
||
| assert_eq!(pairs.len(), 30); | ||
| for (idx, &(id, file_num)) in pairs.iter().enumerate() { | ||
| assert_eq!(id, idx as i32, "ID mismatch at sorted index {idx}"); | ||
| // file_num = id / 10 (file_0 has ids 0-9, file_1 has 10-19, etc.) | ||
| assert_eq!(file_num, id / 10, "file_num mismatch for id {id}"); | ||
| } | ||
| } | ||
|
|
||
| /// Test that errors from one file propagate correctly through the | ||
| /// multi-concurrency read path. | ||
| #[tokio::test] | ||
| async fn test_read_with_multi_concurrency_error_propagation() { | ||
| let tmp_dir = TempDir::new().unwrap(); | ||
| let (schema, file_io, mut tasks) = create_multi_file_test_data(2, &tmp_dir); | ||
|
|
||
| // Add a task pointing to a non-existent file | ||
| tasks.push(Ok(FileScanTask { | ||
| file_size_in_bytes: 1000, | ||
| start: 0, | ||
| length: 0, | ||
| record_count: None, | ||
| data_file_path: format!("{}/nonexistent.parquet", tmp_dir.path().to_str().unwrap()), | ||
| data_file_format: DataFileFormat::Parquet, | ||
| schema: schema.clone(), | ||
| project_field_ids: vec![1, 2], | ||
| predicate: None, | ||
| deletes: vec![], | ||
| partition: None, | ||
| partition_spec: None, | ||
| name_mapping: None, | ||
| case_sensitive: false, | ||
| })); | ||
|
|
||
| let reader = ArrowReaderBuilder::new(file_io) | ||
| .with_data_file_concurrency_limit(3) | ||
| .build(); | ||
|
|
||
| let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream; | ||
|
|
||
| // The stream should produce an error at some point | ||
| let result = reader | ||
| .read(tasks_stream) | ||
| .unwrap() | ||
| .try_collect::<Vec<RecordBatch>>() | ||
| .await; | ||
|
|
||
| assert!( | ||
| result.is_err(), | ||
| "Should propagate error from non-existent file" | ||
| ); | ||
| } | ||
|
|
||
| /// Test that multi-concurrency path works correctly when there is | ||
| /// only one file (edge case: concurrency limit > number of files). | ||
| #[tokio::test] | ||
| async fn test_read_with_multi_concurrency_single_file() { | ||
| let tmp_dir = TempDir::new().unwrap(); | ||
| let (_schema, file_io, tasks) = create_multi_file_test_data(1, &tmp_dir); | ||
|
|
||
| // Concurrency limit of 4 with only 1 file | ||
| let reader = ArrowReaderBuilder::new(file_io) | ||
| .with_data_file_concurrency_limit(4) | ||
| .build(); | ||
|
|
||
| let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream; | ||
|
|
||
| let result = reader | ||
| .read(tasks_stream) | ||
| .unwrap() | ||
| .try_collect::<Vec<RecordBatch>>() | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); | ||
| assert_eq!(total_rows, 10, "Should have 10 rows from 1 file"); | ||
|
|
||
| let mut all_ids: Vec<i32> = Vec::new(); | ||
| for batch in &result { | ||
| let id_col = batch | ||
| .column(0) | ||
| .as_primitive::<arrow_array::types::Int32Type>(); | ||
| for i in 0..batch.num_rows() { | ||
| all_ids.push(id_col.value(i)); | ||
| } | ||
| } | ||
|
|
||
| all_ids.sort(); | ||
| let expected: Vec<i32> = (0..10).collect(); | ||
| assert_eq!(all_ids, expected, "Should contain ids 0-9"); | ||
| } | ||
|
|
||
| /// Test that a panic inside a spawned stream is surfaced as an explicit | ||
| /// Err rather than a silent premature end-of-stream. | ||
| /// | ||
| /// We exercise `spawn_record_batch_stream` directly by wrapping a stream | ||
| /// that panics on its first poll. | ||
| #[tokio::test] | ||
| async fn test_spawn_record_batch_stream_panic_surfaces_as_error() { | ||
| use futures::stream; | ||
|
|
||
| use crate::scan::ArrowRecordBatchStream; | ||
|
|
||
| // A stream that panics immediately when polled | ||
| let panicking_stream: ArrowRecordBatchStream = Box::pin(stream::poll_fn(|_| { | ||
| panic!("simulated Parquet decode panic"); | ||
| })); | ||
|
|
||
| let output = ArrowReader::spawn_record_batch_stream(panicking_stream, 1); | ||
|
|
||
| let result = output.try_collect::<Vec<RecordBatch>>().await; | ||
|
|
||
| assert!( | ||
| result.is_err(), | ||
| "Panic in spawned stream must surface as Err, not silent EOF" | ||
| ); | ||
| let err = result.unwrap_err(); | ||
| let msg = err.to_string(); | ||
| assert!( | ||
| msg.contains("panic reading Parquet batch"), | ||
| "Error message should describe the panic; got: {msg}" | ||
| ); | ||
| } | ||
|
|
||
| /// Regression for <https://github.com/apache/iceberg-rust/issues/2306>: | ||
| /// predicate on a column after nested types in a migrated file (no field IDs). | ||
| /// Schema has struct, list, and map columns before the predicate target (`id`), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is similar to the idea that I had a couple of months back, this is @liurenjie1024's feedback: #1684 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @vustef -thanks for the pointer, I read #1684 and Liu's comment carefully before drafting this.
I think Liu's architecture model is right: step 2 (parallel execution of FileScanTasks) belongs to the engine. That's exactly what Comet does in #2020 - one DF SessionContext per Spark Task, ArrowReader at
concurrency==1, cross-task parallelism owned by Spark.
My PR keeps that whole path intact: when concurrency_limit_data_files == 1, the #2020 fast path (and_then + try_flatten) is used verbatim - no spawn, no channel, no reordering. Callers that own their own parallelism (Comet, Spark, or any future DF executor that shards FileScanTasks across partitions) don't see a behavior change.
The change applies only on the multi-concurrency branch, which today is
hit by two groups:
Callers using the raw
ArrowReader/table.scan().to_arrow()path directly (my use case: streaming RecordBatches into a sink via the core crate, no DataFusion). For these callers, the existingtry_buffer_unordered(N)already gives I/O concurrency but inner-stream polling is single-threaded, so Parquet decompression is serial - the spawn fixes that.The
iceberg-datafusionintegration, which today hard-codesPartitioning::UnknownPartitioning(1)inIcebergTableScan(see the TODO: "to be replaced once we support output-partitioning"). Until that TODO lands, DF sees a single-partition scan and can't shard FileScanTasks across partitions per Liu's model - so the multi-concurrency branch is the only place CPU parallelism can actually happen today. When output-partitioning lands, engines will set concurrency==1 themselves and hit the perf(reader): Fast path ArrowReader::read when concurrency is 1 to avoid waker churn and add determinism to FileScanTask processing #2020 fast path, just like Comet - at which point the spawn path is cleanly dormant rather than harmful.Happy to split the tests into a follow-up or add a config flag to make the spawn strictly opt-in, if either would help it land.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed answer. Don't get me wrong, I'd love to see this in - in fact, in our fork we rely on this parallel behaviour since we still haven't switched our engine to Renjie's suggestion.
My change was before #2020, so it might be that things have changed since then. My impression then wasn't that a concern was that partitioned readers wouldn't get concurrency==1, but that we didn't want
to_arrow()to behave like an parallel engine for scanning Iceberg, but keep it as a low-level API. If I was wrong, then something like #2020 would've been a very easy change for me back then :)That said, it is indeed confusing then why it does have
concurrency_limit_data_filesat all.If you're allowed to proceed with the change (🤞 ), then I have one comment (as a separate comment).