Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 340 additions & 10 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use arrow_schema::{
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
Expand Down Expand Up @@ -281,16 +282,25 @@ impl ArrowReader {
tasks
.map_ok(move |task| {
let file_io = file_io.clone();
let delete_file_loader = self.delete_file_loader.clone();

async move {
let stream = Self::process_file_scan_task(
task,
batch_size,
file_io,
delete_file_loader,
row_group_filtering_enabled,
row_selection_enabled,
parquet_read_options,
)
.await?;

Self::process_file_scan_task(
task,
batch_size,
file_io,
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
parquet_read_options,
)
// Spawn stream consumption onto a separate tokio task so that
// CPU-heavy Parquet decompression runs on the tokio thread pool
// rather than being serialized on the polling thread.
Ok(Self::spawn_record_batch_stream(stream, 1))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is similar to the idea that I had a couple of months back, this is @liurenjie1024's feedback: #1684 (comment)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vustef -thanks for the pointer, I read #1684 and Liu's comment carefully before drafting this.

I think Liu's architecture model is right: step 2 (parallel execution of FileScanTasks) belongs to the engine. That's exactly what Comet does in #2020 - one DF SessionContext per Spark Task, ArrowReader at
concurrency==1, cross-task parallelism owned by Spark.
My PR keeps that whole path intact: when concurrency_limit_data_files == 1, the #2020 fast path (and_then + try_flatten) is used verbatim - no spawn, no channel, no reordering. Callers that own their own parallelism (Comet, Spark, or any future DF executor that shards FileScanTasks across partitions) don't see a behavior change.

The change applies only on the multi-concurrency branch, which today is
hit by two groups:

  1. Callers using the raw ArrowReader / table.scan().to_arrow() path directly (my use case: streaming RecordBatches into a sink via the core crate, no DataFusion). For these callers, the existing try_buffer_unordered(N) already gives I/O concurrency but inner-stream polling is single-threaded, so Parquet decompression is serial - the spawn fixes that.

  2. The iceberg-datafusion integration, which today hard-codes Partitioning::UnknownPartitioning(1) in IcebergTableScan (see the TODO: "to be replaced once we support output-partitioning"). Until that TODO lands, DF sees a single-partition scan and can't shard FileScanTasks across partitions per Liu's model - so the multi-concurrency branch is the only place CPU parallelism can actually happen today. When output-partitioning lands, engines will set concurrency==1 themselves and hit the perf(reader): Fast path ArrowReader::read when concurrency is 1 to avoid waker churn and add determinism to FileScanTask processing #2020 fast path, just like Comet - at which point the spawn path is cleanly dormant rather than harmful.

Happy to split the tests into a follow-up or add a config flag to make the spawn strictly opt-in, if either would help it land.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed answer. Don't get me wrong, I'd love to see this in - in fact, in our fork we rely on this parallel behaviour since we still haven't switched our engine to Renjie's suggestion.
My change was before #2020, so it might be that things have changed since then. My impression then wasn't that a concern was that partitioned readers wouldn't get concurrency==1, but that we didn't want to_arrow() to behave like an parallel engine for scanning Iceberg, but keep it as a low-level API. If I was wrong, then something like #2020 would've been a very easy change for me back then :)
That said, it is indeed confusing then why it does have concurrency_limit_data_files at all.

If you're allowed to proceed with the change (🤞 ), then I have one comment (as a separate comment).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we spawn for the whole thing, i.e. around process_file_scan_task too?
It's not that process_file_scan_task and its IO is parallel already while parquet decoding is not. Rather, it's just that process_file_scan_task is mostly IO, and for IO it's enough to have single task since it could execute many IOs concurrently, while for CPU-heavy operations we need to spawn tasks so that they can scale across threads.
Although I don't have any data out of the box, I believe it'd be beneficial to be able to scale process_file_scan_tasks across threads too.

}
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed")
Expand All @@ -304,6 +314,78 @@ impl ArrowReader {
Ok(stream)
}

/// Spawns consumption of a RecordBatch stream onto a separate tokio task,
/// returning a new stream backed by a bounded channel.
///
/// Without this, `try_flatten_unordered` polls all inner Parquet streams
/// from a single thread; CPU-heavy decompression in each stream blocks that
/// thread, serializing work across files. By moving each stream to its own
/// spawned task, the tokio runtime distributes decompression across its
/// worker thread pool.
///
/// The bounded channel provides backpressure: the spawned task suspends
/// when the buffer is full, limiting per-file memory to approximately
/// `buffer_size + 1` record batches.
///
/// # Error handling
///
/// - Stream errors are forwarded as `Err` items and iteration stops immediately.
/// - Panics inside the Parquet stream are caught via `catch_unwind` and
/// converted to an `Err` item, so the consumer always receives an explicit
/// error rather than a silent premature end-of-stream.
fn spawn_record_batch_stream(
stream: ArrowRecordBatchStream,
buffer_size: usize,
) -> ArrowRecordBatchStream {
let (mut tx, rx) = mpsc::channel(buffer_size);

crate::runtime::spawn(async move {
futures::pin_mut!(stream);
loop {
// Wrap each poll in catch_unwind so that a panic inside the
// Parquet decoder surfaces as an explicit Err rather than
// silently closing the channel (which would look like EOF).
let poll_result = std::panic::AssertUnwindSafe(stream.next())
.catch_unwind()
.await;

let batch_result = match poll_result {
// Stream exhausted — exit cleanly
Ok(None) => break,
// Normal batch
Ok(Some(Ok(batch))) => Ok(batch),
// Error from the Parquet stream
Ok(Some(Err(e))) => Err(e),
// Panic inside stream.next() — convert to Err
Err(panic_payload) => {
let msg = panic_payload
.downcast_ref::<&str>()
.copied()
.or_else(|| panic_payload.downcast_ref::<String>().map(String::as_str))
.unwrap_or("unknown panic");
Err(Error::new(
ErrorKind::Unexpected,
format!("panic reading Parquet batch: {msg}"),
))
}
};

let is_err = batch_result.is_err();
// Ignore send errors: if the receiver was dropped the consumer
// has already moved on and we just exit.
let _ = tx.send(batch_result).await;

if is_err {
// Don't continue polling a broken stream after forwarding
// the error to the consumer.
break;
}
}
});

Box::pin(rx)
}

async fn process_file_scan_task(
task: FileScanTask,
batch_size: Option<usize>,
Expand Down Expand Up @@ -4712,6 +4794,254 @@ message schema {
assert_eq!(result[2], expected_2);
}

/// Helper: create N Parquet files with 10 rows each (id, file_num columns)
/// and return the (schema, file_io, tasks) needed for reader tests.
fn create_multi_file_test_data(
num_files: i32,
tmp_dir: &TempDir,
) -> (SchemaRef, FileIO, Vec<crate::Result<FileScanTask>>) {
use arrow_array::Int32Array;

let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
.into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
]));

let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();

for file_num in 0..num_files {
let id_data = Arc::new(Int32Array::from_iter_values(
file_num * 10..(file_num + 1) * 10,
)) as ArrayRef;
let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;

let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();

let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
let mut writer =
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
}

let tasks: Vec<crate::Result<FileScanTask>> = (0..num_files)
.map(|file_num| {
Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(format!(
"{table_location}/file_{file_num}.parquet"
))
.unwrap()
.len(),
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/file_{file_num}.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})
})
.collect();

(schema, file_io, tasks)
}

