Skip to content

Commit 6e5d9a1

Browse files
committed
two reliability improvements
1 parent 65340b2 commit 6e5d9a1

2 files changed

Lines changed: 148 additions & 102 deletions

File tree

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ struct TurboTasksBackendInner<B: BackingStorage> {
185185
/// triggered.
186186
in_progress_operations: AtomicUsize,
187187

188+
/// Serializes calls to `snapshot_and_persist`. The internal protocol
189+
/// (snapshot_mode flag, snapshot_request bit, suspended_operations) assumes
190+
/// only one snapshot runs at a time; this mutex enforces that contract.
191+
snapshot_in_progress: Mutex<()>,
188192
snapshot_request: Mutex<SnapshotRequest>,
189193
/// Condition Variable that is triggered when `in_progress_operations`
190194
/// reaches zero while snapshot is requested. All operations are either
@@ -248,6 +252,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
248252
task_cache: FxDashMap::default(),
249253
storage: Storage::new(shard_amount, small_preallocation),
250254
in_progress_operations: AtomicUsize::new(0),
255+
snapshot_in_progress: Mutex::new(()),
251256
snapshot_request: Mutex::new(SnapshotRequest::new()),
252257
operations_suspended: Condvar::new(),
253258
snapshot_completed: Condvar::new(),
@@ -437,6 +442,11 @@ impl<B: BackingStorage> Drop for OperationGuard<'_, B> {
437442
let fetch_sub = backend
438443
.in_progress_operations
439444
.fetch_sub(1, Ordering::AcqRel);
445+
debug_assert!(
446+
(fetch_sub & !SNAPSHOT_REQUESTED_BIT) > 0,
447+
"OperationGuard::drop underflow: in_progress_operations was {fetch_sub:#x} before \
448+
decrement"
449+
);
440450
if fetch_sub - 1 == SNAPSHOT_REQUESTED_BIT {
441451
backend.operations_suspended.notify_all();
442452
}
@@ -992,6 +1002,10 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
9921002
let snapshot_span =
9931003
tracing::trace_span!(parent: parent_span.clone(), "snapshot", reason = reason)
9941004
.entered();
1005+
// Serialize snapshots. The internal protocol (snapshot_mode, snapshot
1006+
// request bit, suspended_operations) assumes only one snapshot runs at
1007+
// a time. Held for the entire snapshot lifecycle.
1008+
let _snapshot_in_progress = self.snapshot_in_progress.lock();
9951009
let start = Instant::now();
9961010
// SystemTime for wall-clock timestamps in trace events (milliseconds
9971011
// since epoch). Instant is monotonic but has no defined epoch, so it

turbopack/crates/turbo-tasks/src/manager.rs

Lines changed: 134 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use std::{
44
future::Future,
55
hash::{BuildHasher, BuildHasherDefault},
66
mem::take,
7+
panic::AssertUnwindSafe,
78
pin::Pin,
9+
process::abort,
810
sync::{
911
Arc, Mutex, RwLock, Weak,
1012
atomic::{AtomicBool, AtomicUsize, Ordering},
@@ -1186,6 +1188,26 @@ impl<B: Backend + 'static> TurboTasks<B> {
11861188

11871189
struct TurboTasksExecutor;
11881190

1191+
/// Run a future that drives backend hooks (`task_execution_canceled`,
1192+
/// `try_start_task_execution`, `task_execution_completed`) without their own
1193+
/// panic boundary. A panic here corrupts backend invariants (per-task locks,
1194+
/// restoration flags, aggregation queues, etc.) and historically manifested as
1195+
/// a silent hang. Abort the process so the panic is loud and immediate.
1196+
async fn abort_on_panic<F: Future>(f: F) -> F::Output {
1197+
use futures::FutureExt;
1198+
match AssertUnwindSafe(f).catch_unwind().await {
1199+
Ok(r) => r,
1200+
Err(_) => {
1201+
eprintln!(
1202+
"[turbo-tasks] task execution loop panicked outside the user-task panic boundary. \
1203+
Backend state is now inconsistent; aborting to avoid a silent hang. See the \
1204+
panic message above."
1205+
);
1206+
abort();
1207+
}
1208+
}
1209+
}
1210+
11891211
impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboTasksExecutor {
11901212
type Future = impl Future<Output = ()> + Send + 'static;
11911213

@@ -1200,63 +1222,69 @@ impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboT
12001222
let this2 = this.clone();
12011223
let this = this.clone();
12021224
let future = async move {
1203-
let mut schedule_again = true;
1204-
while schedule_again {
1205-
// it's okay for execution ids to overflow and wrap, they're just used for
1206-
// an assert
1207-
let execution_id = this.execution_id_factory.wrapping_get();
1208-
let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1209-
task_id,
1210-
execution_id,
1211-
priority,
1212-
false, // in_top_level_task
1213-
)));
1214-
let single_execution_future = async {
1215-
if this.stopped.load(Ordering::Acquire) {
1216-
this.backend.task_execution_canceled(task_id, &*this);
1217-
return false;
1218-
}
1219-
1220-
let Some(TaskExecutionSpec { future, span }) = this
1221-
.backend
1222-
.try_start_task_execution(task_id, priority, &*this)
1223-
else {
1224-
return false;
1225-
};
1226-
1227-
async {
1228-
let result = CaptureFuture::new(future).await;
1229-
1230-
// wait for all spawned local tasks using `local` to finish
1231-
wait_for_local_tasks().await;
1232-
1233-
let result = match result {
1234-
Ok(Ok(raw_vc)) => Ok(raw_vc),
1235-
Ok(Err(err)) => Err(err.into()),
1236-
Err(err) => Err(TurboTasksExecutionError::Panic(Arc::new(err))),
1225+
abort_on_panic(async {
1226+
let mut schedule_again = true;
1227+
while schedule_again {
1228+
// it's okay for execution ids to overflow and wrap, they're just used
1229+
// for an assert
1230+
let execution_id = this.execution_id_factory.wrapping_get();
1231+
let current_task_state = Arc::new(RwLock::new(CurrentTaskState::new(
1232+
task_id,
1233+
execution_id,
1234+
priority,
1235+
false, // in_top_level_task
1236+
)));
1237+
let single_execution_future = async {
1238+
if this.stopped.load(Ordering::Acquire) {
1239+
this.backend.task_execution_canceled(task_id, &*this);
1240+
return false;
1241+
}
1242+
1243+
let Some(TaskExecutionSpec { future, span }) = this
1244+
.backend
1245+
.try_start_task_execution(task_id, priority, &*this)
1246+
else {
1247+
return false;
12371248
};
12381249

1239-
let finished_state = this.finish_current_task_state();
1240-
let cell_counters = CURRENT_TASK_STATE
1241-
.with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
1242-
this.backend.task_execution_completed(
1243-
task_id,
1244-
result,
1245-
&cell_counters,
1246-
#[cfg(feature = "verify_determinism")]
1247-
finished_state.stateful,
1248-
finished_state.has_invalidator,
1249-
&*this,
1250-
)
1251-
}
1252-
.instrument(span)
1253-
.await
1254-
};
1255-
schedule_again = CURRENT_TASK_STATE
1256-
.scope(current_task_state, single_execution_future)
1257-
.await;
1258-
}
1259-
this.finish_foreground_job();
1250+
async {
1251+
let result = CaptureFuture::new(future).await;
1252+
1253+
// wait for all spawned local tasks using `local` to finish
1254+
wait_for_local_tasks().await;
1255+
1256+
let result = match result {
1257+
Ok(Ok(raw_vc)) => Ok(raw_vc),
1258+
Ok(Err(err)) => Err(err.into()),
1259+
Err(err) => {
1260+
Err(TurboTasksExecutionError::Panic(Arc::new(err)))
1261+
}
1262+
};
1263+
1264+
let finished_state = this.finish_current_task_state();
1265+
let cell_counters = CURRENT_TASK_STATE.with(|ts| {
1266+
ts.write().unwrap().cell_counters.take().unwrap()
1267+
});
1268+
this.backend.task_execution_completed(
1269+
task_id,
1270+
result,
1271+
&cell_counters,
1272+
#[cfg(feature = "verify_determinism")]
1273+
finished_state.stateful,
1274+
finished_state.has_invalidator,
1275+
&*this,
1276+
)
1277+
}
1278+
.instrument(span)
1279+
.await
1280+
};
1281+
schedule_again = CURRENT_TASK_STATE
1282+
.scope(current_task_state, single_execution_future)
1283+
.await;
1284+
}
1285+
this.finish_foreground_job();
1286+
})
1287+
.await
12601288
};
12611289

12621290
Either::Left(TURBO_TASKS.scope(this2, future).instrument(span))
@@ -1280,54 +1308,58 @@ impl<B: Backend> Executor<TurboTasks<B>, ScheduledTask, TaskPriority> for TurboT
12801308
trait_method.resolve_span(priority)
12811309
}
12821310
};
1283-
async move {
1284-
let result = match ty.task_type {
1285-
LocalTaskType::ResolveNative { native_fn } => {
1286-
LocalTaskType::run_resolve_native(
1287-
native_fn,
1288-
ty.this,
1289-
&*ty.arg,
1290-
persistence,
1291-
this,
1292-
)
1293-
.await
1294-
}
1295-
LocalTaskType::ResolveTrait { trait_method } => {
1296-
LocalTaskType::run_resolve_trait(
1297-
trait_method,
1298-
ty.this.unwrap(),
1299-
&*ty.arg,
1300-
persistence,
1301-
this,
1302-
)
1303-
.await
1304-
}
1305-
};
1306-
1307-
let output = match result {
1308-
Ok(raw_vc) => OutputContent::Link(raw_vc),
1309-
Err(err) => OutputContent::Error(
1310-
TurboTasksExecutionError::from(err)
1311-
.with_local_task_context(task_type.to_string()),
1312-
),
1313-
};
1314-
1315-
let local_task = LocalTask::Done { output };
1316-
1317-
let done_event = CURRENT_TASK_STATE.with(move |gts| {
1318-
let mut gts_write = gts.write().unwrap();
1319-
let scheduled_task = std::mem::replace(
1320-
gts_write.get_mut_local_task(local_task_id),
1321-
local_task,
1322-
);
1323-
let LocalTask::Scheduled { done_event } = scheduled_task else {
1324-
panic!("local task finished, but was not in the scheduled state?");
1311+
abort_on_panic(
1312+
async move {
1313+
let result = match ty.task_type {
1314+
LocalTaskType::ResolveNative { native_fn } => {
1315+
LocalTaskType::run_resolve_native(
1316+
native_fn,
1317+
ty.this,
1318+
&*ty.arg,
1319+
persistence,
1320+
this,
1321+
)
1322+
.await
1323+
}
1324+
LocalTaskType::ResolveTrait { trait_method } => {
1325+
LocalTaskType::run_resolve_trait(
1326+
trait_method,
1327+
ty.this.unwrap(),
1328+
&*ty.arg,
1329+
persistence,
1330+
this,
1331+
)
1332+
.await
1333+
}
13251334
};
1326-
done_event
1327-
});
1328-
done_event.notify(usize::MAX)
1329-
}
1330-
.instrument(span)
1335+
1336+
let output = match result {
1337+
Ok(raw_vc) => OutputContent::Link(raw_vc),
1338+
Err(err) => OutputContent::Error(
1339+
TurboTasksExecutionError::from(err)
1340+
.with_local_task_context(task_type.to_string()),
1341+
),
1342+
};
1343+
1344+
let local_task = LocalTask::Done { output };
1345+
1346+
let done_event = CURRENT_TASK_STATE.with(move |gts| {
1347+
let mut gts_write = gts.write().unwrap();
1348+
let scheduled_task = std::mem::replace(
1349+
gts_write.get_mut_local_task(local_task_id),
1350+
local_task,
1351+
);
1352+
let LocalTask::Scheduled { done_event } = scheduled_task else {
1353+
panic!(
1354+
"local task finished, but was not in the scheduled state?"
1355+
);
1356+
};
1357+
done_event
1358+
});
1359+
done_event.notify(usize::MAX)
1360+
}
1361+
.instrument(span),
1362+
)
13311363
.await
13321364
};
13331365
let future = CURRENT_TASK_STATE.scope(global_task_state, future);

0 commit comments

Comments
 (0)