Skip to content

Commit b54edc4

Browse files
committed
fix: update DataFrame iteration to yield DataFusion RecordBatch objects
1 parent c81cb8f commit b54edc4

3 files changed

Lines changed: 17 additions & 51 deletions

File tree

docs/source/user-guide/dataframe/index.rst

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,12 +168,14 @@ out-of-memory errors.
168168
for batch in reader:
169169
... # process each batch as it is produced
170170
171-
DataFrames are also iterable, yielding :class:`pyarrow.RecordBatch` objects
172-
lazily so you can loop over results directly:
171+
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch` objects
172+
that implement the Arrow C data interface. These batches can be consumed by
173+
libraries like PyArrow without copying:
173174

174175
.. code-block:: python
175176
176177
for batch in df:
178+
pa_batch = batch.to_pyarrow() # optional conversion
177179
... # process each batch as it is produced
178180
179181
See :doc:`../io/arrow` for additional details on the Arrow interface.

python/datafusion/dataframe.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import pyarrow as pa
5555

5656
from datafusion._internal import expr as expr_internal
57+
from datafusion.record_batch import RecordBatch
5758

5859
from enum import Enum
5960

@@ -1121,22 +1122,16 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
11211122
# preserving the original partition order.
11221123
return self.df.__arrow_c_stream__(requested_schema)
11231124

1124-
def __iter__(self) -> Iterator[pa.RecordBatch]:
1125-
"""Yield record batches from the DataFrame without materializing results.
1125+
def __iter__(self) -> Iterator[RecordBatch]:
1126+
"""Yield DataFusion record batches without materializing results.
11261127
1127-
This implementation streams record batches via the Arrow C Stream
1128-
interface, allowing callers such as :func:`pyarrow.Table.from_batches` to
1129-
consume results lazily. The DataFrame is executed using DataFusion's
1130-
partitioned streaming APIs so ``collect`` is never invoked and batch
1131-
order across partitions is preserved.
1128+
Batches are produced lazily using DataFusion's partitioned streaming
1129+
APIs so ``collect`` is never invoked. Each returned batch exposes the
1130+
Arrow C data interface and can be consumed by downstream libraries that
1131+
support ``__arrow_c_array__``.
11321132
"""
1133-
from contextlib import closing
1134-
1135-
import pyarrow as pa
1136-
1137-
reader = pa.RecordBatchReader._import_from_c_capsule(self.__arrow_c_stream__())
1138-
with closing(reader):
1139-
yield from reader
1133+
for stream in self.execute_stream_partitioned():
1134+
yield from stream
11401135

11411136
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11421137
"""Apply a function to the current DataFrame which returns another DataFrame.

python/tests/test_dataframe_iter_stream.py

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,10 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
import pyarrow as pa
18+
import datafusion
1919

2020

21-
def test_iter_releases_reader(monkeypatch, ctx):
22-
batches = [
23-
pa.RecordBatch.from_pydict({"a": [1]}),
24-
pa.RecordBatch.from_pydict({"a": [2]}),
25-
]
26-
27-
class DummyReader:
28-
def __init__(self, batches):
29-
self._iter = iter(batches)
30-
self.closed = False
31-
32-
def __iter__(self):
33-
return self
34-
35-
def __next__(self):
36-
return next(self._iter)
37-
38-
def close(self):
39-
self.closed = True
40-
41-
dummy_reader = DummyReader(batches)
42-
43-
class FakeRecordBatchReader:
44-
@staticmethod
45-
def _import_from_c_capsule(*_args, **_kwargs):
46-
return dummy_reader
47-
48-
monkeypatch.setattr(pa, "RecordBatchReader", FakeRecordBatchReader)
49-
21+
def test_iter_returns_record_batch(ctx):
5022
df = ctx.from_pydict({"a": [1, 2]})
51-
52-
for _ in df:
53-
break
54-
55-
assert dummy_reader.closed
23+
batch = next(iter(df))
24+
assert isinstance(batch, datafusion.RecordBatch)

0 commit comments

Comments
 (0)