Skip to content

Commit d61f46f

Browse files
authored
feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent reads (#44)
> Backport of apache/iceberg-python#3046 ## Summary Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() which eagerly materializes all record batches per file into memory, causing OOM on large tables. This PR adds a new `order` parameter to `to_arrow_batch_reader()` with two implementations: - `TaskOrder` (default) — preserves existing behavior: batches grouped by file in task submission order, each file fully materialized before proceeding to the next. - `ArrivalOrder` — yields batches as they are produced across files without materializing entire files into memory. Accepts three sub-parameters: - `concurrent_streams: int` — number of files to read concurrently (default: 8). A per-scan `ThreadPoolExecutor(max_workers=concurrent_streams)` bounds concurrency. - `batch_size: int | None` — number of rows per batch passed to PyArrow's ds.Scanner (default: PyArrow's built-in 131,072). - `max_buffered_batches: int` — size of the bounded queue between producers and consumer (default: 16), providing backpressure to cap memory usage. ## Problem The current implementation materializes all batches from each file via list() inside executor.map, which runs up to min(32, cpu_count+4) files in parallel. For large files this means all batches from ~20 files are held in memory simultaneously before any are yielded to the consumer. ## Solution ### Before: OOM on large tables ```python batches = table.scan().to_arrow_batch_reader() ``` ### After: bounded memory, tunable parallelism ```python from pyiceberg.table import ArrivalOrder batches = table.scan().to_arrow_batch_reader( order=ArrivalOrder(concurrent_streams=4, batch_size=10000), ) ``` Default behavior is unchanged — `TaskOrder` preserves the existing executor.map + list() path for backwards compatibility. ## Architecture When `order=ArrivalOrder(...)`, batches flow through `_bounded_concurrent_batches`: 1. All file tasks are submitted to a per-scan `ThreadPoolExecutor(max_workers=concurrent_streams)` 2. Workers push batches into a bounded `Queue(maxsize=max_buffered_batches)` — when full, workers block (backpressure) 3. The consumer yields batches from the queue via blocking `queue.get()` 4. A sentinel value signals completion — no timeout-based polling 5. On early termination (consumer stops), a cancel event is set and the queue is drained until the sentinel to unblock all stuck workers 6. The executor context manager handles deterministic shutdown Refactored `to_record_batches` into helpers: `_prepare_tasks_and_deletes`, `_iter_batches_arrival`, `_iter_batches_materialized`, `_apply_limit`. ## Ordering semantics | Configuration | File ordering | Within-file ordering | |---|---|---| | `TaskOrder()` (default) | Batches grouped by file, in task submission order | Row order | | `ArrivalOrder(concurrent_streams=1)` | Grouped by file, sequential | Row order | | `ArrivalOrder(concurrent_streams>1)` | **Interleaved** (no grouping guarantee) | Row order within each file | ## Benchmark results 32 files × 500K rows, 5 columns (int64, float64, string, bool, timestamp), batch_size=131,072 (PyArrow default): | Config | Throughput (rows/s) | TTFR (ms) | Peak Arrow Memory | |---|---|---|---| | default (TaskOrder) | 190,250,192 | 73.4 | 642.2 MB | | ArrivalOrder(cs=1) | 59,317,085 | 27.7 | 10.3 MB | | ArrivalOrder(cs=2) | 105,414,909 | 28.8 | 42.0 MB | | ArrivalOrder(cs=4) | 175,840,782 | 28.4 | 105.5 MB | | ArrivalOrder(cs=8) | 211,922,538 | 32.3 | 271.7 MB | | ArrivalOrder(cs=16) | 209,011,424 | 45.0 | 473.3 MB | *TTFR = Time to First Record, cs = concurrent_streams* ## Are these changes tested? Yes. 25 new unit tests across two test files, plus a micro-benchmark. ## Are there any user-facing changes? Yes. New `order` parameter on `DataScan.to_arrow_batch_reader()`: - `order: ScanOrder | None` — controls batch ordering. Pass `TaskOrder()` (default) or `ArrivalOrder(concurrent_streams=N, batch_size=B, max_buffered_batches=M)`. New public classes `TaskOrder` and `ArrivalOrder` (subclasses of `ScanOrder`) exported from `pyiceberg.table`. All parameters are optional with backwards-compatible defaults. Existing code is unaffected. Documentation updated in `mkdocs/docs/api.md` with usage examples, ordering semantics, and configuration guidance table.
1 parent 714a804 commit d61f46f

File tree

6 files changed

+1074
-40
lines changed

6 files changed

+1074
-40
lines changed

mkdocs/docs/api.md

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,62 @@ for buf in tbl.scan().to_arrow_batch_reader():
355355
print(f"Buffer contains {len(buf)} rows")
356356
```
357357

358+
By default, each file's batches are materialized in memory before being yielded (`TaskOrder()`). For large files that may exceed available memory, use `ArrivalOrder()` to yield batches as they are produced without materializing entire files:
359+
360+
```python
361+
from pyiceberg.table import ArrivalOrder
362+
363+
for buf in tbl.scan().to_arrow_batch_reader(order=ArrivalOrder()):
364+
print(f"Buffer contains {len(buf)} rows")
365+
```
366+
367+
For maximum throughput, tune `concurrent_streams` to read multiple files in parallel with arrival order. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
368+
369+
```python
370+
from pyiceberg.table import ArrivalOrder
371+
372+
for buf in tbl.scan().to_arrow_batch_reader(order=ArrivalOrder(concurrent_streams=4)):
373+
print(f"Buffer contains {len(buf)} rows")
374+
```
375+
376+
**Ordering semantics:**
377+
378+
| Configuration | File ordering | Within-file ordering |
379+
|---|---|---|
380+
| `TaskOrder()` (default) | Batches grouped by file, in task submission order | Row order |
381+
| `ArrivalOrder(concurrent_streams=1)` | Sequential, one file at a time | Row order |
382+
| `ArrivalOrder(concurrent_streams>1)` | Interleaved across files (no grouping guarantee) | Row order within each file |
383+
384+
The `limit` parameter is enforced correctly regardless of configuration.
385+
386+
**Which configuration should I use?**
387+
388+
| Use case | Recommended config |
389+
|---|---|
390+
| Small tables, simple queries | Default — no extra args needed |
391+
| Large tables, maximum throughput with bounded memory | `order=ArrivalOrder(concurrent_streams=N)` — tune N to balance throughput vs memory |
392+
| Fine-grained memory control | `order=ArrivalOrder(concurrent_streams=N, batch_size=M, max_buffered_batches=K)` — tune all parameters |
393+
394+
**Memory usage and performance characteristics:**
395+
396+
- **TaskOrder (default)**: Uses full file materialization. Each file is loaded entirely into memory before yielding batches. Memory usage depends on file sizes.
397+
- **ArrivalOrder**: Uses streaming with controlled memory usage. Memory is bounded by the batch buffering mechanism.
398+
399+
**Memory formula for ArrivalOrder:**
400+
401+
```text
402+
Peak memory ≈ concurrent_streams × batch_size × max_buffered_batches × (average row size in bytes)
403+
```
404+
405+
Where:
406+
407+
- `concurrent_streams`: Number of files read in parallel (default: 8)
408+
- `batch_size`: Number of rows per batch (default: 131,072, can be set via ArrivalOrder constructor)
409+
- `max_buffered_batches`: Internal buffering parameter (default: 16, can be tuned for advanced use cases)
410+
- Average row size depends on your schema and data; multiply the above by it to estimate bytes.
411+
412+
**Note:** `ArrivalOrder()` yields batches in arrival order (interleaved across files when `concurrent_streams > 1`). For deterministic file ordering, use the default `TaskOrder()` mode. The `batch_size` parameter in `ArrivalOrder` controls streaming memory usage, while `TaskOrder` uses full file materialization regardless of batch size.
413+
358414
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
359415

360416
```python
@@ -1619,6 +1675,30 @@ table.scan(
16191675
).to_arrow_batch_reader()
16201676
```
16211677

1678+
To avoid materializing entire files in memory, use `ArrivalOrder` which yields batches as they are produced by PyArrow:
1679+
1680+
```python
1681+
from pyiceberg.table import ArrivalOrder
1682+
1683+
table.scan(
1684+
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
1685+
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
1686+
).to_arrow_batch_reader(order=ArrivalOrder())
1687+
```
1688+
1689+
For concurrent file reads with arrival order, use `concurrent_streams`. Note that batch ordering across files is not guaranteed:
1690+
1691+
```python
1692+
from pyiceberg.table import ArrivalOrder
1693+
1694+
table.scan(
1695+
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
1696+
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
1697+
).to_arrow_batch_reader(order=ArrivalOrder(concurrent_streams=16))
1698+
```
1699+
1700+
When using `concurrent_streams > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Apache Arrow section](#apache-arrow) above for details.
1701+
16221702
### Pandas
16231703

16241704
<!-- prettier-ignore-start -->

pyiceberg/io/pyarrow.py

Lines changed: 177 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@
3333
import logging
3434
import operator
3535
import os
36+
import queue
3637
import re
38+
import threading
3739
import uuid
3840
import warnings
3941
from abc import ABC, abstractmethod
40-
from collections.abc import Callable, Iterable, Iterator
42+
from collections.abc import Callable, Generator, Iterable, Iterator
43+
from concurrent.futures import ThreadPoolExecutor
4144
from copy import copy
4245
from dataclasses import dataclass
4346
from enum import Enum
@@ -141,7 +144,7 @@
141144
visit,
142145
visit_with_partner,
143146
)
144-
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
147+
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, ArrivalOrder, ScanOrder, TableProperties, TaskOrder
145148
from pyiceberg.table.locations import load_location_provider
146149
from pyiceberg.table.metadata import TableMetadata
147150
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
@@ -1581,6 +1584,7 @@ def _task_to_record_batches(
15811584
partition_spec: PartitionSpec | None = None,
15821585
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
15831586
downcast_ns_timestamp_to_us: bool | None = None,
1587+
batch_size: int | None = None,
15841588
) -> Iterator[pa.RecordBatch]:
15851589
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
15861590
with io.new_input(task.file.file_path).open() as fin:
@@ -1612,14 +1616,18 @@ def _task_to_record_batches(
16121616

16131617
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
16141618

1615-
fragment_scanner = ds.Scanner.from_fragment(
1616-
fragment=fragment,
1617-
schema=physical_schema,
1619+
scanner_kwargs: dict[str, Any] = {
1620+
"fragment": fragment,
1621+
"schema": physical_schema,
16181622
# This will push down the query to Arrow.
16191623
# But in case there are positional deletes, we have to apply them first
1620-
filter=pyarrow_filter if not positional_deletes else None,
1621-
columns=[col.name for col in file_project_schema.columns],
1622-
)
1624+
"filter": pyarrow_filter if not positional_deletes else None,
1625+
"columns": [col.name for col in file_project_schema.columns],
1626+
}
1627+
if batch_size is not None:
1628+
scanner_kwargs["batch_size"] = batch_size
1629+
1630+
fragment_scanner = ds.Scanner.from_fragment(**scanner_kwargs)
16231631

16241632
next_index = 0
16251633
batches = fragment_scanner.to_batches()
@@ -1677,6 +1685,86 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
16771685
return deletes_per_file
16781686

16791687

1688+
_QUEUE_SENTINEL = object()
1689+
1690+
1691+
def _bounded_concurrent_batches(
1692+
tasks: list[FileScanTask],
1693+
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
1694+
concurrent_streams: int,
1695+
max_buffered_batches: int = 16,
1696+
) -> Generator[pa.RecordBatch, None, None]:
1697+
"""Read batches from multiple files concurrently with bounded memory.
1698+
1699+
Uses a per-scan ThreadPoolExecutor(max_workers=concurrent_streams) to naturally
1700+
bound concurrency. Workers push batches into a bounded queue which provides
1701+
backpressure when the consumer is slower than the producers.
1702+
1703+
Args:
1704+
tasks: The file scan tasks to process.
1705+
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
1706+
concurrent_streams: Maximum number of concurrent read streams.
1707+
max_buffered_batches: Maximum number of batches to buffer in the queue.
1708+
"""
1709+
if not tasks:
1710+
return
1711+
1712+
batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = queue.Queue(maxsize=max_buffered_batches)
1713+
cancel = threading.Event()
1714+
remaining = len(tasks)
1715+
remaining_lock = threading.Lock()
1716+
1717+
def worker(task: FileScanTask) -> None:
1718+
nonlocal remaining
1719+
try:
1720+
for batch in batch_fn(task):
1721+
if cancel.is_set():
1722+
return
1723+
batch_queue.put(batch)
1724+
except BaseException as e:
1725+
if not cancel.is_set():
1726+
batch_queue.put(e)
1727+
finally:
1728+
with remaining_lock:
1729+
remaining -= 1
1730+
is_last = remaining == 0
1731+
if is_last:
1732+
batch_queue.put(_QUEUE_SENTINEL)
1733+
1734+
with ThreadPoolExecutor(max_workers=concurrent_streams) as executor:
1735+
for task in tasks:
1736+
executor.submit(worker, task)
1737+
1738+
saw_sentinel = False
1739+
try:
1740+
while True:
1741+
item = batch_queue.get()
1742+
1743+
if item is _QUEUE_SENTINEL:
1744+
saw_sentinel = True
1745+
break
1746+
1747+
if isinstance(item, BaseException):
1748+
raise item
1749+
1750+
yield item
1751+
finally:
1752+
cancel.set()
1753+
if not saw_sentinel:
1754+
# Drain the queue to unblock workers stuck on put().
1755+
# Each get() wakes one waiting producer; that producer checks
1756+
# cancel and returns, eventually allowing the last worker to
1757+
# put the sentinel. We stop only when we see the sentinel,
1758+
# which guarantees all workers have finished.
1759+
while True:
1760+
item = batch_queue.get()
1761+
if item is _QUEUE_SENTINEL:
1762+
break
1763+
1764+
1765+
_DEFAULT_SCAN_ORDER: ScanOrder = TaskOrder()
1766+
1767+
16801768
class ArrowScan:
16811769
_table_metadata: TableMetadata
16821770
_io: FileIO
@@ -1756,54 +1844,112 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
17561844

17571845
return result
17581846

1759-
def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
1847+
def to_record_batches(
1848+
self,
1849+
tasks: Iterable[FileScanTask],
1850+
order: ScanOrder = _DEFAULT_SCAN_ORDER,
1851+
) -> Iterator[pa.RecordBatch]:
17601852
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
17611853
17621854
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
17631855
by resolving the right columns that match the current table schema.
17641856
Only data that matches the provided row_filter expression is returned.
17651857
1858+
Ordering semantics:
1859+
- TaskOrder() (default): Yields batches one file at a time in task submission order.
1860+
- ArrivalOrder(): Batches may be interleaved across files as they arrive.
1861+
Within each file, batch ordering follows row order.
1862+
17661863
Args:
17671864
tasks: FileScanTasks representing the data files and delete files to read from.
1865+
order: Controls the order in which record batches are returned.
1866+
TaskOrder() (default) yields batches one file at a time in task order.
1867+
ArrivalOrder(concurrent_streams=N, batch_size=B, max_buffered_batches=M)
1868+
yields batches as they are produced without materializing entire files
1869+
into memory. Peak memory ≈ concurrent_streams × batch_size × max_buffered_batches
1870+
× (average row size in bytes). batch_size is the number of rows per batch.
1871+
For example (if average row size ≈ 32 bytes):
1872+
- ArrivalOrder(concurrent_streams=4, batch_size=32768, max_buffered_batches=8)
1873+
- Peak memory ≈ 4 × 32768 rows × 8 × 32 bytes ≈ ~32 MB (plus Arrow overhead)
17681874
17691875
Returns:
17701876
An Iterator of PyArrow RecordBatches.
17711877
Total number of rows will be capped if specified.
17721878
17731879
Raises:
17741880
ResolveError: When a required field cannot be found in the file
1775-
ValueError: When a field type in the file cannot be projected to the schema type
1881+
ValueError: When a field type in the file cannot be projected to the schema type,
1882+
or when concurrent_streams < 1.
17761883
"""
1777-
deletes_per_file = _read_all_delete_files(self._io, tasks)
1884+
task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)
1885+
1886+
if isinstance(order, ArrivalOrder):
1887+
if order.concurrent_streams < 1:
1888+
raise ValueError(f"concurrent_streams must be >= 1, got {order.concurrent_streams}")
1889+
return self._apply_limit(
1890+
self._iter_batches_arrival(
1891+
task_list, deletes_per_file, order.batch_size, order.concurrent_streams, order.max_buffered_batches
1892+
)
1893+
)
17781894

1779-
total_row_count = 0
1895+
return self._apply_limit(self._iter_batches_materialized(task_list, deletes_per_file))
1896+
1897+
def _prepare_tasks_and_deletes(
1898+
self, tasks: Iterable[FileScanTask]
1899+
) -> tuple[list[FileScanTask], dict[str, list[ChunkedArray]]]:
1900+
"""Resolve delete files and return tasks as a list."""
1901+
task_list = list(tasks)
1902+
deletes_per_file = _read_all_delete_files(self._io, task_list)
1903+
return task_list, deletes_per_file
1904+
1905+
def _iter_batches_arrival(
1906+
self,
1907+
task_list: list[FileScanTask],
1908+
deletes_per_file: dict[str, list[ChunkedArray]],
1909+
batch_size: int | None,
1910+
concurrent_streams: int,
1911+
max_buffered_batches: int = 16,
1912+
) -> Iterator[pa.RecordBatch]:
1913+
"""Yield batches using bounded concurrent streaming in arrival order."""
1914+
1915+
def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
1916+
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)
1917+
1918+
yield from _bounded_concurrent_batches(task_list, batch_fn, concurrent_streams, max_buffered_batches)
1919+
1920+
def _iter_batches_materialized(
1921+
self,
1922+
task_list: list[FileScanTask],
1923+
deletes_per_file: dict[str, list[ChunkedArray]],
1924+
) -> Iterator[pa.RecordBatch]:
1925+
"""Yield batches using executor.map with full file materialization."""
17801926
executor = ExecutorFactory.get_or_create()
17811927

17821928
def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
1783-
# Materialize the iterator here to ensure execution happens within the executor.
1784-
# Otherwise, the iterator would be lazily consumed later (in the main thread),
1785-
# defeating the purpose of using executor.map.
17861929
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
17871930

1788-
limit_reached = False
1789-
for batches in executor.map(batches_for_task, tasks):
1790-
for batch in batches:
1791-
current_batch_size = len(batch)
1792-
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
1793-
yield batch.slice(0, self._limit - total_row_count)
1931+
for batches in executor.map(batches_for_task, task_list):
1932+
yield from batches
17941933

1795-
limit_reached = True
1796-
break
1797-
else:
1798-
yield batch
1799-
total_row_count += current_batch_size
1934+
def _apply_limit(self, batches: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
1935+
"""Apply row limit across batches."""
1936+
if self._limit is None:
1937+
yield from batches
1938+
return
18001939

1801-
if limit_reached:
1802-
# This break will also cancel all running tasks in the executor
1803-
break
1940+
total_row_count = 0
1941+
for batch in batches:
1942+
remaining = self._limit - total_row_count
1943+
if remaining <= 0:
1944+
return
1945+
if len(batch) > remaining:
1946+
yield batch.slice(0, remaining)
1947+
return
1948+
yield batch
1949+
total_row_count += len(batch)
18041950

18051951
def _record_batches_from_scan_tasks_and_deletes(
1806-
self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]]
1952+
self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]], batch_size: int | None = None
18071953
) -> Iterator[pa.RecordBatch]:
18081954
total_row_count = 0
18091955
for task in tasks:
@@ -1822,6 +1968,7 @@ def _record_batches_from_scan_tasks_and_deletes(
18221968
self._table_metadata.specs().get(task.file.spec_id),
18231969
self._table_metadata.format_version,
18241970
self._downcast_ns_timestamp_to_us,
1971+
batch_size,
18251972
)
18261973
for batch in batches:
18271974
if self._limit is not None:

0 commit comments

Comments
 (0)