Skip to content

Commit 2904599

Browse files
authored
Optimize how we track data for persistence (#89370)
## Summary Reworks how the turbo-tasks backend tracks modified tasks for persistence snapshots, reducing overhead and simplifying the snapshot lifecycle. **Key changes:** - **Replace `modified` DashMap with per-shard atomic counters + inline flags.** Instead of maintaining a separate `FxDashMap<TaskId, ModifiedState>` that mirrors every modification, track modifications via flags already on `TaskStorage` and use per-shard `AtomicU64` counters to skip unmodified shards during snapshot iteration. This eliminates a major source of memory overhead. - The downside here is needing to scan the map. per shard counters enable early exits but we will still need to scan entire shards. For a large site that means scanning thousands of - **Merge task cache writes into the snapshot pipeline.** New tasks now carry a `new_task` flag and their type hash in `SnapshotItem`, so task cache entries are written in the same batch as task data/meta — removing the separate `persisted_task_cache_log` (`Sharded<ChunkedVec<...>>`) and its associated locking. - **Remove `local_is_partial` optimization.** The backing storage layer already short-circuits on empty databases, and new tasks eagerly set `restored` flags at allocation time, making this redundant. - **Simplify `end_snapshot`.** Instead of a multi-pass retain/iterate/update cycle over the `modified` map, `end_snapshot` now just drains the small `snapshots` map (only tasks concurrently accessed during snapshot mode) and promotes their `modified_during_snapshot` flags. - **Delete unused utilities.** Removes `Sharded`, `ChunkedVec` (from backing_storage), and `swap_retain` import now that they're no longer needed. **Other cleanups:** - `initialize_new_task` sets restored + new_task flags at allocation time for both persistent and transient tasks - Fuzz test updated to use `active_tracking: true` and `StorageMode::ReadWrite` - New KV storage test for batch write+flush+reopen pattern - Minor fix: `SmallVec::into_boxed_slice()` instead of `into_vec().into_boxed_slice()` ## Build Benchmark Results Measured over 9 runs (1 warm-up discarded), macOS, `TURBOPACK_PERSISTENT_CACHE=1`. ### Cold build (`rm -rf .next/`) | | Time (avg) | Time (stddev) | MaxRSS (avg) | |---|---|---|---| | HEAD | 75.48s | 0.72s | 24,231 MiB | | This PR | 75.20s | 1.98s | 23,829 MiB | | **Delta** | −0.28s | — | **−402 MiB (−1.7%)** | ### Warm build (single file edit) | | Time (avg) | MaxRSS (avg) | |---|---|---| | HEAD | 23.79s | 8,764 MiB | | This PR | 23.67s | 8,766 MiB | | **Delta** | −0.12s | flat | Cold build time difference is within noise (< 1 stddev). The meaningful improvement is a **~400 MiB reduction in peak memory on cold builds**, consistent with the fixed overhead this PR targets. Warm builds are unaffected as expected.
1 parent 18430c7 commit 2904599

16 files changed

Lines changed: 539 additions & 570 deletions

File tree

test/e2e/filesystem-cache/next.config.js

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,10 @@ const nextConfig = {
1717
},
1818
},
1919
},
20-
experimental: enableCaching
21-
? {
22-
turbopackFileSystemCacheForBuild: true,
23-
}
24-
: {
25-
turbopackFileSystemCacheForDev: false,
26-
turbopackFileSystemCacheForBuild: false,
27-
},
20+
experimental: {
21+
turbopackFileSystemCacheForBuild: enableCaching,
22+
turbopackFileSystemCacheForDev: enableCaching,
23+
},
2824
env: {
2925
NEXT_PUBLIC_CONFIG_ENV: 'hello world',
3026
},

