Skip to content

Commit 831cbfb

Browse files
committed
Turbopack: aggregate server HMR into one subscription
Replace per-chunk Server HMR fan-out with a single firehose subscription that diffs every HMR-eligible chunk under the target root in one tick. This significantly cuts the number of tokio task churn on projects with many server chunks and centralizes the diff/clear logic. It leads to a multi-second saving in both cold and warm builds in a large app. A following PR will bring this to client chunks. Rust: - New `aggregate_hmr` module: `AggregateHmrVersion` keyed by chunk path, `merged_partial_update` builder, and `is_hmr_eligible_chunk` (excludes `.map` files, which would force every diff to `Total`). - `Project::all_hmr_version_state` / `all_hmr_update` aggregate over the whole `hmr_root_path`. The seed transition emits an empty `Partial` so the JS consumer doesn't treat it as a restart and wipe handlers the triggering request just populated. Any chunk requiring `Total`/`Missing` escalates the batch. - `VersionedContentMap::hmr_chunks_in_path` lists eligible chunks with their `VersionedContent`. NAPI: - `projectAllHmrEvents(target)` returns a single subscription. JS: - `setupServerHmr` subscribes once via `allHmrEvents` instead of fanning out over `hmrChunkNamesSubscribe`. - `clear()` evicts every chunk under `server/chunks/` from `require.cache` directly rather than tracking subscriptions. This is a bit fragile as it relies on the path prefix and scanning require.cache.
1 parent beee190 commit 831cbfb

9 files changed

