Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 2 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ arrow-schema = "58"
arrow-select = "58"
arrow-string = "58"
async-fs = "2.2.0"
async-channel = "2.3"
async-lock = "3.4"
async-stream = "0.3.6"
async-trait = "0.1.89"
Expand Down Expand Up @@ -161,7 +162,6 @@ inventory = "0.3.20"
itertools = "0.14.0"
jetscii = "0.5.3"
jiff = "0.2.0"
kanal = "0.1.1"
lending-iterator = "0.1.7"
libfuzzer-sys = "0.4"
libloading = "0.8"
Expand All @@ -179,7 +179,6 @@ num-traits = "0.2.19"
num_enum = { version = "0.7.3", default-features = false }
object_store = { version = "0.13.1", default-features = false }
once_cell = "1.21"
oneshot = { version = "0.2.0", features = ["async"] }
opentelemetry = "0.31.0"
opentelemetry-otlp = "0.31.0"
opentelemetry_sdk = "0.31.0"
Expand Down
1 change: 0 additions & 1 deletion vortex-cuda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ bytes = { workspace = true }
cudarc = { workspace = true, features = ["f16"] }
futures = { workspace = true, features = ["executor"] }
itertools = { workspace = true }
kanal = { workspace = true }
object_store = { workspace = true, features = ["fs"] }
parking_lot = { workspace = true }
prost = { workspace = true }
Expand Down
16 changes: 8 additions & 8 deletions vortex-cuda/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use cudarc::driver::CudaSlice;
use cudarc::driver::CudaStream;
use cudarc::driver::DeviceRepr;
use cudarc::driver::result::stream;
use futures::channel::oneshot;
use futures::future::BoxFuture;
use kanal::Sender;
use tracing::warn;
use vortex::array::buffer::BufferHandle;
use vortex::error::VortexResult;
Expand Down Expand Up @@ -132,9 +132,9 @@ impl VortexCudaStream {
pub(crate) async fn await_stream_callback(stream: &CudaStream) -> VortexResult<()> {
let rx = register_stream_callback(stream)?;

rx.recv()
.await
.map_err(|e| vortex_err!("CUDA stream callback channel closed unexpectedly: {}", e))
rx.await.map_err(|oneshot::Canceled| {
vortex_err!("CUDA stream callback channel closed unexpectedly: channel canceled")
})
}

/// Registers a host function callback on the stream.
Expand All @@ -147,8 +147,8 @@ pub(crate) async fn await_stream_callback(stream: &CudaStream) -> VortexResult<(
/// # Errors
///
/// Returns an error if registering the host callback function fails.
fn register_stream_callback(stream: &CudaStream) -> VortexResult<kanal::AsyncReceiver<()>> {
let (tx, rx) = kanal::bounded::<()>(1);
fn register_stream_callback(stream: &CudaStream) -> VortexResult<oneshot::Receiver<()>> {
let (tx, rx) = oneshot::channel::<()>();

let tx_ptr = Box::into_raw(Box::new(tx));

Expand All @@ -161,7 +161,7 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult<kanal::AsyncRec
unsafe extern "C" fn callback(user_data: *mut std::ffi::c_void) {
// SAFETY: The memory of `tx` is manually managed has not been freed
// before. We have unique ownership and can therefore free it.
let tx = unsafe { Box::from_raw(user_data as *mut Sender<()>) };
let tx = unsafe { Box::from_raw(user_data as *mut oneshot::Sender<()>) };

// Blocking send as we're in a callback invoked by the CUDA driver.
// NOTE: send can fail if the CudaEvent is dropped by the caller, in which case the receiver
Expand Down Expand Up @@ -189,5 +189,5 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult<kanal::AsyncRec
})?;
}

Ok(rx.to_async())
Ok(rx)
}
1 change: 0 additions & 1 deletion vortex-duckdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ bitvec = { workspace = true }
custom-labels = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
kanal = { workspace = true }
num-traits = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
parking_lot = { workspace = true }
Expand Down
9 changes: 5 additions & 4 deletions vortex-duckdb/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use custom_labels::CURRENT_LABELSET;
use futures::SinkExt;
use futures::StreamExt;
use futures::channel::mpsc;
use itertools::Itertools;
use num_traits::AsPrimitive;
use tracing::debug;
Expand All @@ -37,7 +39,6 @@ use vortex::expr::col;
use vortex::expr::root;
use vortex::expr::select;
use vortex::expr::stats::Precision;
use vortex::io::kanal_ext::KanalExt;
use vortex::io::runtime::BlockingRuntime;
use vortex::io::runtime::current::ThreadSafeIterator;
use vortex::metrics::tracing::get_global_labels;
Expand Down Expand Up @@ -242,7 +243,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {

// We create an async bounded channel so that all thread-local workers can pull the next
// available array chunk regardless of which partition it came from.
let (tx, rx) = kanal::bounded_async(num_workers * 2);
let (tx, rx) = mpsc::channel(num_workers * 2);

// We drive one partition per worker thread. Each partition is driven as a spawned task
// that pushes array chunks into the shared channel as they are produced. This spawning
Expand All @@ -254,7 +255,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
// We create a new conversion cache scoped to the partition, since there's no point
// caching anything across partitions.
let cache = Arc::new(ConversionCache::default());
let tx = tx.clone();
let mut tx = tx.clone();

RUNTIME.handle().spawn(async move {
let mut stream = match partition.and_then(|p| p.execute()) {
Expand Down Expand Up @@ -282,7 +283,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
// Spawn a task to drive the partition stream and push array chunks into the channel.
RUNTIME.handle().spawn(stream.collect::<()>()).detach();

let iterator = RUNTIME.block_on_stream_thread_safe(|_handle| rx.into_stream());
let iterator = RUNTIME.block_on_stream_thread_safe(|_handle| rx);

Ok(DataSourceGlobal {
iterator,
Expand Down
2 changes: 0 additions & 2 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ flatbuffers = { workspace = true }
futures = { workspace = true, features = ["std", "async-await"] }
getrandom_v03 = { workspace = true } # Needed to pickup the "wasm_js" feature for wasm targets from the workspace configuration
itertools = { workspace = true }
kanal = { workspace = true }
moka = { workspace = true, features = ["sync"] }
object_store = { workspace = true, optional = true }
oneshot.workspace = true
parking_lot = { workspace = true }
pin-project-lite = { workspace = true }
tokio = { workspace = true, features = ["rt"], optional = true }
Expand Down
9 changes: 5 additions & 4 deletions vortex-file/src/read/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl State {
tracing::debug!(?event, "Received ReadEvent");
match event {
ReadEvent::Request(req) => {
if req.callback.is_closed() {
if req.callback.is_canceled() {
tracing::debug!(?req, "ReadRequest dropped before registration");
return;
}
Expand All @@ -145,7 +145,7 @@ impl State {
}
ReadEvent::Polled(req_id) => {
if let Some(req) = self.requests.remove(&req_id) {
if req.callback.is_closed() {
if req.callback.is_canceled() {
self.requests_by_offset.remove(&(req.offset, req_id));
tracing::debug!(?req, "ReadRequest dropped before poll");
} else {
Expand Down Expand Up @@ -192,7 +192,7 @@ impl State {
fn next_uncoalesced(&mut self) -> Option<ReadRequest> {
while let Some((req_id, req)) = self.polled_requests.pop_first() {
self.requests_by_offset.remove(&(req.offset, req_id));
if req.callback.is_closed() {
if req.callback.is_canceled() {
tracing::debug!("Dropping canceled request");
continue;
}
Expand Down Expand Up @@ -250,7 +250,7 @@ impl State {
.vortex_expect("Missing request in requests_by_offset");

// Skip any cancelled requests
if req.callback.is_closed() {
if req.callback.is_canceled() {
if ids_to_remove.insert(req_id) {
keys_to_remove.push((req_offset, req_id));
}
Expand Down Expand Up @@ -322,6 +322,7 @@ impl State {
#[cfg(test)]
mod tests {
use futures::StreamExt;
use futures::channel::oneshot;
use futures::stream;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
Expand Down
7 changes: 4 additions & 3 deletions vortex-file/src/read/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::fmt::Formatter;
use std::ops::Range;
use std::sync::Arc;

use futures::channel::oneshot;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
use vortex_error::VortexError;
Expand Down Expand Up @@ -100,15 +101,15 @@ impl Debug for ReadRequest {
.field("offset", &self.offset)
.field("length", &self.length)
.field("alignment", &self.alignment)
.field("is_closed", &self.callback.is_closed())
.field("is_canceled", &self.callback.is_canceled())
.finish()
}
}

impl ReadRequest {
pub(crate) fn resolve(self, result: VortexResult<BufferHandle>) {
if let Err(e) = self.callback.send(result) {
tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
if let Err(_unsent) = self.callback.send(result) {
tracing::debug!("ReadRequest {} dropped before resolving", self.id);
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions vortex-file/src/segments/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::task::Poll;
use futures::FutureExt;
use futures::StreamExt;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::future;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
Expand Down Expand Up @@ -167,7 +168,7 @@ impl SegmentSource for FileSegmentSource {

let fut = ReadFuture {
id,
recv: recv.into_future(),
recv,
polled: false,
finished: false,
events: self.events.clone(),
Expand All @@ -184,7 +185,7 @@ impl SegmentSource for FileSegmentSource {
/// If dropped, the read request will be canceled where possible.
struct ReadFuture {
id: usize,
recv: oneshot::AsyncReceiver<VortexResult<BufferHandle>>,
recv: oneshot::Receiver<VortexResult<BufferHandle>>,
polled: bool,
finished: bool,
events: mpsc::UnboundedSender<ReadEvent>,
Expand All @@ -200,11 +201,11 @@ impl Future for ReadFuture {
// note: we are skipping polled and dropped events for this if the future
// is ready on the first poll, that means this request was completed
// before it was polled, as part of a coalesced request.
Poll::Ready(
result.unwrap_or_else(|e| {
Err(vortex_err!("ReadRequest dropped by runtime: {e}"))
}),
)
Poll::Ready(result.unwrap_or_else(|oneshot::Canceled| {
Err(vortex_err!(
"ReadRequest dropped by runtime: channel canceled"
))
}))
}
Poll::Pending if !self.polled => {
self.polled = true;
Expand Down
12 changes: 8 additions & 4 deletions vortex-file/src/segments/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use async_trait::async_trait;
use futures::SinkExt;
use futures::channel::mpsc;
use parking_lot::Mutex;
use vortex_buffer::Alignment;
use vortex_buffer::ByteBuffer;
Expand All @@ -18,13 +20,13 @@ use vortex_layout::sequence::SequenceId;
use crate::footer::SegmentSpec;

pub struct BufferedSegmentSink {
buffers: kanal::AsyncSender<ByteBuffer>,
buffers: mpsc::Sender<ByteBuffer>,
byte_offset: AtomicU64,
segment_specs: Mutex<Vec<SegmentSpec>>,
}

impl BufferedSegmentSink {
pub fn new(send: kanal::AsyncSender<ByteBuffer>, byte_offset: u64) -> Self {
pub fn new(send: mpsc::Sender<ByteBuffer>, byte_offset: u64) -> Self {
Self {
buffers: send,
byte_offset: AtomicU64::new(byte_offset),
Expand Down Expand Up @@ -89,11 +91,13 @@ impl SegmentSink for BufferedSegmentSink {
}
};

// Clone the sender since send takes &mut self
let mut sender = self.buffers.clone();
if let Some(padding) = padding_buffer {
let _ = self.buffers.send(padding).await;
let _ = sender.send(padding).await;
}
for buffer in buffers {
let _ = self.buffers.send(buffer).await;
let _ = sender.send(buffer).await;
}

Ok(segment_id)
Expand Down
Loading
Loading