Skip to content

Commit 1c8ef75

Browse files
committed
remove all usages of kanal and oneshot
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
1 parent ff21366 commit 1c8ef75

23 files changed

Lines changed: 110 additions & 158 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ arrow-schema = "58"
100100
arrow-select = "58"
101101
arrow-string = "58"
102102
async-fs = "2.2.0"
103+
async-channel = "2.3"
103104
async-lock = "3.4"
104105
async-stream = "0.3.6"
105106
async-trait = "0.1.89"
@@ -161,7 +162,6 @@ inventory = "0.3.20"
161162
itertools = "0.14.0"
162163
jetscii = "0.5.3"
163164
jiff = "0.2.0"
164-
kanal = "0.1.1"
165165
lending-iterator = "0.1.7"
166166
libfuzzer-sys = "0.4"
167167
libloading = "0.8"
@@ -179,7 +179,6 @@ num-traits = "0.2.19"
179179
num_enum = { version = "0.7.3", default-features = false }
180180
object_store = { version = "0.13.1", default-features = false }
181181
once_cell = "1.21"
182-
oneshot = { version = "0.2.0", features = ["async"] }
183182
opentelemetry = "0.31.0"
184183
opentelemetry-otlp = "0.31.0"
185184
opentelemetry_sdk = "0.31.0"

vortex-cuda/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ bytes = { workspace = true }
2929
cudarc = { workspace = true, features = ["f16"] }
3030
futures = { workspace = true, features = ["executor"] }
3131
itertools = { workspace = true }
32-
kanal = { workspace = true }
3332
object_store = { workspace = true, features = ["fs"] }
3433
parking_lot = { workspace = true }
3534
prost = { workspace = true }

