DataFusion DataFrames can stream results to Arrow-based Python libraries
without first materializing all batches in memory. This page focuses on the
Arrow interfaces exposed by DataFrame and when to use each one.
DataFusion DataFrames implement the __arrow_c_stream__ protocol, enabling
zero-copy, lazy streaming into Arrow-based Python libraries. With the streaming
protocol, batches are produced on demand.
Note
The protocol is implementation-agnostic and works with any Python library that understands the Arrow C streaming interface (for example, PyArrow or other Arrow-compatible implementations). The sections below provide a short PyArrow-specific example and general guidance for other implementations.
import pyarrow as pa
# Create a PyArrow RecordBatchReader without materializing all batches
reader = pa.RecordBatchReader.from_stream(df)
for batch in reader:
... # process each batch as it is producedDataFrames are also iterable, yielding :class:`datafusion.RecordBatch` objects lazily so you can loop over results directly without importing PyArrow:
for batch in df:
... # each batch is a ``datafusion.RecordBatch``Each batch exposes to_pyarrow(), allowing conversion to a PyArrow
table. pa.table(df) collects the entire DataFrame eagerly into a
PyArrow table:
import pyarrow as pa
table = pa.table(df)Asynchronous iteration is supported as well, allowing integration with
asyncio event loops:
async for batch in df:
... # process each batch as it is producedFor finer control over streaming execution, use :py:meth:`~datafusion.DataFrame.execute_stream` to obtain a :py:class:`datafusion.RecordBatchStream`:
stream = df.execute_stream()
for batch in stream:
... # process each batch as it is producedTip
To get a PyArrow reader instead, call
pa.RecordBatchReader.from_stream(df).
When partition boundaries are important, :py:meth:`~datafusion.DataFrame.execute_stream_partitioned` returns an iterable of :py:class:`datafusion.RecordBatchStream` objects, one per partition:
for stream in df.execute_stream_partitioned():
for batch in stream:
... # each stream yields RecordBatchesTo process partitions concurrently, first collect the streams into a list
and then poll each one in a separate asyncio task:
import asyncio
async def consume(stream):
async for batch in stream:
...
streams = list(df.execute_stream_partitioned())
await asyncio.gather(*(consume(s) for s in streams))See :doc:`../io/arrow` for additional details on the Arrow interface.