Skip to content

Commit 7375dff

Browse files
committed
fix notifications
1 parent 3382bf1 commit 7375dff

3 files changed

Lines changed: 607 additions & 148 deletions

File tree

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 33 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod cell_data;
22
mod counter_map;
33
mod operation;
4+
mod snapshot_coordinator;
45
mod storage;
56
pub mod storage_schema;
67

@@ -13,15 +14,15 @@ use std::{
1314
pin::Pin,
1415
sync::{
1516
Arc, LazyLock,
16-
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
17+
atomic::{AtomicBool, AtomicU64, Ordering},
1718
},
1819
time::SystemTime,
1920
};
2021

2122
use anyhow::{Context, Result, bail};
2223
use auto_hash_map::{AutoMap, AutoSet};
2324
use indexmap::IndexSet;
24-
use parking_lot::{Condvar, Mutex};
25+
use parking_lot::Mutex;
2526
use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
2627
use smallvec::{SmallVec, smallvec};
2728
use tokio::time::{Duration, Instant};
@@ -63,6 +64,7 @@ use crate::{
6364
connect_children, get_aggregation_number, get_uppers, is_root_node,
6465
make_task_dirty_internal, prepare_new_children,
6566
},
67+
snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
6668
storage::Storage,
6769
storage_schema::{TaskStorage, TaskStorageAccessors},
6870
},
@@ -75,17 +77,14 @@ use crate::{
7577
utils::{
7678
dash_map_drop_contents::drop_contents,
7779
dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
78-
ptr_eq_arc::PtrEqArc,
7980
shard_amount::compute_shard_amount,
8081
},
8182
};
8283

8384
/// Threshold for parallelizing making dependent tasks dirty.
8485
/// If the number of dependent tasks exceeds this threshold,
8586
/// the operation will be parallelized.
86-
const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;
87-
88-
const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
87+
const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;
8988

9089
/// Configurable idle timeout for snapshot persistence.
9190
/// Defaults to 2 seconds if not set or if the value is invalid.
@@ -97,20 +96,6 @@ static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
9796
.unwrap_or(Duration::from_secs(2))
9897
});
9998

100-
struct SnapshotRequest {
101-
snapshot_requested: bool,
102-
suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
103-
}
104-
105-
impl SnapshotRequest {
106-
fn new() -> Self {
107-
Self {
108-
snapshot_requested: false,
109-
suspended_operations: FxHashSet::default(),
110-
}
111-
}
112-
}
113-
11499
pub enum StorageMode {
115100
/// Queries the storage for cache entries that don't exist locally.
116101
ReadOnly,
@@ -178,25 +163,14 @@ struct TurboTasksBackendInner<B: BackingStorage> {
178163

179164
storage: Storage,
180165

181-
/// Number of executing operations + Highest bit is set when snapshot is
182-
/// requested. When that bit is set, operations should pause until the
183-
/// snapshot is completed. When the bit is set and in progress counter
184-
/// reaches zero, `operations_completed_when_snapshot_requested` is
185-
/// triggered.
186-
in_progress_operations: AtomicUsize,
187-
188-
/// Serializes calls to `snapshot_and_persist`. The internal protocol
189-
/// (snapshot_mode flag, snapshot_request bit, suspended_operations) assumes
190-
/// only one snapshot runs at a time; this mutex enforces that contract.
166+
/// Coordinates the operation/snapshot interleaving protocol. See
167+
/// [`SnapshotCoordinator`] for details.
168+
snapshot_coord: SnapshotCoordinator<AnyOperation>,
169+
/// Serializes calls to `snapshot_and_persist`. The coordinator's
170+
/// `begin_snapshot` asserts that snapshots don't overlap; this mutex
171+
/// enforces that contract for our two callers (background loop and
172+
/// `stop_and_wait`).
191173
snapshot_in_progress: Mutex<()>,
192-
snapshot_request: Mutex<SnapshotRequest>,
193-
/// Condition Variable that is triggered when `in_progress_operations`
194-
/// reaches zero while snapshot is requested. All operations are either
195-
/// completed or suspended.
196-
operations_suspended: Condvar,
197-
/// Condition Variable that is triggered when a snapshot is completed and
198-
/// operations can continue.
199-
snapshot_completed: Condvar,
200174
/// The timestamp of the last started snapshot since [`Self::start_time`].
201175
last_snapshot: AtomicU64,
202176

@@ -251,11 +225,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
251225
),
252226
task_cache: FxDashMap::default(),
253227
storage: Storage::new(shard_amount, small_preallocation),
254-
in_progress_operations: AtomicUsize::new(0),
228+
snapshot_coord: SnapshotCoordinator::new(),
255229
snapshot_in_progress: Mutex::new(()),
256-
snapshot_request: Mutex::new(SnapshotRequest::new()),
257-
operations_suspended: Condvar::new(),
258-
snapshot_completed: Condvar::new(),
259230
last_snapshot: AtomicU64::new(0),
260231
stopping: AtomicBool::new(false),
261232
stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
@@ -278,65 +249,20 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
278249
}
279250

