Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,24 @@ for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, batch_size=
print(f"Buffer contains {len(buf)} rows")
```

For maximum throughput, use `concurrent_files` to read multiple files in parallel with arrival order. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:

```python
from pyiceberg.table import ScanOrder

for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, concurrent_files=4, batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

**Ordering semantics:**

| Configuration | File ordering | Within-file ordering |
|---|---|---|
| `ScanOrder.TASK` (default) | Batches grouped by file, in task submission order | Row order |
| `ScanOrder.ARRIVAL` | Interleaved across files (no grouping guarantee) | Row order within each file |

Within each file, batch ordering always follows row order. The `limit` parameter is enforced correctly regardless of configuration.

To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:

```python
Expand Down Expand Up @@ -1655,6 +1673,19 @@ table.scan(
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL)
```

For concurrent file reads with arrival order, use `concurrent_files`. Note that batch ordering across files is not guaranteed:

```python
from pyiceberg.table import ScanOrder

table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL, concurrent_files=4)
```

When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Apache Arrow section](#apache-arrow) above for details.

### Pandas

<!-- prettier-ignore-start -->
Expand Down
167 changes: 138 additions & 29 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import logging
import operator
import os
import queue
import re
import threading
import uuid
import warnings
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Iterator
from collections.abc import Callable, Generator, Iterable, Iterator
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from dataclasses import dataclass
from enum import Enum
Expand Down Expand Up @@ -1682,6 +1685,76 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
return deletes_per_file


_QUEUE_SENTINEL = object()


def _bounded_concurrent_batches(
tasks: list[FileScanTask],
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
concurrent_files: int,
max_buffered_batches: int = 16,
) -> Generator[pa.RecordBatch, None, None]:
"""Read batches from multiple files concurrently with bounded memory.

Uses a per-scan ThreadPoolExecutor(max_workers=concurrent_files) to naturally
bound concurrency. Workers push batches into a bounded queue which provides
backpressure when the consumer is slower than the producers.

Args:
tasks: The file scan tasks to process.
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
concurrent_files: Maximum number of files to read concurrently.
max_buffered_batches: Maximum number of batches to buffer in the queue.
"""
if not tasks:
return

batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = queue.Queue(maxsize=max_buffered_batches)
cancel = threading.Event()
remaining = len(tasks)
remaining_lock = threading.Lock()

def worker(task: FileScanTask) -> None:
nonlocal remaining
try:
for batch in batch_fn(task):
if cancel.is_set():
return
batch_queue.put(batch)
except BaseException as e:
if not cancel.is_set():
batch_queue.put(e)
finally:
with remaining_lock:
remaining -= 1
if remaining == 0:
batch_queue.put(_QUEUE_SENTINEL)

with ThreadPoolExecutor(max_workers=concurrent_files) as executor:
for task in tasks:
executor.submit(worker, task)

try:
while True:
item = batch_queue.get()

if item is _QUEUE_SENTINEL:
break

if isinstance(item, BaseException):
raise item

yield item
finally:
cancel.set()
# Drain the queue to unblock any workers stuck on put()
while not batch_queue.empty():
try:
batch_queue.get_nowait()
except queue.Empty:
break


class ArrowScan:
_table_metadata: TableMetadata
_io: FileIO
Expand Down Expand Up @@ -1766,22 +1839,30 @@ def to_record_batches(
tasks: Iterable[FileScanTask],
batch_size: int | None = None,
order: ScanOrder = ScanOrder.TASK,
concurrent_files: int = 1,
) -> Iterator[pa.RecordBatch]:
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].

Returns an Iterator of pa.RecordBatch with data from the Iceberg table
by resolving the right columns that match the current table schema.
Only data that matches the provided row_filter expression is returned.

Ordering semantics:
- ScanOrder.TASK (default): Batches are grouped by file in task submission order.
- ScanOrder.ARRIVAL: Batches may be interleaved across files. Within each file,
batch ordering follows row order.

Args:
tasks: FileScanTasks representing the data files and delete files to read from.
batch_size: The number of rows per batch. If None, PyArrow's default is used.
order: Controls the order in which record batches are returned.
ScanOrder.TASK (default) returns batches in task order, with each task
fully materialized before proceeding to the next. Allows parallel file
reads via executor. ScanOrder.ARRIVAL yields batches as they are
produced, processing tasks sequentially without materializing entire
files into memory.
produced without materializing entire files into memory.
concurrent_files: Number of files to read concurrently when order=ScanOrder.ARRIVAL.
Must be >= 1. When > 1, batches may arrive interleaved across files.
Ignored when order=ScanOrder.TASK.

