Skip to content

Commit d1cccd5

Browse files
Fix Windows preemptive crash: two-level preemption to avoid RefCell contention
The SuspendThread-based preemption can fire at any instruction, including during println!() which holds the test harness's OUTPUT_CAPTURE RefCell. When another coroutine on the same thread tries println!, it causes a double-panic abort (STATUS_STACK_BUFFER_OVERRUN / STATUS_HEAP_CORRUPTION). Fix: Implement two-level preemption: - Level 1: First SuspendThread sets a thread-local flag and returns without switching coroutines. preempt_asm restores all registers and returns to the original code. - Level 2: If the flag is still set at the next SuspendThread (~1ms later), the coroutine is truly CPU-bound — force immediate suspend. - Cooperative yield: check_preempt() called from Windows impl_facade! after each hooked syscall, yielding at a safe point where no thread-local borrows are held. This ensures coroutines with syscalls (like Sleep in the preemptive example) yield cooperatively at safe points, while pure CPU-bound loops (like loop{}) still get force-preempted after one extra monitor cycle. Agent-Logs-Url: https://github.com/acl-dev/open-coroutine/sessions/32977241-0cd1-450d-9693-d17d06f87274 Co-authored-by: loongs-zhang <38336731+loongs-zhang@users.noreply.github.com>
1 parent 60cdc8f commit d1cccd5

2 files changed

Lines changed: 66 additions & 24 deletions

File tree

core/src/monitor.rs

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -159,17 +159,8 @@ impl Monitor {
159159
let monitor = Self::get_instance();
160160
Self::init_current(monitor);
161161
let notify_queue = unsafe { &*monitor.notify_queue.get() };
162-
// On Windows, SuspendThread/SetThreadContext is NOT idempotent (unlike
163-
// Unix SIGURG). Calling preempt_thread a second time while the first
164-
// preemption is still pending corrupts the thread's stack layout.
165-
// Therefore, on Windows we collect successfully-preempted nodes and
166-
// remove them from the queue after the iteration.
167-
#[cfg(windows)]
168-
let mut preempted = Vec::new();
169162
while MonitorState::Running == monitor.state.get() || !notify_queue.is_empty() {
170163
//只遍历,不删除,如果抢占调度失败,会在1ms后不断重试,相当于主动检测
171-
#[cfg(windows)]
172-
preempted.clear();
173164
for node in notify_queue {
174165
if now() < node.timestamp {
175166
continue;
@@ -185,9 +176,12 @@ impl Monitor {
185176
);
186177
}
187178
} else if #[cfg(windows)] {
188-
if Self::preempt_thread(node.thread_id) {
189-
preempted.push(*node);
190-
} else {
179+
// Two-level preemption: the first preempt_thread call
180+
// sets a cooperative flag; the second call (next iteration,
181+
// ~1ms later) forces immediate suspension. We do NOT remove
182+
// the node here — MonitorListener::on_state_changed removes
183+
// it when the coroutine actually transitions to Suspend.
184+
if !Self::preempt_thread(node.thread_id) {
191185
error!(
192186
"Attempt to preempt scheduling for thread:{} failed !",
193187
node.thread_id
@@ -196,13 +190,6 @@ impl Monitor {
196190
}
197191
}
198192
}
199-
#[cfg(windows)]
200-
if !preempted.is_empty() {
201-
let queue = unsafe { &mut *monitor.notify_queue.get() };
202-
for node in &preempted {
203-
_ = queue.remove(node);
204-
}
205-
}
206193
//monitor线程不执行协程计算任务,每次循环至少wait 1ms
207194
monitor.blocker.clone().block(Duration::from_millis(1));
208195
}
@@ -514,11 +501,53 @@ std::arch::global_asm!(
514501
"ret",
515502
);
516503

504+
// Thread-local flag for two-level preemption on Windows.
505+
// Level 1: SuspendThread fires, do_preempt sets this flag and returns
506+
// without switching coroutines. The hooked syscall (Sleep, etc.)
507+
// calls check_preempt() which yields cooperatively at a safe point.
508+
// Level 2: If the flag is still set on the next SuspendThread (1ms later),
509+
// the coroutine is truly CPU-bound with no syscalls — do_preempt
510+
// forces an immediate context switch.
511+
#[cfg(windows)]
512+
std::thread_local! {
513+
static PREEMPT_PENDING: Cell<bool> = const { Cell::new(false) };
514+
}
515+
516+
/// Check if a preemption request is pending and yield cooperatively.
517+
/// Called from the Windows syscall hooks (impl_facade!) after each
518+
/// hooked syscall returns, providing a safe yield point where no
519+
/// thread-local borrows (e.g. test harness stdout capture) are held.
520+
#[cfg(windows)]
521+
pub(crate) fn check_preempt() {
522+
PREEMPT_PENDING.with(|flag| {
523+
if flag.get() {
524+
flag.set(false);
525+
if let Some(suspender) = SchedulableSuspender::current() {
526+
suspender.suspend();
527+
}
528+
}
529+
});
530+
}
531+
517532
#[cfg(windows)]
518533
extern "C" fn do_preempt() {
519-
if let Some(suspender) = SchedulableSuspender::current() {
520-
suspender.suspend();
521-
}
534+
PREEMPT_PENDING.with(|flag| {
535+
if flag.get() {
536+
// Flag was already set from a previous SuspendThread attempt but the
537+
// coroutine never made a hooked syscall — it is truly CPU-bound.
538+
// Force immediate suspension.
539+
flag.set(false);
540+
if let Some(suspender) = SchedulableSuspender::current() {
541+
suspender.suspend();
542+
}
543+
} else {
544+
// First attempt: set the flag and return without suspending.
545+
// preempt_asm will restore all registers and return to the original
546+
// code. If the coroutine reaches a hooked syscall, check_preempt()
547+
// will yield cooperatively at a safe point.
548+
flag.set(true);
549+
}
550+
});
522551
}
523552

524553
#[repr(C)]
@@ -638,10 +667,18 @@ mod tests {
638667
let thread_id = tid.load(Ordering::Acquire);
639668
assert_ne!(thread_id, 0, "Thread should have reported its ID");
640669

641-
// Directly call preempt_thread to preempt the running coroutine
670+
// Directly call preempt_thread to preempt the running coroutine.
671+
// Two-level preemption: the first call sets a cooperative flag (the
672+
// coroutine continues running), the second call forces suspension.
673+
assert!(
674+
super::Monitor::preempt_thread(thread_id),
675+
"preempt_thread should succeed (set cooperative flag)"
676+
);
677+
// Allow the first preempt_asm to complete before the second call
678+
std::thread::sleep(Duration::from_millis(1));
642679
assert!(
643680
super::Monitor::preempt_thread(thread_id),
644-
"preempt_thread should succeed"
681+
"preempt_thread should succeed (force suspend)"
645682
);
646683

647684
// Wait for thread to complete

core/src/syscall/windows/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ macro_rules! impl_facade {
9595
}
9696
}
9797
$crate::info!("exit syscall {} {:?} {}", syscall, r, saved_errno);
98+
// Cooperative preemption: if the monitor thread requested
99+
// preemption via SuspendThread, yield here at a safe point
100+
// where no thread-local borrows are held.
101+
#[cfg(feature = "preemptive")]
102+
$crate::monitor::check_preempt();
98103
// Restore errno so callers see the correct error.
99104
if let Some(e) = saved_errno.raw_os_error() {
100105
$crate::syscall::set_errno(

0 commit comments

Comments
 (0)