Skip to content

Commit 044dfff

Browse files
committed
fix oneshot race in tsan with ffi
1 parent 5ce08ed commit 044dfff

1 file changed

Lines changed: 8 additions & 6 deletions

File tree

vortex-io/src/runtime/handle.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use std::task::Context;
88
use std::task::Poll;
99
use std::task::ready;
1010

11+
use futures::channel::oneshot::channel;
12+
use futures::channel::oneshot::Receiver;
1113
use futures::FutureExt;
1214
use tracing::Instrument;
1315
use vortex_error::vortex_panic;
@@ -65,7 +67,7 @@ impl Handle {
6567
Fut: Future<Output = R> + Send + 'static,
6668
R: Send + 'static,
6769
{
68-
let (send, recv) = oneshot::channel();
70+
let (send, recv) = channel();
6971
let span = tracing::Span::current();
7072
let abort_handle = self.runtime().spawn(
7173
async move {
@@ -105,12 +107,12 @@ impl Handle {
105107
F: FnOnce() -> R + Send + 'static,
106108
R: Send + 'static,
107109
{
108-
let (send, recv) = oneshot::channel();
110+
let (send, recv) = channel();
109111
let span = tracing::Span::current();
110112
let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
111113
let _guard = span.enter();
112114
// Optimistically avoid the work if the result won't be used.
113-
if !send.is_closed() {
115+
if !send.is_canceled() {
114116
// Task::detach allows the receiver to be dropped, so we ignore send errors.
115117
drop(send.send(f()));
116118
}
@@ -127,12 +129,12 @@ impl Handle {
127129
F: FnOnce() -> R + Send + 'static,
128130
R: Send + 'static,
129131
{
130-
let (send, recv) = oneshot::channel();
132+
let (send, recv) = channel();
131133
let span = tracing::Span::current();
132134
let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || {
133135
let _guard = span.enter();
134136
// Optimistically avoid the work if the result won't be used.
135-
if !send.is_closed() {
137+
if !send.is_canceled() {
136138
// Task::detach allows the receiver to be dropped, so we ignore send errors.
137139
drop(send.send(f()));
138140
}
@@ -150,7 +152,7 @@ impl Handle {
150152
/// continue running in the background, call [`Task::detach`].
151153
#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
152154
pub struct Task<T> {
153-
recv: oneshot::AsyncReceiver<T>,
155+
recv: Receiver<T>,
154156
abort_handle: Option<AbortHandleRef>,
155157
}
156158

0 commit comments

Comments
 (0)