Lines changed: 578 additions & 99 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
//! Aggregated HMR: one [`VersionState`] covering every chunk under a target's
2+
//! root, so the dev server can subscribe once instead of per chunk.
3+
//!
4+
//! [`VersionState`]: turbopack_core::version::VersionState
5+
6+
use std::sync::Arc;
7+
8+
use anyhow::Result;
9+
use rustc_hash::FxHashMap;
10+
use turbo_rcstr::RcStr;
11+
use turbo_tasks::{FxIndexMap, ResolvedVc, TraitRef, TryJoinIterExt, Vc};
12+
use turbo_tasks_fs::FileSystemPath;
13+
use turbo_tasks_hash::{Xxh3Hash64Hasher, encode_base64};
14+
use turbopack_core::version::{NotFoundVersion, PartialUpdate, Update, Version, VersionedContent};
15+
16+
use crate::versioned_content_map::VersionedContentMap;
17+
18+
/// One chunk's contribution to an [`AggregateHmrVersion`]: its output path and
19+
/// the versioned content backing it.
20+
pub struct HmrChunkWithContent {
21+
pub path: RcStr,
22+
pub content: ResolvedVc<Box<dyn VersionedContent>>,
23+
}
24+
25+
/// Whether an emitted chunk participates in HMR. Source map (`.map`) files do
26+
/// not: their content fully rewrites on any source change, which would force
27+
/// per-chunk diffs to escalate to `Total`.
28+
pub fn is_hmr_eligible_chunk(name: &str) -> bool {
29+
!name.ends_with(".map")
30+
}
31+
32+
/// Per-chunk versions keyed by path. `id()` hashes sorted entries so it's
33+
/// stable across `FxIndexMap` iteration order. Mirrors `EcmascriptDevChunkListVersion`.
34+
#[turbo_tasks::value(serialization = "skip", shared)]
35+
pub struct AggregateHmrVersion {
36+
#[turbo_tasks(trace_ignore)]
37+
pub versions: FxIndexMap<RcStr, TraitRef<Box<dyn Version>>>,
38+
}
39+
40+
#[turbo_tasks::value_impl]
41+
impl Version for AggregateHmrVersion {
42+
#[turbo_tasks::function]
43+
async fn id(&self) -> Result<Vc<RcStr>> {
44+
let mut entries = self
45+
.versions
46+
.iter()
47+
.map(|(path, version)| {
48+
let path = path.clone();
49+
let version = TraitRef::cell(version.clone());
50+
async move {
51+
let id = version.id().owned().await?;
52+
Ok::<_, anyhow::Error>((path, id))
53+
}
54+
})
55+
.try_join()
56+
.await?;
57+
entries.sort_by(|a, b| a.0.cmp(&b.0));
58+
59+
let mut hasher = Xxh3Hash64Hasher::new();
60+
hasher.write_value(entries.len());
61+
for (path, id) in entries {
62+
hasher.write_value(path.as_str());
63+
hasher.write_value(id.as_str());
64+
}
65+
Ok(Vc::cell(encode_base64(hasher.finish()).into()))
66+
}
67+
}
68+
69+
impl AggregateHmrVersion {
70+
/// Snapshots every HMR-eligible chunk under `root` in `map` into a new
71+
/// [`Version`]. Returns a [`NotFoundVersion`] when no chunks exist yet
72+
/// (e.g. before any endpoints have been written).
73+
pub async fn from_map(
74+
map: Vc<VersionedContentMap>,
75+
root: &FileSystemPath,
76+
) -> Result<Vc<Box<dyn Version>>> {
77+
let chunks = map.hmr_chunks_in_path(root).await?;
78+
if chunks.is_empty() {
79+
return Ok(Vc::upcast(NotFoundVersion::new()));
80+
}
81+
Ok(Vc::upcast(Self::from_chunks(&chunks).await?))
82+
}
83+
84+
/// Snapshots each [`HmrChunkWithContent`]'s [`Version`] into a new
85+
/// [`AggregateHmrVersion`].
86+
pub async fn from_chunks(chunks: &[HmrChunkWithContent]) -> Result<Vc<Self>> {
87+
let versions = chunks
88+
.iter()
89+
.map(|HmrChunkWithContent { path, content }| {
90+
let path = path.clone();
91+
let content = *content;
92+
async move {
93+
let version = content.version().into_trait_ref().await?;
94+
Ok::<_, anyhow::Error>((path, version))
95+
}
96+
})
97+
.try_join()
98+
.await?
99+
.into_iter()
100+
.collect();
101+
Ok(Self { versions }.cell())
102+
}
103+
}
104+
105+
/// Unions one chunk's `EcmascriptMergedUpdate` into the combined `{entries, chunks}`.
106+
/// Both maps are keyed by globally-unique ids, so plain insertion is safe.
107+
pub fn merge_ecmascript_merged_update(
108+
combined_entries: &mut FxHashMap<String, serde_json::Value>,
109+
combined_chunks: &mut FxHashMap<String, serde_json::Value>,
110+
instruction: &serde_json::Value,
111+
) {
112+
let Some(obj) = instruction.as_object() else {
113+
return;
114+
};
115+
if let Some(entries) = obj.get("entries").and_then(|v| v.as_object()) {
116+
for (k, v) in entries {
117+
combined_entries.insert(k.clone(), v.clone());
118+
}
119+
}
120+
if let Some(chunks) = obj.get("chunks").and_then(|v| v.as_object()) {
121+
for (k, v) in chunks {
122+
combined_chunks.insert(k.clone(), v.clone());
123+
}
124+
}
125+
}
126+
127+
/// Builds an `Update::Partial` whose instruction is a combined
128+
/// `EcmascriptMergedUpdate` covering `entries` and `chunks`. Empty maps are
129+
/// omitted so an empty `entries`/`chunks` field never appears in the payload.
130+
///
131+
/// Passing empty maps produces an instruction with only `type:
132+
/// "EcmascriptMergedUpdate"`, used to advance `VersionState` to `to` without
133+
/// the JS consumer applying anything: it sees a `partial` event with nothing
134+
/// to apply and short-circuits.
135+
pub fn merged_partial_update(
136+
to: TraitRef<Box<dyn Version>>,
137+
entries: FxHashMap<String, serde_json::Value>,
138+
chunks: FxHashMap<String, serde_json::Value>,
139+
) -> Update {
140+
let mut instruction = serde_json::Map::new();
141+
instruction.insert(
142+
"type".to_string(),
143+
serde_json::Value::String("EcmascriptMergedUpdate".to_string()),
144+
);
145+
if !entries.is_empty() {
146+
instruction.insert(
147+
"entries".to_string(),
148+
serde_json::Value::Object(entries.into_iter().collect()),
149+
);
150+
}
151+
if !chunks.is_empty() {
152+
instruction.insert(
153+
"chunks".to_string(),
154+
serde_json::Value::Object(chunks.into_iter().collect()),
155+
);
156+
}
157+
Update::Partial(PartialUpdate {
158+
to,
159+
instruction: Arc::new(serde_json::Value::Object(instruction)),
160+
})
161+
}

crates/next-api/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#![feature(arbitrary_self_types_pointers)]
33
#![feature(impl_trait_in_assoc_type)]
44

5+
mod aggregate_hmr;
56
pub mod analyze;
67
mod app;
78
mod asset_hashes_manifest;

crates/next-api/src/project.rs

