Skip to content

Commit 7c415d4

Browse files
sumedhsakdeoclaude
andcommitted
refactor: replace streaming param with order=ScanOrder in concurrent tests and docs
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4186713 commit 7c415d4

File tree

2 files changed

+14
-10
lines changed

2 files changed

+14
-10
lines changed

mkdocs/docs/api.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,19 +371,21 @@ for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, batch_size=
371371
print(f"Buffer contains {len(buf)} rows")
372372
```
373373

374-
For maximum throughput, use `concurrent_files` to read multiple files in parallel while streaming. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
374+
For maximum throughput, use `concurrent_files` to read multiple files in parallel with arrival order. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
375375

376376
```python
377-
for buf in tbl.scan().to_arrow_batch_reader(streaming=True, concurrent_files=4, batch_size=1000):
377+
from pyiceberg.table import ScanOrder
378+
379+
for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, concurrent_files=4, batch_size=1000):
378380
print(f"Buffer contains {len(buf)} rows")
379381
```
380382

381383
**Ordering semantics:**
382384

383385
| Configuration | File ordering | Within-file ordering |
384386
|---|---|---|
385-
| Default (`streaming=False`) | Batches grouped by file, in task submission order | Row order |
386-
| `streaming=True` | Interleaved across files (no grouping guarantee) | Row order within each file |
387+
| `ScanOrder.TASK` (default) | Batches grouped by file, in task submission order | Row order |
388+
| `ScanOrder.ARRIVAL` | Interleaved across files (no grouping guarantee) | Row order within each file |
387389

388390
Within each file, batch ordering always follows row order. The `limit` parameter is enforced correctly regardless of configuration.
389391

@@ -1671,13 +1673,15 @@ table.scan(
16711673
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL)
16721674
```
16731675

1674-
For concurrent file reads with streaming, use `concurrent_files`. Note that batch ordering across files is not guaranteed:
1676+
For concurrent file reads with arrival order, use `concurrent_files`. Note that batch ordering across files is not guaranteed:
16751677

16761678
```python
1679+
from pyiceberg.table import ScanOrder
1680+
16771681
table.scan(
16781682
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
16791683
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
1680-
).to_arrow_batch_reader(streaming=True, concurrent_files=4)
1684+
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL, concurrent_files=4)
16811685
```
16821686

16831687
When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Apache Arrow section](#apache-arrow) above for details.

tests/io/test_pyarrow.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3277,7 +3277,7 @@ def test_task_order_with_positional_deletes(tmpdir: str) -> None:
32773277

32783278

32793279
def test_concurrent_files_with_positional_deletes(tmpdir: str) -> None:
3280-
"""Test that streaming=True with concurrent_files correctly applies positional deletes."""
3280+
"""Test that order=ScanOrder.ARRIVAL with concurrent_files correctly applies positional deletes."""
32813281
# 4 files, 10 rows each; delete different rows per file
32823282
scan, tasks = _create_scan_and_tasks(
32833283
tmpdir,
@@ -3286,7 +3286,7 @@ def test_concurrent_files_with_positional_deletes(tmpdir: str) -> None:
32863286
delete_rows_per_file=[[0, 9], [4, 5], [0, 1, 2], []],
32873287
)
32883288

3289-
batches = list(scan.to_record_batches(tasks, streaming=True, concurrent_files=2))
3289+
batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL, concurrent_files=2))
32903290

32913291
total_rows = sum(len(b) for b in batches)
32923292
assert total_rows == 33 # 40 - 7 deletes
@@ -3310,7 +3310,7 @@ def test_concurrent_files_with_positional_deletes_and_limit(tmpdir: str) -> None
33103310
delete_rows_per_file=[[0], [0], [0], [0]],
33113311
)
33123312

3313-
batches = list(scan.to_record_batches(tasks, streaming=True, concurrent_files=2))
3313+
batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL, concurrent_files=2))
33143314

33153315
total_rows = sum(len(b) for b in batches)
33163316
assert total_rows == 20
@@ -3321,7 +3321,7 @@ def test_concurrent_files_invalid_value(tmpdir: str) -> None:
33213321
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=1, rows_per_file=10)
33223322

33233323
with pytest.raises(ValueError, match="concurrent_files must be >= 1"):
3324-
list(scan.to_record_batches(tasks, streaming=True, concurrent_files=0))
3324+
list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL, concurrent_files=0))
33253325

33263326

33273327
def test_parse_location_defaults() -> None:

0 commit comments

Comments
 (0)