Skip to content

Commit 7cfb2b1

Browse files
sumedhsakdeoclaude
andcommitted
feat: add concurrent_files flag for bounded concurrent streaming
Add _bounded_concurrent_batches() with proper lock discipline: - Queue backpressure caps memory (scan.max-buffered-batches, default 16) - Semaphore limits concurrent file reads (concurrent_files param) - Cancel event with timeouts on all blocking ops (no lock over IO) - Error propagation and early termination support When streaming=True and concurrent_files > 1, batches are yielded as they arrive from parallel file reads. File ordering is not guaranteed (documented). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b72b7ba commit 7cfb2b1

File tree

4 files changed

+430
-6
lines changed

4 files changed

+430
-6
lines changed

mkdocs/docs/api.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,25 @@ for buf in tbl.scan().to_arrow_batch_reader(streaming=True, batch_size=1000):
369369
print(f"Buffer contains {len(buf)} rows")
370370
```
371371

372+
For maximum throughput, use `concurrent_files` to read multiple files in parallel while streaming. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
373+
374+
```python
375+
for buf in tbl.scan().to_arrow_batch_reader(streaming=True, concurrent_files=4, batch_size=1000):
376+
print(f"Buffer contains {len(buf)} rows")
377+
```
378+
379+
The maximum number of buffered batches can be tuned via the `scan.max-buffered-batches` table property (default 16).
380+
381+
**Ordering semantics:**
382+
383+
| Configuration | File ordering | Within-file ordering |
384+
|---|---|---|
385+
| Default (`streaming=False`) | Batches grouped by file, in task submission order | Row order |
386+
| `streaming=True` | Batches grouped by file, sequential | Row order |
387+
| `streaming=True, concurrent_files>1` | Interleaved across files (no grouping guarantee) | Row order within each file |
388+
389+
In all modes, within-file batch ordering follows row order. The `limit` parameter is enforced correctly regardless of configuration.
390+
372391
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
373392

374393
```python
@@ -1651,6 +1670,17 @@ table.scan(
16511670
).to_arrow_batch_reader(streaming=True)
16521671
```
16531672

1673+
For concurrent file reads with streaming, use `concurrent_files`. Note that batch ordering across files is not guaranteed:
1674+
1675+
```python
1676+
table.scan(
1677+
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
1678+
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
1679+
).to_arrow_batch_reader(streaming=True, concurrent_files=4)
1680+
```
1681+
1682+
When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Arrow Batch Reader section](#arrow-batch-reader) above for details.
1683+
16541684
### Pandas
16551685

16561686
<!-- prettier-ignore-start -->

pyiceberg/io/pyarrow.py

Lines changed: 128 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import logging
3434
import operator
3535
import os
36+
import queue
3637
import re
38+
import threading
3739
import uuid
3840
import warnings
3941
from abc import ABC, abstractmethod
@@ -1682,6 +1684,89 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
16821684
return deletes_per_file
16831685

16841686

1687+
_QUEUE_SENTINEL = object()
1688+
1689+
1690+
def _bounded_concurrent_batches(
1691+
tasks: list[FileScanTask],
1692+
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
1693+
concurrent_files: int,
1694+
max_buffered_batches: int = 16,
1695+
) -> Iterator[pa.RecordBatch]:
1696+
"""Read batches from multiple files concurrently with bounded memory.
1697+
1698+
Workers read from files in parallel (up to concurrent_files at a time) and push
1699+
batches into a shared queue. The consumer yields batches from the queue.
1700+
A sentinel value signals completion, avoiding timeout-based polling.
1701+
1702+
Args:
1703+
tasks: The file scan tasks to process.
1704+
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
1705+
concurrent_files: Maximum number of files to read concurrently.
1706+
max_buffered_batches: Maximum number of batches to buffer in the queue.
1707+
"""
1708+
if not tasks:
1709+
return
1710+
1711+
batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = queue.Queue(maxsize=max_buffered_batches)
1712+
cancel_event = threading.Event()
1713+
pending_count = len(tasks)
1714+
pending_lock = threading.Lock()
1715+
file_semaphore = threading.Semaphore(concurrent_files)
1716+
1717+
def worker(task: FileScanTask) -> None:
1718+
nonlocal pending_count
1719+
acquired = False
1720+
try:
1721+
# Acquire semaphore — blocks until a slot is available or cancelled
1722+
while not cancel_event.is_set():
1723+
if file_semaphore.acquire(timeout=0.5):
1724+
acquired = True
1725+
break
1726+
if cancel_event.is_set():
1727+
return
1728+
1729+
for batch in batch_fn(task):
1730+
if cancel_event.is_set():
1731+
return
1732+
batch_queue.put(batch)
1733+
except BaseException as e:
1734+
if not cancel_event.is_set():
1735+
batch_queue.put(e)
1736+
finally:
1737+
if acquired:
1738+
file_semaphore.release()
1739+
with pending_lock:
1740+
pending_count -= 1
1741+
if pending_count == 0:
1742+
batch_queue.put(_QUEUE_SENTINEL)
1743+
1744+
executor = ExecutorFactory.get_or_create()
1745+
futures = [executor.submit(worker, task) for task in tasks]
1746+
1747+
try:
1748+
while True:
1749+
item = batch_queue.get()
1750+
1751+
if item is _QUEUE_SENTINEL:
1752+
break
1753+
1754+
if isinstance(item, BaseException):
1755+
raise item
1756+
1757+
yield item
1758+
finally:
1759+
cancel_event.set()
1760+
# Drain the queue to unblock any workers stuck on put()
1761+
while not batch_queue.empty():
1762+
try:
1763+
batch_queue.get_nowait()
1764+
except queue.Empty:
1765+
break
1766+
for future in futures:
1767+
future.cancel()
1768+
1769+
16851770
class ArrowScan:
16861771
_table_metadata: TableMetadata
16871772
_io: FileIO
@@ -1762,19 +1847,33 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
17621847
return result
17631848

17641849
def to_record_batches(
1765-
self, tasks: Iterable[FileScanTask], batch_size: int | None = None, streaming: bool = False
1850+
self,
1851+
tasks: Iterable[FileScanTask],
1852+
batch_size: int | None = None,
1853+
streaming: bool = False,
1854+
concurrent_files: int = 1,
17661855
) -> Iterator[pa.RecordBatch]:
17671856
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
17681857
17691858
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
17701859
by resolving the right columns that match the current table schema.
17711860
Only data that matches the provided row_filter expression is returned.
17721861
1862+
Ordering semantics:
1863+
- Default (streaming=False): Batches are grouped by file in task submission order.
1864+
- streaming=True, concurrent_files=1: Batches are grouped by file, processed sequentially.
1865+
- streaming=True, concurrent_files>1: Batches may be interleaved across files.
1866+
In all modes, within-file batch ordering follows row order.
1867+
17731868
Args:
17741869
tasks: FileScanTasks representing the data files and delete files to read from.
17751870
batch_size: The number of rows per batch. If None, PyArrow's default is used.
17761871
streaming: If True, yield batches as they are produced without materializing
1777-
entire files into memory. Files are still processed sequentially.
1872+
entire files into memory. Files are still processed sequentially when
1873+
concurrent_files=1.
1874+
concurrent_files: Number of files to read concurrently when streaming=True.
1875+
When > 1, batches may arrive interleaved across files. Ignored when
1876+
streaming=False.
17781877
17791878
Returns:
17801879
An Iterator of PyArrow RecordBatches.
@@ -1786,7 +1885,33 @@ def to_record_batches(
17861885
"""
17871886
deletes_per_file = _read_all_delete_files(self._io, tasks)
17881887

1789-
if streaming:
1888+
if streaming and concurrent_files > 1:
1889+
# Concurrent streaming path: read multiple files in parallel with bounded queue.
1890+
# Ordering is NOT guaranteed across files — batches arrive as produced.
1891+
task_list = list(tasks)
1892+
1893+
def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
1894+
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)
1895+
1896+
from pyiceberg.table import TableProperties
1897+
1898+
max_buffered = int(
1899+
self._table_metadata.properties.get(
1900+
TableProperties.SCAN_MAX_BUFFERED_BATCHES,
1901+
TableProperties.SCAN_MAX_BUFFERED_BATCHES_DEFAULT,
1902+
)
1903+
)
1904+
1905+
total_row_count = 0
1906+
for batch in _bounded_concurrent_batches(task_list, batch_fn, concurrent_files, max_buffered):
1907+
current_batch_size = len(batch)
1908+
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
1909+
yield batch.slice(0, self._limit - total_row_count)
1910+
return
1911+
else:
1912+
yield batch
1913+
total_row_count += current_batch_size
1914+
elif streaming:
17901915
# Streaming path: process all tasks sequentially, yielding batches as produced.
17911916
# _record_batches_from_scan_tasks_and_deletes handles the limit internally
17921917
# when called with all tasks, so no outer limit check is needed.

pyiceberg/table/__init__.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,9 @@ class TableProperties:
247247
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
248248
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
249249

250+
SCAN_MAX_BUFFERED_BATCHES = "scan.max-buffered-batches"
251+
SCAN_MAX_BUFFERED_BATCHES_DEFAULT = 16
252+
250253

251254
class Transaction:
252255
_table: Table
@@ -2157,17 +2160,28 @@ def to_arrow(self) -> pa.Table:
21572160
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
21582161
).to_table(self.plan_files())
21592162

2160-
def to_arrow_batch_reader(self, batch_size: int | None = None, streaming: bool = False) -> pa.RecordBatchReader:
2163+
def to_arrow_batch_reader(
2164+
self, batch_size: int | None = None, streaming: bool = False, concurrent_files: int = 1
2165+
) -> pa.RecordBatchReader:
21612166
"""Return an Arrow RecordBatchReader from this DataScan.
21622167
21632168
For large results, using a RecordBatchReader requires less memory than
21642169
loading an Arrow Table for the same DataScan, because a RecordBatch
21652170
is read one at a time.
21662171
2172+
Ordering semantics:
2173+
- Default (streaming=False): Batches are grouped by file in task submission order.
2174+
- streaming=True, concurrent_files=1: Batches are grouped by file, processed sequentially.
2175+
- streaming=True, concurrent_files>1: Batches may be interleaved across files.
2176+
In all modes, within-file batch ordering follows row order.
2177+
21672178
Args:
21682179
batch_size: The number of rows per batch. If None, PyArrow's default is used.
21692180
streaming: If True, yield batches as they are produced without materializing
2170-
entire files into memory. Files are still processed sequentially.
2181+
entire files into memory. Files are still processed sequentially when
2182+
concurrent_files=1.
2183+
concurrent_files: Number of files to read concurrently when streaming=True.
2184+
When > 1, batches may arrive interleaved across files.
21712185
21722186
Returns:
21732187
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
@@ -2180,7 +2194,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None, streaming: bool =
21802194
target_schema = schema_to_pyarrow(self.projection())
21812195
batches = ArrowScan(
21822196
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2183-
).to_record_batches(self.plan_files(), batch_size=batch_size, streaming=streaming)
2197+
).to_record_batches(self.plan_files(), batch_size=batch_size, streaming=streaming, concurrent_files=concurrent_files)
21842198

21852199
return pa.RecordBatchReader.from_batches(
21862200
target_schema,

0 commit comments

Comments
 (0)