Skip to content

Commit f2e6335

Browse files
committed
feat: add to_stream method for lazy DataFrame processing and update iteration behavior
1 parent df4c042 commit f2e6335

3 files changed

Lines changed: 51 additions & 43 deletions

File tree

docs/source/user-guide/dataframe/index.rst

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,18 @@ out-of-memory errors.
168168
for batch in reader:
169169
... # process each batch as it is produced
170170
171-
DataFrames are also iterable, yielding :class:`pyarrow.RecordBatch` objects
172-
lazily so you can loop over results directly:
171+
DataFrames expose :py:meth:`~datafusion.DataFrame.to_stream`, which returns a
172+
``RecordBatchStream`` for lazily processing results without materializing them
173+
all at once:
174+
175+
.. code-block:: python
176+
177+
stream = df.to_stream()
178+
for batch in stream:
179+
... # process each batch as it is produced
180+
181+
DataFrames themselves are also iterable and delegate to ``to_stream()`` under
182+
the hood:
173183

174184
.. code-block:: python
175185

python/datafusion/dataframe.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from typing import (
2626
TYPE_CHECKING,
2727
Any,
28+
AsyncIterator,
2829
Iterable,
2930
Iterator,
3031
Literal,
@@ -291,7 +292,9 @@ class DataFrame:
291292
"""Two dimensional table representation of data.
292293
293294
DataFrame objects are iterable; iterating over a DataFrame yields
294-
:class:`datafusion.record_batch.RecordBatch` instances lazily.
295+
:class:`pyarrow.RecordBatch` instances lazily. Use
296+
:py:meth:`to_stream` to obtain a :class:`~datafusion.record_batch.RecordBatchStream`
297+
for explicit iteration over the results.
295298
296299
See :ref:`user_guide_concepts` in the online documentation for more information.
297300
"""
@@ -1022,6 +1025,14 @@ def to_arrow_table(self) -> pa.Table:
10221025
"""
10231026
return self.df.to_arrow_table()
10241027

1028+
def to_stream(self) -> RecordBatchStream:
1029+
"""Execute this :py:class:`DataFrame` and return a record batch stream.
1030+
1031+
This is a convenience wrapper around :py:meth:`execute_stream` and can be
1032+
used to iterate over results without materializing them.
1033+
"""
1034+
return self.execute_stream()
1035+
10251036
def execute_stream(self) -> RecordBatchStream:
10261037
"""Executes this DataFrame and returns a stream over a single partition.
10271038
@@ -1121,14 +1132,25 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
11211132
# preserving the original partition order.
11221133
return self.df.__arrow_c_stream__(requested_schema)
11231134

1124-
def __iter__(self) -> Iterator[RecordBatch]:
1125-
"""Yield record batches from the DataFrame without materializing results.
1135+
def __iter__(self) -> Iterator[pa.RecordBatch]:
1136+
"""Yield record batches from this DataFrame lazily.
11261137
1127-
This executes the DataFrame using DataFusion's partitioned streaming
1128-
APIs and yields :class:`datafusion.record_batch.RecordBatch` objects.
1138+
This delegates to :py:meth:`to_stream` and converts each batch to a
1139+
:class:`pyarrow.RecordBatch` without eagerly materializing the entire
1140+
result set.
11291141
"""
1130-
for stream in self.execute_stream_partitioned():
1131-
yield from stream
1142+
for batch in self.to_stream():
1143+
yield batch.to_pyarrow()
1144+
1145+
def __aiter__(self) -> AsyncIterator[pa.RecordBatch]:
1146+
"""Asynchronously yield record batches from this DataFrame lazily."""
1147+
stream = self.to_stream()
1148+
1149+
async def iterator() -> AsyncIterator[pa.RecordBatch]:
1150+
async for batch in stream:
1151+
yield batch.to_pyarrow()
1152+
1153+
return iterator()
11321154

11331155
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11341156
"""Apply a function to the current DataFrame which returns another DataFrame.

python/tests/test_dataframe_iter_stream.py

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,17 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
import pyarrow as pa
19-
20-
21-
def test_iter_releases_reader(monkeypatch, ctx):
22-
batches = [
23-
pa.RecordBatch.from_pydict({"a": [1]}),
24-
pa.RecordBatch.from_pydict({"a": [2]}),
25-
]
26-
27-
class DummyReader:
28-
def __init__(self, batches):
29-
self._iter = iter(batches)
30-
self.closed = False
31-
32-
def __iter__(self):
33-
return self
34-
35-
def __next__(self):
36-
return next(self._iter)
37-
38-
def close(self):
39-
self.closed = True
40-
41-
dummy_reader = DummyReader(batches)
42-
43-
class FakeRecordBatchReader:
44-
@staticmethod
45-
def _import_from_c_capsule(*_args, **_kwargs):
46-
return dummy_reader
47-
48-
monkeypatch.setattr(pa, "RecordBatchReader", FakeRecordBatchReader)
4918

19+
def test_to_stream(ctx):
5020
df = ctx.from_pydict({"a": [1, 2]})
21+
stream = df.to_stream()
22+
batches = [rb.to_pyarrow() for rb in stream]
23+
assert len(batches) == 1
24+
assert batches[0].to_pydict() == {"a": [1, 2]}
5125

52-
for _ in df:
53-
break
5426

55-
assert dummy_reader.closed
27+
def test_dataframe_iter(ctx):
28+
df = ctx.from_pydict({"a": [1, 2]})
29+
batches = list(df)
30+
assert len(batches) == 1
31+
assert batches[0].to_pydict() == {"a": [1, 2]}

0 commit comments

Comments
 (0)