Skip to content

Commit 48b332a

Browse files
sumedhsakdeoclaude
andcommitted
feat: add concurrent_files flag for bounded concurrent streaming
Add _bounded_concurrent_batches() with proper lock discipline: - Queue backpressure caps memory (scan.max-buffered-batches, default 16) - Semaphore limits concurrent file reads (concurrent_files param) - Cancel event with timeouts on all blocking ops (no lock over IO) - Error propagation and early termination support When streaming=True and concurrent_files > 1, batches are yielded as they arrive from parallel file reads. File ordering is not guaranteed (documented). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 2474b12 commit 48b332a

File tree

5 files changed

+463
-16
lines changed

5 files changed

+463
-16
lines changed

mkdocs/docs/api.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,22 @@ for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, batch_size=
371371
print(f"Buffer contains {len(buf)} rows")
372372
```
373373

374+
For maximum throughput, use `concurrent_files` to read multiple files in parallel while streaming. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
375+
376+
```python
377+
for buf in tbl.scan().to_arrow_batch_reader(streaming=True, concurrent_files=4, batch_size=1000):
378+
print(f"Buffer contains {len(buf)} rows")
379+
```
380+
381+
**Ordering semantics:**
382+
383+
| Configuration | File ordering | Within-file ordering |
384+
|---|---|---|
385+
| Default (`streaming=False`) | Batches grouped by file, in task submission order | Row order |
386+
| `streaming=True` | Interleaved across files (no grouping guarantee) | Row order within each file |
387+
388+
Within each file, batch ordering always follows row order. The `limit` parameter is enforced correctly regardless of configuration.
389+
374390
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
375391

376392
```python
@@ -1655,6 +1671,17 @@ table.scan(
16551671
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL)
16561672
```
16571673

1674+
For concurrent file reads with streaming, use `concurrent_files`. Note that batch ordering across files is not guaranteed:
1675+
1676+
```python
1677+
table.scan(
1678+
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
1679+
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
1680+
).to_arrow_batch_reader(streaming=True, concurrent_files=4)
1681+
```
1682+
1683+
When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Apache Arrow section](#apache-arrow) above for details.
1684+
16581685
### Pandas
16591686

16601687
<!-- prettier-ignore-start -->

pyiceberg/io/pyarrow.py

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333
import logging
3434
import operator
3535
import os
36+
import queue
3637
import re
38+
import threading
3739
import uuid
3840
import warnings
3941
from abc import ABC, abstractmethod
40-
from collections.abc import Callable, Iterable, Iterator
42+
from collections.abc import Callable, Generator, Iterable, Iterator
4143
from copy import copy
4244
from dataclasses import dataclass
4345
from enum import Enum
@@ -1682,6 +1684,87 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
16821684
return deletes_per_file
16831685

16841686

1687+
_QUEUE_SENTINEL = object()
1688+
1689+
1690+
def _bounded_concurrent_batches(
1691+
tasks: list[FileScanTask],
1692+
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
1693+
concurrent_files: int,
1694+
max_buffered_batches: int = 16,
1695+
) -> Generator[pa.RecordBatch, None, None]:
1696+
"""Read batches from multiple files concurrently with bounded memory.
1697+
1698+
Workers read from files in parallel (up to concurrent_files at a time) and push
1699+
batches into a shared queue. The consumer yields batches from the queue.
1700+
A sentinel value signals completion, avoiding timeout-based polling.
1701+
1702+
Args:
1703+
tasks: The file scan tasks to process.
1704+
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
1705+
concurrent_files: Maximum number of files to read concurrently.
1706+
max_buffered_batches: Maximum number of batches to buffer in the queue.
1707+
"""
1708+
if not tasks:
1709+
return
1710+
1711+
batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = queue.Queue(maxsize=max_buffered_batches)
1712+
cancel_event = threading.Event()
1713+
pending_count = len(tasks)
1714+
pending_lock = threading.Lock()
1715+
file_semaphore = threading.Semaphore(concurrent_files)
1716+
1717+
def worker(task: FileScanTask) -> None:
1718+
nonlocal pending_count
1719+
try:
1720+
# Blocking acquire — on cancellation, extra permits are released to unblock.
1721+
file_semaphore.acquire()
1722+
if cancel_event.is_set():
1723+
return
1724+
1725+
for batch in batch_fn(task):
1726+
if cancel_event.is_set():
1727+
return
1728+
batch_queue.put(batch)
1729+
except BaseException as e:
1730+
if not cancel_event.is_set():
1731+
batch_queue.put(e)
1732+
finally:
1733+
file_semaphore.release()
1734+
with pending_lock:
1735+
pending_count -= 1
1736+
if pending_count == 0:
1737+
batch_queue.put(_QUEUE_SENTINEL)
1738+
1739+
executor = ExecutorFactory.get_or_create()
1740+
futures = [executor.submit(worker, task) for task in tasks]
1741+
1742+
try:
1743+
while True:
1744+
item = batch_queue.get()
1745+
1746+
if item is _QUEUE_SENTINEL:
1747+
break
1748+
1749+
if isinstance(item, BaseException):
1750+
raise item
1751+
1752+
yield item
1753+
finally:
1754+
cancel_event.set()
1755+
# Release semaphore permits to unblock any workers waiting on acquire()
1756+
for _ in range(len(tasks)):
1757+
file_semaphore.release()
1758+
# Drain the queue to unblock any workers stuck on put()
1759+
while not batch_queue.empty():
1760+
try:
1761+
batch_queue.get_nowait()
1762+
except queue.Empty:
1763+
break
1764+
for future in futures:
1765+
future.cancel()
1766+
1767+
16851768
class ArrowScan:
16861769
_table_metadata: TableMetadata
16871770
_io: FileIO
@@ -1766,22 +1849,30 @@ def to_record_batches(
17661849
tasks: Iterable[FileScanTask],
17671850
batch_size: int | None = None,
17681851
order: ScanOrder = ScanOrder.TASK,
1852+
concurrent_files: int = 1,
17691853
) -> Iterator[pa.RecordBatch]:
17701854
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
17711855
17721856
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
17731857
by resolving the right columns that match the current table schema.
17741858
Only data that matches the provided row_filter expression is returned.
17751859
1860+
Ordering semantics:
1861+
- ScanOrder.TASK (default): Batches are grouped by file in task submission order.
1862+
- ScanOrder.ARRIVAL: Batches may be interleaved across files. Within each file,
1863+
batch ordering follows row order.
1864+
17761865
Args:
17771866
tasks: FileScanTasks representing the data files and delete files to read from.
17781867
batch_size: The number of rows per batch. If None, PyArrow's default is used.
17791868
order: Controls the order in which record batches are returned.
17801869
ScanOrder.TASK (default) returns batches in task order, with each task
17811870
fully materialized before proceeding to the next. Allows parallel file
17821871
reads via executor. ScanOrder.ARRIVAL yields batches as they are
1783-
produced, processing tasks sequentially without materializing entire
1784-
files into memory.
1872+
produced without materializing entire files into memory.
1873+
concurrent_files: Number of files to read concurrently when order=ScanOrder.ARRIVAL.
1874+
Must be >= 1. When > 1, batches may arrive interleaved across files.
1875+
Ignored when order=ScanOrder.TASK.
17851876
17861877
Returns:
17871878
An Iterator of PyArrow RecordBatches.
@@ -1790,18 +1881,32 @@ def to_record_batches(
17901881
Raises:
17911882
ResolveError: When a required field cannot be found in the file
17921883
ValueError: When a field type in the file cannot be projected to the schema type,
1793-
or when an invalid order value is provided.
1884+
or when an invalid order value is provided, or when concurrent_files < 1.
17941885
"""
17951886
if not isinstance(order, ScanOrder):
17961887
raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder enum value (ScanOrder.TASK or ScanOrder.ARRIVAL).")
17971888

1889+
if concurrent_files < 1:
1890+
raise ValueError(f"concurrent_files must be >= 1, got {concurrent_files}")
1891+
17981892
deletes_per_file = _read_all_delete_files(self._io, tasks)
17991893

18001894
if order == ScanOrder.ARRIVAL:
1801-
# Arrival order: process all tasks sequentially, yielding batches as produced.
1802-
# _record_batches_from_scan_tasks_and_deletes handles the limit internally
1803-
# when called with all tasks, so no outer limit check is needed.
1804-
yield from self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, batch_size)
1895+
# Arrival order: read files with bounded concurrency, yielding batches as produced.
1896+
# When concurrent_files=1, this is sequential. When >1, batches may interleave across files.
1897+
task_list = list(tasks)
1898+
1899+
def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
1900+
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)
1901+
1902+
total_row_count = 0
1903+
for batch in _bounded_concurrent_batches(task_list, batch_fn, concurrent_files):
1904+
current_batch_size = len(batch)
1905+
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
1906+
yield batch.slice(0, self._limit - total_row_count)
1907+
return
1908+
yield batch
1909+
total_row_count += current_batch_size
18051910
return
18061911

18071912
# Task order: existing behavior with executor.map + list()

pyiceberg/table/__init__.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2170,19 +2170,28 @@ def to_arrow(self) -> pa.Table:
21702170
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
21712171
).to_table(self.plan_files())
21722172

2173-
def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder = ScanOrder.TASK) -> pa.RecordBatchReader:
2173+
def to_arrow_batch_reader(
2174+
self, batch_size: int | None = None, order: ScanOrder = ScanOrder.TASK, concurrent_files: int = 1
2175+
) -> pa.RecordBatchReader:
21742176
"""Return an Arrow RecordBatchReader from this DataScan.
21752177
21762178
For large results, using a RecordBatchReader requires less memory than
21772179
loading an Arrow Table for the same DataScan, because a RecordBatch
21782180
is read one at a time.
21792181
2182+
Ordering semantics:
2183+
- ScanOrder.TASK (default): Batches are grouped by file in task submission order.
2184+
- ScanOrder.ARRIVAL: Batches may be interleaved across files. Within each file,
2185+
batch ordering follows row order.
2186+
21802187
Args:
21812188
batch_size: The number of rows per batch. If None, PyArrow's default is used.
21822189
order: Controls the order in which record batches are returned.
21832190
ScanOrder.TASK (default) returns batches in task order with parallel
2184-
file reads. ScanOrder.ARRIVAL yields batches as they are produced,
2185-
processing tasks sequentially.
2191+
file reads. ScanOrder.ARRIVAL yields batches as they are produced
2192+
without materializing entire files into memory.
2193+
concurrent_files: Number of files to read concurrently when order=ScanOrder.ARRIVAL.
2194+
When > 1, batches may arrive interleaved across files.
21862195
21872196
Returns:
21882197
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
@@ -2195,7 +2204,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder
21952204
target_schema = schema_to_pyarrow(self.projection())
21962205
batches = ArrowScan(
21972206
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2198-
).to_record_batches(self.plan_files(), batch_size=batch_size, order=order)
2207+
).to_record_batches(self.plan_files(), batch_size=batch_size, order=order, concurrent_files=concurrent_files)
21992208

22002209
return pa.RecordBatchReader.from_batches(
22012210
target_schema,

0 commit comments

Comments
 (0)