/// Test that multi-concurrency reads all files correctly.
/// Unlike concurrency=1, ordering is NOT guaranteed with
/// try_flatten_unordered, so we verify data presence without
/// checking order.
#[tokio::test]
async fn test_read_with_multi_concurrency() {
let tmp_dir = TempDir::new().unwrap();
let (_schema, file_io, tasks) = create_multi_file_test_data(3, &tmp_dir);

let reader = ArrowReaderBuilder::new(file_io)
.with_data_file_concurrency_limit(3)
.build();

let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;

let result = reader
.read(tasks_stream)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 30, "Should have 30 total rows from 3 files");

// Collect all (id, file_num) pairs
let mut pairs: Vec<(i32, i32)> = Vec::new();
for batch in &result {
let id_col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
let file_num_col = batch
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();

for i in 0..batch.num_rows() {
pairs.push((id_col.value(i), file_num_col.value(i)));
}
}

// Sort by id for deterministic comparison (order is not guaranteed)
pairs.sort_by_key(|&(id, _)| id);

assert_eq!(pairs.len(), 30);
for (idx, &(id, file_num)) in pairs.iter().enumerate() {
assert_eq!(id, idx as i32, "ID mismatch at sorted index {idx}");
// file_num = id / 10 (file_0 has ids 0-9, file_1 has 10-19, etc.)
assert_eq!(file_num, id / 10, "file_num mismatch for id {id}");
}
}

