1717
1818use std:: collections:: HashMap ;
1919use std:: ffi:: { CStr , CString } ;
20+ use std:: os:: raw:: c_void;
2021use std:: sync:: Arc ;
2122
2223use arrow:: array:: { new_null_array, RecordBatch , RecordBatchReader } ;
@@ -39,6 +40,7 @@ use datafusion::prelude::*;
3940use datafusion_ffi:: table_provider:: FFI_TableProvider ;
4041use futures:: { StreamExt , TryStreamExt } ;
4142use pyo3:: exceptions:: PyValueError ;
43+ use pyo3:: ffi;
4244use pyo3:: prelude:: * ;
4345use pyo3:: pybacked:: PyBackedStr ;
4446use pyo3:: types:: { PyCapsule , PyList , PyTuple , PyTupleMethods } ;
@@ -62,6 +64,25 @@ use crate::{
6264static ARROW_STREAM_NAME : & CStr =
6365 unsafe { CStr :: from_bytes_with_nul_unchecked ( b"arrow_array_stream\0 " ) } ;
6466
67+ /// Capsule destructor for [`FFI_ArrowArrayStream`].
68+ ///
69+ /// PyArrow currently takes ownership of the stream and nulls out the capsule's
70+ /// pointer when importing. This destructor defensively checks for a null pointer
71+ /// to guard against double free until PyArrow is removed.
72+ unsafe extern "C" fn drop_arrow_array_stream_capsule ( capsule : * mut ffi:: PyObject ) {
73+ // Safety: `capsule` is expected to be a valid `PyCapsule` containing a
74+ // `FFI_ArrowArrayStream` created by this module.
75+ let ptr = unsafe { ffi:: PyCapsule_GetPointer ( capsule, ARROW_STREAM_NAME . as_ptr ( ) ) } ;
76+ if ptr. is_null ( ) {
77+ return ;
78+ }
79+ // Reconstruct and drop the stream then null out the capsule pointer.
80+ unsafe {
81+ drop ( Box :: from_raw ( ptr. cast :: < FFI_ArrowArrayStream > ( ) ) ) ;
82+ ffi:: PyCapsule_SetPointer ( capsule, std:: ptr:: null_mut ( ) ) ;
83+ }
84+ }
85+
6586// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
6687// - we have not decided on the table_provider approach yet
6788// this is an interim implementation
@@ -971,9 +992,18 @@ impl PyDataFrame {
971992 // PyArrow imports the capsule it assumes ownership of the stream and
972993 // nulls out the capsule's internal pointer so the destructor does not
973994 // free it twice.
974- // ARROW_STREAM_NAME is a `&CStr`; PyCapsule::new expects an
975- // `Option<CString>` for the name, so convert it here.
976- PyCapsule :: new ( py, stream, Some ( ARROW_STREAM_NAME . into ( ) ) ) . map_err ( Into :: into)
995+ unsafe {
996+ let capsule = ffi:: PyCapsule_New (
997+ Box :: into_raw ( Box :: new ( stream) ) as * mut c_void ,
998+ ARROW_STREAM_NAME . as_ptr ( ) ,
999+ Some ( drop_arrow_array_stream_capsule) ,
1000+ ) ;
1001+ if capsule. is_null ( ) {
1002+ Err ( PyErr :: fetch ( py) . into ( ) )
1003+ } else {
1004+ Ok ( Bound :: from_owned_ptr ( py, capsule) . downcast_into_unchecked ( ) )
1005+ }
1006+ }
9771007 }
9781008
9791009 fn execute_stream ( & self , py : Python ) -> PyDataFusionResult < PyRecordBatchStream > {
0 commit comments