Skip to content

Commit 2474b12

Browse files
sumedhsakdeoclaude
andcommitted
feat: add ScanOrder enum to ArrowScan.to_record_batches
Introduce ScanOrder.TASK (default) and ScanOrder.ARRIVAL to control batch ordering. TASK materializes each file before yielding; ARRIVAL yields batches as produced for lower memory usage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 70af67f commit 2474b12

File tree

4 files changed

+239
-6
lines changed

4 files changed

+239
-6
lines changed

mkdocs/docs/api.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,15 @@ for buf in tbl.scan().to_arrow_batch_reader(batch_size=1000):
362362
print(f"Buffer contains {len(buf)} rows")
363363
```
364364

365+
By default, each file's batches are materialized in memory before being yielded (`order=ScanOrder.TASK`). For large files that may exceed available memory, use `order=ScanOrder.ARRIVAL` to yield batches as they are produced without materializing entire files:
366+
367+
```python
368+
from pyiceberg.table import ScanOrder
369+
370+
for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, batch_size=1000):
371+
print(f"Buffer contains {len(buf)} rows")
372+
```
373+
365374
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
366375

367376
```python
@@ -1635,6 +1644,17 @@ table.scan(
16351644
).to_arrow_batch_reader(batch_size=1000)
16361645
```
16371646

1647+
Use `order=ScanOrder.ARRIVAL` to avoid materializing entire files in memory. This yields batches as they are produced by PyArrow, one file at a time:
1648+
1649+
```python
1650+
from pyiceberg.table import ScanOrder
1651+
1652+
table.scan(
1653+
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
1654+
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
1655+
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL)
1656+
```
1657+
16381658
### Pandas
16391659

16401660
<!-- prettier-ignore-start -->

pyiceberg/io/pyarrow.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@
141141
visit,
142142
visit_with_partner,
143143
)
144-
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
144+
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, ScanOrder, TableProperties
145145
from pyiceberg.table.locations import load_location_provider
146146
from pyiceberg.table.metadata import TableMetadata
147147
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
@@ -1761,7 +1761,12 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
17611761

17621762
return result
17631763

1764-
def to_record_batches(self, tasks: Iterable[FileScanTask], batch_size: int | None = None) -> Iterator[pa.RecordBatch]:
1764+
def to_record_batches(
1765+
self,
1766+
tasks: Iterable[FileScanTask],
1767+
batch_size: int | None = None,
1768+
order: ScanOrder = ScanOrder.TASK,
1769+
) -> Iterator[pa.RecordBatch]:
17651770
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
17661771
17671772
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
@@ -1770,17 +1775,36 @@ def to_record_batches(self, tasks: Iterable[FileScanTask], batch_size: int | Non
17701775
17711776
Args:
17721777
tasks: FileScanTasks representing the data files and delete files to read from.
1778+
batch_size: The number of rows per batch. If None, PyArrow's default is used.
1779+
order: Controls the order in which record batches are returned.
1780+
ScanOrder.TASK (default) returns batches in task order, with each task
1781+
fully materialized before proceeding to the next. Allows parallel file
1782+
reads via executor. ScanOrder.ARRIVAL yields batches as they are
1783+
produced, processing tasks sequentially without materializing entire
1784+
files into memory.
17731785
17741786
Returns:
17751787
An Iterator of PyArrow RecordBatches.
17761788
Total number of rows will be capped if specified.
17771789
17781790
Raises:
17791791
ResolveError: When a required field cannot be found in the file
1780-
ValueError: When a field type in the file cannot be projected to the schema type
1792+
ValueError: When a field type in the file cannot be projected to the schema type,
1793+
or when an invalid order value is provided.
17811794
"""
1795+
if not isinstance(order, ScanOrder):
1796+
raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder enum value (ScanOrder.TASK or ScanOrder.ARRIVAL).")
1797+
17821798
deletes_per_file = _read_all_delete_files(self._io, tasks)
17831799

1800+
if order == ScanOrder.ARRIVAL:
1801+
# Arrival order: process all tasks sequentially, yielding batches as produced.
1802+
# _record_batches_from_scan_tasks_and_deletes handles the limit internally
1803+
# when called with all tasks, so no outer limit check is needed.
1804+
yield from self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, batch_size)
1805+
return
1806+
1807+
# Task order: existing behavior with executor.map + list()
17841808
total_row_count = 0
17851809
executor = ExecutorFactory.get_or_create()
17861810

pyiceberg/table/__init__.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from abc import ABC, abstractmethod
2424
from collections.abc import Callable, Iterable, Iterator
2525
from dataclasses import dataclass
26+
from enum import Enum
2627
from functools import cached_property
2728
from itertools import chain
2829
from types import TracebackType
@@ -154,6 +155,20 @@
154155
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
155156

156157

158+
class ScanOrder(str, Enum):
159+
"""Order in which record batches are returned from a scan.
160+
161+
Attributes:
162+
TASK: Batches are returned in task order, with each task fully materialized
163+
before proceeding to the next. Allows parallel file reads via executor.
164+
ARRIVAL: Batches are yielded as they are produced, processing tasks
165+
sequentially without materializing entire files into memory.
166+
"""
167+
168+
TASK = "task"
169+
ARRIVAL = "arrival"
170+
171+
157172
@dataclass()
158173
class UpsertResult:
159174
"""Summary the upsert operation."""
@@ -2155,7 +2170,7 @@ def to_arrow(self) -> pa.Table:
21552170
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
21562171
).to_table(self.plan_files())
21572172

2158-
def to_arrow_batch_reader(self, batch_size: int | None = None) -> pa.RecordBatchReader:
2173+
def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder = ScanOrder.TASK) -> pa.RecordBatchReader:
21592174
"""Return an Arrow RecordBatchReader from this DataScan.
21602175
21612176
For large results, using a RecordBatchReader requires less memory than
@@ -2164,6 +2179,10 @@ def to_arrow_batch_reader(self, batch_size: int | None = None) -> pa.RecordBatch
21642179
21652180
Args:
21662181
batch_size: The number of rows per batch. If None, PyArrow's default is used.
2182+
order: Controls the order in which record batches are returned.
2183+
ScanOrder.TASK (default) returns batches in task order with parallel
2184+
file reads. ScanOrder.ARRIVAL yields batches as they are produced,
2185+
processing tasks sequentially.
21672186
21682187
Returns:
21692188
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
@@ -2176,7 +2195,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None) -> pa.RecordBatch
21762195
target_schema = schema_to_pyarrow(self.projection())
21772196
batches = ArrowScan(
21782197
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2179-
).to_record_batches(self.plan_files(), batch_size=batch_size)
2198+
).to_record_batches(self.plan_files(), batch_size=batch_size, order=order)
21802199

21812200
return pa.RecordBatchReader.from_batches(
21822201
target_schema,

tests/io/test_pyarrow.py

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
8787
from pyiceberg.partitioning import PartitionField, PartitionSpec
8888
from pyiceberg.schema import Schema, make_compatible_name, visit
89-
from pyiceberg.table import FileScanTask, TableProperties
89+
from pyiceberg.table import FileScanTask, ScanOrder, TableProperties
9090
from pyiceberg.table.metadata import TableMetadataV2
9191
from pyiceberg.table.name_mapping import create_mapping_from_schema
9292
from pyiceberg.transforms import HourTransform, IdentityTransform
@@ -3106,6 +3106,176 @@ def test_task_to_record_batches_default_batch_size(tmpdir: str) -> None:
31063106
assert len(batches[0]) == num_rows
31073107

31083108

3109+
def _create_scan_and_tasks(
3110+
tmpdir: str,
3111+
num_files: int = 1,
3112+
rows_per_file: int = 100,
3113+
limit: int | None = None,
3114+
delete_rows_per_file: list[list[int]] | None = None,
3115+
) -> tuple[ArrowScan, list[FileScanTask]]:
3116+
"""Helper to create an ArrowScan and FileScanTasks for testing.
3117+
3118+
Args:
3119+
delete_rows_per_file: If provided, a list of lists of row positions to delete
3120+
per file. Length must match num_files. Each inner list contains 0-based
3121+
row positions within that file to mark as positionally deleted.
3122+
"""
3123+
table_schema = Schema(NestedField(1, "col", LongType(), required=True))
3124+
pa_schema = pa.schema([pa.field("col", pa.int64(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"})])
3125+
tasks = []
3126+
for i in range(num_files):
3127+
start = i * rows_per_file
3128+
arrow_table = pa.table({"col": pa.array(range(start, start + rows_per_file))}, schema=pa_schema)
3129+
data_file = _write_table_to_data_file(f"{tmpdir}/file_{i}.parquet", pa_schema, arrow_table)
3130+
data_file.spec_id = 0
3131+
3132+
delete_files = set()
3133+
if delete_rows_per_file and delete_rows_per_file[i]:
3134+
delete_table = pa.table(
3135+
{
3136+
"file_path": [data_file.file_path] * len(delete_rows_per_file[i]),
3137+
"pos": delete_rows_per_file[i],
3138+
}
3139+
)
3140+
delete_path = f"{tmpdir}/deletes_{i}.parquet"
3141+
pq.write_table(delete_table, delete_path)
3142+
delete_files.add(
3143+
DataFile.from_args(
3144+
content=DataFileContent.POSITION_DELETES,
3145+
file_path=delete_path,
3146+
file_format=FileFormat.PARQUET,
3147+
partition={},
3148+
record_count=len(delete_rows_per_file[i]),
3149+
file_size_in_bytes=22,
3150+
)
3151+
)
3152+
3153+
tasks.append(FileScanTask(data_file=data_file, delete_files=delete_files))
3154+
3155+
scan = ArrowScan(
3156+
table_metadata=TableMetadataV2(
3157+
location="file://a/b/",
3158+
last_column_id=1,
3159+
format_version=2,
3160+
schemas=[table_schema],
3161+
partition_specs=[PartitionSpec()],
3162+
),
3163+
io=PyArrowFileIO(),
3164+
projected_schema=table_schema,
3165+
row_filter=AlwaysTrue(),
3166+
case_sensitive=True,
3167+
limit=limit,
3168+
)
3169+
return scan, tasks
3170+
3171+
3172+
def test_task_order_produces_same_results(tmpdir: str) -> None:
3173+
"""Test that order=ScanOrder.TASK produces the same results as the default behavior."""
3174+
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100)
3175+
3176+
batches_default = list(scan.to_record_batches(tasks, order=ScanOrder.TASK))
3177+
# Re-create tasks since iterators are consumed
3178+
_, tasks2 = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100)
3179+
batches_task_order = list(scan.to_record_batches(tasks2, order=ScanOrder.TASK))
3180+
3181+
total_default = sum(len(b) for b in batches_default)
3182+
total_task_order = sum(len(b) for b in batches_task_order)
3183+
assert total_default == 300
3184+
assert total_task_order == 300
3185+
3186+
3187+
def test_arrival_order_yields_all_batches(tmpdir: str) -> None:
3188+
"""Test that order=ScanOrder.ARRIVAL yields all batches correctly."""
3189+
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100)
3190+
3191+
batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))
3192+
3193+
total_rows = sum(len(b) for b in batches)
3194+
assert total_rows == 300
3195+
# Verify all values are present
3196+
all_values = sorted([v for b in batches for v in b.column("col").to_pylist()])
3197+
assert all_values == list(range(300))
3198+
3199+
3200+
def test_arrival_order_with_limit(tmpdir: str) -> None:
3201+
"""Test that order=ScanOrder.ARRIVAL respects the row limit."""
3202+
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100, limit=150)
3203+
3204+
batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))
3205+
3206+
total_rows = sum(len(b) for b in batches)
3207+
assert total_rows == 150
3208+
3209+
3210+
def test_arrival_order_file_ordering_preserved(tmpdir: str) -> None:
3211+
"""Test that file ordering is preserved in arrival order mode."""
3212+
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100)
3213+
3214+
batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))
3215+
all_values = [v for b in batches for v in b.column("col").to_pylist()]
3216+
3217+
# Values should be in file order: 0-99 from file 0, 100-199 from file 1, 200-299 from file 2
3218+
assert all_values == list(range(300))
3219+
3220+
3221+
def test_arrival_order_with_positional_deletes(tmpdir: str) -> None:
3222+
"""Test that order=ScanOrder.ARRIVAL correctly applies positional deletes."""
3223+
# 3 files, 10 rows each; delete rows 0,5 from file 0, row 3 from file 1, nothing from file 2
3224+
scan, tasks = _create_scan_and_tasks(
3225+
tmpdir,
3226+
num_files=3,
3227+
rows_per_file=10,
3228+
delete_rows_per_file=[[0, 5], [3], []],
3229+
)
3230+
3231+
batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))
3232+
3233+
total_rows = sum(len(b) for b in batches)
3234+
assert total_rows == 27 # 30 - 3 deletes
3235+
all_values = sorted([v for b in batches for v in b.column("col").to_pylist()])
3236+
# File 0: 0-9, delete rows 0,5 → values 1,2,3,4,6,7,8,9
3237+
# File 1: 10-19, delete row 3 → values 10,11,12,14,15,16,17,18,19
3238+
# File 2: 20-29, no deletes → values 20-29
3239+
expected = [1, 2, 3, 4, 6, 7, 8, 9] + [10, 11, 12, 14, 15, 16, 17, 18, 19] + list(range(20, 30))
3240+
assert all_values == sorted(expected)
3241+
3242+
3243+
def test_arrival_order_with_positional_deletes_and_limit(tmpdir: str) -> None:
3244+
"""Test that order=ScanOrder.ARRIVAL with positional deletes respects the row limit."""
3245+
# 3 files, 10 rows each; delete row 0 from each file
3246+
scan, tasks = _create_scan_and_tasks(
3247+
tmpdir,
3248+
num_files=3,
3249+
rows_per_file=10,
3250+
limit=15,
3251+
delete_rows_per_file=[[0], [0], [0]],
3252+
)
3253+
3254+
batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))
3255+
3256+
total_rows = sum(len(b) for b in batches)
3257+
assert total_rows == 15
3258+
3259+
3260+
def test_task_order_with_positional_deletes(tmpdir: str) -> None:
3261+
"""Test that the default task order mode correctly applies positional deletes."""
3262+
# 3 files, 10 rows each; delete rows from each file
3263+
scan, tasks = _create_scan_and_tasks(
3264+
tmpdir,
3265+
num_files=3,
3266+
rows_per_file=10,
3267+
delete_rows_per_file=[[0, 5], [3], []],
3268+
)
3269+
3270+
batches = list(scan.to_record_batches(tasks, order=ScanOrder.TASK))
3271+
3272+
total_rows = sum(len(b) for b in batches)
3273+
assert total_rows == 27 # 30 - 3 deletes
3274+
all_values = sorted([v for b in batches for v in b.column("col").to_pylist()])
3275+
expected = [1, 2, 3, 4, 6, 7, 8, 9] + [10, 11, 12, 14, 15, 16, 17, 18, 19] + list(range(20, 30))
3276+
assert all_values == sorted(expected)
3277+
3278+
31093279
def test_parse_location_defaults() -> None:
31103280
"""Test that parse_location uses defaults."""
31113281

0 commit comments

Comments
 (0)