Skip to content

Commit 9e2991c

Browse files
authored
Add Executor::spawn_io (#7894)
## Summary Allow `Executor` implementation to split their async IO and CPU work on different runtimes. --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 2ee2033 commit 9e2991c

7 files changed

Lines changed: 197 additions & 65 deletions

File tree

vortex-duckdb/src/filesystem.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,12 @@ fn object_store_fs(base_url: &Url) -> VortexResult<FileSystemRef> {
9494
);
9595
};
9696

97-
Ok(Arc::new(Compat::new(ObjectStoreFileSystem::new(
97+
let object_store = Arc::new(Compat::new(object_store)) as Arc<dyn ObjectStore>;
98+
99+
Ok(Arc::new(ObjectStoreFileSystem::new(
98100
object_store,
99101
RUNTIME.handle(),
100-
))))
102+
)))
101103
}
102104

103105
struct DuckDbFileSystem {

vortex-io/public-api.lock

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,8 @@ pub fn vortex_io::runtime::Handle::spawn_blocking<F, R>(&self, F) -> vortex_io::
380380

381381
pub fn vortex_io::runtime::Handle::spawn_cpu<F, R>(&self, F) -> vortex_io::runtime::Task<R> where F: core::ops::function::FnOnce() -> R + core::marker::Send + 'static, R: core::marker::Send + 'static
382382

383+
pub fn vortex_io::runtime::Handle::spawn_io<Fut, R>(&self, Fut) -> vortex_io::runtime::Task<R> where Fut: core::future::future::Future<Output = R> + core::marker::Send + 'static, R: core::marker::Send + 'static
384+
383385
pub fn vortex_io::runtime::Handle::spawn_nested<F, Fut, R>(&self, F) -> vortex_io::runtime::Task<R> where F: core::ops::function::FnOnce(vortex_io::runtime::Handle) -> Fut, Fut: core::future::future::Future<Output = R> + core::marker::Send + 'static, R: core::marker::Send + 'static
384386

385387
impl core::clone::Clone for vortex_io::runtime::Handle
@@ -458,6 +460,8 @@ pub fn vortex_io::runtime::Executor::spawn_blocking_io(&self, alloc::boxed::Box<
458460

459461
pub fn vortex_io::runtime::Executor::spawn_cpu(&self, alloc::boxed::Box<(dyn core::ops::function::FnOnce() + core::marker::Send + 'static)>) -> vortex_io::runtime::AbortHandleRef
460462

463+
pub fn vortex_io::runtime::Executor::spawn_io(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef
464+
461465
impl vortex_io::runtime::Executor for async_executor::Executor<'static>
462466

463467
pub fn async_executor::Executor<'static>::spawn(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef
@@ -466,6 +470,8 @@ pub fn async_executor::Executor<'static>::spawn_blocking_io(&self, alloc::boxed:
466470

467471
pub fn async_executor::Executor<'static>::spawn_cpu(&self, alloc::boxed::Box<(dyn core::ops::function::FnOnce() + core::marker::Send + 'static)>) -> vortex_io::runtime::AbortHandleRef
468472

473+
pub fn async_executor::Executor<'static>::spawn_io(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef
474+
469475
impl vortex_io::runtime::Executor for tokio::runtime::handle::Handle
470476

471477
pub fn tokio::runtime::handle::Handle::spawn(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef
@@ -474,6 +480,8 @@ pub fn tokio::runtime::handle::Handle::spawn_blocking_io(&self, alloc::boxed::Bo
474480

475481
pub fn tokio::runtime::handle::Handle::spawn_cpu(&self, alloc::boxed::Box<(dyn core::ops::function::FnOnce() + core::marker::Send + 'static)>) -> vortex_io::runtime::AbortHandleRef
476482

483+
pub fn tokio::runtime::handle::Handle::spawn_io(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef
484+
477485
pub type vortex_io::runtime::AbortHandleRef = alloc::boxed::Box<dyn vortex_io::runtime::AbortHandle>
478486

479487
pub mod vortex_io::session

vortex-io/src/compat/obj_store.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use futures::stream::BoxStream;
1111
use object_store::CopyOptions;
1212
use object_store::GetOptions;
1313
use object_store::GetResult;
14+
use object_store::GetResultPayload;
1415
use object_store::ListResult;
1516
use object_store::MultipartUpload;
1617
use object_store::ObjectMeta;
@@ -56,7 +57,15 @@ impl<T: ObjectStore> ObjectStore for Compat<T> {
5657
}
5758

5859
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
59-
Compat::new(self.inner().get_opts(location, options)).await
60+
let mut result = Compat::new(self.inner().get_opts(location, options)).await?;
61+
result.payload = match result.payload {
62+
GetResultPayload::Stream(stream) => {
63+
GetResultPayload::Stream(Compat::new(stream).boxed())
64+
}
65+
#[cfg(not(target_arch = "wasm32"))]
66+
GetResultPayload::File(file, path) => GetResultPayload::File(file, path),
67+
};
68+
Ok(result)
6069
}
6170

6271
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {

vortex-io/src/object_store/read_at.rs

Lines changed: 139 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -117,64 +117,145 @@ impl VortexReadAt for ObjectStoreReadAt {
117117
let allocator = Arc::clone(&self.allocator);
118118
let range = offset..(offset + length as u64);
119119

120-
async move {
121-
let mut buffer = allocator.allocate(length, alignment)?;
122-
123-
let response = store
124-
.get_opts(
125-
&path,
126-
GetOptions {
127-
range: Some(GetRange::Bounded(range.clone())),
128-
..Default::default()
129-
},
130-
)
131-
.await?;
132-
133-
let buffer = match response.payload {
134-
#[cfg(not(target_arch = "wasm32"))]
135-
GetResultPayload::File(file, _) => {
136-
handle
137-
.spawn_blocking(move || {
138-
read_exact_at(&file, buffer.as_mut_slice(), range.start)?;
139-
Ok::<_, io::Error>(buffer)
140-
})
141-
.await
142-
.map_err(io::Error::other)?
143-
}
144-
#[cfg(target_arch = "wasm32")]
145-
GetResultPayload::File(..) => {
146-
unreachable!("File payload not supported on wasm32")
147-
}
148-
GetResultPayload::Stream(mut byte_stream) => {
149-
let mut written = 0usize;
150-
while let Some(bytes) = byte_stream.next().await {
151-
let bytes = bytes?;
152-
let end = written + bytes.len();
153-
vortex_ensure!(
154-
end <= length,
155-
"Object store stream returned too many bytes: {} > expected {} (range: {:?})",
156-
end,
157-
length,
158-
range
159-
);
160-
buffer.as_mut_slice()[written..end].copy_from_slice(&bytes);
161-
written = end;
162-
}
163-
164-
vortex_ensure!(
165-
written == length,
166-
"Object store stream returned {} bytes but expected {} bytes (range: {:?})",
167-
written,
168-
length,
169-
range
170-
);
171-
172-
buffer
173-
}
174-
};
175-
176-
Ok(BufferHandle::new_host(buffer.freeze()))
177-
}
120+
// Requires to deal with borrowed lifetimes
121+
let io_handle = handle.clone();
122+
123+
handle
124+
.spawn_io(async move {
125+
let mut buffer = allocator.allocate(length, alignment)?;
126+
127+
let response = store
128+
.get_opts(
129+
&path,
130+
GetOptions {
131+
range: Some(GetRange::Bounded(range.clone())),
132+
..Default::default()
133+
},
134+
)
135+
.await?;
136+
137+
let buffer = match response.payload {
138+
#[cfg(not(target_arch = "wasm32"))]
139+
GetResultPayload::File(file, _) => {
140+
io_handle
141+
.spawn_blocking(move || {
142+
read_exact_at(&file, buffer.as_mut_slice(), range.start)?;
143+
Ok::<_, io::Error>(buffer)
144+
})
145+
.await
146+
.map_err(io::Error::other)?
147+
}
148+
#[cfg(target_arch = "wasm32")]
149+
GetResultPayload::File(..) => {
150+
unreachable!("File payload not supported on wasm32")
151+
}
152+
GetResultPayload::Stream(mut byte_stream) => {
153+
let mut written = 0usize;
154+
while let Some(bytes) = byte_stream.next().await {
155+
let bytes = bytes?;
156+
let end = written + bytes.len();
157+
vortex_ensure!(
158+
end <= length,
159+
"Object store stream returned too many bytes: {} > expected {} (range: {:?})",
160+
end,
161+
length,
162+
range
163+
);
164+
buffer.as_mut_slice()[written..end].copy_from_slice(&bytes);
165+
written = end;
166+
}
167+
168+
vortex_ensure!(
169+
written == length,
170+
"Object store stream returned {} bytes but expected {} bytes (range: {:?})",
171+
written,
172+
length,
173+
range
174+
);
175+
176+
buffer
177+
}
178+
};
179+
180+
Ok(BufferHandle::new_host(buffer.freeze()))
181+
})
178182
.boxed()
179183
}
180184
}
185+
186+
#[cfg(test)]
187+
mod tests {
188+
189+
use std::sync::atomic::AtomicUsize;
190+
use std::sync::atomic::Ordering;
191+
192+
use object_store::PutPayload;
193+
use object_store::memory::InMemory;
194+
195+
use super::*;
196+
use crate::runtime::AbortHandle;
197+
use crate::runtime::AbortHandleRef;
198+
use crate::runtime::Executor;
199+
200+
const TEST_DATA: &[u8] = b"object store test data";
201+
202+
#[derive(Default)]
203+
struct CountingExecutor {
204+
spawn_count: AtomicUsize,
205+
spawn_io_count: AtomicUsize,
206+
}
207+
208+
impl Executor for CountingExecutor {
209+
fn spawn(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
210+
self.spawn_count.fetch_add(1, Ordering::SeqCst);
211+
TokioAbortHandle::new_handle(tokio::spawn(fut).abort_handle())
212+
}
213+
214+
fn spawn_io(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
215+
self.spawn_io_count.fetch_add(1, Ordering::SeqCst);
216+
TokioAbortHandle::new_handle(tokio::spawn(fut).abort_handle())
217+
}
218+
219+
fn spawn_cpu(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
220+
TokioAbortHandle::new_handle(tokio::spawn(async move { task() }).abort_handle())
221+
}
222+
223+
fn spawn_blocking_io(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
224+
TokioAbortHandle::new_handle(tokio::task::spawn_blocking(task).abort_handle())
225+
}
226+
}
227+
228+
struct TokioAbortHandle(tokio::task::AbortHandle);
229+
230+
impl TokioAbortHandle {
231+
fn new_handle(handle: tokio::task::AbortHandle) -> AbortHandleRef {
232+
Box::new(Self(handle))
233+
}
234+
}
235+
236+
impl AbortHandle for TokioAbortHandle {
237+
fn abort(self: Box<Self>) {
238+
self.0.abort();
239+
}
240+
}
241+
242+
#[tokio::test]
243+
async fn read_at_uses_spawn_io() -> anyhow::Result<()> {
244+
let executor = Arc::new(CountingExecutor::default());
245+
let runtime = Arc::clone(&executor) as Arc<dyn Executor>;
246+
let handle = Handle::new(Arc::downgrade(&runtime));
247+
248+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
249+
let path = ObjectPath::from("test.bin");
250+
store.put(&path, PutPayload::from_static(TEST_DATA)).await?;
251+
252+
let reader = ObjectStoreReadAt::new(store, path, handle);
253+
let buffer = reader.read_at(7, 5, Alignment::new(1)).await?;
254+
255+
assert_eq!(buffer.to_host().await.as_slice(), b"store");
256+
assert_eq!(executor.spawn_io_count.load(Ordering::SeqCst), 1);
257+
assert_eq!(executor.spawn_count.load(Ordering::SeqCst), 0);
258+
259+
Ok(())
260+
}
261+
}

vortex-io/src/runtime/handle.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,32 @@ impl Handle {
9191
self.spawn(f(Handle::new(Weak::clone(&self.runtime))))
9292
}
9393

94+
/// Spawn a new I/O future onto the runtime.
95+
///
96+
/// See [`Executor::spawn_io`] for more details about how this future is expected to run.
97+
///
98+
// See [`Task`] for details on cancelling or detaching the spawned task.
99+
pub fn spawn_io<Fut, R>(&self, f: Fut) -> Task<R>
100+
where
101+
Fut: Future<Output = R> + Send + 'static,
102+
R: Send + 'static,
103+
{
104+
let (send, recv) = oneshot::channel();
105+
let span = tracing::Span::current();
106+
let abort_handle = self.runtime().spawn_io(
107+
async move {
108+
// Task::detach allows the receiver to be dropped, so we ignore send errors.
109+
drop(send.send(f.await));
110+
}
111+
.instrument(span)
112+
.boxed(),
113+
);
114+
Task {
115+
recv: recv.into_future(),
116+
abort_handle: Some(abort_handle),
117+
}
118+
}
119+
94120
/// Spawn a CPU-bound task for execution on the runtime.
95121
///
96122
/// Note that many runtimes will interleave this work on the same async runtime. See the

vortex-io/src/runtime/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ pub trait Executor: Send + Sync {
4545
/// The returned `AbortHandle` may be used to optimistically cancel the future.
4646
fn spawn(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef;
4747

48+
/// Spawns a future doing IO to be executed on the runtime.
49+
/// This allows `Executor` implementation to split work between multiple async runtime.
50+
/// By default, it just calls `Executor::spawn`.
51+
fn spawn_io(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
52+
self.spawn(fut)
53+
}
54+
4855
/// Spawns a CPU-bound task for execution on the runtime.
4956
///
5057
/// The returned `AbortHandle` may be used to optimistically cancel the task if it has not

vortex-jni/src/object_store.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ pub(crate) fn object_store_fs(
3333
handle: Handle,
3434
) -> VortexResult<FileSystemRef> {
3535
let object_store = make_object_store(url, properties)?;
36-
Ok(Arc::new(Compat::new(ObjectStoreFileSystem::new(
37-
object_store,
38-
handle,
39-
))))
36+
let object_store = Arc::new(Compat::new(object_store)) as Arc<dyn ObjectStore>;
37+
38+
Ok(Arc::new(ObjectStoreFileSystem::new(object_store, handle)))
4039
}
4140

4241
#[expect(clippy::cognitive_complexity)]

0 commit comments

Comments
 (0)