@@ -39,6 +39,7 @@ use datafusion::prelude::*;
3939use datafusion_ffi:: table_provider:: FFI_TableProvider ;
4040use futures:: { StreamExt , TryStreamExt } ;
4141use pyo3:: exceptions:: PyValueError ;
42+ use pyo3:: ffi;
4243use pyo3:: prelude:: * ;
4344use pyo3:: pybacked:: PyBackedStr ;
4445use pyo3:: types:: { PyCapsule , PyList , PyTuple , PyTupleMethods } ;
@@ -58,6 +59,19 @@ use crate::{
5859 expr:: { sort_expr:: PySortExpr , PyExpr } ,
5960} ;
6061
62+ const ARROW_STREAM_NAME : & str = "arrow_stream" ;
63+
64+ unsafe extern "C" fn drop_stream ( capsule : * mut ffi:: PyObject ) {
65+ if capsule. is_null ( ) {
66+ return ;
67+ }
68+ let name = CString :: new ( ARROW_STREAM_NAME ) . unwrap ( ) ;
69+ let stream_ptr = ffi:: PyCapsule_GetPointer ( capsule, name. as_ptr ( ) ) as * mut FFI_ArrowArrayStream ;
70+ if !stream_ptr. is_null ( ) {
71+ drop ( Box :: from_raw ( stream_ptr) ) ;
72+ }
73+ }
74+
6175// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
6276// - we have not decided on the table_provider approach yet
6377// this is an interim implementation
@@ -958,20 +972,21 @@ impl PyDataFrame {
958972 !stream_ptr. is_null( ) ,
959973 "ArrowArrayStream pointer should never be null"
960974 ) ;
961- let stream_capsule_name = CString :: new ( "arrow_array_stream" ) . unwrap ( ) ;
962- unsafe {
963- PyCapsule :: new_bound_with_destructor (
964- py,
965- stream_ptr,
966- Some ( stream_capsule_name) ,
967- |ptr : * mut FFI_ArrowArrayStream , _| {
968- if !ptr. is_null ( ) {
969- unsafe { Box :: from_raw ( ptr) } ;
970- }
971- } ,
975+ let stream_name = CString :: new ( ARROW_STREAM_NAME ) . unwrap ( ) ;
976+ let capsule = unsafe {
977+ ffi:: PyCapsule_New (
978+ stream_ptr as * mut c_void ,
979+ stream_name. as_ptr ( ) ,
980+ Some ( drop_stream) ,
972981 )
982+ } ;
983+ if capsule. is_null ( ) {
984+ unsafe { drop ( Box :: from_raw ( stream_ptr) ) } ;
985+ Err ( PyErr :: fetch ( py) . into ( ) )
986+ } else {
987+ let any = unsafe { Bound :: from_owned_ptr ( py, capsule) } ;
988+ Ok ( any. downcast_into :: < PyCapsule > ( ) . unwrap ( ) )
973989 }
974- . map_err ( PyDataFusionError :: from)
975990 }
976991
977992 fn execute_stream ( & self , py : Python ) -> PyDataFusionResult < PyRecordBatchStream > {
0 commit comments