Skip to content

Commit 52a8912

Browse files
committed
feat: add Arrow C Data Interface export to RecordBatch and update DataFrame iteration
1 parent 555ee5f commit 52a8912

3 files changed

Lines changed: 116 additions & 15 deletions

File tree

python/datafusion/dataframe.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4444
from datafusion.expr import Expr, SortExpr, sort_or_default
4545
from datafusion.plan import ExecutionPlan, LogicalPlan
46-
from datafusion.record_batch import RecordBatchStream
46+
from datafusion.record_batch import RecordBatch, RecordBatchStream
4747

4848
if TYPE_CHECKING:
4949
import pathlib
@@ -291,7 +291,7 @@ class DataFrame:
291291
"""Two dimensional table representation of data.
292292
293293
DataFrame objects are iterable; iterating over a DataFrame yields
294-
:class:`pyarrow.RecordBatch` instances lazily.
294+
:class:`datafusion.record_batch.RecordBatch` instances lazily.
295295
296296
See :ref:`user_guide_concepts` in the online documentation for more information.
297297
"""
@@ -1121,22 +1121,14 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
11211121
# preserving the original partition order.
11221122
return self.df.__arrow_c_stream__(requested_schema)
11231123

1124-
def __iter__(self) -> Iterator[pa.RecordBatch]:
1124+
def __iter__(self) -> Iterator[RecordBatch]:
11251125
"""Yield record batches from the DataFrame without materializing results.
11261126
1127-
This implementation streams record batches via the Arrow C Stream
1128-
interface, allowing callers such as :func:`pyarrow.Table.from_batches` to
1129-
consume results lazily. The DataFrame is executed using DataFusion's
1130-
partitioned streaming APIs so ``collect`` is never invoked and batch
1131-
order across partitions is preserved.
1127+
This executes the DataFrame using DataFusion's partitioned streaming
1128+
APIs and yields :class:`datafusion.record_batch.RecordBatch` objects.
11321129
"""
1133-
from contextlib import closing
1134-
1135-
import pyarrow as pa
1136-
1137-
reader = pa.RecordBatchReader._import_from_c_capsule(self.__arrow_c_stream__())
1138-
with closing(reader):
1139-
yield from reader
1130+
for stream in self.execute_stream_partitioned():
1131+
yield from stream
11401132

11411133
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11421134
"""Apply a function to the current DataFrame which returns another DataFrame.

python/datafusion/record_batch.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ def to_pyarrow(self) -> pa.RecordBatch:
4646
"""Convert to :py:class:`pa.RecordBatch`."""
4747
return self.record_batch.to_pyarrow()
4848

49+
def __arrow_c_array__(
50+
self, requested_schema: object | None = None
51+
) -> tuple[object, object]:
52+
"""Arrow C Data Interface export."""
53+
return self.record_batch.__arrow_c_array__(requested_schema)
54+
4955

5056
class RecordBatchStream:
5157
"""This class represents a stream of record batches.

src/record_batch.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,58 @@
1616
// under the License.
1717

1818
use std::sync::Arc;
19+
use std::{ffi::c_void, ffi::CStr};
1920

2021
use crate::errors::PyDataFusionError;
2122
use crate::utils::wait_for_future;
23+
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
24+
use datafusion::arrow::array::{Array, StructArray};
2225
use datafusion::arrow::pyarrow::ToPyArrow;
2326
use datafusion::arrow::record_batch::RecordBatch;
2427
use datafusion::physical_plan::SendableRecordBatchStream;
2528
use futures::StreamExt;
2629
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration};
30+
use pyo3::ffi;
2731
use pyo3::prelude::*;
32+
use pyo3::types::PyCapsule;
2833
use pyo3::{pyclass, pymethods, PyObject, PyResult, Python};
2934
use tokio::sync::Mutex;
3035

