Skip to content

Commit a748b68

Browse files
committed
schedule work
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent d4a7142 commit a748b68

4 files changed

Lines changed: 27 additions & 27 deletions

File tree

vortex-cxx/src/read.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,11 @@
44
use std::sync::Arc;
55

66
use anyhow::Result;
7-
use arrow_array::RecordBatch;
87
use arrow_array::RecordBatchReader;
9-
use arrow_array::cast::AsArray;
108
use arrow_array::ffi::FFI_ArrowSchema;
119
use arrow_array::ffi_stream::FFI_ArrowArrayStream;
12-
use arrow_schema::ArrowError;
13-
use arrow_schema::DataType;
1410
use arrow_schema::Schema;
1511
use arrow_schema::SchemaRef;
16-
use futures::StreamExt;
17-
use futures::stream::TryStreamExt;
18-
use vortex::array::arrow::IntoArrowArray;
1912
use vortex::buffer::Buffer;
2013
use vortex::file::OpenOptionsSessionExt;
2114
use vortex::io::runtime::BlockingRuntime;
@@ -158,19 +151,9 @@ pub(crate) fn scan_builder_into_threadsafe_cloneable_reader(
158151
Arc::new(arrow_schema)
159152
}
160153
};
161-
let data_type = DataType::Struct(schema.fields().clone());
162-
163154
let stream = builder
164155
.inner
165-
.into_stream()?
166-
.map(move |result| {
167-
result.and_then(|chunk| {
168-
chunk
169-
.into_arrow(&data_type)
170-
.map(|struct_array| RecordBatch::from(struct_array.as_struct()))
171-
})
172-
})
173-
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
156+
.into_record_batch_stream(Arc::clone(&schema))?;
174157

175158
let iter = RUNTIME.block_on_stream_thread_safe(|_h| stream);
176159
let rbr = RecordBatchIteratorAdapter::new(iter, schema);

vortex-datafusion/src/persistent/opener.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use vortex::array::arrow::ArrowArrayExecutor;
3737
use vortex::error::VortexError;
3838
use vortex::file::OpenOptionsSessionExt;
3939
use vortex::io::InstrumentedReadAt;
40+
use vortex::io::session::RuntimeSessionExt;
4041
use vortex::layout::LayoutReader;
4142
use vortex::layout::scan::scan_builder::ScanBuilder;
4243
use vortex::metrics::Label;
@@ -362,6 +363,7 @@ impl FileOpener for VortexOpener {
362363
}
363364

364365
let stream_schema = Arc::new(stream_schema);
366+
let handle = session.handle();
365367

366368
let stream = scan_builder
367369
.with_metrics_registry(metrics_registry)
@@ -370,10 +372,15 @@ impl FileOpener for VortexOpener {
370372
.with_ordered(has_output_ordering)
371373
.into_stream()
372374
.map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))?
373-
.map(move |chunk| {
374-
let mut ctx = session.create_execution_ctx();
375-
chunk.and_then(|chunk| {
376-
chunk.execute_record_batch(stream_schema.as_ref(), &mut ctx)
375+
.then(move |chunk| {
376+
let session = session.clone();
377+
let stream_schema = Arc::clone(&stream_schema);
378+
let handle = handle.clone();
379+
handle.spawn_blocking(move || {
380+
let mut ctx = session.create_execution_ctx();
381+
chunk.and_then(|chunk| {
382+
chunk.execute_record_batch(stream_schema.as_ref(), &mut ctx)
383+
})
377384
})
378385
})
379386
.map_ok(move |rb| {

vortex-datafusion/src/v2/source.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -429,15 +429,15 @@ impl DataSource for VortexDataSource {
429429
let handle = session.handle();
430430
let stream = scan_streams
431431
.try_flatten_unordered(Some(num_partitions.get() * 2))
432-
.map(move |result| {
432+
.then(move |result| {
433433
let session = session.clone();
434434
let schema = Arc::clone(&projected_schema);
435-
handle.spawn_cpu(move || {
435+
let handle = handle.clone();
436+
handle.spawn_blocking(move || {
436437
let mut ctx = session.create_execution_ctx();
437438
result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
438439
})
439440
})
440-
.buffered(num_partitions.get())
441441
.map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
442442

443443
// Apply leftover projection (expressions that couldn't be pushed into Vortex).

vortex-layout/src/scan/arrow.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use vortex_array::VortexSessionExecute;
1818
use vortex_array::arrow::ArrowArrayExecutor;
1919
use vortex_error::VortexResult;
2020
use vortex_io::runtime::BlockingRuntime;
21+
use vortex_io::session::RuntimeSessionExt;
2122

2223
use crate::scan::scan_builder::ScanBuilder;
2324

@@ -52,13 +53,22 @@ impl ScanBuilder {
5253
) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
5354
let data_type = DataType::Struct(schema.fields().clone());
5455
let session = self.session().clone();
56+
let handle = session.handle();
57+
let concurrency = std::thread::available_parallelism()
58+
.map(|n| n.get())
59+
.unwrap_or(1);
5560

5661
let stream = self
5762
.into_stream()?
5863
.map(move |chunk| {
59-
let mut ctx = session.create_execution_ctx();
60-
chunk.and_then(|chunk| to_record_batch(chunk, &data_type, &mut ctx))
64+
let session = session.clone();
65+
let data_type = data_type.clone();
66+
handle.spawn_blocking(move || {
67+
let mut ctx = session.create_execution_ctx();
68+
chunk.and_then(|chunk| to_record_batch(chunk, &data_type, &mut ctx))
69+
})
6170
})
71+
.buffered(concurrency)
6272
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
6373

6474
Ok(stream)

0 commit comments

Comments
 (0)