Skip to content

Commit a4cb212

Browse files
sumedhsakdeoclaude
andcommitted
feat: add concurrent_files flag for bounded concurrent streaming
Add _bounded_concurrent_batches() with proper lock discipline: - Queue backpressure caps memory (scan.max-buffered-batches, default 16) - Semaphore limits concurrent file reads (concurrent_files param) - Cancel event with timeouts on all blocking ops (no lock over IO) - Error propagation and early termination support When streaming=True and concurrent_files > 1, batches are yielded as they arrive from parallel file reads. File ordering is not guaranteed (documented). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 444549f commit a4cb212

File tree

5 files changed

+471
-5
lines changed

5 files changed

+471
-5
lines changed

mkdocs/docs/api.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,25 @@ for buf in tbl.scan().to_arrow_batch_reader(streaming=True, batch_size=1000):
369369
print(f"Buffer contains {len(buf)} rows")
370370
```
371371

372+
For maximum throughput, use `concurrent_files` to read multiple files in parallel while streaming. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
373+
374+
```python
375+
for buf in tbl.scan().to_arrow_batch_reader(streaming=True, concurrent_files=4, batch_size=1000):
376+
print(f"Buffer contains {len(buf)} rows")
377+
```
378+
379+
The maximum number of buffered batches can be tuned via the `scan.max-buffered-batches` table property (default 16).
380+
381+
**Ordering semantics:**
382+
383+
| Configuration | File ordering | Within-file ordering |
384+
|---|---|---|
385+
| Default (`streaming=False`) | Batches grouped by file, in task submission order | Row order |
386+
| `streaming=True` | Batches grouped by file, sequential | Row order |
387+
| `streaming=True, concurrent_files>1` | Interleaved across files (no grouping guarantee) | Row order within each file |
388+
389+
In all modes, within-file batch ordering follows row order. The `limit` parameter is enforced correctly regardless of configuration.
390+
372391
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
373392

374393
```python
@@ -1651,6 +1670,17 @@ table.scan(
16511670
).to_arrow_batch_reader(streaming=True)
16521671
```
16531672

1673+
For concurrent file reads with streaming, use `concurrent_files`. Note that batch ordering across files is not guaranteed:
1674+
1675+
```python
1676+
table.scan(
1677+
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
1678+
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
1679+
).to_arrow_batch_reader(streaming=True, concurrent_files=4)
1680+
```
1681+
1682+
When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Arrow Batch Reader section](#arrow-batch-reader) above for details.
1683+
16541684
### Pandas
16551685

16561686
<!-- prettier-ignore-start -->

pyiceberg/io/pyarrow.py

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import logging
3434
import operator
3535
import os
36+
import queue
3637
import re
38+
import threading
3739
import uuid
3840
import warnings
3941
from abc import ABC, abstractmethod
@@ -1682,6 +1684,90 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
16821684
return deletes_per_file
16831685

16841686

1687+
_QUEUE_SENTINEL = object()
1688+
1689+
1690+
def _bounded_concurrent_batches(
1691+
tasks: list[FileScanTask],
1692+
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
1693+
concurrent_files: int,
1694+
max_buffered_batches: int = 16,
1695+
) -> Iterator[pa.RecordBatch]:
1696+
"""Read batches from multiple files concurrently with bounded memory.
1697+
1698+
Workers read from files in parallel (up to concurrent_files at a time) and push
1699+
batches into a shared queue. The consumer yields batches from the queue.
1700+
A sentinel value signals completion, avoiding timeout-based polling.
1701+
1702+
Args:
1703+
tasks: The file scan tasks to process.
1704+
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
1705+
concurrent_files: Maximum number of files to read concurrently.
1706+
max_buffered_batches: Maximum number of batches to buffer in the queue.
1707+
"""
1708+
if not tasks:
1709+
return
1710+
1711+
batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = queue.Queue(maxsize=max_buffered_batches)
1712+
cancel_event = threading.Event()
1713+
pending_count = len(tasks)
1714+
pending_lock = threading.Lock()
1715+
file_semaphore = threading.Semaphore(concurrent_files)
1716+
1717+
def worker(task: FileScanTask) -> None:
1718+
nonlocal pending_count
1719+
acquired = False
1720+
try:
1721+
# Blocking acquire — on cancellation, extra permits are released to unblock.
1722+
file_semaphore.acquire()
1723+
if cancel_event.is_set():
1724+
return
1725+
acquired = True
1726+
1727+
for batch in batch_fn(task):
1728+
if cancel_event.is_set():
1729+
return
1730+
batch_queue.put(batch)
1731+
except BaseException as e:
1732+
if not cancel_event.is_set():
1733+
batch_queue.put(e)
1734+
finally:
1735+
if acquired:
1736+
file_semaphore.release()
1737+
with pending_lock:
1738+
pending_count -= 1
1739+
if pending_count == 0:
1740+
batch_queue.put(_QUEUE_SENTINEL)
1741+
1742+
executor = ExecutorFactory.get_or_create()
1743+
futures = [executor.submit(worker, task) for task in tasks]
1744+
1745+
try:
1746+
while True:
1747+
item = batch_queue.get()
1748+
1749+
if item is _QUEUE_SENTINEL:
1750+
break
1751+
1752+
if isinstance(item, BaseException):
1753+
raise item
1754+
1755+
yield item
1756+
finally:
1757+
cancel_event.set()
1758+
# Release semaphore permits to unblock any workers waiting on acquire()
1759+
for _ in range(len(tasks)):
1760+
file_semaphore.release()
1761+
# Drain the queue to unblock any workers stuck on put()
1762+
while not batch_queue.empty():
1763+
try:
1764+
batch_queue.get_nowait()
1765+
except queue.Empty:
1766+
break
1767+
for future in futures:
1768+
future.cancel()
1769+
1770+
16851771
class ArrowScan:
16861772
_table_metadata: TableMetadata
16871773
_io: FileIO
@@ -1762,19 +1848,33 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
17621848
return result
17631849

17641850
def to_record_batches(
1765-
self, tasks: Iterable[FileScanTask], batch_size: int | None = None, streaming: bool = False
1851+
self,
1852+
tasks: Iterable[FileScanTask],
1853+
batch_size: int | None = None,
1854+
streaming: bool = False,
1855+
concurrent_files: int = 1,
17661856
) -> Iterator[pa.RecordBatch]:
17671857
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
17681858
17691859
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
17701860
by resolving the right columns that match the current table schema.
17711861
Only data that matches the provided row_filter expression is returned.
17721862
1863+
Ordering semantics:
1864+
- Default (streaming=False): Batches are grouped by file in task submission order.
1865+
- streaming=True, concurrent_files=1: Batches are grouped by file, processed sequentially.
1866+
- streaming=True, concurrent_files>1: Batches may be interleaved across files.
1867+
In all modes, within-file batch ordering follows row order.
1868+
17731869
Args:
17741870
tasks: FileScanTasks representing the data files and delete files to read from.
17751871
batch_size: The number of rows per batch. If None, PyArrow's default is used.
17761872
streaming: If True, yield batches as they are produced without materializing
1777-
entire files into memory. Files are still processed sequentially.
1873+
entire files into memory. Files are still processed sequentially when
1874+
concurrent_files=1.
1875+
concurrent_files: Number of files to read concurrently when streaming=True.
1876+
When > 1, batches may arrive interleaved across files. Ignored when
1877+
streaming=False.
17781878
17791879
Returns:
17801880
An Iterator of PyArrow RecordBatches.
@@ -1786,6 +1886,33 @@ def to_record_batches(
17861886
"""
17871887
deletes_per_file = _read_all_delete_files(self._io, tasks)
17881888

1889+
if streaming and concurrent_files > 1:
1890+
# Concurrent streaming path: read multiple files in parallel with bounded queue.
1891+
# Ordering is NOT guaranteed across files — batches arrive as produced.
1892+
task_list = list(tasks)
1893+
1894+
def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
1895+
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)
1896+
1897+
from pyiceberg.table import TableProperties
1898+
1899+
max_buffered = int(
1900+
self._table_metadata.properties.get(
1901+
TableProperties.SCAN_MAX_BUFFERED_BATCHES,
1902+
TableProperties.SCAN_MAX_BUFFERED_BATCHES_DEFAULT,
1903+
)
1904+
)
1905+
1906+
total_row_count = 0
1907+
for batch in _bounded_concurrent_batches(task_list, batch_fn, concurrent_files, max_buffered):
1908+
current_batch_size = len(batch)
1909+
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
1910+
yield batch.slice(0, self._limit - total_row_count)
1911+
return
1912+
yield batch
1913+
total_row_count += current_batch_size
1914+
return
1915+
17891916
if streaming:
17901917
# Streaming path: process all tasks sequentially, yielding batches as produced.
17911918
# _record_batches_from_scan_tasks_and_deletes handles the limit internally

pyiceberg/table/__init__.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,9 @@ class TableProperties:
247247
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
248248
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
249249

250+
SCAN_MAX_BUFFERED_BATCHES = "scan.max-buffered-batches"
251+
SCAN_MAX_BUFFERED_BATCHES_DEFAULT = 16
252+
250253

251254
class Transaction:
252255
_table: Table
@@ -2157,17 +2160,28 @@ def to_arrow(self) -> pa.Table:
21572160
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
21582161
).to_table(self.plan_files())
21592162

2160-
def to_arrow_batch_reader(self, batch_size: int | None = None, streaming: bool = False) -> pa.RecordBatchReader:
2163+
def to_arrow_batch_reader(
2164+
self, batch_size: int | None = None, streaming: bool = False, concurrent_files: int = 1
2165+
) -> pa.RecordBatchReader:
21612166
"""Return an Arrow RecordBatchReader from this DataScan.
21622167
21632168
For large results, using a RecordBatchReader requires less memory than
21642169
loading an Arrow Table for the same DataScan, because a RecordBatch
21652170
is read one at a time.
21662171
2172+
Ordering semantics:
2173+
- Default (streaming=False): Batches are grouped by file in task submission order.
2174+
- streaming=True, concurrent_files=1: Batches are grouped by file, processed sequentially.
2175+
- streaming=True, concurrent_files>1: Batches may be interleaved across files.
2176+
In all modes, within-file batch ordering follows row order.
2177+
21672178
Args:
21682179
batch_size: The number of rows per batch. If None, PyArrow's default is used.
21692180
streaming: If True, yield batches as they are produced without materializing
2170-
entire files into memory. Files are still processed sequentially.
2181+
entire files into memory. Files are still processed sequentially when
2182+
concurrent_files=1.
2183+
concurrent_files: Number of files to read concurrently when streaming=True.
2184+
When > 1, batches may arrive interleaved across files.
21712185
21722186
Returns:
21732187
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
@@ -2180,7 +2194,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None, streaming: bool =
21802194
target_schema = schema_to_pyarrow(self.projection())
21812195
batches = ArrowScan(
21822196
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2183-
).to_record_batches(self.plan_files(), batch_size=batch_size, streaming=streaming)
2197+
).to_record_batches(self.plan_files(), batch_size=batch_size, streaming=streaming, concurrent_files=concurrent_files)
21842198

21852199
return pa.RecordBatchReader.from_batches(
21862200
target_schema,

0 commit comments

Comments
 (0)