Returns:
An Iterator of PyArrow RecordBatches.
Expand All @@ -1790,46 +1871,74 @@ def to_record_batches(
Raises:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type,
or when an invalid order value is provided.
or when an invalid order value is provided, or when concurrent_files < 1.
"""
if not isinstance(order, ScanOrder):
raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder enum value (ScanOrder.TASK or ScanOrder.ARRIVAL).")

deletes_per_file = _read_all_delete_files(self._io, tasks)
if concurrent_files < 1:
raise ValueError(f"concurrent_files must be >= 1, got {concurrent_files}")

task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)

if order == ScanOrder.ARRIVAL:
# Arrival order: process all tasks sequentially, yielding batches as produced.
# _record_batches_from_scan_tasks_and_deletes handles the limit internally
# when called with all tasks, so no outer limit check is needed.
yield from self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, batch_size)
return
return self._apply_limit(self._iter_batches_arrival(task_list, deletes_per_file, batch_size, concurrent_files))

# Task order: existing behavior with executor.map + list()
total_row_count = 0
return self._apply_limit(self._iter_batches_materialized(task_list, deletes_per_file, batch_size))

def _prepare_tasks_and_deletes(
self, tasks: Iterable[FileScanTask]
) -> tuple[list[FileScanTask], dict[str, list[ChunkedArray]]]:
"""Resolve delete files and return tasks as a list."""
task_list = list(tasks)
deletes_per_file = _read_all_delete_files(self._io, task_list)
return task_list, deletes_per_file

def _iter_batches_arrival(
self,
task_list: list[FileScanTask],
deletes_per_file: dict[str, list[ChunkedArray]],
batch_size: int | None,
concurrent_files: int,
) -> Iterator[pa.RecordBatch]:
"""Yield batches using bounded concurrent streaming in arrival order."""

def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)

yield from _bounded_concurrent_batches(task_list, batch_fn, concurrent_files)

def _iter_batches_materialized(
self,
task_list: list[FileScanTask],
deletes_per_file: dict[str, list[ChunkedArray]],
batch_size: int | None,
) -> Iterator[pa.RecordBatch]:
"""Yield batches using executor.map with full file materialization."""
executor = ExecutorFactory.get_or_create()

def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
# Materialize the iterator here to ensure execution happens within the executor.
# Otherwise, the iterator would be lazily consumed later (in the main thread),
# defeating the purpose of using executor.map.
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size))

limit_reached = False
for batches in executor.map(batches_for_task, tasks):
for batch in batches:
current_batch_size = len(batch)
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
yield batch.slice(0, self._limit - total_row_count)
for batches in executor.map(batches_for_task, task_list):
yield from batches

limit_reached = True
break
else:
yield batch
total_row_count += current_batch_size
def _apply_limit(self, batches: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
"""Apply row limit across batches."""
if self._limit is None:
yield from batches
return

if limit_reached:
# This break will also cancel all running tasks in the executor
break
total_row_count = 0
for batch in batches:
remaining = self._limit - total_row_count
if remaining <= 0:
return
if len(batch) > remaining:
yield batch.slice(0, remaining)
return
yield batch
total_row_count += len(batch)

def _record_batches_from_scan_tasks_and_deletes(
self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]], batch_size: int | None = None
Expand Down
17 changes: 13 additions & 4 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2170,19 +2170,28 @@ def to_arrow(self) -> pa.Table:
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())

def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder = ScanOrder.TASK) -> pa.RecordBatchReader:
def to_arrow_batch_reader(
self, batch_size: int | None = None, order: ScanOrder = ScanOrder.TASK, concurrent_files: int = 1
) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.

For large results, using a RecordBatchReader requires less memory than
loading an Arrow Table for the same DataScan, because a RecordBatch
is read one at a time.

Ordering semantics:
- ScanOrder.TASK (default): Batches are grouped by file in task submission order.
- ScanOrder.ARRIVAL: Batches may be interleaved across files. Within each file,
batch ordering follows row order.

Args:
batch_size: The number of rows per batch. If None, PyArrow's default is used.
order: Controls the order in which record batches are returned.
ScanOrder.TASK (default) returns batches in task order with parallel
file reads. ScanOrder.ARRIVAL yields batches as they are produced,
processing tasks sequentially.
file reads. ScanOrder.ARRIVAL yields batches as they are produced
without materializing entire files into memory.
concurrent_files: Number of files to read concurrently when order=ScanOrder.ARRIVAL.
When > 1, batches may arrive interleaved across files.

Returns:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
Expand All @@ -2195,7 +2204,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder
target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_record_batches(self.plan_files(), batch_size=batch_size, order=order)
).to_record_batches(self.plan_files(), batch_size=batch_size, order=order, concurrent_files=concurrent_files)

return pa.RecordBatchReader.from_batches(
target_schema,
Expand Down
Loading