From 1c8ef75f669751b2a43530f6f373a0c32f68367d Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 8 Apr 2026 15:55:33 -0400 Subject: [PATCH] remove all usages of kanal and oneshot Signed-off-by: Andrew Duffy --- Cargo.lock | 27 ++--------------- Cargo.toml | 3 +- vortex-cuda/Cargo.toml | 1 - vortex-cuda/src/stream.rs | 16 +++++----- vortex-duckdb/Cargo.toml | 1 - vortex-duckdb/src/datasource.rs | 9 +++--- vortex-file/Cargo.toml | 2 -- vortex-file/src/read/driver.rs | 9 +++--- vortex-file/src/read/request.rs | 7 +++-- vortex-file/src/segments/source.rs | 15 +++++----- vortex-file/src/segments/writer.rs | 12 +++++--- vortex-file/src/writer.rs | 23 +++++++------- vortex-io/Cargo.toml | 3 +- vortex-io/public-api.lock | 10 ------- vortex-io/src/kanal_ext.rs | 22 -------------- vortex-io/src/lib.rs | 1 - vortex-io/src/runtime/current.rs | 5 ++-- vortex-io/src/runtime/handle.rs | 15 +++++----- vortex-io/src/runtime/single.rs | 38 +++++++++++++----------- vortex-io/src/runtime/tests.rs | 2 +- vortex-layout/Cargo.toml | 2 -- vortex-layout/src/layouts/dict/writer.rs | 31 +++++++++---------- vortex-layout/src/layouts/table.rs | 14 +++++---- 23 files changed, 110 insertions(+), 158 deletions(-) delete mode 100644 vortex-io/src/kanal_ext.rs diff --git a/Cargo.lock b/Cargo.lock index 9778e8403bc..1861c81f82f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5118,16 +5118,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "kanal" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3953adf0cd667798b396c2fa13552d6d9b3269d7dd1154c4c416442d1ff574" -dependencies = [ - "futures-core", - "lock_api", -] - [[package]] name = "kasuari" version = "0.4.12" @@ -6509,12 +6499,6 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "269bca4c2591a28585d6bf10d9ed0332b7d76900a1b02bec41bdc3a2cdcda107" -[[package]] -name = "oneshot" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe21416a02c693fb9f980befcb230ecc70b0b3d1cc4abf88b9675c4c1457f0c" - [[package]] name = "oorandom" version = "11.1.5" @@ -9146,7 +9130,7 @@ dependencies = [ "measure_time", "memmap2", "once_cell", - "oneshot 0.1.13", + "oneshot", "rayon", "regex", "rust-stemmers", @@ -10366,7 +10350,6 @@ dependencies = [ "fastlanes", "futures", "itertools 0.14.0", - "kanal", "object_store 0.13.1", "parking_lot", "prost 0.14.3", @@ -10482,7 +10465,6 @@ dependencies = [ "futures", "itertools 0.14.0", "jiff", - "kanal", "num-traits", "object_store 0.13.1", "parking_lot", @@ -10565,10 +10547,8 @@ dependencies = [ "futures", "getrandom 0.4.2", "itertools 0.14.0", - "kanal", "moka", "object_store 0.13.1", - "oneshot 0.2.1", "parking_lot", "pin-project-lite", "tokio", @@ -10657,6 +10637,7 @@ name = "vortex-io" version = "0.1.0" dependencies = [ "anyhow", + "async-channel", "async-fs", "async-stream", "async-trait", @@ -10667,9 +10648,7 @@ dependencies = [ "glob", "handle", "itertools 0.14.0", - "kanal", "object_store 0.13.1", - "oneshot 0.2.1", "parking_lot", "pin-project-lite", "rstest", @@ -10735,10 +10714,8 @@ dependencies = [ "flatbuffers", "futures", "itertools 0.14.0", - "kanal", "moka", "once_cell", - "oneshot 0.2.1", "parking_lot", "paste", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 6bb0413546a..27a41807f4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ arrow-schema = "58" arrow-select = "58" arrow-string = "58" async-fs = "2.2.0" +async-channel = "2.3" async-lock = "3.4" async-stream = "0.3.6" async-trait = "0.1.89" @@ -161,7 +162,6 @@ inventory = "0.3.20" itertools = "0.14.0" jetscii = "0.5.3" jiff = "0.2.0" -kanal = "0.1.1" lending-iterator = "0.1.7" libfuzzer-sys = "0.4" libloading = "0.8" @@ -179,7 +179,6 @@ num-traits = "0.2.19" num_enum = { version = "0.7.3", default-features = false } object_store = { version = "0.13.1", default-features = false } once_cell = "1.21" -oneshot = { version = "0.2.0", features = ["async"] } opentelemetry = "0.31.0" opentelemetry-otlp = "0.31.0" opentelemetry_sdk = "0.31.0" diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index 9d21976f92b..c4402145749 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -29,7 +29,6 @@ bytes = { workspace = true } cudarc = { workspace = true, features = ["f16"] } futures = { workspace = true, features = ["executor"] } itertools = { workspace = true } -kanal = { workspace = true } object_store = { workspace = true, features = ["fs"] } parking_lot = { workspace = true } prost = { workspace = true } diff --git a/vortex-cuda/src/stream.rs b/vortex-cuda/src/stream.rs index 1b7c99c54a9..4dc2e77507c 100644 --- a/vortex-cuda/src/stream.rs +++ b/vortex-cuda/src/stream.rs @@ -11,8 +11,8 @@ use cudarc::driver::CudaSlice; use cudarc::driver::CudaStream; use cudarc::driver::DeviceRepr; use cudarc::driver::result::stream; +use futures::channel::oneshot; use futures::future::BoxFuture; -use kanal::Sender; use tracing::warn; use vortex::array::buffer::BufferHandle; use vortex::error::VortexResult; @@ -132,9 +132,9 @@ impl VortexCudaStream { pub(crate) async fn await_stream_callback(stream: &CudaStream) -> VortexResult<()> { let rx = register_stream_callback(stream)?; - rx.recv() - .await - .map_err(|e| vortex_err!("CUDA stream callback channel closed unexpectedly: {}", e)) + rx.await.map_err(|oneshot::Canceled| { + vortex_err!("CUDA stream callback channel closed unexpectedly: channel canceled") + }) } /// Registers a host function callback on the stream. @@ -147,8 +147,8 @@ pub(crate) async fn await_stream_callback(stream: &CudaStream) -> VortexResult<( /// # Errors /// /// Returns an error if registering the host callback function fails. -fn register_stream_callback(stream: &CudaStream) -> VortexResult> { - let (tx, rx) = kanal::bounded::<()>(1); +fn register_stream_callback(stream: &CudaStream) -> VortexResult> { + let (tx, rx) = oneshot::channel::<()>(); let tx_ptr = Box::into_raw(Box::new(tx)); @@ -161,7 +161,7 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult) }; + let tx = unsafe { Box::from_raw(user_data as *mut oneshot::Sender<()>) }; // Blocking send as we're in a callback invoked by the CUDA driver. // NOTE: send can fail if the CudaEvent is dropped by the caller, in which case the receiver @@ -189,5 +189,5 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult TableFunction for T { // We create an async bounded channel so that all thread-local workers can pull the next // available array chunk regardless of which partition it came from. - let (tx, rx) = kanal::bounded_async(num_workers * 2); + let (tx, rx) = mpsc::channel(num_workers * 2); // We drive one partition per worker thread. Each partition is driven as a spawned task // that pushes array chunks into the shared channel as they are produced. This spawning @@ -254,7 +255,7 @@ impl TableFunction for T { // We create a new conversion cache scoped to the partition, since there's no point // caching anything across partitions. let cache = Arc::new(ConversionCache::default()); - let tx = tx.clone(); + let mut tx = tx.clone(); RUNTIME.handle().spawn(async move { let mut stream = match partition.and_then(|p| p.execute()) { @@ -282,7 +283,7 @@ impl TableFunction for T { // Spawn a task to drive the partition stream and push array chunks into the channel. RUNTIME.handle().spawn(stream.collect::<()>()).detach(); - let iterator = RUNTIME.block_on_stream_thread_safe(|_handle| rx.into_stream()); + let iterator = RUNTIME.block_on_stream_thread_safe(|_handle| rx); Ok(DataSourceGlobal { iterator, diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index e812ab228c6..8fc761ac3fe 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -23,10 +23,8 @@ flatbuffers = { workspace = true } futures = { workspace = true, features = ["std", "async-await"] } getrandom_v03 = { workspace = true } # Needed to pickup the "wasm_js" feature for wasm targets from the workspace configuration itertools = { workspace = true } -kanal = { workspace = true } moka = { workspace = true, features = ["sync"] } object_store = { workspace = true, optional = true } -oneshot.workspace = true parking_lot = { workspace = true } pin-project-lite = { workspace = true } tokio = { workspace = true, features = ["rt"], optional = true } diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index d8fd638adc8..2b63f8ee7de 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -136,7 +136,7 @@ impl State { tracing::debug!(?event, "Received ReadEvent"); match event { ReadEvent::Request(req) => { - if req.callback.is_closed() { + if req.callback.is_canceled() { tracing::debug!(?req, "ReadRequest dropped before registration"); return; } @@ -145,7 +145,7 @@ impl State { } ReadEvent::Polled(req_id) => { if let Some(req) = self.requests.remove(&req_id) { - if req.callback.is_closed() { + if req.callback.is_canceled() { self.requests_by_offset.remove(&(req.offset, req_id)); tracing::debug!(?req, "ReadRequest dropped before poll"); } else { @@ -192,7 +192,7 @@ impl State { fn next_uncoalesced(&mut self) -> Option { while let Some((req_id, req)) = self.polled_requests.pop_first() { self.requests_by_offset.remove(&(req.offset, req_id)); - if req.callback.is_closed() { + if req.callback.is_canceled() { tracing::debug!("Dropping canceled request"); continue; } @@ -250,7 +250,7 @@ impl State { .vortex_expect("Missing request in requests_by_offset"); // Skip any cancelled requests - if req.callback.is_closed() { + if req.callback.is_canceled() { if ids_to_remove.insert(req_id) { keys_to_remove.push((req_offset, req_id)); } @@ -322,6 +322,7 @@ impl State { #[cfg(test)] mod tests { use futures::StreamExt; + use futures::channel::oneshot; use futures::stream; use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; diff --git a/vortex-file/src/read/request.rs b/vortex-file/src/read/request.rs index 7caaa08d3d8..d9f8afffa4e 100644 --- a/vortex-file/src/read/request.rs +++ b/vortex-file/src/read/request.rs @@ -7,6 +7,7 @@ use std::fmt::Formatter; use std::ops::Range; use std::sync::Arc; +use futures::channel::oneshot; use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_error::VortexError; @@ -100,15 +101,15 @@ impl Debug for ReadRequest { .field("offset", &self.offset) .field("length", &self.length) .field("alignment", &self.alignment) - .field("is_closed", &self.callback.is_closed()) + .field("is_canceled", &self.callback.is_canceled()) .finish() } } impl ReadRequest { pub(crate) fn resolve(self, result: VortexResult) { - if let Err(e) = self.callback.send(result) { - tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id); + if let Err(_unsent) = self.callback.send(result) { + tracing::debug!("ReadRequest {} dropped before resolving", self.id); } } } diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index 8f83150c4bb..dcee426d54e 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -11,6 +11,7 @@ use std::task::Poll; use futures::FutureExt; use futures::StreamExt; use futures::channel::mpsc; +use futures::channel::oneshot; use futures::future; use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; @@ -167,7 +168,7 @@ impl SegmentSource for FileSegmentSource { let fut = ReadFuture { id, - recv: recv.into_future(), + recv, polled: false, finished: false, events: self.events.clone(), @@ -184,7 +185,7 @@ impl SegmentSource for FileSegmentSource { /// If dropped, the read request will be canceled where possible. struct ReadFuture { id: usize, - recv: oneshot::AsyncReceiver>, + recv: oneshot::Receiver>, polled: bool, finished: bool, events: mpsc::UnboundedSender, @@ -200,11 +201,11 @@ impl Future for ReadFuture { // note: we are skipping polled and dropped events for this if the future // is ready on the first poll, that means this request was completed // before it was polled, as part of a coalesced request. - Poll::Ready( - result.unwrap_or_else(|e| { - Err(vortex_err!("ReadRequest dropped by runtime: {e}")) - }), - ) + Poll::Ready(result.unwrap_or_else(|oneshot::Canceled| { + Err(vortex_err!( + "ReadRequest dropped by runtime: channel canceled" + )) + })) } Poll::Pending if !self.polled => { self.polled = true; diff --git a/vortex-file/src/segments/writer.rs b/vortex-file/src/segments/writer.rs index e163c2cc868..0fec942f1f0 100644 --- a/vortex-file/src/segments/writer.rs +++ b/vortex-file/src/segments/writer.rs @@ -6,6 +6,8 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use async_trait::async_trait; +use futures::SinkExt; +use futures::channel::mpsc; use parking_lot::Mutex; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; @@ -18,13 +20,13 @@ use vortex_layout::sequence::SequenceId; use crate::footer::SegmentSpec; pub struct BufferedSegmentSink { - buffers: kanal::AsyncSender, + buffers: mpsc::Sender, byte_offset: AtomicU64, segment_specs: Mutex>, } impl BufferedSegmentSink { - pub fn new(send: kanal::AsyncSender, byte_offset: u64) -> Self { + pub fn new(send: mpsc::Sender, byte_offset: u64) -> Self { Self { buffers: send, byte_offset: AtomicU64::new(byte_offset), @@ -89,11 +91,13 @@ impl SegmentSink for BufferedSegmentSink { } }; + // Clone the sender since send takes &mut self + let mut sender = self.buffers.clone(); if let Some(padding) = padding_buffer { - let _ = self.buffers.send(padding).await; + let _ = sender.send(padding).await; } for buffer in buffers { - let _ = self.buffers.send(buffer).await; + let _ = sender.send(buffer).await; } Ok(segment_id) diff --git a/vortex-file/src/writer.rs b/vortex-file/src/writer.rs index 66b6226c6fa..1e9751c7f66 100644 --- a/vortex-file/src/writer.rs +++ b/vortex-file/src/writer.rs @@ -7,8 +7,10 @@ use std::sync::Arc; use std::sync::atomic::AtomicU64; use futures::FutureExt; +use futures::SinkExt; use futures::StreamExt; use futures::TryStreamExt; +use futures::channel::mpsc; use futures::future::Fuse; use futures::future::LocalBoxFuture; use futures::future::ready; @@ -35,7 +37,6 @@ use vortex_error::vortex_bail; use vortex_error::vortex_err; use vortex_io::IoBuf; use vortex_io::VortexWrite; -use vortex_io::kanal_ext::KanalExt; use vortex_io::runtime::BlockingRuntime; use vortex_io::session::RuntimeSessionExt; use vortex_layout::LayoutStrategy; @@ -172,7 +173,7 @@ impl VortexWriteOptions { let mut position = MAGIC_BYTES.len() as u64; // Create a channel to send buffers from the segment sink to the output stream. - let (send, recv) = kanal::bounded_async(1); + let (send, recv) = mpsc::channel(1); let segments = Arc::new(BufferedSegmentSink::new(send, position)); @@ -194,9 +195,8 @@ impl VortexWriteOptions { }); // Flush buffers as they arrive - let recv_stream = recv.into_stream(); - pin_mut!(recv_stream); - while let Some(buffer) = recv_stream.next().await { + pin_mut!(recv); + while let Some(buffer) = recv.next().await { if buffer.is_empty() { continue; } @@ -249,10 +249,9 @@ impl VortexWriteOptions { /// Create a push-based [`Writer`] that can be used to incrementally write arrays to the file. pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> { // Create a channel for sending arrays to the layout task. - let (arrays_send, arrays_recv) = kanal::bounded_async(1); + let (arrays_send, arrays_recv) = mpsc::channel(1); - let arrays = - ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream())); + let arrays = ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv)); let write = CountingVortexWrite::new(write); let bytes_written = write.counter(); @@ -271,7 +270,7 @@ impl VortexWriteOptions { /// An async API for writing Vortex files. pub struct Writer<'w> { // The input channel for sending arrays to the writer. - arrays: Option>>, + arrays: Option>>, // The writer task that ultimately produces the footer. future: Fuse>>, // The bytes written so far. @@ -283,7 +282,7 @@ pub struct Writer<'w> { impl Writer<'_> { /// Push a new chunk into the writer. pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> { - let arrays = self.arrays.clone().vortex_expect("missing arrays sender"); + let mut arrays = self.arrays.clone().vortex_expect("missing arrays sender"); let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse(); pin_mut!(send_fut); @@ -315,12 +314,12 @@ impl Writer<'_> { /// A task is spawned to consume the stream and push it into the writer, with the current /// thread being used to write buffers to the output. pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> { - let arrays = self.arrays.clone().vortex_expect("missing arrays sender"); + let mut arrays = self.arrays.clone().vortex_expect("missing arrays sender"); let stream_fut = async move { while let Some(chunk) = stream.next().await { arrays.send(chunk).await?; } - Ok::<_, kanal::SendError>(()) + Ok::<_, mpsc::SendError>(()) } .fuse(); pin_mut!(stream_fut); diff --git a/vortex-io/Cargo.toml b/vortex-io/Cargo.toml index a73baec63c6..85c512a067e 100644 --- a/vortex-io/Cargo.toml +++ b/vortex-io/Cargo.toml @@ -25,9 +25,7 @@ futures = { workspace = true, features = ["std", "executor"] } getrandom_v03 = { workspace = true } # Needed to pickup the "wasm_js" feature for wasm targets from the workspace configuration glob = { workspace = true } handle = "1.0.2" -kanal = { workspace = true } object_store = { workspace = true, optional = true, features = ["fs"] } -oneshot = { workspace = true } parking_lot = { workspace = true } pin-project-lite = { workspace = true } tokio = { workspace = true, features = [ @@ -47,6 +45,7 @@ custom-labels = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] # Smol is our default impl, so we don't want it to be optional, but it cannot be part of wasm +async-channel = { workspace = true } smol = { workspace = true } # target_os = "unknown" matches wasm32-unknown-unknown (browser), excluding WASI targets diff --git a/vortex-io/public-api.lock b/vortex-io/public-api.lock index 2037e272e3b..08e18e351f8 100644 --- a/vortex-io/public-api.lock +++ b/vortex-io/public-api.lock @@ -114,16 +114,6 @@ pub fn vortex_io::compat::Compat::open_read<'life0, 'life1, 'async_trait>(&'l pub type vortex_io::filesystem::FileSystemRef = alloc::sync::Arc -pub mod vortex_io::kanal_ext - -pub trait vortex_io::kanal_ext::KanalExt - -pub fn vortex_io::kanal_ext::KanalExt::into_stream(self) -> impl futures_core::stream::Stream - -impl vortex_io::kanal_ext::KanalExt for kanal::AsyncReceiver - -pub fn kanal::AsyncReceiver::into_stream(self) -> impl futures_core::stream::Stream - pub mod vortex_io::object_store pub struct vortex_io::object_store::ObjectStoreFileSystem diff --git a/vortex-io/src/kanal_ext.rs b/vortex-io/src/kanal_ext.rs deleted file mode 100644 index b03ca0fa282..00000000000 --- a/vortex-io/src/kanal_ext.rs +++ /dev/null @@ -1,22 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use async_stream::stream; -use futures::Stream; -use kanal::AsyncReceiver; - -pub trait KanalExt { - fn into_stream(self) -> impl Stream; -} - -impl KanalExt for AsyncReceiver { - fn into_stream(self) -> impl Stream { - stream! { - // The Err case indicates the sender / channel has been closed so we terminate - // the stream. - while let Ok(next) = self.recv().await { - yield next - } - } - } -} diff --git a/vortex-io/src/lib.rs b/vortex-io/src/lib.rs index f30f64ff307..2796970802f 100644 --- a/vortex-io/src/lib.rs +++ b/vortex-io/src/lib.rs @@ -18,7 +18,6 @@ pub use write::*; pub mod compat; pub mod filesystem; mod io_buf; -pub mod kanal_ext; mod limit; #[cfg(feature = "object_store")] pub mod object_store; diff --git a/vortex-io/src/runtime/current.rs b/vortex-io/src/runtime/current.rs index c49d9e010a5..bea5980c365 100644 --- a/vortex-io/src/runtime/current.rs +++ b/vortex-io/src/runtime/current.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use async_channel::Receiver; use futures::Stream; use futures::StreamExt; use futures::stream::BoxStream; @@ -66,7 +67,7 @@ impl CurrentThreadRuntime { // We create an MPMC result channel and spawn a task to drive the stream and send results. // This allows multiple worker threads to drive the execution while all waiting for results // on the channel. - let (result_tx, result_rx) = kanal::bounded_async(1); + let (result_tx, result_rx) = async_channel::bounded(1); self.executor .spawn(async move { futures::pin_mut!(stream); @@ -131,7 +132,7 @@ impl Iterator for CurrentThreadIterator<'_, T> { /// An iterator that drives a stream from multiple threads. pub struct ThreadSafeIterator { executor: Arc>, - results: kanal::AsyncReceiver, + results: Receiver, } // Manual clone implementation since `T` does not need to be `Clone`. diff --git a/vortex-io/src/runtime/handle.rs b/vortex-io/src/runtime/handle.rs index f04b897b29e..0c07af5323b 100644 --- a/vortex-io/src/runtime/handle.rs +++ b/vortex-io/src/runtime/handle.rs @@ -9,6 +9,7 @@ use std::task::Poll; use std::task::ready; use futures::FutureExt; +use futures::channel::oneshot; use tracing::Instrument; use vortex_error::vortex_panic; @@ -76,7 +77,7 @@ impl Handle { .boxed(), ); Task { - recv: recv.into_future(), + recv, abort_handle: Some(abort_handle), } } @@ -110,13 +111,13 @@ impl Handle { let abort_handle = self.runtime().spawn_cpu(Box::new(move || { let _guard = span.enter(); // Optimistically avoid the work if the result won't be used. - if !send.is_closed() { + if !send.is_canceled() { // Task::detach allows the receiver to be dropped, so we ignore send errors. drop(send.send(f())); } })); Task { - recv: recv.into_future(), + recv, abort_handle: Some(abort_handle), } } @@ -132,13 +133,13 @@ impl Handle { let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || { let _guard = span.enter(); // Optimistically avoid the work if the result won't be used. - if !send.is_closed() { + if !send.is_canceled() { // Task::detach allows the receiver to be dropped, so we ignore send errors. drop(send.send(f())); } })); Task { - recv: recv.into_future(), + recv, abort_handle: Some(abort_handle), } } @@ -150,7 +151,7 @@ impl Handle { /// continue running in the background, call [`Task::detach`]. #[must_use = "When a Task is dropped without being awaited, it is cancelled"] pub struct Task { - recv: oneshot::AsyncReceiver, + recv: oneshot::Receiver, abort_handle: Option, } @@ -169,7 +170,7 @@ impl Future for Task { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(self.recv.poll_unpin(cx)) { Ok(result) => Poll::Ready(result), - Err(_recv_err) => { + Err(oneshot::Canceled) => { // If the other end of the channel was dropped, it means the runtime dropped // the future without ever completing it. If the caller aborted this task by // dropping it, then they wouldn't be able to poll it anymore. diff --git a/vortex-io/src/runtime/single.rs b/vortex-io/src/runtime/single.rs index 2a113e3715e..e26419f7d18 100644 --- a/vortex-io/src/runtime/single.rs +++ b/vortex-io/src/runtime/single.rs @@ -4,9 +4,11 @@ use std::rc::Rc; use std::rc::Weak as RcWeak; use std::sync::Arc; +use std::sync::mpsc; use futures::Stream; use futures::StreamExt; +use futures::channel::oneshot; use futures::future::BoxFuture; use futures::stream::LocalBoxStream; use parking_lot::Mutex; @@ -38,16 +40,16 @@ impl Default for SingleThreadRuntime { } struct Sender { - scheduling: kanal::Sender>, - cpu: kanal::Sender>, - blocking: kanal::Sender>, + scheduling: mpsc::Sender>, + cpu: mpsc::Sender>, + blocking: mpsc::Sender>, } impl Sender { fn new(local: &Rc>) -> Self { - let (scheduling_send, scheduling_recv) = kanal::unbounded::(); - let (cpu_send, cpu_recv) = kanal::unbounded::(); - let (blocking_send, blocking_recv) = kanal::unbounded::(); + let (scheduling_send, scheduling_recv) = mpsc::channel::(); + let (cpu_send, cpu_recv) = mpsc::channel::(); + let (blocking_send, blocking_recv) = mpsc::channel::(); // We pass weak references to the local execution into the async tasks such that the task's // reference doesn't keep the execution alive after the runtime is dropped. @@ -57,7 +59,7 @@ impl Sender { let weak_local2 = RcWeak::clone(&weak_local); local .spawn(async move { - while let Ok(spawn) = scheduling_recv.as_async().recv().await { + while let Ok(spawn) = scheduling_recv.recv() { if let Some(local) = weak_local2.upgrade() { // Ignore send errors since it means the caller immediately detached. drop( @@ -74,7 +76,7 @@ impl Sender { let weak_local2 = RcWeak::clone(&weak_local); local .spawn(async move { - while let Ok(spawn) = cpu_recv.as_async().recv().await { + while let Ok(spawn) = cpu_recv.recv() { if let Some(local) = weak_local2.upgrade() { let work = spawn.sync; // Ignore send errors since it means the caller immediately detached. @@ -90,7 +92,7 @@ impl Sender { let weak_local2 = RcWeak::clone(&weak_local); local .spawn(async move { - while let Ok(spawn) = blocking_recv.as_async().recv().await { + while let Ok(spawn) = blocking_recv.recv() { if let Some(local) = weak_local2.upgrade() { let work = spawn.sync; // Ignore send errors since it means the caller immediately detached. @@ -121,10 +123,10 @@ impl Executor for Sender { future, task_callback: send, }) { - vortex_panic!("Executor missing: {}", e); + vortex_panic!("Executor missing: {:?}", e); } Box::new(LazyAbortHandle { - task: Mutex::new(recv), + task: Mutex::new(Some(recv)), }) } @@ -134,10 +136,10 @@ impl Executor for Sender { sync: cpu, task_callback: send, }) { - vortex_panic!("Executor missing: {}", e); + vortex_panic!("Executor missing: {:?}", e); } Box::new(LazyAbortHandle { - task: Mutex::new(recv), + task: Mutex::new(Some(recv)), }) } @@ -147,10 +149,10 @@ impl Executor for Sender { sync: work, task_callback: send, }) { - vortex_panic!("Executor missing: {}", e); + vortex_panic!("Executor missing: {:?}", e); } Box::new(LazyAbortHandle { - task: Mutex::new(recv), + task: Mutex::new(Some(recv)), }) } } @@ -230,13 +232,15 @@ struct SpawnSync<'rt> { } struct LazyAbortHandle { - task: Mutex>, + task: Mutex>>, } impl AbortHandle for LazyAbortHandle { fn abort(self: Box) { // Aborting a smol::Task is done by dropping it. - if let Ok(task) = self.task.lock().try_recv() { + if let Some(mut recv) = self.task.lock().take() + && let Ok(Some(task)) = recv.try_recv() + { task.abort() } } diff --git a/vortex-io/src/runtime/tests.rs b/vortex-io/src/runtime/tests.rs index 8b342f4242d..06ed0934abf 100644 --- a/vortex-io/src/runtime/tests.rs +++ b/vortex-io/src/runtime/tests.rs @@ -316,7 +316,7 @@ async fn test_task_detach() { let handle = TokioRuntime::current(); let counter = Arc::new(AtomicUsize::new(0)); let c = Arc::clone(&counter); - let (tx, rx) = oneshot::channel::<()>(); + let (tx, rx) = futures::channel::oneshot::channel::<()>(); let task = handle.spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(10)).await; diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index 8bbccf09b91..540b2f398c7 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -26,10 +26,8 @@ bit-vec = { workspace = true } flatbuffers = { workspace = true } futures = { workspace = true, features = ["alloc", "async-await", "executor"] } itertools = { workspace = true } -kanal = { workspace = true } moka = { workspace = true, features = ["future"] } once_cell = { workspace = true, features = ["parking_lot"] } -oneshot = { workspace = true } parking_lot = { workspace = true } paste = { workspace = true } pin-project-lite = { workspace = true } diff --git a/vortex-layout/src/layouts/dict/writer.rs b/vortex-layout/src/layouts/dict/writer.rs index fd51739d646..1335fdde4b5 100644 --- a/vortex-layout/src/layouts/dict/writer.rs +++ b/vortex-layout/src/layouts/dict/writer.rs @@ -10,9 +10,12 @@ use async_stream::stream; use async_stream::try_stream; use async_trait::async_trait; use futures::FutureExt; +use futures::SinkExt; use futures::Stream; use futures::StreamExt; use futures::TryStreamExt; +use futures::channel::mpsc; +use futures::channel::oneshot; use futures::future::BoxFuture; use futures::pin_mut; use futures::stream::BoxStream; @@ -32,7 +35,6 @@ use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; -use vortex_io::kanal_ext::KanalExt; use vortex_io::runtime::Handle; use crate::IntoLayout; @@ -377,9 +379,9 @@ type SequencedChunk = VortexResult<(SequenceId, ArrayRef)>; struct DictionaryTransformer { input: DictionaryStream, - active_codes_tx: Option>, + active_codes_tx: Option>, active_values_tx: Option>, - pending_send: Option>>, + pending_send: Option>>, } impl DictionaryTransformer { @@ -427,7 +429,7 @@ impl Stream for DictionaryTransformer { }))) => { if self.active_codes_tx.is_none() { // Start a new group - let (codes_tx, codes_rx) = kanal::bounded_async::(1); + let (codes_tx, codes_rx) = mpsc::channel::(1); let (values_tx, values_rx) = oneshot::channel(); self.active_codes_tx = Some(codes_tx.clone()); @@ -437,22 +439,21 @@ impl Stream for DictionaryTransformer { let codes_dtype = DType::Primitive(codes_ptype, Nullability::NonNullable); // Send first codes. - self.pending_send = - Some(Box::pin( - async move { codes_tx.send(Ok((seq_id, codes))).await }, - )); + let mut codes_tx_clone = codes_tx; + self.pending_send = Some(Box::pin(async move { + codes_tx_clone.send(Ok((seq_id, codes))).await + })); // Create output streams. - let codes_stream = SequentialStreamAdapter::new( - codes_dtype, - codes_rx.into_stream().boxed(), - ) - .sendable(); + let codes_stream = + SequentialStreamAdapter::new(codes_dtype, codes_rx.boxed()).sendable(); let values_future = async move { values_rx .await - .map_err(|e| vortex_err!("values sender dropped: {}", e)) + .map_err(|oneshot::Canceled| { + vortex_err!("values sender dropped: channel canceled") + }) .flatten() } .boxed(); @@ -462,7 +463,7 @@ impl Stream for DictionaryTransformer { // Continue streaming codes to existing group if let Some(tx) = &self.active_codes_tx { - let tx = tx.clone(); + let mut tx = tx.clone(); self.pending_send = Some(Box::pin(async move { tx.send(Ok((seq_id, codes))).await })); } diff --git a/vortex-layout/src/layouts/table.rs b/vortex-layout/src/layouts/table.rs index 1a0aacc5f0a..19a57e53b11 100644 --- a/vortex-layout/src/layouts/table.rs +++ b/vortex-layout/src/layouts/table.rs @@ -8,8 +8,10 @@ use std::sync::Arc; use async_trait::async_trait; +use futures::SinkExt; use futures::StreamExt; use futures::TryStreamExt; +use futures::channel::mpsc; use futures::future::try_join_all; use futures::pin_mut; use itertools::Itertools; @@ -26,7 +28,6 @@ use vortex_array::dtype::Nullability; use vortex_error::VortexError; use vortex_error::VortexResult; use vortex_error::vortex_bail; -use vortex_io::kanal_ext::KanalExt; use vortex_io::runtime::Handle; use vortex_utils::aliases::DefaultHashBuilder; use vortex_utils::aliases::hash_map::HashMap; @@ -257,23 +258,25 @@ impl LayoutStrategy for TableStrategy { } let (column_streams_tx, column_streams_rx): (Vec<_>, Vec<_>) = - (0..stream_count).map(|_| kanal::bounded_async(1)).unzip(); + (0..stream_count).map(|_| mpsc::channel(1)).unzip(); // Spawn a task to fan out column chunks to their respective transposed streams handle .spawn(async move { + let mut column_streams_tx = column_streams_tx; pin_mut!(columns_vec_stream); while let Some(result) = columns_vec_stream.next().await { match result { Ok(columns) => { - for (tx, column) in column_streams_tx.iter().zip_eq(columns.into_iter()) + for (tx, column) in + column_streams_tx.iter_mut().zip_eq(columns.into_iter()) { let _ = tx.send(Ok(column)).await; } } Err(e) => { let e: Arc = Arc::new(e); - for tx in column_streams_tx.iter() { + for tx in column_streams_tx.iter_mut() { let _ = tx.send(Err(VortexError::from(Arc::clone(&e)))).await; } break; @@ -307,8 +310,7 @@ impl LayoutStrategy for TableStrategy { .enumerate() .map(move |(index, ((dtype, recv), name))| { let column_stream = - SequentialStreamAdapter::new(dtype.clone(), recv.into_stream().boxed()) - .sendable(); + SequentialStreamAdapter::new(dtype.clone(), recv.boxed()).sendable(); let child_eof = eof.split_off(); let field = Field::Name(name.clone()); handle.spawn_nested(|h| {