vortex-cuda/src/stream.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use cudarc::driver::CudaSlice;
1111
use cudarc::driver::CudaStream;
1212
use cudarc::driver::DeviceRepr;
1313
use cudarc::driver::result::stream;
14+
use futures::channel::oneshot;
1415
use futures::future::BoxFuture;
15-
use kanal::Sender;
1616
use tracing::warn;
1717
use vortex::array::buffer::BufferHandle;
1818
use vortex::error::VortexResult;
@@ -132,9 +132,9 @@ impl VortexCudaStream {
132132
pub(crate) async fn await_stream_callback(stream: &CudaStream) -> VortexResult<()> {
133133
let rx = register_stream_callback(stream)?;
134134

135-
rx.recv()
136-
.await
137-
.map_err(|e| vortex_err!("CUDA stream callback channel closed unexpectedly: {}", e))
135+
rx.await.map_err(|oneshot::Canceled| {
136+
vortex_err!("CUDA stream callback channel closed unexpectedly: channel canceled")
137+
})
138138
}
139139

140140
/// Registers a host function callback on the stream.
@@ -147,8 +147,8 @@ pub(crate) async fn await_stream_callback(stream: &CudaStream) -> VortexResult<(
147147
/// # Errors
148148
///
149149
/// Returns an error if registering the host callback function fails.
150-
fn register_stream_callback(stream: &CudaStream) -> VortexResult<kanal::AsyncReceiver<()>> {
151-
let (tx, rx) = kanal::bounded::<()>(1);
150+
fn register_stream_callback(stream: &CudaStream) -> VortexResult<oneshot::Receiver<()>> {
151+
let (tx, rx) = oneshot::channel::<()>();
152152

153153
let tx_ptr = Box::into_raw(Box::new(tx));
154154

@@ -161,7 +161,7 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult<kanal::AsyncRec
161161
unsafe extern "C" fn callback(user_data: *mut std::ffi::c_void) {
162162
// SAFETY: The memory of `tx` is manually managed has not been freed
163163
// before. We have unique ownership and can therefore free it.
164-
let tx = unsafe { Box::from_raw(user_data as *mut Sender<()>) };
164+
let tx = unsafe { Box::from_raw(user_data as *mut oneshot::Sender<()>) };
165165

166166
// Blocking send as we're in a callback invoked by the CUDA driver.
167167
// NOTE: send can fail if the CudaEvent is dropped by the caller, in which case the receiver
@@ -189,5 +189,5 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult<kanal::AsyncRec
189189
})?;
190190
}
191191

192-
Ok(rx.to_async())
192+
Ok(rx)
193193
}

vortex-duckdb/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ bitvec = { workspace = true }
3030
custom-labels = { workspace = true }
3131
futures = { workspace = true }
3232
itertools = { workspace = true }
33-
kanal = { workspace = true }
3433
num-traits = { workspace = true }
3534
object_store = { workspace = true, features = ["aws"] }
3635
parking_lot = { workspace = true }

vortex-duckdb/src/datasource.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use std::sync::atomic::AtomicU64;
1414
use std::sync::atomic::Ordering;
1515

1616
use custom_labels::CURRENT_LABELSET;
17+
use futures::SinkExt;
1718
use futures::StreamExt;
19+
use futures::channel::mpsc;
1820
use itertools::Itertools;
1921
use num_traits::AsPrimitive;
2022
use tracing::debug;
@@ -37,7 +39,6 @@ use vortex::expr::col;
3739
use vortex::expr::root;
3840
use vortex::expr::select;
3941
use vortex::expr::stats::Precision;
40-
use vortex::io::kanal_ext::KanalExt;
4142
use vortex::io::runtime::BlockingRuntime;
4243
use vortex::io::runtime::current::ThreadSafeIterator;
4344
use vortex::metrics::tracing::get_global_labels;
@@ -242,7 +243,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
242243

243244
// We create an async bounded channel so that all thread-local workers can pull the next
244245
// available array chunk regardless of which partition it came from.
245-
let (tx, rx) = kanal::bounded_async(num_workers * 2);
246+
let (tx, rx) = mpsc::channel(num_workers * 2);
246247

247248
// We drive one partition per worker thread. Each partition is driven as a spawned task
248249
// that pushes array chunks into the shared channel as they are produced. This spawning
@@ -254,7 +255,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
254255
// We create a new conversion cache scoped to the partition, since there's no point
255256
// caching anything across partitions.
256257
let cache = Arc::new(ConversionCache::default());
257-
let tx = tx.clone();
258+
let mut tx = tx.clone();
258259

259260
RUNTIME.handle().spawn(async move {
260261
let mut stream = match partition.and_then(|p| p.execute()) {
@@ -282,7 +283,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
282283
// Spawn a task to drive the partition stream and push array chunks into the channel.
283284
RUNTIME.handle().spawn(stream.collect::<()>()).detach();
284285

285-
let iterator = RUNTIME.block_on_stream_thread_safe(|_handle| rx.into_stream());
286+
let iterator = RUNTIME.block_on_stream_thread_safe(|_handle| rx);
286287

287288
Ok(DataSourceGlobal {
288289
iterator,

vortex-file/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ flatbuffers = { workspace = true }
2323
futures = { workspace = true, features = ["std", "async-await"] }
2424
getrandom_v03 = { workspace = true } # Needed to pickup the "wasm_js" feature for wasm targets from the workspace configuration
2525
itertools = { workspace = true }
26-
kanal = { workspace = true }
2726
moka = { workspace = true, features = ["sync"] }
2827
object_store = { workspace = true, optional = true }
29-
oneshot.workspace = true
3028
parking_lot = { workspace = true }
3129
pin-project-lite = { workspace = true }
3230
tokio = { workspace = true, features = ["rt"], optional = true }

vortex-file/src/read/driver.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl State {
136136
tracing::debug!(?event, "Received ReadEvent");
137137
match event {
138138
ReadEvent::Request(req) => {
139-
if req.callback.is_closed() {
139+
if req.callback.is_canceled() {
140140
tracing::debug!(?req, "ReadRequest dropped before registration");
141141
return;
142142
}
@@ -145,7 +145,7 @@ impl State {
145145
}
146146
ReadEvent::Polled(req_id) => {
147147
if let Some(req) = self.requests.remove(&req_id) {
148-
if req.callback.is_closed() {
148+
if req.callback.is_canceled() {
149149
self.requests_by_offset.remove(&(req.offset, req_id));
150150
tracing::debug!(?req, "ReadRequest dropped before poll");
151151
} else {
@@ -192,7 +192,7 @@ impl State {
192192
fn next_uncoalesced(&mut self) -> Option<ReadRequest> {
193193
while let Some((req_id, req)) = self.polled_requests.pop_first() {
194194
self.requests_by_offset.remove(&(req.offset, req_id));
195-
if req.callback.is_closed() {
195+
if req.callback.is_canceled() {
196196
tracing::debug!("Dropping canceled request");
197197
continue;
198198
}
@@ -250,7 +250,7 @@ impl State {
250250
.vortex_expect("Missing request in requests_by_offset");
251251

252252
// Skip any cancelled requests
253-
if req.callback.is_closed() {
253+
if req.callback.is_canceled() {
254254
if ids_to_remove.insert(req_id) {
255255
keys_to_remove.push((req_offset, req_id));
256256
}
@@ -322,6 +322,7 @@ impl State {
322322
#[cfg(test)]
323323
mod tests {
324324
use futures::StreamExt;
325+
use futures::channel::oneshot;
325326
use futures::stream;
326327
use vortex_array::buffer::BufferHandle;
327328
use vortex_buffer::Alignment;

vortex-file/src/read/request.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::fmt::Formatter;
77
use std::ops::Range;
88
use std::sync::Arc;
99

10+
use futures::channel::oneshot;
1011
use vortex_array::buffer::BufferHandle;
1112
use vortex_buffer::Alignment;
1213
use vortex_error::VortexError;
@@ -100,15 +101,15 @@ impl Debug for ReadRequest {
100101
.field("offset", &self.offset)
101102
.field("length", &self.length)
102103
.field("alignment", &self.alignment)
103-
.field("is_closed", &self.callback.is_closed())
104+
.field("is_canceled", &self.callback.is_canceled())
104105
.finish()
105106
}
106107
}
107108

108109
impl ReadRequest {
109110
pub(crate) fn resolve(self, result: VortexResult<BufferHandle>) {
110-
if let Err(e) = self.callback.send(result) {
111-
tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
111+
if let Err(_unsent) = self.callback.send(result) {
112+
tracing::debug!("ReadRequest {} dropped before resolving", self.id);
112113
}
113114
}
114115
}

vortex-file/src/segments/source.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::task::Poll;
1111
use futures::FutureExt;
1212
use futures::StreamExt;
1313
use futures::channel::mpsc;
14+
use futures::channel::oneshot;
1415
use futures::future;
1516
use vortex_array::buffer::BufferHandle;
1617
use vortex_buffer::Alignment;
@@ -167,7 +168,7 @@ impl SegmentSource for FileSegmentSource {
167168

168169
let fut = ReadFuture {
169170
id,
170-
recv: recv.into_future(),
171+
recv,
171172
polled: false,
172173
finished: false,
173174
events: self.events.clone(),
@@ -184,7 +185,7 @@ impl SegmentSource for FileSegmentSource {
184185
/// If dropped, the read request will be canceled where possible.
185186
struct ReadFuture {
186187
id: usize,
187-
recv: oneshot::AsyncReceiver<VortexResult<BufferHandle>>,
188+
recv: oneshot::Receiver<VortexResult<BufferHandle>>,
188189
polled: bool,
189190
finished: bool,
190191
events: mpsc::UnboundedSender<ReadEvent>,
@@ -200,11 +201,11 @@ impl Future for ReadFuture {
200201
// note: we are skipping polled and dropped events for this if the future
201202
// is ready on the first poll, that means this request was completed
202203
// before it was polled, as part of a coalesced request.
203-
Poll::Ready(
204-
result.unwrap_or_else(|e| {
205-
Err(vortex_err!("ReadRequest dropped by runtime: {e}"))
206-
}),
207-
)
204+
Poll::Ready(result.unwrap_or_else(|oneshot::Canceled| {
205+
Err(vortex_err!(
206+
"ReadRequest dropped by runtime: channel canceled"
207+
))
208+
}))
208209
}
209210
Poll::Pending if !self.polled => {
210211
self.polled = true;

0 commit comments

Comments
 (0)