@@ -39,7 +39,6 @@ use datafusion::prelude::*;
3939use datafusion_ffi:: table_provider:: FFI_TableProvider ;
4040use futures:: { StreamExt , TryStreamExt } ;
4141use pyo3:: exceptions:: PyValueError ;
42- use pyo3:: ffi;
4342use pyo3:: prelude:: * ;
4443use pyo3:: pybacked:: PyBackedStr ;
4544use pyo3:: types:: { PyCapsule , PyList , PyTuple , PyTupleMethods } ;
@@ -59,19 +58,6 @@ use crate::{
5958 expr:: { sort_expr:: PySortExpr , PyExpr } ,
6059} ;
6160
62- const ARROW_STREAM_NAME : & str = "arrow_stream" ;
63-
64- unsafe extern "C" fn drop_stream ( capsule : * mut ffi:: PyObject ) {
65- if capsule. is_null ( ) {
66- return ;
67- }
68- let name = CString :: new ( ARROW_STREAM_NAME ) . unwrap ( ) ;
69- let stream_ptr = ffi:: PyCapsule_GetPointer ( capsule, name. as_ptr ( ) ) as * mut FFI_ArrowArrayStream ;
70- if !stream_ptr. is_null ( ) {
71- drop ( Box :: from_raw ( stream_ptr) ) ;
72- }
73- }
74-
7561// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
7662// - we have not decided on the table_provider approach yet
7763// this is an interim implementation
@@ -972,21 +958,20 @@ impl PyDataFrame {
972958 !stream_ptr. is_null( ) ,
973959 "ArrowArrayStream pointer should never be null"
974960 ) ;
975- let stream_name = CString :: new ( ARROW_STREAM_NAME ) . unwrap ( ) ;
976- let capsule = unsafe {
977- ffi:: PyCapsule_New (
978- stream_ptr as * mut c_void ,
979- stream_name. as_ptr ( ) ,
980- Some ( drop_stream) ,
961+ let stream_capsule_name = CString :: new ( "arrow_array_stream" ) . unwrap ( ) ;
962+ unsafe {
963+ PyCapsule :: new_bound_with_destructor (
964+ py,
965+ stream_ptr,
966+ Some ( stream_capsule_name) ,
967+ |ptr : * mut FFI_ArrowArrayStream , _| {
968+ if !ptr. is_null ( ) {
969+ unsafe { Box :: from_raw ( ptr) } ;
970+ }
971+ } ,
981972 )
982- } ;
983- if capsule. is_null ( ) {
984- unsafe { drop ( Box :: from_raw ( stream_ptr) ) } ;
985- Err ( PyErr :: fetch ( py) . into ( ) )
986- } else {
987- let any = unsafe { Bound :: from_owned_ptr ( py, capsule) } ;
988- Ok ( any. downcast_into :: < PyCapsule > ( ) . unwrap ( ) )
989973 }
974+ . map_err ( PyDataFusionError :: from)
990975 }
991976
992977 fn execute_stream ( & self , py : Python ) -> PyDataFusionResult < PyRecordBatchStream > {
0 commit comments