Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 33 additions & 134 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod cell_data;
mod counter_map;
mod operation;
mod snapshot_coordinator;
mod storage;
pub mod storage_schema;

Expand All @@ -13,15 +14,15 @@ use std::{
pin::Pin,
sync::{
Arc, LazyLock,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::SystemTime,
};

use anyhow::{Context, Result, bail};
use auto_hash_map::{AutoMap, AutoSet};
use indexmap::IndexSet;
use parking_lot::{Condvar, Mutex};
use parking_lot::Mutex;
use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
use smallvec::{SmallVec, smallvec};
use tokio::time::{Duration, Instant};
Expand Down Expand Up @@ -63,6 +64,7 @@ use crate::{
connect_children, get_aggregation_number, get_uppers, is_root_node,
make_task_dirty_internal, prepare_new_children,
},
snapshot_coordinator::{OperationGuard, SnapshotCoordinator},
storage::Storage,
storage_schema::{TaskStorage, TaskStorageAccessors},
},
Expand All @@ -75,17 +77,14 @@ use crate::{
utils::{
dash_map_drop_contents::drop_contents,
dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard},
ptr_eq_arc::PtrEqArc,
shard_amount::compute_shard_amount,
},
};

/// Threshold for parallelizing making dependent tasks dirty.
/// If the number of dependent tasks exceeds this threshold,
/// the operation will be parallelized.
const DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD: usize = 10000;

const SNAPSHOT_REQUESTED_BIT: usize = 1 << (usize::BITS - 1);
const DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD: usize = 10000;

/// Configurable idle timeout for snapshot persistence.
/// Defaults to 2 seconds if not set or if the value is invalid.
Expand All @@ -97,20 +96,6 @@ static IDLE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
.unwrap_or(Duration::from_secs(2))
});

struct SnapshotRequest {
snapshot_requested: bool,
suspended_operations: FxHashSet<PtrEqArc<AnyOperation>>,
}

impl SnapshotRequest {
fn new() -> Self {
Self {
snapshot_requested: false,
suspended_operations: FxHashSet::default(),
}
}
}

pub enum StorageMode {
/// Queries the storage for cache entries that don't exist locally.
ReadOnly,
Expand Down Expand Up @@ -178,21 +163,14 @@ struct TurboTasksBackendInner<B: BackingStorage> {

storage: Storage,

/// Number of executing operations + Highest bit is set when snapshot is
/// requested. When that bit is set, operations should pause until the
/// snapshot is completed. When the bit is set and in progress counter
/// reaches zero, `operations_completed_when_snapshot_requested` is
/// triggered.
in_progress_operations: AtomicUsize,

snapshot_request: Mutex<SnapshotRequest>,
/// Condition Variable that is triggered when `in_progress_operations`
/// reaches zero while snapshot is requested. All operations are either
/// completed or suspended.
operations_suspended: Condvar,
/// Condition Variable that is triggered when a snapshot is completed and
/// operations can continue.
snapshot_completed: Condvar,
/// Coordinates the operation/snapshot interleaving protocol. See
/// [`SnapshotCoordinator`] for details.
snapshot_coord: SnapshotCoordinator,
/// Serializes calls to `snapshot_and_persist`. The coordinator's
/// `begin_snapshot` asserts that snapshots don't overlap; this mutex
/// enforces that contract for our two callers (background loop and
/// `stop_and_wait`).
snapshot_in_progress: Mutex<()>,
/// The timestamp of the last started snapshot since [`Self::start_time`].
last_snapshot: AtomicU64,

Expand Down Expand Up @@ -247,10 +225,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
),
task_cache: FxDashMap::default(),
storage: Storage::new(shard_amount, small_preallocation),
in_progress_operations: AtomicUsize::new(0),
snapshot_request: Mutex::new(SnapshotRequest::new()),
operations_suspended: Condvar::new(),
snapshot_completed: Condvar::new(),
snapshot_coord: SnapshotCoordinator::new(),
snapshot_in_progress: Mutex::new(()),
last_snapshot: AtomicU64::new(0),
stopping: AtomicBool::new(false),
stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()),
Expand All @@ -273,65 +249,20 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}

