Skip to content

Commit f0ff1e6

Browse files
dengsh12scovich
andauthored
fix: deadlock for TokioMultiThreadExecutor (delta-io#1606)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta-kernel-rs/pull/1606/files) to review incremental changes. - [**stack/dead-wait**](delta-io#1606) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1606/files)] - [stack/rename-checkpoint](delta-io#1608) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1608/files/d0750b490f0f817bf0fd185dd17c34314eb77f77..1bdeda0c00304a685baa8a655779ace8d34e74fc)] - [stack/perform-checkpoint](delta-io#1600) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1600/files/1bdeda0c00304a685baa8a655779ace8d34e74fc..847883e1503277d42da7039c975098ae8e587666)] - [stack/ffi-checkpoint](delta-io#1611) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1611/files/847883e1503277d42da7039c975098ae8e587666..17b307b2f9b34f9e5a6e2b2a33d0ea008b1634e1)] --------- ## What changes are proposed in this pull request? Fix delta-io#1605 <!-- **Uncomment** this section if there are any changes affecting public APIs. Else, **delete** this section. ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? Added unit test --------- Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
1 parent 24dff15 commit f0ff1e6

1 file changed

Lines changed: 45 additions & 3 deletions

File tree

kernel/src/engine/default/executor.rs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ pub mod tokio {
169169
}
170170

171171
impl TaskExecutor for TokioMultiThreadExecutor {
172+
// `block_on` uses `block_in_place`; If concurrent `block_on` calls exceed Tokio's `max_blocking_threads`, this can deadlock
173+
// See:
174+
// https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.max_blocking_threads
172175
fn block_on<T>(&self, task: T) -> T::Output
173176
where
174177
T: Future + Send + 'static,
@@ -191,9 +194,14 @@ pub mod tokio {
191194
// We throw away the handle, but it should continue on.
192195
self.handle.spawn(fut);
193196

194-
receiver
195-
.recv()
196-
.expect("TokioMultiThreadExecutor has crashed")
197+
// Use block_in_place to tell Tokio we're about to block - this allows
198+
// the runtime to move tasks off this worker's local queue so they can
199+
// be stolen by other workers.
200+
tokio::task::block_in_place(|| {
201+
receiver
202+
.recv()
203+
.expect("TokioMultiThreadExecutor has crashed")
204+
})
197205
}
198206

199207
fn spawn<F>(&self, task: F)
@@ -246,5 +254,39 @@ pub mod tokio {
246254
let executor = TokioMultiThreadExecutor::new(tokio::runtime::Handle::current());
247255
test_executor(executor).await;
248256
}
257+
258+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
259+
async fn test_nested_block_on_does_not_deadlock() {
260+
use std::sync::Arc;
261+
use std::time::Duration;
262+
263+
let executor = Arc::new(TokioMultiThreadExecutor::new(
264+
tokio::runtime::Handle::current(),
265+
));
266+
let executor_clone = executor.clone();
267+
268+
let (tx, rx) = channel::<i32>();
269+
270+
let handle = std::thread::spawn(move || {
271+
// Outer block_on
272+
let result = executor.block_on(async move {
273+
// Inner block_on
274+
let inner_result = executor_clone.block_on(async {
275+
tokio::time::sleep(Duration::from_millis(1)).await;
276+
42
277+
});
278+
inner_result + 1
279+
});
280+
tx.send(result).ok();
281+
});
282+
283+
// Wait with timeout - if this times out, we have a deadlock
284+
let timeout = Duration::from_secs(5);
285+
let result = rx
286+
.recv_timeout(timeout)
287+
.expect("Timeout - likely deadlock in TokioMultiThreadExecutor::block_on");
288+
assert_eq!(result, 43);
289+
handle.join().expect("thread panicked");
290+
}
249291
}
250292
}

0 commit comments

Comments
 (0)