/// Test that errors from one file propagate correctly through the
/// multi-concurrency read path.
#[tokio::test]
async fn test_read_with_multi_concurrency_error_propagation() {
let tmp_dir = TempDir::new().unwrap();
let (schema, file_io, mut tasks) = create_multi_file_test_data(2, &tmp_dir);

// Add a task pointing to a non-existent file
tasks.push(Ok(FileScanTask {
file_size_in_bytes: 1000,
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/nonexistent.parquet", tmp_dir.path().to_str().unwrap()),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
}));

let reader = ArrowReaderBuilder::new(file_io)
.with_data_file_concurrency_limit(3)
.build();

let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;

// The stream should produce an error at some point
let result = reader
.read(tasks_stream)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await;

assert!(
result.is_err(),
"Should propagate error from non-existent file"
);
}

/// Test that multi-concurrency path works correctly when there is
/// only one file (edge case: concurrency limit > number of files).
#[tokio::test]
async fn test_read_with_multi_concurrency_single_file() {
let tmp_dir = TempDir::new().unwrap();
let (_schema, file_io, tasks) = create_multi_file_test_data(1, &tmp_dir);

// Concurrency limit of 4 with only 1 file
let reader = ArrowReaderBuilder::new(file_io)
.with_data_file_concurrency_limit(4)
.build();

let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;

let result = reader
.read(tasks_stream)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 10, "Should have 10 rows from 1 file");

let mut all_ids: Vec<i32> = Vec::new();
for batch in &result {
let id_col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
for i in 0..batch.num_rows() {
all_ids.push(id_col.value(i));
}
}

all_ids.sort();
let expected: Vec<i32> = (0..10).collect();
assert_eq!(all_ids, expected, "Should contain ids 0-9");
}

/// Test that a panic inside a spawned stream is surfaced as an explicit
/// Err rather than a silent premature end-of-stream.
///
/// We exercise `spawn_record_batch_stream` directly by wrapping a stream
/// that panics on its first poll.
#[tokio::test]
async fn test_spawn_record_batch_stream_panic_surfaces_as_error() {
use futures::stream;

use crate::scan::ArrowRecordBatchStream;

// A stream that panics immediately when polled
let panicking_stream: ArrowRecordBatchStream = Box::pin(stream::poll_fn(|_| {
panic!("simulated Parquet decode panic");
}));

let output = ArrowReader::spawn_record_batch_stream(panicking_stream, 1);

let result = output.try_collect::<Vec<RecordBatch>>().await;

assert!(
result.is_err(),
"Panic in spawned stream must surface as Err, not silent EOF"
);
let err = result.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("panic reading Parquet batch"),
"Error message should describe the panic; got: {msg}"
);
}

/// Regression for <https://github.com/apache/iceberg-rust/issues/2306>:
/// predicate on a column after nested types in a migrated file (no field IDs).
/// Schema has struct, list, and map columns before the predicate target (`id`),
Expand Down
Loading