Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions vortex-duckdb/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ fn object_store_fs(base_url: &Url) -> VortexResult<FileSystemRef> {
);
};

Ok(Arc::new(Compat::new(ObjectStoreFileSystem::new(
let object_store = Arc::new(Compat::new(object_store)) as Arc<dyn ObjectStore>;

Ok(Arc::new(ObjectStoreFileSystem::new(
object_store,
RUNTIME.handle(),
))))
)))
}

struct DuckDbFileSystem {
Expand Down
8 changes: 8 additions & 0 deletions vortex-io/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ pub fn vortex_io::runtime::Handle::spawn_blocking<F, R>(&self, F) -> vortex_io::

pub fn vortex_io::runtime::Handle::spawn_cpu<F, R>(&self, F) -> vortex_io::runtime::Task<R> where F: core::ops::function::FnOnce() -> R + core::marker::Send + 'static, R: core::marker::Send + 'static

pub fn vortex_io::runtime::Handle::spawn_io<Fut, R>(&self, Fut) -> vortex_io::runtime::Task<R> where Fut: core::future::future::Future<Output = R> + core::marker::Send + 'static, R: core::marker::Send + 'static

pub fn vortex_io::runtime::Handle::spawn_nested<F, Fut, R>(&self, F) -> vortex_io::runtime::Task<R> where F: core::ops::function::FnOnce(vortex_io::runtime::Handle) -> Fut, Fut: core::future::future::Future<Output = R> + core::marker::Send + 'static, R: core::marker::Send + 'static

impl core::clone::Clone for vortex_io::runtime::Handle
Expand Down Expand Up @@ -458,6 +460,8 @@ pub fn vortex_io::runtime::Executor::spawn_blocking_io(&self, alloc::boxed::Box<

pub fn vortex_io::runtime::Executor::spawn_cpu(&self, alloc::boxed::Box<(dyn core::ops::function::FnOnce() + core::marker::Send + 'static)>) -> vortex_io::runtime::AbortHandleRef

pub fn vortex_io::runtime::Executor::spawn_io(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef

impl vortex_io::runtime::Executor for async_executor::Executor<'static>

pub fn async_executor::Executor<'static>::spawn(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef
Expand All @@ -466,6 +470,8 @@ pub fn async_executor::Executor<'static>::spawn_blocking_io(&self, alloc::boxed:

pub fn async_executor::Executor<'static>::spawn_cpu(&self, alloc::boxed::Box<(dyn core::ops::function::FnOnce() + core::marker::Send + 'static)>) -> vortex_io::runtime::AbortHandleRef

pub fn async_executor::Executor<'static>::spawn_io(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef

impl vortex_io::runtime::Executor for tokio::runtime::handle::Handle

pub fn tokio::runtime::handle::Handle::spawn(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef
Expand All @@ -474,6 +480,8 @@ pub fn tokio::runtime::handle::Handle::spawn_blocking_io(&self, alloc::boxed::Bo

pub fn tokio::runtime::handle::Handle::spawn_cpu(&self, alloc::boxed::Box<(dyn core::ops::function::FnOnce() + core::marker::Send + 'static)>) -> vortex_io::runtime::AbortHandleRef

pub fn tokio::runtime::handle::Handle::spawn_io(&self, futures_core::future::BoxFuture<'static, ()>) -> vortex_io::runtime::AbortHandleRef

pub type vortex_io::runtime::AbortHandleRef = alloc::boxed::Box<dyn vortex_io::runtime::AbortHandle>

pub mod vortex_io::session
Expand Down
11 changes: 10 additions & 1 deletion vortex-io/src/compat/obj_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::stream::BoxStream;
use object_store::CopyOptions;
use object_store::GetOptions;
use object_store::GetResult;
use object_store::GetResultPayload;
use object_store::ListResult;
use object_store::MultipartUpload;
use object_store::ObjectMeta;
Expand Down Expand Up @@ -56,7 +57,15 @@ impl<T: ObjectStore> ObjectStore for Compat<T> {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Compat::new(self.inner().get_opts(location, options)).await
let mut result = Compat::new(self.inner().get_opts(location, options)).await?;
result.payload = match result.payload {
GetResultPayload::Stream(stream) => {
GetResultPayload::Stream(Compat::new(stream).boxed())
}
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(file, path) => GetResultPayload::File(file, path),
};
Ok(result)
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
Expand Down
197 changes: 139 additions & 58 deletions vortex-io/src/object_store/read_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,64 +117,145 @@ impl VortexReadAt for ObjectStoreReadAt {
let allocator = Arc::clone(&self.allocator);
let range = offset..(offset + length as u64);

async move {
let mut buffer = allocator.allocate(length, alignment)?;

let response = store
.get_opts(
&path,
GetOptions {
range: Some(GetRange::Bounded(range.clone())),
..Default::default()
},
)
.await?;

let buffer = match response.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(file, _) => {
handle
.spawn_blocking(move || {
read_exact_at(&file, buffer.as_mut_slice(), range.start)?;
Ok::<_, io::Error>(buffer)
})
.await
.map_err(io::Error::other)?
}
#[cfg(target_arch = "wasm32")]
GetResultPayload::File(..) => {
unreachable!("File payload not supported on wasm32")
}
GetResultPayload::Stream(mut byte_stream) => {
let mut written = 0usize;
while let Some(bytes) = byte_stream.next().await {
let bytes = bytes?;
let end = written + bytes.len();
vortex_ensure!(
end <= length,
"Object store stream returned too many bytes: {} > expected {} (range: {:?})",
end,
length,
range
);
buffer.as_mut_slice()[written..end].copy_from_slice(&bytes);
written = end;
}

vortex_ensure!(
written == length,
"Object store stream returned {} bytes but expected {} bytes (range: {:?})",
written,
length,
range
);

buffer
}
};

Ok(BufferHandle::new_host(buffer.freeze()))
}
// Requires to deal with borrowed lifetimes
let io_handle = handle.clone();

handle
.spawn_io(async move {
Comment thread
AdamGS marked this conversation as resolved.
let mut buffer = allocator.allocate(length, alignment)?;

let response = store
.get_opts(
&path,
GetOptions {
range: Some(GetRange::Bounded(range.clone())),
..Default::default()
},
)
.await?;

let buffer = match response.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(file, _) => {
io_handle
.spawn_blocking(move || {
read_exact_at(&file, buffer.as_mut_slice(), range.start)?;
Ok::<_, io::Error>(buffer)
})
.await
.map_err(io::Error::other)?
}
#[cfg(target_arch = "wasm32")]
GetResultPayload::File(..) => {
unreachable!("File payload not supported on wasm32")
}
GetResultPayload::Stream(mut byte_stream) => {
let mut written = 0usize;
while let Some(bytes) = byte_stream.next().await {
let bytes = bytes?;
let end = written + bytes.len();
vortex_ensure!(
end <= length,
"Object store stream returned too many bytes: {} > expected {} (range: {:?})",
end,
length,
range
);
buffer.as_mut_slice()[written..end].copy_from_slice(&bytes);
written = end;
}

vortex_ensure!(
written == length,
"Object store stream returned {} bytes but expected {} bytes (range: {:?})",
written,
length,
range
);

buffer
}
};

Ok(BufferHandle::new_host(buffer.freeze()))
})
.boxed()
}
}

#[cfg(test)]
mod tests {

use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

use object_store::PutPayload;
use object_store::memory::InMemory;

use super::*;
use crate::runtime::AbortHandle;
use crate::runtime::AbortHandleRef;
use crate::runtime::Executor;

const TEST_DATA: &[u8] = b"object store test data";

#[derive(Default)]
struct CountingExecutor {
spawn_count: AtomicUsize,
spawn_io_count: AtomicUsize,
}

impl Executor for CountingExecutor {
fn spawn(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
self.spawn_count.fetch_add(1, Ordering::SeqCst);
TokioAbortHandle::new_handle(tokio::spawn(fut).abort_handle())
}

fn spawn_io(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
self.spawn_io_count.fetch_add(1, Ordering::SeqCst);
TokioAbortHandle::new_handle(tokio::spawn(fut).abort_handle())
}

fn spawn_cpu(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
TokioAbortHandle::new_handle(tokio::spawn(async move { task() }).abort_handle())
}

fn spawn_blocking_io(&self, task: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
TokioAbortHandle::new_handle(tokio::task::spawn_blocking(task).abort_handle())
}
}

struct TokioAbortHandle(tokio::task::AbortHandle);

impl TokioAbortHandle {
fn new_handle(handle: tokio::task::AbortHandle) -> AbortHandleRef {
Box::new(Self(handle))
}
}

impl AbortHandle for TokioAbortHandle {
fn abort(self: Box<Self>) {
self.0.abort();
}
}

#[tokio::test]
async fn read_at_uses_spawn_io() -> anyhow::Result<()> {
let executor = Arc::new(CountingExecutor::default());
let runtime = Arc::clone(&executor) as Arc<dyn Executor>;
let handle = Handle::new(Arc::downgrade(&runtime));

let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let path = ObjectPath::from("test.bin");
store.put(&path, PutPayload::from_static(TEST_DATA)).await?;

let reader = ObjectStoreReadAt::new(store, path, handle);
let buffer = reader.read_at(7, 5, Alignment::new(1)).await?;

assert_eq!(buffer.to_host().await.as_slice(), b"store");
assert_eq!(executor.spawn_io_count.load(Ordering::SeqCst), 1);
assert_eq!(executor.spawn_count.load(Ordering::SeqCst), 0);

Ok(())
}
}
26 changes: 26 additions & 0 deletions vortex-io/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,32 @@ impl Handle {
self.spawn(f(Handle::new(Weak::clone(&self.runtime))))
}

/// Spawn a new I/O future onto the runtime.
///
/// See [`Executor::spawn_io`] for more details about how this future is expected to run.
///
// See [`Task`] for details on cancelling or detaching the spawned task.
pub fn spawn_io<Fut, R>(&self, f: Fut) -> Task<R>
where
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (send, recv) = oneshot::channel();
let span = tracing::Span::current();
let abort_handle = self.runtime().spawn_io(
async move {
// Task::detach allows the receiver to be dropped, so we ignore send errors.
drop(send.send(f.await));
}
.instrument(span)
.boxed(),
);
Task {
recv: recv.into_future(),
abort_handle: Some(abort_handle),
}
}

/// Spawn a CPU-bound task for execution on the runtime.
///
/// Note that many runtimes will interleave this work on the same async runtime. See the
Expand Down
7 changes: 7 additions & 0 deletions vortex-io/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ pub trait Executor: Send + Sync {
/// The returned `AbortHandle` may be used to optimistically cancel the future.
fn spawn(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef;

/// Spawns a future doing IO to be executed on the runtime.
/// This allows `Executor` implementation to split work between multiple async runtime.
/// By default, it just calls `Executor::spawn`.
fn spawn_io(&self, fut: BoxFuture<'static, ()>) -> AbortHandleRef {
self.spawn(fut)
}

/// Spawns a CPU-bound task for execution on the runtime.
///
/// The returned `AbortHandle` may be used to optimistically cancel the task if it has not
Expand Down
7 changes: 3 additions & 4 deletions vortex-jni/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ pub(crate) fn object_store_fs(
handle: Handle,
) -> VortexResult<FileSystemRef> {
let object_store = make_object_store(url, properties)?;
Ok(Arc::new(Compat::new(ObjectStoreFileSystem::new(
object_store,
handle,
))))
let object_store = Arc::new(Compat::new(object_store)) as Arc<dyn ObjectStore>;

Ok(Arc::new(ObjectStoreFileSystem::new(object_store, handle)))
}

#[expect(clippy::cognitive_complexity)]
Expand Down
Loading