Skip to content

Commit c383049

Browse files
sumedhsakdeoclaude
andcommitted
feat: add concurrent_files flag for bounded concurrent streaming
Add _bounded_concurrent_batches() with proper lock discipline: - Queue backpressure caps memory (scan.max-buffered-batches, default 16) - Semaphore limits concurrent file reads (concurrent_files param) - Cancel event with timeouts on all blocking ops (no lock over IO) - Error propagation and early termination support When streaming=True and concurrent_files > 1, batches are yielded as they arrive from parallel file reads. File ordering is not guaranteed (documented). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 444549f commit c383049

File tree

5 files changed

+478
-13
lines changed

5 files changed

+478
-13
lines changed

mkdocs/docs/api.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,24 @@ for buf in tbl.scan().to_arrow_batch_reader(streaming=True, batch_size=1000):
369369
print(f"Buffer contains {len(buf)} rows")
370370
```
371371

372+
For maximum throughput, use `concurrent_files` to read multiple files in parallel while streaming. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
373+
374+
```python
375+
for buf in tbl.scan().to_arrow_batch_reader(streaming=True, concurrent_files=4, batch_size=1000):
376+
print(f"Buffer contains {len(buf)} rows")
377+
```
378+
379+
The maximum number of buffered batches can be tuned via the `scan.max-buffered-batches` table property (default 16).
380+
381+
**Ordering semantics:**
382+
383+
| Configuration | File ordering | Within-file ordering |
384+
|---|---|---|
385+
| Default (`streaming=False`) | Batches grouped by file, in task submission order | Row order |
386+
| `streaming=True` | Interleaved across files (no grouping guarantee) | Row order within each file |
387+
388+
Within each file, batch ordering always follows row order. The `limit` parameter is enforced correctly regardless of configuration.
389+
372390
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
373391

374392
```python
@@ -1651,6 +1669,17 @@ table.scan(
16511669
).to_arrow_batch_reader(streaming=True)
16521670
```
16531671

1672+
For concurrent file reads with streaming, use `concurrent_files`. Note that batch ordering across files is not guaranteed:
1673+
1674+
```python
1675+
table.scan(
1676+
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
1677+
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
1678+
).to_arrow_batch_reader(streaming=True, concurrent_files=4)
1679+
```
1680+
1681+
When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Arrow Batch Reader section](#arrow-batch-reader) above for details.
1682+
16541683
### Pandas
16551684

16561685
<!-- prettier-ignore-start -->

pyiceberg/io/pyarrow.py

Lines changed: 126 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import logging
3434
import operator
3535
import os
36+
import queue
3637
import re
38+
import threading
3739
import uuid
3840
import warnings
3941
from abc import ABC, abstractmethod
@@ -1682,6 +1684,87 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
16821684
return deletes_per_file
16831685

16841686

1687+
_QUEUE_SENTINEL = object()
1688+
1689+
1690+
def _bounded_concurrent_batches(
1691+
tasks: list[FileScanTask],
1692+
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
1693+
concurrent_files: int,
1694+
max_buffered_batches: int = 16,
1695+
) -> Iterator[pa.RecordBatch]:
1696+
"""Read batches from multiple files concurrently with bounded memory.
1697+
1698+
Workers read from files in parallel (up to concurrent_files at a time) and push
1699+
batches into a shared queue. The consumer yields batches from the queue.
1700+
A sentinel value signals completion, avoiding timeout-based polling.
1701+
1702+
Args:
1703+
tasks: The file scan tasks to process.
1704+
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
1705+
concurrent_files: Maximum number of files to read concurrently.
1706+
max_buffered_batches: Maximum number of batches to buffer in the queue.
1707+
"""
1708+
if not tasks:
1709+
return
1710+
1711+
batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = queue.Queue(maxsize=max_buffered_batches)
1712+
cancel_event = threading.Event()
1713+
pending_count = len(tasks)
1714+
pending_lock = threading.Lock()
1715+
file_semaphore = threading.Semaphore(concurrent_files)
1716+
1717+
def worker(task: FileScanTask) -> None:
1718+
nonlocal pending_count
1719+
try:
1720+
# Blocking acquire — on cancellation, extra permits are released to unblock.
1721+
file_semaphore.acquire()
1722+
if cancel_event.is_set():
1723+
return
1724+
1725+
for batch in batch_fn(task):
1726+
if cancel_event.is_set():
1727+
return
1728+
batch_queue.put(batch)
1729+
except BaseException as e:
1730+
if not cancel_event.is_set():
1731+
batch_queue.put(e)
1732+
finally:
1733+
file_semaphore.release()
1734+
with pending_lock:
1735+
pending_count -= 1
1736+
if pending_count == 0:
1737+
batch_queue.put(_QUEUE_SENTINEL)
1738+
1739+
executor = ExecutorFactory.get_or_create()
1740+
futures = [executor.submit(worker, task) for task in tasks]
1741+
1742+
try:
1743+
while True:
1744+
item = batch_queue.get()
1745+
1746+
if item is _QUEUE_SENTINEL:
1747+
break
1748+
1749+
if isinstance(item, BaseException):
1750+
raise item
1751+
1752+
yield item
1753+
finally:
1754+
cancel_event.set()
1755+
# Release semaphore permits to unblock any workers waiting on acquire()
1756+
for _ in range(len(tasks)):
1757+
file_semaphore.release()
1758+
# Drain the queue to unblock any workers stuck on put()
1759+
while not batch_queue.empty():
1760+
try:
1761+
batch_queue.get_nowait()
1762+
except queue.Empty:
1763+
break
1764+
for future in futures:
1765+
future.cancel()
1766+
1767+
16851768
class ArrowScan:
16861769
_table_metadata: TableMetadata
16871770
_io: FileIO
@@ -1762,19 +1845,32 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
17621845
return result
17631846

17641847
def to_record_batches(
1765-
self, tasks: Iterable[FileScanTask], batch_size: int | None = None, streaming: bool = False
1848+
self,
1849+
tasks: Iterable[FileScanTask],
1850+
batch_size: int | None = None,
1851+
streaming: bool = False,
1852+
concurrent_files: int = 1,
17661853
) -> Iterator[pa.RecordBatch]:
17671854
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
17681855
17691856
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
17701857
by resolving the right columns that match the current table schema.
17711858
Only data that matches the provided row_filter expression is returned.
17721859
1860+
Ordering semantics:
1861+
- Default (streaming=False): Batches are grouped by file in task submission order.
1862+
- streaming=True: Batches may be interleaved across files. Within each file,
1863+
batch ordering follows row order.
1864+
17731865
Args:
17741866
tasks: FileScanTasks representing the data files and delete files to read from.
17751867
batch_size: The number of rows per batch. If None, PyArrow's default is used.
17761868
streaming: If True, yield batches as they are produced without materializing
1777-
entire files into memory. Files are still processed sequentially.
1869+
entire files into memory. Files are still processed sequentially when
1870+
concurrent_files=1.
1871+
concurrent_files: Number of files to read concurrently when streaming=True.
1872+
Must be >= 1. When > 1, batches may arrive interleaved across files.
1873+
Ignored when streaming=False.
17781874
17791875
Returns:
17801876
An Iterator of PyArrow RecordBatches.
@@ -1783,14 +1879,38 @@ def to_record_batches(
17831879
Raises:
17841880
ResolveError: When a required field cannot be found in the file
17851881
ValueError: When a field type in the file cannot be projected to the schema type
1882+
or when concurrent_files < 1
17861883
"""
1884+
if concurrent_files < 1:
1885+
raise ValueError(f"concurrent_files must be >= 1, got {concurrent_files}")
1886+
17871887
deletes_per_file = _read_all_delete_files(self._io, tasks)
17881888

17891889
if streaming:
1790-
# Streaming path: process all tasks sequentially, yielding batches as produced.
1791-
# _record_batches_from_scan_tasks_and_deletes handles the limit internally
1792-
# when called with all tasks, so no outer limit check is needed.
1793-
yield from self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, batch_size)
1890+
# Streaming path: read files with bounded concurrency, yielding batches as produced.
1891+
# When concurrent_files=1, this is sequential. When >1, batches may interleave across files.
1892+
task_list = list(tasks)
1893+
1894+
def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
1895+
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)
1896+
1897+
from pyiceberg.table import TableProperties
1898+
1899+
max_buffered = int(
1900+
self._table_metadata.properties.get(
1901+
TableProperties.SCAN_MAX_BUFFERED_BATCHES,
1902+
TableProperties.SCAN_MAX_BUFFERED_BATCHES_DEFAULT,
1903+
)
1904+
)
1905+
1906+
total_row_count = 0
1907+
for batch in _bounded_concurrent_batches(task_list, batch_fn, concurrent_files, max_buffered):
1908+
current_batch_size = len(batch)
1909+
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
1910+
yield batch.slice(0, self._limit - total_row_count)
1911+
return
1912+
yield batch
1913+
total_row_count += current_batch_size
17941914
return
17951915

17961916
# Non-streaming path: existing behavior with executor.map + list()

pyiceberg/table/__init__.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,9 @@ class TableProperties:
247247
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
248248
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
249249

250+
SCAN_MAX_BUFFERED_BATCHES = "scan.max-buffered-batches"
251+
SCAN_MAX_BUFFERED_BATCHES_DEFAULT = 16
252+
250253

251254
class Transaction:
252255
_table: Table
@@ -2157,17 +2160,27 @@ def to_arrow(self) -> pa.Table:
21572160
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
21582161
).to_table(self.plan_files())
21592162

2160-
def to_arrow_batch_reader(self, batch_size: int | None = None, streaming: bool = False) -> pa.RecordBatchReader:
2163+
def to_arrow_batch_reader(
2164+
self, batch_size: int | None = None, streaming: bool = False, concurrent_files: int = 1
2165+
) -> pa.RecordBatchReader:
21612166
"""Return an Arrow RecordBatchReader from this DataScan.
21622167
21632168
For large results, using a RecordBatchReader requires less memory than
21642169
loading an Arrow Table for the same DataScan, because a RecordBatch
21652170
is read one at a time.
21662171
2172+
Ordering semantics:
2173+
- Default (streaming=False): Batches are grouped by file in task submission order.
2174+
- streaming=True: Batches may be interleaved across files. Within each file,
2175+
batch ordering follows row order.
2176+
21672177
Args:
21682178
batch_size: The number of rows per batch. If None, PyArrow's default is used.
21692179
streaming: If True, yield batches as they are produced without materializing
2170-
entire files into memory. Files are still processed sequentially.
2180+
entire files into memory. Files are still processed sequentially when
2181+
concurrent_files=1.
2182+
concurrent_files: Number of files to read concurrently when streaming=True.
2183+
When > 1, batches may arrive interleaved across files.
21712184
21722185
Returns:
21732186
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
@@ -2180,7 +2193,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None, streaming: bool =
21802193
target_schema = schema_to_pyarrow(self.projection())
21812194
batches = ArrowScan(
21822195
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2183-
).to_record_batches(self.plan_files(), batch_size=batch_size, streaming=streaming)
2196+
).to_record_batches(self.plan_files(), batch_size=batch_size, streaming=streaming, concurrent_files=concurrent_files)
21842197

21852198
return pa.RecordBatchReader.from_batches(
21862199
target_schema,

0 commit comments

Comments
 (0)