test/integration/config-output-export/test/index.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ const runDev = async (config: any, shouldWaitForReady = true) => {
2323
const port = await findPort()
2424
const obj = { port, stdout: '', stderr: '' }
2525
app = await launchApp(appDir, port, {
26-
stdout: false,
2726
onStdout(msg: string) {
2827
obj.stdout += msg || ''
2928
},
30-
stderr: false,
3129
onStderr(msg: string) {
3230
obj.stderr += msg || ''
3331
},

turbopack/crates/turbo-persistence/src/value_buf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ impl ValueBuffer<'_> {
1313
match self {
1414
ValueBuffer::Borrowed(b) => b.into(),
1515
ValueBuffer::Vec(v) => v.into_boxed_slice(),
16-
ValueBuffer::SmallVec(sv) => sv.into_vec().into_boxed_slice(),
16+
ValueBuffer::SmallVec(sv) => sv.into_boxed_slice(),
1717
}
1818
}
1919
}

turbopack/crates/turbo-tasks-backend/fuzz/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ libfuzzer-sys = "0.4.9"
1616
once_cell = { workspace = true }
1717
serde = { workspace = true }
1818
tokio = { workspace = true, features = ["full"] }
19-
turbo-tasks = { workspace = true }
19+
turbo-tasks = { workspace = true, features = ["non_operation_vc_strongly_consistent"] }
2020
turbo-tasks-backend = { workspace = true }
2121
turbo-tasks-malloc = { workspace = true }
2222

turbopack/crates/turbo-tasks-backend/fuzz/src/graph.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,17 @@ pub fn run(data: Vec<TaskSpec>) {
133133
struct Iteration(State<usize>);
134134

135135
fn actual_operation(spec: Arc<Vec<TaskSpec>>, iterations: usize) {
136-
let tt = TurboTasks::new(turbo_tasks_backend::TurboTasksBackend::new(
137-
turbo_tasks_backend::BackendOptions {
138-
storage_mode: None,
139-
small_preallocation: true,
140-
..Default::default()
141-
},
142-
turbo_tasks_backend::noop_backing_storage(),
143-
));
144136
RUNTIME
145137
.block_on(async {
138+
let tt = TurboTasks::new(turbo_tasks_backend::TurboTasksBackend::new(
139+
turbo_tasks_backend::BackendOptions {
140+
storage_mode: Some(turbo_tasks_backend::StorageMode::ReadWrite),
141+
small_preallocation: false,
142+
active_tracking: true,
143+
..Default::default()
144+
},
145+
turbo_tasks_backend::noop_backing_storage(),
146+
));
146147
for i in 0..iterations {
147148
let spec = spec.clone();
148149
tt.run(async move {

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,18 @@ use crate::{
6363
storage::Storage,
6464
storage_schema::{TaskStorage, TaskStorageAccessors},
6565
},
66-
backing_storage::{BackingStorage, SnapshotItem},
66+
backing_storage::{BackingStorage, SnapshotItem, compute_task_type_hash},
6767
data::{
6868
ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
6969
InProgressState, InProgressStateInner, OutputValue, TransientTask,
7070
},
7171
error::TaskError,
7272
utils::{
7373
arc_or_owned::ArcOrOwned,
74-
chunked_vec::ChunkedVec,
7574
dash_map_drop_contents::drop_contents,
7675
dash_map_raw_entry::{RawEntry, raw_entry},
7776
ptr_eq_arc::PtrEqArc,
7877
shard_amount::compute_shard_amount,
79-
sharded::Sharded,
80-
swap_retain,
8178
},
8279
};
8380

@@ -167,8 +164,6 @@ pub enum TurboTasksBackendJob {
167164

168165
pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);
169166

170-
type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
171-
172167
struct TurboTasksBackendInner<B: BackingStorage> {
173168
options: BackendOptions,
174169

@@ -177,15 +172,10 @@ struct TurboTasksBackendInner<B: BackingStorage> {
177172
persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
178173
transient_task_id_factory: IdFactoryWithReuse<TaskId>,
179174

180-
persisted_task_cache_log: Option<TaskCacheLog>,
181175
task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,
182176

183177
storage: Storage,
184178

185-
/// When true, the backing_storage has data that is not in the local storage.
186-
/// This is determined once at startup and never changes.
187-
local_is_partial: bool,
188-
189179
/// Number of executing operations + Highest bit is set when snapshot is
190180
/// requested. When that bit is set, operations should pause until the
191181
/// snapshot is completed. When the bit is set and in progress counter
@@ -235,10 +225,6 @@ impl<B: BackingStorage> TurboTasksBackend<B> {
235225
impl<B: BackingStorage> TurboTasksBackendInner<B> {
236226
pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
237227
let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
238-
let need_log = matches!(
239-
options.storage_mode,
240-
Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
241-
);
242228
if !options.dependency_tracking {
243229
options.active_tracking = false;
244230
}
@@ -257,9 +243,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
257243
TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
258244
TaskId::MAX,
259245
),
260-
persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
261246
task_cache: FxDashMap::default(),
262-
local_is_partial: next_task_id != TaskId::MIN,
263247
storage: Storage::new(shard_amount, small_preallocation),
264248
in_progress_operations: AtomicUsize::new(0),
265249
snapshot_request: Mutex::new(SnapshotRequest::new()),
@@ -1036,12 +1020,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
10361020
.map(|op| op.arc().clone())
10371021
.collect::<Vec<_>>();
10381022
}
1039-
self.storage.start_snapshot();
1040-
let mut persisted_task_cache_log = self
1041-
.persisted_task_cache_log
1042-
.as_ref()
1043-
.map(|l| l.take(|i| i))
1044-
.unwrap_or_default();
1023+
// Enter snapshot mode, which atomically reads and resets the modified count.
1024+
// Checking after start_snapshot ensures no concurrent increments can race.
1025+
let (snapshot_guard, has_modifications) = self.storage.start_snapshot();
10451026
let mut snapshot_request = self.snapshot_request.lock();
10461027
snapshot_request.snapshot_requested = false;
10471028
self.in_progress_operations
@@ -1050,6 +1031,13 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
10501031
let snapshot_time = Instant::now();
10511032
drop(snapshot_request);
10521033

1034+
if !has_modifications {
1035+
// No tasks modified since the last snapshot — drop the guard (which
1036+
// calls end_snapshot) and skip the expensive O(N) scan.
1037+
drop(snapshot_guard);
1038+
return Some((start, false));
1039+
}
1040+
10531041
#[cfg(feature = "print_cache_item_size")]
10541042
#[derive(Default)]
10551043
struct TaskCacheStats {
@@ -1278,35 +1266,49 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
12781266
} else {
12791267
None
12801268
};
1269+
let task_type_hash = if inner.flags.new_task() {
1270+
let task_type = inner.get_persistent_task_type().expect(
1271+
"It is not possible for a new_task to not have a persistent_task_type. Task \
1272+
creation for persistent tasks uses a single ExecutionContextImpl for \
1273+
creating the task (which sets new_task) and connect_child (which sets \
1274+
persistent_task_type) and take_snapshot waits for all operations to complete \
1275+
or suspend before we start snapshotting. So task creation will always set \
1276+
the task_type.",
1277+
);
1278+
Some(compute_task_type_hash(task_type))
1279+
} else {
1280+
None
1281+
};
12811282

12821283
SnapshotItem {
12831284
task_id,
12841285
meta,
12851286
data,
1287+
task_type_hash,
12861288
}
12871289
};
12881290

12891291
// take_snapshot already filters empty items and empty shards in parallel
1290-
let task_snapshots = self.storage.take_snapshot(&process);
1291-
1292-
swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
1292+
let task_snapshots = self.storage.take_snapshot(snapshot_guard, &process);
12931293

12941294
drop(snapshot_span);
12951295
let snapshot_duration = start.elapsed();
12961296
let task_count = task_snapshots.len();
12971297

1298-
if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
1298+
if task_snapshots.is_empty() {
1299+
// This should be impossible — if we got here, modified_count was nonzero, and every
1300+
// modification that increments the count also failed during encoding.
1301+
std::hint::cold_path();
12991302
return Some((snapshot_time, false));
13001303
}
13011304

13021305
let persist_start = Instant::now();
13031306
let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
13041307
{
1305-
if let Err(err) = self.backing_storage.save_snapshot(
1306-
suspended_operations,
1307-
persisted_task_cache_log,
1308-
task_snapshots,
1309-
) {
1308+
if let Err(err) = self
1309+
.backing_storage
1310+
.save_snapshot(suspended_operations, task_snapshots)
1311+
{
13101312
eprintln!("Persisting failed: {err:?}");
13111313
return None;
13121314
}
@@ -1551,7 +1553,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
15511553
let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
15521554
RawEntry::Occupied(e) => {
15531555
// Another thread beat us to creating this task - use their task_id.
1554-
// They will handle logging to persisted_task_cache_log.
1556+
// They will handle logging the new task as modified
15551557
let task_id = *e.get();
15561558
drop(e);
15571559
self.track_cache_hit(&task_type);
@@ -1562,12 +1564,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
15621564
let task_type = Arc::new(task_type);
15631565
let task_id = self.persisted_task_id_factory.get();
15641566
e.insert(task_type.clone(), task_id);
1567+
// Mark the task as new in storage.
1568+
// Do this after e.insert so we aren't holding the task_cache lock
1569+
self.storage.initialize_new_task(task_id);
15651570
// insert() consumes e, releasing the lock
15661571
self.track_cache_miss(&task_type);
15671572
is_new = true;
1568-
if let Some(log) = &self.persisted_task_cache_log {
1569-
log.lock(task_id).push((task_type.clone(), task_id));
1570-
}
15711573
(task_id, ArcOrOwned::Arc(task_type))
15721574
}
15731575
};
@@ -1637,6 +1639,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
16371639
let task_type = Arc::new(task_type);
16381640
let task_id = self.transient_task_id_factory.get();
16391641
e.insert(task_type.clone(), task_id);
1642+
self.storage.initialize_new_task(task_id);
16401643
self.track_cache_miss(&task_type);
16411644

16421645
if is_root {

turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,7 +1896,7 @@ impl AggregationUpdateQueue {
18961896
/// See detailed comments in that function, follow the STEP numbers.
18971897
fn inner_of_uppers_has_new_follower<T: TaskIdWithOptionalCount + Clone, const N: usize>(
18981898
&mut self,
1899-
ctx: &mut impl ExecuteContext,
1899+
ctx: &mut impl ExecuteContext<'_>,
19001900
new_follower_id: TaskId,
19011901
mut upper_ids: SmallVec<[T; N]>,
19021902
) {
@@ -2618,7 +2618,7 @@ impl AggregationUpdateQueue {
26182618
/// Only used when activeness is tracked.
26192619
fn increase_active_count(
26202620
&mut self,
2621-
ctx: &mut impl ExecuteContext,
2621+
ctx: &mut impl ExecuteContext<'_>,
26222622
task_id: TaskId,
26232623
task_type: Option<Arc<CachedTaskType>>,
26242624
) {
@@ -2637,7 +2637,7 @@ impl AggregationUpdateQueue {
26372637
if let Some(task_type) = task_type
26382638
&& !task.has_persistent_task_type()
26392639
{
2640-
let _ = task.set_persistent_task_type(task_type);
2640+
task.set_persistent_task_type(task_type);
26412641
}
26422642
let state = task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
26432643
let is_new = state.is_empty();

0 commit comments

Comments
 (0)