fn suspending_requested(&self) -> bool {
self.should_persist()
&& (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0
self.should_persist() && self.snapshot_coord.snapshot_pending()
}

fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
#[cold]
fn operation_suspend_point_cold<B: BackingStorage>(
this: &TurboTasksBackendInner<B>,
suspend: impl FnOnce() -> AnyOperation,
) {
let operation = Arc::new(suspend());
let mut snapshot_request = this.snapshot_request.lock();
if snapshot_request.snapshot_requested {
snapshot_request
.suspended_operations
.insert(operation.clone().into());
let value = this.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
if value == SNAPSHOT_REQUESTED_BIT {
this.operations_suspended.notify_all();
}
this.snapshot_completed
.wait_while(&mut snapshot_request, |snapshot_request| {
snapshot_request.snapshot_requested
});
this.in_progress_operations.fetch_add(1, Ordering::AcqRel);
snapshot_request
.suspended_operations
.remove(&operation.into());
}
}

if self.suspending_requested() {
operation_suspend_point_cold(self, suspend);
if self.should_persist() {
self.snapshot_coord.suspend_point(suspend);
}
}

pub(crate) fn start_operation(&self) -> OperationGuard<'_, B> {
pub(crate) fn start_operation(&self) -> OperationGuard<'_, AnyOperation> {
if !self.should_persist() {
return OperationGuard { backend: None };
}
let fetch_add = self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
if (fetch_add & SNAPSHOT_REQUESTED_BIT) != 0 {
let mut snapshot_request = self.snapshot_request.lock();
if snapshot_request.snapshot_requested {
let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel) - 1;
if value == SNAPSHOT_REQUESTED_BIT {
self.operations_suspended.notify_all();
}
self.snapshot_completed
.wait_while(&mut snapshot_request, |snapshot_request| {
snapshot_request.snapshot_requested
});
self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
}
}
OperationGuard {
backend: Some(self),
return OperationGuard::noop();
}
self.snapshot_coord.begin_operation()
}

fn should_persist(&self) -> bool {
Expand Down Expand Up @@ -427,23 +358,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

pub(crate) struct OperationGuard<'a, B: BackingStorage> {
backend: Option<&'a TurboTasksBackendInner<B>>,
}

impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
fn drop(&mut self) {
if let Some(backend) = self.backend {
let fetch_sub = backend
.in_progress_operations
.fetch_sub(1, Ordering::AcqRel);
if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
backend.operations_suspended.notify_all();
}
}
}
}

/// Intermediate result of step 1 of task execution completion.
struct TaskExecutionCompletePrepareResult {
pub new_children: FxHashSet<TaskId>,
Expand Down Expand Up @@ -992,44 +906,29 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let snapshot_span =
tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
.entered();
// Serialize snapshots. The internal protocol (snapshot_mode, snapshot
// request bit, suspended_operations) assumes only one snapshot runs at
// a time. Held for the entire snapshot lifecycle.
let _snapshot_in_progress = self.snapshot_in_progress.lock();
let start = Instant::now();
// SystemTime for wall-clock timestamps in trace events (milliseconds
// since epoch). Instant is monotonic but has no defined epoch, so it
// can't be used for cross-process trace correlation.
let wall_start = SystemTime::now();
debug_assert!(self.should_persist());

let suspended_operations;
{
let mut snapshot_phase = {
let _span = tracing::info_span!("blocking").entered();
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = true;
let active_operations = self
.in_progress_operations
.fetch_or(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
if active_operations != 0 {
self.operations_suspended
.wait_while(&mut snapshot_request, |_| {
self.in_progress_operations.load(Ordering::Relaxed)
!= SNAPSHOT_REQUESTED_BIT
});
}
suspended_operations = snapshot_request
.suspended_operations
.iter()
.map(|op| op.arc().clone())
.collect::<Vec<_>>();
}
self.snapshot_coord.begin_snapshot()
};
// Enter snapshot mode, which atomically reads and resets the modified count.
// Checking after start_snapshot ensures no concurrent increments can race.
let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = false;
self.in_progress_operations
.fetch_sub(SNAPSHOT_REQUESTED_BIT, Ordering::Relaxed);
self.snapshot_completed.notify_all();

let suspended_operations = snapshot_phase.take_suspended_operations();

let snapshot_time = Instant::now();
drop(snapshot_request);
drop(snapshot_phase);

if !has_modifications {
// No tasks modified since the last snapshot — drop the guard (which
Expand Down Expand Up @@ -2448,7 +2347,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
span.record("result", "marked dirty");
}

if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLIZATION_THRESHOLD {
if output_dependent_tasks.len() > DEPENDENT_TASKS_DIRTY_PARALLELIZATION_THRESHOLD {
let chunk_size = good_chunk_size(output_dependent_tasks.len());
let chunks = into_chunks(output_dependent_tasks.to_vec(), chunk_size);
let _ = scope_and_block(chunks.len(), |scope| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use turbo_tasks::{
use self::aggregation_update::ComputeDirtyAndCleanUpdate;
use crate::{
backend::{
EventDescription, OperationGuard, TaskDataCategory, TurboTasksBackend,
TurboTasksBackendInner,
EventDescription, TaskDataCategory, TurboTasksBackend, TurboTasksBackendInner,
snapshot_coordinator::OperationGuard,
storage::{SpecificTaskDataCategory, StorageWriteGuard},
storage_schema::{TaskStorage, TaskStorageAccessors},
},
Expand Down Expand Up @@ -168,7 +168,7 @@ impl TaskLockCounter {
pub struct ExecuteContextImpl<'e, B: BackingStorage> {
backend: &'e TurboTasksBackendInner<B>,
turbo_tasks: &'e dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
_operation_guard: Option<OperationGuard<'e, B>>,
_operation_guard: Option<OperationGuard<'e, AnyOperation>>,
task_lock_counter: TaskLockCounter,
}

Expand Down
Loading
Loading