280251
fn suspending_requested(&self) -> bool {
281-
self.should_persist()
282-
&& (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
252+
self.should_persist() && self.snapshot_coord.snapshot_pending()
283253
}
284254

285255
fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
286-
#[cold]
287-
fn operation_suspend_point_cold<B: BackingStorage>(
288-
this: &TurboTasksBackendInner<B>,
289-
suspend: impl FnOnce() -> AnyOperation,
290-
) {
291-
let operation = Arc::new(suspend());
292-
let mut snapshot_request = this.snapshot_request.lock();
293-
if snapshot_request.snapshot_requested {
294-
snapshot_request
295-
.suspended_operations
296-
.insert(operation.clone().into());
297-
let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
298-
assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
299-
if value == SNAPSHOT_REQUESTED_BIT {
300-
this.operations_suspended.notify_all();
301-
}
302-
this.snapshot_completed
303-
.wait_while(&mut snapshot_request, |snapshot_request| {
304-
snapshot_request.snapshot_requested
305-
});
306-
this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
307-
snapshot_request
308-
.suspended_operations
309-
.remove(&operation.into());
310-
}
311-
}
312-
313-
if self.suspending_requested() {
314-
operation_suspend_point_cold(self, suspend);
256+
if self.should_persist() {
257+
self.snapshot_coord.suspend_point(suspend);
315258
}
316259
}
317260

318-
pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
261+
pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
319262
if !self.should_persist() {
320-
return OperationGuard { backend: None };
321-
}
322-
let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
323-
if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
324-
let mut snapshot_request = self.snapshot_request.lock();
325-
if snapshot_request.snapshot_requested {
326-
let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
327-
if value == SNAPSHOT_REQUESTED_BIT {
328-
self.operations_suspended.notify_all();
329-
}
330-
self.snapshot_completed
331-
.wait_while(&mut snapshot_request, |snapshot_request| {
332-
snapshot_request.snapshot_requested
333-
});
334-
self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
335-
}
336-
}
337-
OperationGuard {
338-
backend: Some(self),
263+
return OperationGuard::noop();
339264
}
265+
self.snapshot_coord.begin_operation()
340266
}
341267

342268
fn should_persist(&self) -> bool {
@@ -432,28 +358,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
432358
}
433359
}
434360

