Skip to content

Commit bdbf6c4

Browse files
authored
Spawn arrow conversion in jni bindings (#8595)
We want the worker pool to pick it up instead of running on the consuming thread
1 parent a9a3c27 commit bdbf6c4

1 file changed

Lines changed: 23 additions & 15 deletions

File tree

vortex-jni/src/scan.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ use jni::objects::JClass;
2828
use jni::objects::JLongArray;
2929
use jni::sys::jboolean;
3030
use jni::sys::jlong;
31-
use vortex::array::ArrayRef;
32-
use vortex::array::ExecutionCtx;
3331
use vortex::array::VortexSessionExecute;
3432
use vortex::array::arrow::ArrowSessionExt;
3533
use vortex::array::stream::SendableArrayStream;
@@ -47,6 +45,7 @@ use vortex::scan::PartitionStream;
4745
use vortex::scan::ScanRequest;
4846
use vortex::scan::selection::Selection;
4947

48+
use crate::POOL;
5049
use crate::RUNTIME;
5150
use crate::data_source::NativeDataSource;
5251
use crate::dtype::strip_views;
@@ -351,23 +350,32 @@ pub extern "system" fn Java_dev_vortex_jni_NativePartition_scanArrow(
351350
_ => unreachable!("Vortex DType always exports as a struct"),
352351
};
353352
let schema = Arc::new(arrow_schema::Schema::new(fields));
354-
let target = Field::new_struct("", schema.fields().clone(), false);
353+
let target = Arc::new(Field::new_struct("", schema.fields().clone(), false));
355354

356355
let session = unsafe { session_ref(session_ptr) };
357356

358357
let iter = RUNTIME
359-
.block_on_stream_thread_safe(|_handle| array_stream)
360-
.map(
361-
move |chunk: VortexResult<ArrayRef>| -> VortexResult<RecordBatch> {
362-
let chunk: ArrayRef = chunk?;
363-
let mut ctx: ExecutionCtx = session.create_execution_ctx();
364-
let arrow = session
365-
.arrow()
366-
.execute_arrow(chunk, Some(&target), &mut ctx)?;
367-
Ok(RecordBatch::from(arrow.as_struct().clone()))
368-
},
369-
)
370-
.map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
358+
.block_on_stream_thread_safe(|handle| {
359+
array_stream
360+
.map(move |chunk| {
361+
let session = session.clone();
362+
let target = Arc::clone(&target);
363+
handle.spawn(async move {
364+
let chunk = chunk?;
365+
let mut ctx = session.create_execution_ctx();
366+
let arrow = session.arrow().execute_arrow(
367+
chunk,
368+
Some(target.as_ref()),
369+
&mut ctx,
370+
)?;
371+
Ok(RecordBatch::from(arrow.as_struct().clone()))
372+
})
373+
})
374+
.buffered(POOL.worker_count().max(1))
375+
})
376+
.map(|result: VortexResult<RecordBatch>| {
377+
result.map_err(|e| ArrowError::ExternalError(Box::new(e)))
378+
});
371379

372380
let reader = RecordBatchIteratorAdapter::new(iter, schema);
373381
let arrow_stream = FFI_ArrowArrayStream::new(Box::new(reader));

0 commit comments

Comments
 (0)