Skip to content

Commit 32b7443

Browse files
authored
fix(base): avoid self-deadlock during runtime drop (#20103)
fix(runtime): avoid self-deadlock during runtime drop
1 parent d77ae19 commit 32b7443

2 files changed

Lines changed: 56 additions & 3 deletions

File tree

src/common/base/src/runtime/runtime.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ impl Runtime {
8282

8383
if cfg!(debug_assertions) {
8484
let instant = Instant::now();
85-
// We wait up to 3 seconds to complete the runtime shutdown.
85+
// In debug builds, bound the Tokio runtime shutdown wait to 3 seconds.
86+
// The dropper joining this wait-to-drop thread is still an unbounded wait.
8687
runtime.shutdown_timeout(Duration::from_secs(3));
8788
instant.elapsed() >= Duration::from_secs(3)
8889
} else {
@@ -95,6 +96,7 @@ impl Runtime {
9596
task_marker,
9697
_dropper: Dropper {
9798
name,
99+
runtime_id,
98100
close: Some(watchdog_tx),
99101
join_handler: Some(join_handler),
100102
},
@@ -296,6 +298,7 @@ impl Runtime {
296298
/// Dropping the dropper will cause runtime to shutdown.
297299
pub struct Dropper {
298300
name: Option<String>,
301+
runtime_id: String,
299302
close: Option<std::sync::mpsc::Sender<WatchdogEvent>>,
300303
join_handler: Option<ThreadJoinHandle<bool>>,
301304
}
@@ -307,11 +310,20 @@ impl Drop for Dropper {
307310
if let Some(close_sender) = self.close.take()
308311
&& close_sender.send(WatchdogEvent::Stop).is_ok()
309312
{
313+
if self.is_dropping_from_own_runtime() {
314+
// The wait-to-drop thread owns the Tokio runtime and will shut it down
315+
// after observing the stop signal. Joining it from one of the same
316+
// runtime's workers would deadlock: shutdown waits for this worker to
317+
// exit, while this worker waits for shutdown to finish.
318+
drop(self.join_handler.take());
319+
return;
320+
}
321+
310322
match self.join_handler.take().unwrap().join() {
311323
Err(e) => warn!("Runtime dropper panic, {:?}", e),
312324
Ok(true) => {
313-
// When the runtime shutdown is blocked for more than 3 seconds,
314-
// we will print the backtrace in the warn log, which will help us debug.
325+
// If the debug shutdown timeout is fully consumed, log the
326+
// drop-site backtrace to help diagnose blocked runtime tasks.
315327
warn!(
316328
"Runtime dropper is blocked 3 seconds, runtime name: {:?}, drop backtrace: {:?}",
317329
self.name,
@@ -325,6 +337,15 @@ impl Drop for Dropper {
325337
}
326338
}
327339

340+
impl Dropper {
341+
fn is_dropping_from_own_runtime(&self) -> bool {
342+
match Handle::try_current() {
343+
Ok(handle) => handle.id().to_string() == self.runtime_id,
344+
Err(_) => false,
345+
}
346+
}
347+
}
348+
328349
/// Run multiple futures parallel
329350
/// using a semaphore to limit the parallelism number, and a specified thread pool to run the futures.
330351
/// It waits for all futures to complete and returns their results.

src/common/base/tests/it/runtime.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::sync::Arc;
1616
use std::sync::LazyLock;
1717
use std::sync::Mutex;
18+
use std::sync::mpsc;
1819
use std::time::Duration;
1920
use std::time::Instant;
2021

@@ -86,6 +87,37 @@ async fn test_shutdown_long_run_runtime() -> anyhow::Result<()> {
8687
Ok(())
8788
}
8889

90+
struct RuntimeOwner {
91+
_runtime: Arc<Runtime>,
92+
}
93+
94+
#[test]
95+
fn test_runtime_can_be_dropped_from_own_worker() -> anyhow::Result<()> {
96+
let runtime = Arc::new(Runtime::with_worker_threads(
97+
1,
98+
Some("drop-runtime-from-own-worker".to_string()),
99+
)?);
100+
let owner = Arc::new(RuntimeOwner {
101+
_runtime: runtime.clone(),
102+
});
103+
let owner_for_task = owner.clone();
104+
let (release_tx, release_rx) = mpsc::channel();
105+
let (done_tx, done_rx) = mpsc::channel();
106+
107+
runtime.spawn(async move {
108+
release_rx.recv().unwrap();
109+
drop(owner_for_task);
110+
let _ = done_tx.send(());
111+
});
112+
113+
drop(owner);
114+
drop(runtime);
115+
release_tx.send(())?;
116+
117+
done_rx.recv_timeout(Duration::from_secs(2))?;
118+
Ok(())
119+
}
120+
89121
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
90122
async fn test_runtime_task_dump_contains_runtime_id() -> anyhow::Result<()> {
91123
let runtime = Runtime::with_worker_threads(1, Some("task-marker-runtime".to_string()))?;

0 commit comments

Comments
 (0)