Lines changed: 153 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use tracing::{Instrument, field::Empty};
3737
use turbo_rcstr::{RcStr, rcstr};
3838
use turbo_tasks::{
3939
Completion, Completions, FxIndexMap, NonLocalValue, OperationValue, OperationVc, ReadRef,
40-
ResolvedVc, State, TransientInstance, TryFlatJoinIterExt, TryJoinIterExt, Vc,
40+
ResolvedVc, State, TraitRef, TransientInstance, TryFlatJoinIterExt, TryJoinIterExt, Vc,
4141
debug::ValueDebugFormat, fxindexmap, trace::TraceRawVcs,
4242
};
4343
use turbo_tasks_env::{EnvMap, ProcessEnv};
@@ -80,7 +80,8 @@ use turbopack_core::{
8080
reference_type::{CommonJsReferenceSubType, ReferenceType},
8181
resolve::{FindContextFileResult, find_context_file},
8282
version::{
83-
NotFoundVersion, OptionVersionedContent, Update, Version, VersionState, VersionedContent,
83+
NotFoundVersion, OptionVersionedContent, PartialUpdate, TotalUpdate, Update, Version,
84+
VersionState, VersionedContent,
8485
},
8586
};
8687
#[cfg(feature = "process_pool")]
@@ -91,6 +92,10 @@ use turbopack_node::worker_threads_backend;
9192
use turbopack_nodejs::NodeJsChunkingContext;
9293

9394
use crate::{
95+
aggregate_hmr::{
96+
AggregateHmrVersion, HmrChunkWithContent, is_hmr_eligible_chunk,
97+
merge_ecmascript_merged_update, merged_partial_update,
98+
},
9499
app::{AppProject, OptionAppProject},
95100
empty::EmptyEndpoint,
96101
entrypoints::Entrypoints,
@@ -2498,14 +2503,158 @@ impl Project {
24982503
}
24992504
}
25002505