435-
pub(crate) struct OperationGuard<'a, B: BackingStorage> {
436-
backend: Option<&'a TurboTasksBackendInner<B>>,
437-
}
438-
439-
impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
440-
fn drop(&mut self) {
441-
if let Some(backend) = self.backend {
442-
let fetch_sub = backend
443-
.in_progress_operations
444-
.fetch_sub(1, Ordering::AcqRel);
445-
debug_assert!(
446-
(fetch_sub & !SNAPSHOT_REQUESTED_BIT) > 0,
447-
"OperationGuard::drop underflow: in_progress_operations was {fetch_sub:#x} before \
448-
decrement"
449-
);
450-
if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
451-
backend.operations_suspended.notify_all();
452-
}
453-
}
454-
}
455-
}
456-
457361
/// Intermediate result of step 1 of task execution completion.
458362
struct TaskExecutionCompletePrepareResult {
459363
pub new_children: FxHashSet<TaskId>,
@@ -1013,37 +917,21 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
1013917
let wall_start = SystemTime::now();
1014918
debug_assert!(self.should_persist());
1015919

1016-
let suspended_operations;
1017-
{
920+
// Enter the snapshot phase: blocks until all in-flight operations
921+
// drain or suspend, then collects the suspended-operations list.
922+
// Holding the phase keeps new operations queued; we drop it after
923+
// `start_snapshot()` so the snapshot mode is entered atomically with
924+
// operations still suspended (preventing missed modifications).
925+
let (snapshot_guard, has_modifications, suspended_operations) = {
1018926
let _span = tracing::info_span!("blocking").entered();
1019-
let mut snapshot_request = self.snapshot_request.lock();
1020-
snapshot_request.snapshot_requested = true;
1021-
let active_operations = self
1022-
.in_progress_operations
1023-
.fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1024-
if active_operations != 0 {
1025-
self.operations_suspended
1026-
.wait_while(&mut snapshot_request, |_| {
1027-
self.in_progress_operations.load(Ordering::Relaxed)
1028-
!= SNAPSHOT_REQUESTED_BIT
1029-
});
1030-
}
1031-
suspended_operations = snapshot_request
1032-
.suspended_operations
1033-
.iter()
1034-
.map(|op| op.arc().clone())
1035-
.collect::<Vec<_>>();
1036-
}
1037-
// Enter snapshot mode, which atomically reads and resets the modified count.
1038-
// Checking after start_snapshot ensures no concurrent increments can race.
1039-
let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
1040-
let mut snapshot_request = self.snapshot_request.lock();
1041-
snapshot_request.snapshot_requested = false;
1042-
self.in_progress_operations
1043-
.fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
1044-
self.snapshot_completed.notify_all();
927+
let mut phase = self.snapshot_coord.begin_snapshot();
928+
let suspended_operations = phase.take_suspended_operations();
929+
let (g, hm) = self.storage.start_snapshot();
930+
// `phase` drops here, releasing the snapshot bit and waking any
931+
// operations parked on `snapshot_completed`.
932+
(g, hm, suspended_operations)
933+
};
1045934
let snapshot_time = Instant::now();
1046-
drop(snapshot_request);
1047935

1048936
if !has_modifications {
1049937
// No tasks modified since the last snapshot — drop the guard (which
@@ -2462,7 +2350,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
24622350
span.record("result", "marked dirty");
24632351
}
24642352

2465-
if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
2353+
if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
24662354
let chunk_size = good_chunk_size(output_dependent_tasks.len());
24672355
let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
24682356
let _ = scope_and_block(chunks.len(), |scope| {

turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use turbo_tasks::{
2626
use self::aggregation_update::ComputeDirtyAndCleanUpdate;
2727
use crate::{
2828
backend::{
29-
EventDescription, OperationGuard, TaskDataCategory, TurboTasksBackend,
30-
TurboTasksBackendInner,
29+
EventDescription, TaskDataCategory, TurboTasksBackend, TurboTasksBackendInner,
30+
snapshot_coordinator::OperationGuard,
3131
storage::{SpecificTaskDataCategory, StorageWriteGuard},
3232
storage_schema::{TaskStorage, TaskStorageAccessors},
3333
},
@@ -168,7 +168,7 @@ impl TaskLockCounter {
168168
pub struct ExecuteContextImpl<'e, B: BackingStorage> {
169169
backend: &'e TurboTasksBackendInner<B>,
170170
turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
171-
_operation_guard: Option<OperationGuard<'e, B>>,
171+
_operation_guard: Option<OperationGuard<'e, AnyOperation>>,
172172
task_lock_counter: TaskLockCounter,
173173
}
174174

0 commit comments

Comments
 (0)