36+
#[allow(clippy::manual_c_str_literals)]
37+
static ARROW_ARRAY_NAME: &CStr = unsafe { CStr::from_bytes_with_nul_unchecked(b"arrow_array\0") };
38+
#[allow(clippy::manual_c_str_literals)]
39+
static ARROW_SCHEMA_NAME: &CStr = unsafe { CStr::from_bytes_with_nul_unchecked(b"arrow_schema\0") };
40+
41+
unsafe extern "C" fn drop_array(capsule: *mut ffi::PyObject) {
42+
if capsule.is_null() {
43+
return;
44+
}
45+
46+
if ffi::PyCapsule_IsValid(capsule, ARROW_ARRAY_NAME.as_ptr()) == 1 {
47+
let array_ptr =
48+
ffi::PyCapsule_GetPointer(capsule, ARROW_ARRAY_NAME.as_ptr()) as *mut FFI_ArrowArray;
49+
if !array_ptr.is_null() {
50+
drop(Box::from_raw(array_ptr));
51+
}
52+
}
53+
ffi::PyErr_Clear();
54+
}
55+
56+
unsafe extern "C" fn drop_schema(capsule: *mut ffi::PyObject) {
57+
if capsule.is_null() {
58+
return;
59+
}
60+
61+
if ffi::PyCapsule_IsValid(capsule, ARROW_SCHEMA_NAME.as_ptr()) == 1 {
62+
let schema_ptr =
63+
ffi::PyCapsule_GetPointer(capsule, ARROW_SCHEMA_NAME.as_ptr()) as *mut FFI_ArrowSchema;
64+
if !schema_ptr.is_null() {
65+
drop(Box::from_raw(schema_ptr));
66+
}
67+
}
68+
ffi::PyErr_Clear();
69+
}
70+
3171
#[pyclass(name = "RecordBatch", module = "datafusion", subclass)]
3272
pub struct PyRecordBatch {
3373
batch: RecordBatch,
@@ -38,6 +78,69 @@ impl PyRecordBatch {
3878
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
3979
self.batch.to_pyarrow(py)
4080
}
81+
82+
#[pyo3(signature = (requested_schema=None))]
83+
fn __arrow_c_array__<'py>(
84+
&self,
85+
py: Python<'py>,
86+
requested_schema: Option<Bound<'py, PyCapsule>>,
87+
) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {
88+
// For now ignore requested_schema; future work could apply projection
89+
if let Some(schema_capsule) = requested_schema {
90+
crate::utils::validate_pycapsule(&schema_capsule, "arrow_schema")?;
91+
}
92+
93+
let struct_array = StructArray::from(self.batch.clone());
94+
let data = struct_array.to_data();
95+
let array = FFI_ArrowArray::new(&data);
96+
let schema =
97+
FFI_ArrowSchema::try_from(data.data_type()).map_err(PyDataFusionError::from)?;
98+
99+
let array_ptr = Box::into_raw(Box::new(array));
100+
let schema_ptr = Box::into_raw(Box::new(schema));
101+
102+
unsafe {
103+
let schema_capsule = ffi::PyCapsule_New(
104+
schema_ptr as *mut c_void,
105+
ARROW_SCHEMA_NAME.as_ptr(),
106+
Some(drop_schema),
107+
);
108+
if schema_capsule.is_null() {
109+
drop(Box::from_raw(schema_ptr));
110+
drop(Box::from_raw(array_ptr));
111+
return Err(PyErr::fetch(py));
112+
}
113+
114+
let array_capsule = ffi::PyCapsule_New(
115+
array_ptr as *mut c_void,
116+
ARROW_ARRAY_NAME.as_ptr(),
117+
Some(drop_array),
118+
);
119+
if array_capsule.is_null() {
120+
drop(Box::from_raw(array_ptr));
121+
if ffi::PyCapsule_IsValid(schema_capsule, ARROW_SCHEMA_NAME.as_ptr()) == 1 {
122+
let schema_ptr =
123+
ffi::PyCapsule_GetPointer(schema_capsule, ARROW_SCHEMA_NAME.as_ptr())
124+
as *mut FFI_ArrowSchema;
125+
if !schema_ptr.is_null() {
126+
drop(Box::from_raw(schema_ptr));
127+
}
128+
}
129+
ffi::PyErr_Clear();
130+
ffi::Py_DECREF(schema_capsule);
131+
return Err(PyErr::fetch(py));
132+
}
133+
134+
let schema_capsule = Bound::from_owned_ptr(py, schema_capsule)
135+
.downcast_into::<PyCapsule>()
136+
.unwrap();
137+
let array_capsule = Bound::from_owned_ptr(py, array_capsule)
138+
.downcast_into::<PyCapsule>()
139+
.unwrap();
140+
141+
Ok((schema_capsule, array_capsule))
142+
}
143+
}
41144
}
42145

43146
impl From<RecordBatch> for PyRecordBatch {

0 commit comments

Comments
 (0)