Skip to content

Commit 13feb8d

Browse files
sumedhsakdeoclaude
andcommitted
feat: add concurrent_files flag for bounded concurrent streaming
Add _bounded_concurrent_batches() with proper lock discipline: - Queue backpressure caps memory (scan.max-buffered-batches, default 16) - Semaphore limits concurrent file reads (concurrent_files param) - Cancel event with timeouts on all blocking ops (no lock over IO) - Error propagation and early termination support When streaming=True and concurrent_files > 1, batches are yielded as they arrive from parallel file reads. File ordering is not guaranteed (documented). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 444549f commit 13feb8d

File tree

5 files changed

+468
-14
lines changed

5 files changed

+468
-14
lines changed

mkdocs/docs/api.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,22 @@ for buf in tbl.scan().to_arrow_batch_reader(streaming=True, batch_size=1000):
369369
print(f"Buffer contains {len(buf)} rows")
370370
```
371371

372+
For maximum throughput, use `concurrent_files` to read multiple files in parallel while streaming. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
373+
374+
```python
375+
for buf in tbl.scan().to_arrow_batch_reader(streaming=True, concurrent_files=4, batch_size=1000):
376+
print(f"Buffer contains {len(buf)} rows")
377+
```
378+
379+
**Ordering semantics:**
380+
381+
| Configuration | File ordering | Within-file ordering |
382+
|---|---|---|
383+
| Default (`streaming=False`) | Batches grouped by file, in task submission order | Row order |
384+
| `streaming=True` | Interleaved across files (no grouping guarantee) | Row order within each file |
385+
386+
Within each file, batch ordering always follows row order. The `limit` parameter is enforced correctly regardless of configuration.
387+
372388
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
373389

374390
```python
@@ -1651,6 +1667,17 @@ table.scan(
16511667
).to_arrow_batch_reader(streaming=True)
16521668
```
16531669

1670+
For concurrent file reads with streaming, use `concurrent_files`. Note that batch ordering across files is not guaranteed:
1671+
1672+
```python
1673+
table.scan(
1674+
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
1675+
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
1676+
).to_arrow_batch_reader(streaming=True, concurrent_files=4)
1677+
```
1678+
1679+
When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Apache Arrow section](#apache-arrow) above for details.
1680+
16541681
### Pandas
16551682

16561683
<!-- prettier-ignore-start -->

pyiceberg/io/pyarrow.py

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333
import logging
3434
import operator
3535
import os
36+
import queue
3637
import re
38+
import threading
3739
import uuid
3840
import warnings
3941
from abc import ABC, abstractmethod
40-
from collections.abc import Callable, Iterable, Iterator
42+
from collections.abc import Callable, Generator, Iterable, Iterator
4143
from copy import copy
4244
from dataclasses import dataclass
4345
from enum import Enum
@@ -1682,6 +1684,87 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
16821684
return deletes_per_file
16831685

16841686

1687+
_QUEUE_SENTINEL = object()
1688+
1689+
1690+
def _bounded_concurrent_batches(
1691+
tasks: list[FileScanTask],
1692+
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
1693+
concurrent_files: int,
1694+
max_buffered_batches: int = 16,
1695+
) -> Generator[pa.RecordBatch, None, None]:
1696+
"""Read batches from multiple files concurrently with bounded memory.
1697+
1698+
Workers read from files in parallel (up to concurrent_files at a time) and push
1699+
batches into a shared queue. The consumer yields batches from the queue.
1700+
A sentinel value signals completion, avoiding timeout-based polling.
1701+
1702+
Args:
1703+
tasks: The file scan tasks to process.
1704+
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
1705+
concurrent_files: Maximum number of files to read concurrently.
1706+
max_buffered_batches: Maximum number of batches to buffer in the queue.
1707+
"""
1708+
if not tasks:
1709+
return
1710+
1711+
batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = queue.Queue(maxsize=max_buffered_batches)
1712+
cancel_event = threading.Event()
1713+
pending_count = len(tasks)
1714+
pending_lock = threading.Lock()
1715+
file_semaphore = threading.Semaphore(concurrent_files)
1716+
1717+
def worker(task: FileScanTask) -> None:
1718+
nonlocal pending_count
1719+
try:
1720+
# Blocking acquire — on cancellation, extra permits are released to unblock.
1721+
file_semaphore.acquire()
1722+
if cancel_event.is_set():
1723+
return
1724+
1725+
for batch in batch_fn(task):
1726+
if cancel_event.is_set():
1727+
return
1728+
batch_queue.put(batch)
1729+
except BaseException as e:
1730+
if not cancel_event.is_set():
1731+
batch_queue.put(e)
1732+
finally:
1733+
file_semaphore.release()
1734+
with pending_lock:
1735+
pending_count -= 1
1736+
if pending_count == 0:
1737+
batch_queue.put(_QUEUE_SENTINEL)
1738+
1739+
executor = ExecutorFactory.get_or_create()
1740+
futures = [executor.submit(worker, task) for task in tasks]
1741+
1742+
try:
1743+
while True:
1744+
item = batch_queue.get()
1745+
1746+
if item is _QUEUE_SENTINEL:
1747+
break
1748+
1749+
if isinstance(item, BaseException):
1750+
raise item
1751+
1752+
yield item
1753+
finally:
1754+
cancel_event.set()
1755+
# Release semaphore permits to unblock any workers waiting on acquire()
1756+
for _ in range(len(tasks)):
1757+
file_semaphore.release()
1758+
# Drain the queue to unblock any workers stuck on put()
1759+
while not batch_queue.empty():
1760+
try:
1761+
batch_queue.get_nowait()
1762+
except queue.Empty:
1763+
break
1764+
for future in futures:
1765+
future.cancel()
1766+
1767+
16851768
class ArrowScan:
16861769
_table_metadata: TableMetadata
16871770
_io: FileIO
@@ -1762,19 +1845,32 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
17621845
return result
17631846

17641847
def to_record_batches(
1765-
self, tasks: Iterable[FileScanTask], batch_size: int | None = None, streaming: bool = False
1848+
self,
1849+
tasks: Iterable[FileScanTask],
1850+
batch_size: int | None = None,
1851+
streaming: bool = False,
1852+
concurrent_files: int = 1,
17661853
) -> Iterator[pa.RecordBatch]:
17671854
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
17681855
17691856
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
17701857
by resolving the right columns that match the current table schema.
17711858
Only data that matches the provided row_filter expression is returned.
17721859
1860+
Ordering semantics:
1861+
- Default (streaming=False): Batches are grouped by file in task submission order.
1862+
- streaming=True: Batches may be interleaved across files. Within each file,
1863+
batch ordering follows row order.
1864+
17731865
Args:
17741866
tasks: FileScanTasks representing the data files and delete files to read from.
17751867
batch_size: The number of rows per batch. If None, PyArrow's default is used.
17761868
streaming: If True, yield batches as they are produced without materializing
1777-
entire files into memory. Files are still processed sequentially.
1869+
entire files into memory. Files are still processed sequentially when
1870+
concurrent_files=1.
1871+
concurrent_files: Number of files to read concurrently when streaming=True.
1872+
Must be >= 1. When > 1, batches may arrive interleaved across files.
1873+
Ignored when streaming=False.
17781874
17791875
Returns:
17801876
An Iterator of PyArrow RecordBatches.
@@ -1783,14 +1879,29 @@ def to_record_batches(
17831879
Raises:
17841880
ResolveError: When a required field cannot be found in the file
17851881
ValueError: When a field type in the file cannot be projected to the schema type
1882+
or when concurrent_files < 1
17861883
"""
1884+
if concurrent_files < 1:
1885+
raise ValueError(f"concurrent_files must be >= 1, got {concurrent_files}")
1886+
17871887
deletes_per_file = _read_all_delete_files(self._io, tasks)
17881888

17891889
if streaming:
1790-
# Streaming path: process all tasks sequentially, yielding batches as produced.
1791-
# _record_batches_from_scan_tasks_and_deletes handles the limit internally
1792-
# when called with all tasks, so no outer limit check is needed.
1793-
yield from self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, batch_size)
1890+
# Streaming path: read files with bounded concurrency, yielding batches as produced.
1891+
# When concurrent_files=1, this is sequential. When >1, batches may interleave across files.
1892+
task_list = list(tasks)
1893+
1894+
def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
1895+
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)
1896+
1897+
total_row_count = 0
1898+
for batch in _bounded_concurrent_batches(task_list, batch_fn, concurrent_files):
1899+
current_batch_size = len(batch)
1900+
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
1901+
yield batch.slice(0, self._limit - total_row_count)
1902+
return
1903+
yield batch
1904+
total_row_count += current_batch_size
17941905
return
17951906

17961907
# Non-streaming path: existing behavior with executor.map + list()

pyiceberg/table/__init__.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2157,17 +2157,27 @@ def to_arrow(self) -> pa.Table:
21572157
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
21582158
).to_table(self.plan_files())
21592159

2160-
def to_arrow_batch_reader(self, batch_size: int | None = None, streaming: bool = False) -> pa.RecordBatchReader:
2160+
def to_arrow_batch_reader(
2161+
self, batch_size: int | None = None, streaming: bool = False, concurrent_files: int = 1
2162+
) -> pa.RecordBatchReader:
21612163
"""Return an Arrow RecordBatchReader from this DataScan.
21622164
21632165
For large results, using a RecordBatchReader requires less memory than
21642166
loading an Arrow Table for the same DataScan, because a RecordBatch
21652167
is read one at a time.
21662168
2169+
Ordering semantics:
2170+
- Default (streaming=False): Batches are grouped by file in task submission order.
2171+
- streaming=True: Batches may be interleaved across files. Within each file,
2172+
batch ordering follows row order.
2173+
21672174
Args:
21682175
batch_size: The number of rows per batch. If None, PyArrow's default is used.
21692176
streaming: If True, yield batches as they are produced without materializing
2170-
entire files into memory. Files are still processed sequentially.
2177+
entire files into memory. Files are still processed sequentially when
2178+
concurrent_files=1.
2179+
concurrent_files: Number of files to read concurrently when streaming=True.
2180+
When > 1, batches may arrive interleaved across files.
21712181
21722182
Returns:
21732183
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
@@ -2180,7 +2190,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None, streaming: bool =
21802190
target_schema = schema_to_pyarrow(self.projection())
21812191
batches = ArrowScan(
21822192
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2183-
).to_record_batches(self.plan_files(), batch_size=batch_size, streaming=streaming)
2193+
).to_record_batches(self.plan_files(), batch_size=batch_size, streaming=streaming, concurrent_files=concurrent_files)
21842194

21852195
return pa.RecordBatchReader.from_batches(
21862196
target_schema,

0 commit comments

Comments
 (0)