diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index b0f09a0cf00e..48b09f9a9761 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1,6 +1,7 @@ mod cell_data; mod counter_map; mod operation; +mod snapshot_coordinator; mod storage; pub mod storage_schema; @@ -13,7 +14,7 @@ use std::{ pin::Pin, sync::{ Arc, LazyLock, - atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, }, time::SystemTime, }; @@ -21,7 +22,7 @@ use std::{ use anyhow::{Context, Result, bail}; use auto_hash_map::{AutoMap, AutoSet}; use indexmap::IndexSet; -use parking_lot::{Condvar, Mutex}; +use parking_lot::Mutex; use rustc_hash::{FxHashMap, FxHashSet, FxHasher}; use smallvec::{SmallVec, smallvec}; use tokio::time::{Duration, Instant}; @@ -63,6 +64,7 @@ use crate::{ connect_children, get_aggregation_number, get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children, }, + snapshot_coordinator::{OperationGuard, SnapshotCoordinator}, storage::Storage, storage_schema::{TaskStorage, TaskStorageAccessors}, }, @@ -75,7 +77,6 @@ use crate::{ utils::{ dash_map_drop_contents::drop_contents, dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard}, - ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, }, }; @@ -83,9 +84,7 @@ use crate::{ /// Threshold for parallelizing making dependent tasks dirty. /// If the number of dependent tasks exceeds this threshold, /// the operation will be parallelized. -const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000; - -const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1); +const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000; /// Configurable idle timeout for snapshot persistence. /// Defaults to 2 seconds if not set or if the value is invalid. @@ -97,20 +96,6 @@ static IDLE_TIMEOUT: LazyLock = LazyLock::new(|| { .unwrap_or(Duration::from_secs(2)) }); -struct SnapshotRequest { - snapshot_requested: bool, - suspended_operations: FxHashSet>, -} - -impl SnapshotRequest { - fn new() -> Self { - Self { - snapshot_requested: false, - suspended_operations: FxHashSet::default(), - } - } -} - pub enum StorageMode { /// Queries the storage for cache entries that don't exist locally. ReadOnly, @@ -178,21 +163,14 @@ struct TurboTasksBackendInner { storage: Storage, - /// Number of executing operations + Highest bit is set when snapshot is - /// requested. When that bit is set, operations should pause until the - /// snapshot is completed. When the bit is set and in progress counter - /// reaches zero, `operations_completed_when_snapshot_requested` is - /// triggered. - in_progress_operations: AtomicUsize, - - snapshot_request: Mutex, - /// Condition Variable that is triggered when `in_progress_operations` - /// reaches zero while snapshot is requested. All operations are either - /// completed or suspended. - operations_suspended: Condvar, - /// Condition Variable that is triggered when a snapshot is completed and - /// operations can continue. - snapshot_completed: Condvar, + /// Coordinates the operation/snapshot interleaving protocol. See + /// [`SnapshotCoordinator`] for details. + snapshot_coord: SnapshotCoordinator, + /// Serializes calls to `snapshot_and_persist`. The coordinator's + /// `begin_snapshot` asserts that snapshots don't overlap; this mutex + /// enforces that contract for our two callers (background loop and + /// `stop_and_wait`). + snapshot_in_progress: Mutex<()>, /// The timestamp of the last started snapshot since [`Self::start_time`]. last_snapshot: AtomicU64, @@ -247,10 +225,8 @@ impl TurboTasksBackendInner { ), task_cache: FxDashMap::default(), storage: Storage::new(shard_amount, small_preallocation), - in_progress_operations: AtomicUsize::new(0), - snapshot_request: Mutex::new(SnapshotRequest::new()), - operations_suspended: Condvar::new(), - snapshot_completed: Condvar::new(), + snapshot_coord: SnapshotCoordinator::new(), + snapshot_in_progress: Mutex::new(()), last_snapshot: AtomicU64::new(0), stopping: AtomicBool::new(false), stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()), @@ -273,65 +249,20 @@ impl TurboTasksBackendInner { } fn suspending_requested(&self) -> bool { - self.should_persist() - && (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0 + self.should_persist() && self.snapshot_coord.snapshot_pending() } fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) { - #[cold] - fn operation_suspend_point_cold( - this: &TurboTasksBackendInner, - suspend: impl FnOnce() -> AnyOperation, - ) { - let operation = Arc::new(suspend()); - let mut snapshot_request = this.snapshot_request.lock(); - if snapshot_request.snapshot_requested { - snapshot_request - .suspended_operations - .insert(operation.clone().into()); - let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1; - assert!((value & SNAPSHOT_REQUESTED_BIT) != 0); - if value == SNAPSHOT_REQUESTED_BIT { - this.operations_suspended.notify_all(); - } - this.snapshot_completed - .wait_while(&mut snapshot_request, |snapshot_request| { - snapshot_request.snapshot_requested - }); - this.in_progress_operations.fetch_add(1, Ordering::AcqRel); - snapshot_request - .suspended_operations - .remove(&operation.into()); - } - } - - if self.suspending_requested() { - operation_suspend_point_cold(self, suspend); + if self.should_persist() { + self.snapshot_coord.suspend_point(suspend); } } - pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> { + pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> { if !self.should_persist() { - return OperationGuard { backend: None }; - } - let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel); - if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 { - let mut snapshot_request = self.snapshot_request.lock(); - if snapshot_request.snapshot_requested { - let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1; - if value == SNAPSHOT_REQUESTED_BIT { - self.operations_suspended.notify_all(); - } - self.snapshot_completed - .wait_while(&mut snapshot_request, |snapshot_request| { - snapshot_request.snapshot_requested - }); - self.in_progress_operations.fetch_add(1, Ordering::AcqRel); - } - } - OperationGuard { - backend: Some(self), + return OperationGuard::noop(); } + self.snapshot_coord.begin_operation() } fn should_persist(&self) -> bool { @@ -427,23 +358,6 @@ impl TurboTasksBackendInner { } } -pub(crate) struct OperationGuard<'a, B: BackingStorage> { - backend: Option<&'a TurboTasksBackendInner>, -} - -impl Drop for OperationGuard<'_, B> { - fn drop(&mut self) { - if let Some(backend) = self.backend { - let fetch_sub = backend - .in_progress_operations - .fetch_sub(1, Ordering::AcqRel); - if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT { - backend.operations_suspended.notify_all(); - } - } - } -} - /// Intermediate result of step 1 of task execution completion. struct TaskExecutionCompletePrepareResult { pub new_children: FxHashSet, @@ -992,6 +906,10 @@ impl TurboTasksBackendInner { let snapshot_span = tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason) .entered(); + // Serialize snapshots. The internal protocol (snapshot_mode, snapshot + // request bit, suspended_operations) assumes only one snapshot runs at + // a time. Held for the entire snapshot lifecycle. + let _snapshot_in_progress = self.snapshot_in_progress.lock(); let start = Instant::now(); // SystemTime for wall-clock timestamps in trace events (milliseconds // since epoch). Instant is monotonic but has no defined epoch, so it @@ -999,37 +917,18 @@ impl TurboTasksBackendInner { let wall_start = SystemTime::now(); debug_assert!(self.should_persist()); - let suspended_operations; - { + let mut snapshot_phase = { let _span = tracing::info_span!("blocking").entered(); - let mut snapshot_request = self.snapshot_request.lock(); - snapshot_request.snapshot_requested = true; - let active_operations = self - .in_progress_operations - .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed); - if active_operations != 0 { - self.operations_suspended - .wait_while(&mut snapshot_request, |_| { - self.in_progress_operations.load(Ordering::Relaxed) - != SNAPSHOT_REQUESTED_BIT - }); - } - suspended_operations = snapshot_request - .suspended_operations - .iter() - .map(|op| op.arc().clone()) - .collect::>(); - } + self.snapshot_coord.begin_snapshot() + }; // Enter snapshot mode, which atomically reads and resets the modified count. // Checking after start_snapshot ensures no concurrent increments can race. let (snapshot_guard, has_modifications) = self.storage.start_snapshot(); - let mut snapshot_request = self.snapshot_request.lock(); - snapshot_request.snapshot_requested = false; - self.in_progress_operations - .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed); - self.snapshot_completed.notify_all(); + + let suspended_operations = snapshot_phase.take_suspended_operations(); + let snapshot_time = Instant::now(); - drop(snapshot_request); + drop(snapshot_phase); if !has_modifications { // No tasks modified since the last snapshot — drop the guard (which @@ -2448,7 +2347,7 @@ impl TurboTasksBackendInner { span.record("result", "marked dirty"); } - if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD { + if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD { let chunk_size = good_chunk_size(output_dependent_tasks.len()); let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size); let _ = scope_and_block(chunks.len(), |scope| { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 4882ac615dab..76af14bf9e3e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -26,8 +26,8 @@ use turbo_tasks::{ use self::aggregation_update::ComputeDirtyAndCleanUpdate; use crate::{ backend::{ - EventDescription, OperationGuard, TaskDataCategory, TurboTasksBackend, - TurboTasksBackendInner, + EventDescription, TaskDataCategory, TurboTasksBackend, TurboTasksBackendInner, + snapshot_coordinator::OperationGuard, storage::{SpecificTaskDataCategory, StorageWriteGuard}, storage_schema::{TaskStorage, TaskStorageAccessors}, }, @@ -168,7 +168,7 @@ impl TaskLockCounter { pub struct ExecuteContextImpl<'e, B: BackingStorage> { backend: &'e TurboTasksBackendInner, turbo_tasks: &'e dyn TurboTasksBackendApi>, - _operation_guard: Option>, + _operation_guard: Option>, task_lock_counter: TaskLockCounter, } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/snapshot_coordinator.rs b/turbopack/crates/turbo-tasks-backend/src/backend/snapshot_coordinator.rs new file mode 100644 index 000000000000..7fe6bbb46953 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/backend/snapshot_coordinator.rs @@ -0,0 +1,598 @@ +//! Coordinator that gates concurrent operations against snapshotting. +//! +//! Backend operations and snapshot work share a single [`SnapshotCoordinator`] +//! that enforces the protocol: +//! +//! - When no snapshot is in flight, [`begin_operation`](SnapshotCoordinator::begin_operation) is a +//! single uncontended atomic increment. +//! - When a snapshot is requested, new operations block until the snapshot finishes, and operations +//! already in flight either complete or call +//! [`suspend_point`](SnapshotCoordinator::suspend_point) to suspend. +//! - The snapshotter waits for every in-flight operation to drain or suspend, takes its snapshot, +//! then wakes everyone. + +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; + +use parking_lot::{Condvar, Mutex}; +use rustc_hash::FxHashSet; + +use crate::{backend::AnyOperation, utils::ptr_eq_arc::PtrEqArc}; + +/// High bit: set while a snapshot is requested or in flight. +/// Low bits: count of operations currently executing (not suspended). +const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1); + +/// State protected by the mutex. +struct State { + /// `true` between `begin_snapshot` and `SnapshotPhase::drop`. + snapshot_requested: bool, + /// Operations that called [`SnapshotCoordinator::suspend_point`] and have + /// not yet resumed. Returned to the snapshotter via + /// [`SnapshotPhase::suspended_operations`] so it can persist them in the + /// uncompleted-operations log. + suspended_operations: FxHashSet>, +} + +/// Coordinates operation/snapshot interleaving. +/// +/// Generic over the operation type the caller wants to suspend. The +/// coordinator only requires `O: Send + Sync + 'static`; it never inspects +/// the value, just stores it via [`PtrEqArc`]. +pub struct SnapshotCoordinator { + /// Combined count + bit. See [`SNAPSHOT_REQUESTED_BIT`]. + in_progress_operations: AtomicUsize, + state: Mutex>, + /// Notified by the last operation to drain (count drops to `BIT` while + /// `SNAPSHOT_REQUESTED_BIT` is set). Awaited by [`begin_snapshot`]. + operations_drained: Condvar, + /// Notified by [`SnapshotPhase::drop`]. Awaited by operations that hit a + /// suspend point or arrive while a snapshot is in flight. + snapshot_completed: Condvar, +} + +impl Default for SnapshotCoordinator { + fn default() -> Self { + Self::new() + } +} + +impl SnapshotCoordinator { + pub fn new() -> Self { + Self { + in_progress_operations: AtomicUsize::new(0), + state: Mutex::new(State { + snapshot_requested: false, + suspended_operations: FxHashSet::default(), + }), + operations_drained: Condvar::new(), + snapshot_completed: Condvar::new(), + } + } + + /// Cheap check used by hot paths. Returns `true` while a snapshot is in + /// flight (or being requested). May return `false` racily if a snapshot + /// is just about to start; the actual coordination happens in + /// [`suspend_point`](Self::suspend_point) and [`begin_operation`](Self::begin_operation). + pub fn snapshot_pending(&self) -> bool { + // Acquire so that observing the bit synchronizes with anything the + // snapshotter wrote before setting it. + (self.in_progress_operations.load(Ordering::Acquire) & SNAPSHOT_REQUESTED_BIT) != 0 + } + + /// Begin an operation. Returns a guard that decrements on drop. + /// + /// If a snapshot is in flight, blocks until the snapshot finishes before + /// returning the guard. + pub fn begin_operation(&self) -> OperationGuard<'_, O> { + // Fast path: no snapshot in flight, single atomic increment. + let prev = self.in_progress_operations.fetch_add(1, Ordering::AcqRel); + if (prev & SNAPSHOT_REQUESTED_BIT) == 0 { + return OperationGuard { coord: Some(self) }; + } + #[cold] + fn wait_for_snapshot_to_complete(this: &SnapshotCoordinator) { + // We arrive here holding our +1 (the fetch_add in begin_operation). + // Two cases: + // - Snapshot is still in flight: back out our +1, wait for it to finish, then re-add. + // The drop balances the re-add. + // - Snapshot already finished between our fetch_add and acquiring this mutex: leave + // our +1 in place; the drop balances it directly. No extra atomics needed. + let mut state = this.state.lock(); + if state.snapshot_requested { + let prev = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel); + if prev - 1 == SNAPSHOT_REQUESTED_BIT { + this.operations_drained.notify_all(); + } + this.snapshot_completed + .wait_while(&mut state, |s| s.snapshot_requested); + // Re-add now that the snapshot is done. Bit is cleared because + // we just observed `snapshot_requested == false` under the + // mutex. + this.in_progress_operations.fetch_add(1, Ordering::AcqRel); + } + } + // Slow path: a snapshot is in flight (or just requested). Back out + // the increment, wait for the snapshot to complete, then re-increment. + wait_for_snapshot_to_complete(self); + OperationGuard { coord: Some(self) } + } + + /// Suspend the current operation if a snapshot is requested. Otherwise a + /// no-op. The closure is called only when actually suspending — it must + /// produce a handle to this operation so the snapshotter can persist it + /// for replay on the next startup. + pub fn suspend_point(&self, suspend: impl FnOnce() -> O) { + if !self.snapshot_pending() { + return; + } + #[cold] + fn suspend_point_cold(this: &SnapshotCoordinator, suspend: impl FnOnce() -> O) { + let op = Arc::new(suspend()); + let mut state = this.state.lock(); + if !state.snapshot_requested { + // Race: snapshot finished between the `snapshot_pending` check + // and acquiring the mutex. Nothing to do. + return; + } + state + .suspended_operations + .insert(PtrEqArc::from(op.clone())); + // Decrement the count so the snapshotter can drain. + let prev = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel); + // Protocol violation if either invariant fails. Keep as a regular + // `assert!` so production builds also catch it: the alternative is + // a corrupted counter that hangs the next snapshot indefinitely. + assert!( + (prev & SNAPSHOT_REQUESTED_BIT) != 0 && (prev & !SNAPSHOT_REQUESTED_BIT) > 0, + "suspend_point called without a live operation: prev={prev:#x}" + ); + if prev - 1 == SNAPSHOT_REQUESTED_BIT { + this.operations_drained.notify_all(); + } + // Wait for the snapshot to finish. + this.snapshot_completed + .wait_while(&mut state, |s| s.snapshot_requested); + // Resume: re-increment and remove ourselves from the suspended set. + this.in_progress_operations.fetch_add(1, Ordering::AcqRel); + state.suspended_operations.remove(&PtrEqArc::from(op)); + } + suspend_point_cold(self, suspend); + } + + /// Begin a snapshot. Sets the snapshot bit, blocks until all in-flight + /// operations have drained or suspended, and returns a [`SnapshotPhase`] + /// guard that releases the bit on drop. + /// + /// Concurrent callers panic via the debug assertion. Production callers + /// must serialize themselves (see `snapshot_in_progress` lock in + /// `mod.rs`); the coordinator does not own that mutex because some + /// callers want to interleave additional work between phases. + pub fn begin_snapshot(&self) -> SnapshotPhase<'_, O> { + let mut state = self.state.lock(); + // Protocol violation: callers must serialize snapshots themselves. + // Promoted from debug_assert: silently ignoring this leads directly + // to a stuck counter and a hung process. + assert!( + !state.snapshot_requested, + "begin_snapshot called while another snapshot was already in flight" + ); + state.snapshot_requested = true; + // AcqRel so the writes leading up to setting the bit are visible to + // the operation hot path's Acquire load in `snapshot_pending`. + let active = self + .in_progress_operations + .fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::AcqRel); + assert!( + (active & SNAPSHOT_REQUESTED_BIT) == 0, + "snapshot bit was already set when begin_snapshot ran: {active:#x}" + ); + if (active & !SNAPSHOT_REQUESTED_BIT) != 0 { + // Some operations are in flight. Wait for them to drain or + // suspend. The predicate is Acquire-loaded so we synchronize + // with the AcqRel decrement that woke us. + self.operations_drained.wait_while(&mut state, |_| { + self.in_progress_operations.load(Ordering::Acquire) != SNAPSHOT_REQUESTED_BIT + }); + } + // Snapshot ranges that follow can read the suspended_operations + // list; we leave the mutex held until the caller drops the phase. + let suspended_operations: Vec> = state + .suspended_operations + .iter() + .map(|op| op.arc().clone()) + .collect(); + // Release the mutex now — the snapshotter does the heavy work + // without holding it. Operations attempting to start during this + // window observe the bit set and either suspend or wait on + // `snapshot_completed`. + drop(state); + SnapshotPhase { + coord: self, + suspended_operations, + } + } +} + +/// Guard returned by [`SnapshotCoordinator::begin_operation`]. Decrements the +/// in-progress count on drop and notifies the snapshotter if it is waiting. +pub struct OperationGuard<'a, O> { + coord: Option<&'a SnapshotCoordinator>, +} + +impl OperationGuard<'_, O> { + /// A guard that does nothing on drop. Useful for backends that don't + /// participate in the snapshot protocol (e.g. when persistence is + /// disabled). + pub fn noop() -> Self { + Self { coord: None } + } +} + +impl Drop for OperationGuard<'_, O> { + fn drop(&mut self) { + let Some(coord) = self.coord else { + return; + }; + let prev = coord.in_progress_operations.fetch_sub(1, Ordering::AcqRel); + // Underflow means a guard was dropped without a matching increment; + // promoted from debug_assert because the alternative is silently + // wrapping to usize::MAX and breaking every subsequent snapshot. + assert!( + (prev & !SNAPSHOT_REQUESTED_BIT) > 0, + "OperationGuard::drop underflow: in_progress_operations was {prev:#x}" + ); + if prev - 1 == SNAPSHOT_REQUESTED_BIT { + #[cold] + fn notify_drained(coord: &SnapshotCoordinator) { + // Take the state mutex around `notify_all`. This is defensive against + // `parking_lot::Condvar::notify_all`'s fast path: it does a `Relaxed` load + // on the condvar's internal `state` and short-circuits if it observes + // null. A waiter publishes that `state` under parking_lot's bucket lock + // (not under the user mutex), so a notifier that has never synchronized + // with the user mutex can racily observe stale null and drop the notify. + // + // It is generally a best practice to only notify under the loc + let _g = coord.state.lock(); + coord.operations_drained.notify_all(); + } + notify_drained(coord); + } + } +} + +/// Guard returned by [`SnapshotCoordinator::begin_snapshot`]. Holds the +/// snapshot bit; on drop, releases it and wakes any operations parked on +/// `snapshot_completed`. +pub struct SnapshotPhase<'a, O> { + coord: &'a SnapshotCoordinator, + suspended_operations: Vec>, +} + +impl SnapshotPhase<'_, O> { + /// Operations that were suspended at the moment the snapshot started. + /// The snapshotter must persist these so they can be replayed on the + /// next startup. + #[cfg(test)] + pub fn suspended_operations(&self) -> &[Arc] { + &self.suspended_operations + } + + /// Take ownership of the suspended-operations list. + pub fn take_suspended_operations(&mut self) -> Vec> { + std::mem::take(&mut self.suspended_operations) + } +} + +impl Drop for SnapshotPhase<'_, O> { + fn drop(&mut self) { + let mut state = self.coord.state.lock(); + state.snapshot_requested = false; + let prev = self + .coord + .in_progress_operations + .fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::AcqRel); + assert!( + (prev & SNAPSHOT_REQUESTED_BIT) != 0, + "SnapshotPhase::drop: snapshot bit was already cleared (prev={prev:#x})" + ); + // Notify everyone waiting for the snapshot to finish under the + // mutex (correctness against parking_lot's notify_all fast path). + self.coord.snapshot_completed.notify_all(); + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::{ + Arc, + atomic::{AtomicBool, AtomicUsize}, + mpsc::{self, RecvTimeoutError}, + }, + thread, + time::Duration, + }; + + use super::*; + + /// Trivial operation type for tests — just a u32 tag. + type Op = u32; + + /// Spin until `snapshot_pending()` returns true, yielding occasionally so + /// we don't starve the snapshotter thread on single-core CI. Replaces + /// fixed `thread::sleep` waits — those introduced both flakiness (too + /// short) and slowness (too long). + fn wait_for_snapshot_pending(coord: &SnapshotCoordinator) { + while !coord.snapshot_pending() { + thread::yield_now(); + } + } + + #[test] + fn no_snapshot_pending_initially() { + let coord = SnapshotCoordinator::::new(); + assert!(!coord.snapshot_pending()); + } + + #[test] + fn begin_operation_fast_path() { + let coord = SnapshotCoordinator::::new(); + let g = coord.begin_operation(); + assert_eq!(coord.in_progress_operations.load(Ordering::Acquire), 1); + drop(g); + assert_eq!(coord.in_progress_operations.load(Ordering::Acquire), 0); + } + + #[test] + fn snapshot_with_no_ops_proceeds_immediately() { + let coord = SnapshotCoordinator::::new(); + let phase = coord.begin_snapshot(); + assert!(coord.snapshot_pending()); + assert!(phase.suspended_operations().is_empty()); + drop(phase); + assert!(!coord.snapshot_pending()); + } + + #[test] + fn snapshot_waits_for_ops_to_drain() { + let coord = Arc::new(SnapshotCoordinator::::new()); + + let g = coord.begin_operation(); + let started_snapshot = Arc::new(AtomicUsize::new(0)); + + let coord2 = coord.clone(); + let snap_thread = thread::spawn({ + let started_snapshot = started_snapshot.clone(); + move || { + let _phase = coord2.begin_snapshot(); + started_snapshot.store(1, Ordering::Release); + } + }); + + // Wait for the snapshotter to set the bit. It can't make progress + // past begin_snapshot while we hold `g`, so started_snapshot must + // still be 0. + wait_for_snapshot_pending(&coord); + assert_eq!(started_snapshot.load(Ordering::Acquire), 0); + + // Drop the operation — snapshotter should now proceed. + drop(g); + snap_thread.join().unwrap(); + assert_eq!(started_snapshot.load(Ordering::Acquire), 1); + } + + #[test] + fn new_operation_blocks_during_snapshot() { + let coord = Arc::new(SnapshotCoordinator::::new()); + let phase = coord.begin_snapshot(); + let started_op = Arc::new(AtomicUsize::new(0)); + let arrived = Arc::new(AtomicUsize::new(0)); + + let coord2 = coord.clone(); + let op_thread = thread::spawn({ + let started_op = started_op.clone(); + let arrived = arrived.clone(); + move || { + arrived.store(1, Ordering::Release); + let _guard = coord2.begin_operation(); + started_op.store(1, Ordering::Release); + } + }); + + // Wait until the worker is alive and about to call begin_operation. + // We can't directly observe it entering begin_operation (its + // fetch_add is transient — it backs out and parks before we can + // sample), but since we hold `phase` the worker provably cannot + // set started_op=1 from anywhere inside begin_operation. So + // observing started_op==0 after the worker is running and on its + // way into begin_operation is a real check, not a vacuous one. + while arrived.load(Ordering::Acquire) == 0 { + thread::yield_now(); + } + assert_eq!(started_op.load(Ordering::Acquire), 0); + + drop(phase); + op_thread.join().unwrap(); + assert_eq!(started_op.load(Ordering::Acquire), 1); + } + + #[test] + fn suspend_point_lets_snapshot_proceed() { + let coord = Arc::new(SnapshotCoordinator::::new()); + let g = coord.begin_operation(); + + let snapshotter_done = Arc::new(AtomicUsize::new(0)); + let coord_snap = coord.clone(); + + let snap_thread = thread::spawn({ + let snapshotter_done = snapshotter_done.clone(); + move || { + let phase = coord_snap.begin_snapshot(); + assert_eq!(phase.suspended_operations().len(), 1); + snapshotter_done.store(1, Ordering::Release); + // Hold the snapshot for a moment so the suspend_point thread + // observes `snapshot_requested == true` after waking. + thread::sleep(Duration::from_millis(20)); + } + }); + + wait_for_snapshot_pending(&coord); + // Snapshotter is now waiting for our operation to drain. Calling + // suspend_point should let it proceed. + coord.suspend_point(|| 42u32); + // suspend_point returns once the snapshot is finished. + assert_eq!(snapshotter_done.load(Ordering::Acquire), 1); + + snap_thread.join().unwrap(); + drop(g); + } + + /// Run `body` on a worker thread and wait up to `timeout` for it to + /// finish. + fn run_with_timeout( + label: &'static str, + timeout: Duration, + body: impl FnOnce() + Send + 'static, + ) { + let (tx, rx) = mpsc::channel::<()>(); + let handle = thread::spawn(move || { + body(); + let _ = tx.send(()); + }); + match rx.recv_timeout(timeout) { + // Worker either finished normally or panicked (dropping the + // sender). Either way it's no longer running, so join to + // propagate any panic. + Ok(()) | Err(RecvTimeoutError::Disconnected) => { + handle.join().unwrap(); + } + Err(RecvTimeoutError::Timeout) => { + panic!( + "[watchdog] {label}: timed out after {timeout:?}, missed-wakeup race likely" + ); + } + } + } + + /// Targeted stress test that reproduces the parking_lot notify-all + /// fast-path missed-wakeup race when `OperationGuard::drop` does NOT + /// take the state mutex. + #[test] + fn stress_no_missed_wakeups() { + run_with_timeout("stress_no_missed_wakeups", Duration::from_secs(60), || { + let coord = Arc::new(SnapshotCoordinator::::new()); + let snapshot_lock = Arc::new(Mutex::new(())); + let stop = Arc::new(AtomicBool::new(false)); + let snap_count = Arc::new(AtomicUsize::new(0)); + + let mut op_handles = Vec::new(); + for _ in 0..8 { + let coord = coord.clone(); + op_handles.push(thread::spawn({ + let stop = stop.clone(); + move || { + while !stop.load(Ordering::Relaxed) { + let _g = coord.begin_operation(); + } + } + })); + } + let mut snap_handles = Vec::new(); + for _ in 0..2 { + snap_handles.push(thread::spawn({ + let coord = coord.clone(); + let snapshot_lock = snapshot_lock.clone(); + let snap_count = snap_count.clone(); + move || { + for _ in 0..200 { + let _ser = snapshot_lock.lock(); + let _phase = coord.begin_snapshot(); + snap_count.fetch_add(1, Ordering::Relaxed); + } + } + })); + } + + // Progress watchdog: print snapshot count every 5s so we can see + // if the test is making progress or actually wedged. + let stop_progress = Arc::new(AtomicBool::new(false)); + + let progress = thread::spawn({ + let stop_progress = stop_progress.clone(); + let snap_count = snap_count.clone(); + move || { + while !stop_progress.load(Ordering::Relaxed) { + thread::sleep(Duration::from_secs(1)); + eprintln!( + "[stress] snapshots completed: {}", + snap_count.load(Ordering::Relaxed), + ); + } + } + }); + + for h in snap_handles { + h.join().unwrap(); + } + stop.store(true, Ordering::Relaxed); + for h in op_handles { + h.join().unwrap(); + } + stop_progress.store(true, Ordering::Relaxed); + let _ = progress.join(); + + assert_eq!(coord.in_progress_operations.load(Ordering::Acquire), 0); + }); + } + + #[test] + fn many_concurrent_ops_and_snapshots() { + // Stress test: hammer the protocol from many threads. + // The coordinator does not serialize concurrent snapshotters (callers + // are expected to do that with their own mutex), so we use one here. + let coord = Arc::new(SnapshotCoordinator::::new()); + let snapshot_lock = Arc::new(Mutex::new(())); + let counter = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::new(); + for _ in 0..8 { + handles.push(thread::spawn({ + let coord = coord.clone(); + let counter = counter.clone(); + move || { + for _ in 0..200 { + let _g = coord.begin_operation(); + counter.fetch_add(1, Ordering::Relaxed); + } + } + })); + } + for _ in 0..2 { + handles.push(thread::spawn({ + let coord = coord.clone(); + let snapshot_lock = snapshot_lock.clone(); + move || { + for _ in 0..50 { + let _ser = snapshot_lock.lock(); + let _phase = coord.begin_snapshot(); + // Pretend to do snapshot work. + thread::sleep(Duration::from_micros(10)); + } + } + })); + } + + for h in handles { + h.join().unwrap(); + } + assert_eq!(counter.load(Ordering::Relaxed), 8 * 200); + assert_eq!( + coord.in_progress_operations.load(Ordering::Acquire), + 0, + "in_progress_operations should be 0 after all ops and snapshots done" + ); + } +} diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index 1032c3d2dbc6..eeb12fb270ec 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -4,7 +4,9 @@ use std::{ future::Future, hash::{BuildHasher, BuildHasherDefault}, mem::take, + panic::AssertUnwindSafe, pin::Pin, + process::abort, sync::{ Arc, Mutex, RwLock, Weak, atomic::{AtomicBool, AtomicUsize, Ordering}, @@ -16,7 +18,7 @@ use anyhow::{Result, anyhow}; use auto_hash_map::AutoMap; use bincode::{Decode, Encode}; use either::Either; -use futures::stream::FuturesUnordered; +use futures::{FutureExt, stream::FuturesUnordered}; use rustc_hash::{FxBuildHasher, FxHasher}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; @@ -1186,6 +1188,25 @@ impl TurboTasks { struct TurboTasksExecutor; +/// Run a future and abort the process if a panic is reported +/// +/// Turbtasks catches panics from user code and propagates throught the task tree, but if it happens +/// as part of state management we have to abort +async fn abort_on_panic(f: F) -> F::Output { + match AssertUnwindSafe(f).catch_unwind().await { + Ok(r) => r, + Err(_) => { + eprintln!( + "\nturbo-tasks: an internal panic occurred outside the per-task panic \ + boundary. This is a bug in turbo-tasks/Turbopack — please report it at \ + https://github.com/vercel/next.js/discussions and include the panic message \ + and stack trace above.\n\nAborting." + ); + abort(); + } + } +} + impl Executor, ScheduledTask, TaskPriority> for TurboTasksExecutor { type Future = impl Future + Send + 'static; @@ -1200,63 +1221,69 @@ impl Executor, ScheduledTask, TaskPriority> for TurboT let this2 = this.clone(); let this = this.clone(); let future = async move { - let mut schedule_again = true; - while schedule_again { - // it's okay for execution ids to overflow and wrap, they're just used for - // an assert - let execution_id = this.execution_id_factory.wrapping_get(); - let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new( - task_id, - execution_id, - priority, - false, // in_top_level_task - ))); - let single_execution_future = async { - if this.stopped.load(Ordering::Acquire) { - this.backend.task_execution_canceled(task_id, &*this); - return false; - } - - let Some(TaskExecutionSpec { future, span }) = this - .backend - .try_start_task_execution(task_id, priority, &*this) - else { - return false; - }; - - async { - let result = CaptureFuture::new(future).await; - - // wait for all spawned local tasks using `local` to finish - wait_for_local_tasks().await; - - let result = match result { - Ok(Ok(raw_vc)) => Ok(raw_vc), - Ok(Err(err)) => Err(err.into()), - Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))), + abort_on_panic(async { + let mut schedule_again = true; + while schedule_again { + // it's okay for execution ids to overflow and wrap, they're just used + // for an assert + let execution_id = this.execution_id_factory.wrapping_get(); + let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new( + task_id, + execution_id, + priority, + false, // in_top_level_task + ))); + let single_execution_future = async { + if this.stopped.load(Ordering::Acquire) { + this.backend.task_execution_canceled(task_id, &*this); + return false; + } + + let Some(TaskExecutionSpec { future, span }) = this + .backend + .try_start_task_execution(task_id, priority, &*this) + else { + return false; }; - let finished_state = this.finish_current_task_state(); - let cell_counters = CURRENT_TASK_STATE - .with(|ts| ts.write().unwrap().cell_counters.take().unwrap()); - this.backend.task_execution_completed( - task_id, - result, - &cell_counters, - #[cfg(feature = "verify_determinism")] - finished_state.stateful, - finished_state.has_invalidator, - &*this, - ) - } - .instrument(span) - .await - }; - schedule_again = CURRENT_TASK_STATE - .scope(current_task_state, single_execution_future) - .await; - } - this.finish_foreground_job(); + async { + let result = CaptureFuture::new(future).await; + + // wait for all spawned local tasks using `local` to finish + wait_for_local_tasks().await; + + let result = match result { + Ok(Ok(raw_vc)) => Ok(raw_vc), + Ok(Err(err)) => Err(err.into()), + Err(err) => { + Err(TurboTasksExecutionError::Panic(Arc::new(err))) + } + }; + + let finished_state = this.finish_current_task_state(); + let cell_counters = CURRENT_TASK_STATE.with(|ts| { + ts.write().unwrap().cell_counters.take().unwrap() + }); + this.backend.task_execution_completed( + task_id, + result, + &cell_counters, + #[cfg(feature = "verify_determinism")] + finished_state.stateful, + finished_state.has_invalidator, + &*this, + ) + } + .instrument(span) + .await + }; + schedule_again = CURRENT_TASK_STATE + .scope(current_task_state, single_execution_future) + .await; + } + this.finish_foreground_job(); + }) + .await }; Either::Left(TURBO_TASKS.scope(this2, future).instrument(span)) @@ -1280,54 +1307,58 @@ impl Executor, ScheduledTask, TaskPriority> for TurboT trait_method.resolve_span(priority) } }; - async move { - let result = match ty.task_type { - LocalTaskType::ResolveNative { native_fn } => { - LocalTaskType::run_resolve_native( - native_fn, - ty.this, - &*ty.arg, - persistence, - this, - ) - .await - } - LocalTaskType::ResolveTrait { trait_method } => { - LocalTaskType::run_resolve_trait( - trait_method, - ty.this.unwrap(), - &*ty.arg, - persistence, - this, - ) - .await - } - }; - - let output = match result { - Ok(raw_vc) => OutputContent::Link(raw_vc), - Err(err) => OutputContent::Error( - TurboTasksExecutionError::from(err) - .with_local_task_context(task_type.to_string()), - ), - }; - - let local_task = LocalTask::Done { output }; - - let done_event = CURRENT_TASK_STATE.with(move |gts| { - let mut gts_write = gts.write().unwrap(); - let scheduled_task = std::mem::replace( - gts_write.get_mut_local_task(local_task_id), - local_task, - ); - let LocalTask::Scheduled { done_event } = scheduled_task else { - panic!("local task finished, but was not in the scheduled state?"); + abort_on_panic( + async move { + let result = match ty.task_type { + LocalTaskType::ResolveNative { native_fn } => { + LocalTaskType::run_resolve_native( + native_fn, + ty.this, + &*ty.arg, + persistence, + this, + ) + .await + } + LocalTaskType::ResolveTrait { trait_method } => { + LocalTaskType::run_resolve_trait( + trait_method, + ty.this.unwrap(), + &*ty.arg, + persistence, + this, + ) + .await + } }; - done_event - }); - done_event.notify(usize::MAX) - } - .instrument(span) + + let output = match result { + Ok(raw_vc) => OutputContent::Link(raw_vc), + Err(err) => OutputContent::Error( + TurboTasksExecutionError::from(err) + .with_local_task_context(task_type.to_string()), + ), + }; + + let local_task = LocalTask::Done { output }; + + let done_event = CURRENT_TASK_STATE.with(move |gts| { + let mut gts_write = gts.write().unwrap(); + let scheduled_task = std::mem::replace( + gts_write.get_mut_local_task(local_task_id), + local_task, + ); + let LocalTask::Scheduled { done_event } = scheduled_task else { + panic!( + "local task finished, but was not in the scheduled state?" + ); + }; + done_event + }); + done_event.notify(usize::MAX) + } + .instrument(span), + ) .await }; let future = CURRENT_TASK_STATE.scope(global_task_state, future);