2506+
/// Aggregate counterpart to [`Self::hmr_version_state`]: one [`VersionState`]
2507+
/// covering every HMR-eligible chunk under `target`'s root. See
2508+
/// [`Self::all_hmr_update`].
2509+
#[turbo_tasks::function]
2510+
pub async fn all_hmr_version_state(
2511+
self: ResolvedVc<Self>,
2512+
target: HmrTarget,
2513+
session: TransientInstance<()>,
2514+
) -> Result<Vc<VersionState>> {
2515+
if target == HmrTarget::Client {
2516+
bail!("all_hmr_version_state is not yet implemented for the client target");
2517+
}
2518+
2519+
// The session argument keeps this from caching across sessions.
2520+
let _ = session;
2521+
2522+
#[tracing::instrument(
2523+
level = "info",
2524+
name = "get aggregate HMR version",
2525+
skip_all,
2526+
fields(target = %target),
2527+
)]
2528+
#[turbo_tasks::function(operation, root)]
2529+
async fn aggregate_hmr_version_operation(
2530+
this: ResolvedVc<Project>,
2531+
target: HmrTarget,
2532+
) -> Result<Vc<Box<dyn Version>>> {
2533+
let Some(map) = this.await?.versioned_content_map else {
2534+
bail!("must be in dev mode to hmr")
2535+
};
2536+
let root = this.hmr_root_path(target).owned().await?;
2537+
AggregateHmrVersion::from_map(*map, &root).await
2538+
}
2539+
let version_op = aggregate_hmr_version_operation(self, target);
2540+
2541+
// INVALIDATION: untracked initial read; the subscription drives invalidation.
2542+
let state = VersionState::new(
2543+
version_op
2544+
.read_trait_strongly_consistent()
2545+
.untracked()
2546+
.await?,
2547+
)
2548+
.await?;
2549+
Ok(state)
2550+
}
2551+
2552+
/// Aggregate counterpart to [`Self::hmr_update`]: a single `Update` whose
2553+
/// `EcmascriptMergedUpdate` is the union of per-chunk diffs under
2554+
/// `target`'s root.
2555+
///
2556+
/// All-or-nothing restart: any chunk needing `Total`/`Missing` escalates
2557+
/// the whole batch to `Total` (the runtime can't partially restart). New
2558+
/// chunks absent from `from` are skipped; the runtime require()s them on
2559+
/// demand.
2560+
#[turbo_tasks::function]
2561+
pub async fn all_hmr_update(
2562+
self: Vc<Self>,
2563+
target: HmrTarget,
2564+
from: Vc<VersionState>,
2565+
) -> Result<Vc<Update>> {
2566+
if target == HmrTarget::Client {
2567+
bail!("all_hmr_update is not yet implemented for the client target");
2568+
}
2569+
2570+
let Some(map) = self.await?.versioned_content_map else {
2571+
bail!("must be in dev mode to hmr")
2572+
};
2573+
let root = self.hmr_root_path(target).owned().await?;
2574+
let chunks_versioned_content = map.hmr_chunks_in_path(&root).await?;
2575+
2576+
// No chunks to diff yet (e.g. before any endpoints have been written).
2577+
if chunks_versioned_content.is_empty() {
2578+
return Ok(Update::None.cell());
2579+
}
2580+
2581+
// Build `to` up front so we can return it on every escape hatch below.
2582+
let to_aggregate = AggregateHmrVersion::from_chunks(&chunks_versioned_content).await?;
2583+
let to_ref = Vc::upcast::<Box<dyn Version>>(to_aggregate)
2584+
.into_trait_ref()
2585+
.await?;
2586+
2587+
let from_resolved = from.get().to_resolved().await?;
2588+
let Some(from_aggregate) =
2589+
ResolvedVc::try_downcast_type::<AggregateHmrVersion>(from_resolved)
2590+
else {
2591+
// If `from` is `None`, the last state was the initial state. In this case, emit an
2592+
// empty Partial update to advance the state to `to` without triggering a restart
2593+
return Ok(
2594+
merged_partial_update(to_ref, FxHashMap::default(), FxHashMap::default()).cell(),
2595+
);
2596+
};
2597+
let from_aggregate = from_aggregate.await?;
2598+
2599+
// Diff each chunk that exists in `from` against its current version.
2600+
let mut has_new_chunks = false;
2601+
let chunk_updates = chunks_versioned_content
2602+
.into_iter()
2603+
.filter_map(|HmrChunkWithContent { path, content }| {
2604+
let Some(prev) = from_aggregate.versions.get(&path).cloned() else {
2605+
has_new_chunks = true;
2606+
return None;
2607+
};
2608+
Some((path, content, TraitRef::cell(prev)))
2609+
})
2610+
.map(|(path, content, prev)| async move {
2611+
let update = content.update(prev).await?;
2612+
Ok::<_, anyhow::Error>((path, update))
2613+
})
2614+
.try_join()
2615+
.await?;
2616+
2617+
let mut combined_entries: FxHashMap<String, serde_json::Value> = FxHashMap::default();
2618+
let mut combined_chunks: FxHashMap<String, serde_json::Value> = FxHashMap::default();
2619+
for (_path, update) in chunk_updates {
2620+
match &*update {
2621+
Update::None => {}
2622+
Update::Missing | Update::Total(_) => {
2623+
return Ok(Update::Total(TotalUpdate { to: to_ref }).cell());
2624+
}
2625+
Update::Partial(PartialUpdate { instruction, .. }) => {
2626+
merge_ecmascript_merged_update(
2627+
&mut combined_entries,
2628+
&mut combined_chunks,
2629+
instruction,
2630+
);
2631+
}
2632+
}
2633+
}
2634+
2635+
if combined_entries.is_empty() && combined_chunks.is_empty() && !has_new_chunks {
2636+
return Ok(Update::None.cell());
2637+
}
2638+
2639+
Ok(merged_partial_update(to_ref, combined_entries, combined_chunks).cell())
2640+
}
2641+
25012642
/// Gets a list of all HMR chunk names that can be subscribed to for the
25022643
/// specified target. Used by the dev server to set up server-side HMR
25032644
/// subscriptions for all Node.js App Router entries (pages and route
2504-
/// handlers).
2645+
/// handlers). See [`is_hmr_eligible_chunk`] for the eligibility rule.
25052646
#[turbo_tasks::function]
25062647
pub async fn hmr_chunk_names(self: Vc<Self>, target: HmrTarget) -> Result<Vc<Vec<RcStr>>> {
25072648
if let Some(map) = self.await?.versioned_content_map {
2508-
Ok(map.keys_in_path(self.hmr_root_path(target).owned().await?))
2649+
let names = map
2650+
.keys_in_path(self.hmr_root_path(target).owned().await?)
2651+
.await?;
2652+
let filtered: Vec<RcStr> = names
2653+
.iter()
2654+
.filter(|name| is_hmr_eligible_chunk(name))
2655+
.cloned()
2656+
.collect();
2657+
Ok(Vc::cell(filtered))
25092658
} else {
25102659
bail!("must be in dev mode to hmr")
25112660
}

0 commit comments

Comments
 (0)