@@ -253,16 +253,19 @@ impl DeviceArrayExt for ArrayRef {
253253 }
254254}
255255
256+ // POSIX EIO for Arrow stream producer/export failures.
256257const ARROW_STREAM_EIO : c_int = 5 ;
258+ // POSIX EINVAL for invalid Arrow stream callback arguments or released streams.
257259const ARROW_STREAM_EINVAL : c_int = 22 ;
258260
259261#[ derive( Clone , Debug , PartialEq ) ]
260- enum DeviceExportSchema {
262+ enum ArrowDeviceStreamSchema {
261263 Schema ( Schema ) ,
262264 Field ( Field ) ,
263265}
264266
265- impl DeviceExportSchema {
267+ impl ArrowDeviceStreamSchema {
268+ /// Interpret an Arrow C schema as the stream schema shape for `dtype`.
266269 fn from_ffi ( schema : & FFI_ArrowSchema , dtype : & DType ) -> VortexResult < Self > {
267270 if matches ! ( dtype, DType :: Struct ( ..) ) {
268271 Ok ( Self :: Schema ( Schema :: try_from ( schema) ?) )
@@ -271,6 +274,7 @@ impl DeviceExportSchema {
271274 }
272275 }
273276
277+ /// Build the Arrow stream schema for an empty stream with the given dtype.
274278 fn from_dtype ( dtype : & DType , ctx : & mut CudaExecutionCtx ) -> VortexResult < Self > {
275279 let dtype = arrow_device_export_dtype ( dtype) ;
276280 if let DType :: Struct ( struct_dtype, _) = & dtype {
@@ -282,6 +286,7 @@ impl DeviceExportSchema {
282286 }
283287 }
284288
289+ /// Export this stream schema as an owned Arrow C schema.
285290 fn to_ffi ( & self ) -> VortexResult < FFI_ArrowSchema > {
286291 match self {
287292 Self :: Schema ( schema) => Ok ( FFI_ArrowSchema :: try_from ( schema) ?) ,
@@ -292,48 +297,41 @@ impl DeviceExportSchema {
292297
293298type ArrayStreamIterator = Box < dyn Iterator < Item = VortexResult < ArrayRef > > > ;
294299
295- struct FuturesArrayStreamIterator {
296- stream : SendableArrayStream ,
297- }
298-
299- impl Iterator for FuturesArrayStreamIterator {
300- type Item = VortexResult < ArrayRef > ;
301-
302- fn next ( & mut self ) -> Option < Self :: Item > {
303- futures:: executor:: block_on ( self . stream . next ( ) )
304- }
305- }
306-
307300struct DeviceArrayStreamPrivateData {
308301 array_iter : ArrayStreamIterator ,
309302 ctx : CudaExecutionCtx ,
310303 dtype : DType ,
311- schema : Option < DeviceExportSchema > ,
304+ schema : Option < ArrowDeviceStreamSchema > ,
312305 pending_array : Option < ArrowDeviceArray > ,
313306 device_id : i64 ,
314307 last_error : Option < CString > ,
315308}
316309
317310impl DeviceArrayStreamPrivateData {
311+ /// Clear the last stream error before a new callback invocation.
318312 fn clear_error ( & mut self ) {
319313 self . last_error = None ;
320314 }
321315
316+ /// Store the last stream error and return the Arrow callback error code.
322317 fn set_error ( & mut self , error : impl ToString ) -> c_int {
323- let message = error. to_string ( ) . replace ( '\0' , "\\ 0" ) ;
324- self . last_error = CString :: new ( message) . ok ( ) ;
318+ self . last_error = CString :: new ( error. to_string ( ) ) . ok ( ) ;
325319 ARROW_STREAM_EIO
326320 }
327321
328- fn ensure_schema ( & mut self ) -> VortexResult < & DeviceExportSchema > {
322+ /// Initialize and return the stream schema, exporting the first batch if needed.
323+ fn ensure_schema ( & mut self ) -> VortexResult < & ArrowDeviceStreamSchema > {
329324 if self . schema . is_none ( ) {
330325 match self . array_iter . next ( ) {
331326 Some ( array) => {
332327 let array = self . export_batch ( array?) ?;
333328 self . pending_array = Some ( array) ;
334329 }
335330 None => {
336- self . schema = Some ( DeviceExportSchema :: from_dtype ( & self . dtype , & mut self . ctx ) ?) ;
331+ self . schema = Some ( ArrowDeviceStreamSchema :: from_dtype (
332+ & self . dtype ,
333+ & mut self . ctx ,
334+ ) ?) ;
337335 }
338336 }
339337 }
@@ -343,21 +341,26 @@ impl DeviceArrayStreamPrivateData {
343341 . ok_or_else ( || vortex_err ! ( "ArrowDeviceArrayStream schema was not initialized" ) )
344342 }
345343
344+ /// Export and return the next device batch, or `None` at end of stream.
346345 fn next_array ( & mut self ) -> VortexResult < Option < ArrowDeviceArray > > {
347346 if let Some ( array) = self . pending_array . take ( ) {
348347 return Ok ( Some ( array) ) ;
349348 }
350349
351350 let Some ( array) = self . array_iter . next ( ) else {
352351 if self . schema . is_none ( ) {
353- self . schema = Some ( DeviceExportSchema :: from_dtype ( & self . dtype , & mut self . ctx ) ?) ;
352+ self . schema = Some ( ArrowDeviceStreamSchema :: from_dtype (
353+ & self . dtype ,
354+ & mut self . ctx ,
355+ ) ?) ;
354356 }
355357 return Ok ( None ) ;
356358 } ;
357359
358360 self . export_batch ( array?) . map ( Some )
359361 }
360362
363+ /// Export one Vortex array batch and validate it against the stream schema and device.
361364 fn export_batch ( & mut self , array : ArrayRef ) -> VortexResult < ArrowDeviceArray > {
362365 vortex_ensure ! (
363366 array. dtype( ) == & self . dtype,
@@ -372,7 +375,7 @@ impl DeviceArrayStreamPrivateData {
372375 mut schema,
373376 mut array,
374377 } = exported;
375- let batch_schema = DeviceExportSchema :: from_ffi ( & schema, & self . dtype ) ;
378+ let batch_schema = ArrowDeviceStreamSchema :: from_ffi ( & schema, & self . dtype ) ;
376379 release_schema ( & mut schema) ;
377380 let batch_schema = match batch_schema {
378381 Ok ( batch_schema) => batch_schema,
@@ -418,6 +421,7 @@ impl DeviceArrayStreamPrivateData {
418421}
419422
420423impl Drop for DeviceArrayStreamPrivateData {
424+ /// Release any pending batch exported while initializing the stream schema.
421425 fn drop ( & mut self ) {
422426 if let Some ( mut array) = self . pending_array . take ( ) {
423427 release_device_array ( & mut array) ;
@@ -440,13 +444,15 @@ pub trait DeviceArrayStreamExt {
440444}
441445
442446impl DeviceArrayStreamExt for SendableArrayStream {
447+ /// Export this stream by adapting it to a blocking iterator.
443448 fn export_device_array_stream (
444449 self ,
445450 session : & VortexSession ,
446451 ) -> VortexResult < ArrowDeviceArrayStream > {
447452 let dtype = self . dtype ( ) . clone ( ) ;
453+ let mut stream = self ;
448454 export_device_array_stream_from_iter (
449- FuturesArrayStreamIterator { stream : self } ,
455+ std :: iter :: from_fn ( move || futures :: executor :: block_on ( stream . next ( ) ) ) ,
450456 dtype,
451457 session,
452458 )
@@ -503,6 +509,7 @@ pub fn export_device_array_stream_from_iter_with_ctx(
503509 }
504510}
505511
512+ /// Return the private stream state for a live Arrow device stream.
506513unsafe fn device_stream_private_data < ' a > (
507514 stream : * mut ArrowDeviceArrayStream ,
508515) -> Option < & ' a mut DeviceArrayStreamPrivateData > {
@@ -515,6 +522,7 @@ unsafe fn device_stream_private_data<'a>(
515522 }
516523}
517524
525+ /// Create the Arrow end-of-stream marker for the stream's CUDA device.
518526fn released_device_array ( device_id : i64 ) -> ArrowDeviceArray {
519527 ArrowDeviceArray {
520528 array : ArrowArray :: empty ( ) ,
@@ -525,18 +533,21 @@ fn released_device_array(device_id: i64) -> ArrowDeviceArray {
525533 }
526534}
527535
536+ /// Release an Arrow C schema if it is live.
528537fn release_schema ( schema : & mut FFI_ArrowSchema ) {
529538 if let Some ( release) = schema. release {
530539 unsafe { release ( schema) } ;
531540 }
532541}
533542
543+ /// Release an Arrow device array if it is live.
534544fn release_device_array ( array : & mut ArrowDeviceArray ) {
535545 if let Some ( release) = array. array . release {
536546 unsafe { release ( & raw mut array. array ) } ;
537547 }
538548}
539549
550+ /// Implement `ArrowDeviceArrayStream.get_schema` for Vortex CUDA streams.
540551unsafe extern "C" fn device_stream_get_schema (
541552 stream : * mut ArrowDeviceArrayStream ,
542553 out : * mut ArrowSchema ,
@@ -560,6 +571,7 @@ unsafe extern "C" fn device_stream_get_schema(
560571 }
561572}
562573
574+ /// Implement `ArrowDeviceArrayStream.get_next` for Vortex CUDA streams.
563575unsafe extern "C" fn device_stream_get_next (
564576 stream : * mut ArrowDeviceArrayStream ,
565577 out : * mut ArrowDeviceArray ,
@@ -586,6 +598,7 @@ unsafe extern "C" fn device_stream_get_next(
586598 }
587599}
588600
601+ /// Implement `ArrowDeviceArrayStream.get_last_error` for Vortex CUDA streams.
589602unsafe extern "C" fn device_stream_get_last_error (
590603 stream : * mut ArrowDeviceArrayStream ,
591604) -> * const c_char {
@@ -599,6 +612,7 @@ unsafe extern "C" fn device_stream_get_last_error(
599612 . map_or ( ptr:: null ( ) , |error| error. as_ptr ( ) )
600613}
601614
615+ /// Implement `ArrowDeviceArrayStream.release` for Vortex CUDA streams.
602616unsafe extern "C" fn device_stream_release ( stream : * mut ArrowDeviceArrayStream ) {
603617 let Some ( stream_ref) = ( unsafe { stream. as_mut ( ) } ) else {
604618 return ;
@@ -875,18 +889,21 @@ mod tests {
875889 use crate :: arrow:: ArrowSchema ;
876890 use crate :: arrow:: DeviceArrayStreamExt ;
877891
892+ /// Release an Arrow C schema in stream tests if it is live.
878893 unsafe fn release_schema ( schema : & mut FFI_ArrowSchema ) {
879894 if let Some ( release) = schema. release {
880895 unsafe { release ( schema) } ;
881896 }
882897 }
883898
899+ /// Release an Arrow device array in stream tests if it is live.
884900 unsafe fn release_device_array ( array : & mut ArrowDeviceArray ) {
885901 if let Some ( release) = array. array . release {
886902 unsafe { release ( & raw mut array. array ) } ;
887903 }
888904 }
889905
906+ /// Create a zeroed placeholder Arrow device array for callback outputs.
890907 fn empty_device_array ( ) -> ArrowDeviceArray {
891908 ArrowDeviceArray {
892909 array : ArrowArray :: empty ( ) ,
@@ -897,6 +914,7 @@ mod tests {
897914 }
898915 }
899916
917+ /// Verify schema, batch, EOS, and idempotent release stream behavior.
900918 #[ cuda_test]
901919 fn test_export_device_array_stream_schema_next_eos_release ( ) -> VortexResult < ( ) > {
902920 let session = VortexSession :: default ( ) . with_some ( CudaSession :: try_default ( ) ?) ;
0 commit comments