Skip to content

Commit 62ad6d1

Browse files
committed
fix(platform): improve handling of bursts in running scripts
1 parent c818305 commit 62ad6d1

2 files changed

Lines changed: 213 additions & 99 deletions

File tree

src/js_runtime.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use deno_core::{JsRuntimeForSnapshot, PollEventLoopOptions, RuntimeOptions, scop
1818
use serde::Deserialize;
1919
use std::{
2020
sync::{
21-
Arc, OnceLock,
21+
Arc, Once, OnceLock,
2222
atomic::{AtomicBool, AtomicUsize, Ordering},
2323
},
2424
time::{Duration, Instant},
@@ -73,14 +73,34 @@ globalThis.secutils = {
7373
/// which is exactly what we want.
7474
static STARTUP_SNAPSHOT: OnceLock<&'static [u8]> = OnceLock::new();
7575

76+
/// Guards a single `deno_core::JsRuntime::init_platform` call for the whole process.
77+
static PLATFORM_INIT: Once = Once::new();
78+
79+
/// Initialises the V8 platform exactly once, no matter which entry point gets there first.
80+
/// `JsRuntime::init_platform` (the server boot path) and the lazy snapshot build (the path tests
81+
/// and benches hit when they call `execute_script` without an explicit platform init) both funnel
82+
/// through here. The V8 platform must be installed *before* the first isolate is created, otherwise
83+
/// concurrent isolate creation across worker threads races on V8's implicit default-platform setup
84+
/// and trips a fatal `v8::HandleScope::CreateHandle` ("Cannot create a handle without a
85+
/// HandleScope") during GC.
86+
fn ensure_platform() {
87+
PLATFORM_INIT.call_once(|| {
88+
deno_core::JsRuntime::init_platform(None);
89+
});
90+
}
91+
7692
fn build_startup_snapshot() -> &'static [u8] {
77-
// The snapshot intentionally does not bake in `secutils_ext` or any JS
78-
// modules: baking ops into a snapshot requires the snapshotting runtime
79-
// to register the exact same op layout at runtime (a minefield of subtle
80-
// version/feature mismatches). What we capture here is the expensive
81-
// part - the V8 context setup and builtin JS globals. `secutils_ext` is
82-
// still registered at `JsRuntime::new` time per invocation, but on top
83-
// of a warm, pre-initialised context.
93+
// The V8 platform has to be live before we create the snapshotting isolate (the first isolate
94+
// in the process on the lazy path). Doing it here means every consumer - server, benches, and
95+
// the test binary - gets a properly initialised platform regardless of whether
96+
// `JsRuntime::init_platform` was called explicitly.
97+
ensure_platform();
98+
99+
// The snapshot intentionally does not bake in `secutils_ext` or any JS modules: baking ops into
100+
// a snapshot requires the snapshotting runtime to register the exact same op layout at runtime
101+
// (a minefield of subtle version/feature mismatches). What we capture here is the expensive
102+
// part - the V8 context setup and builtin JS globals. `secutils_ext` is still registered at
103+
// `JsRuntime::new` time per invocation, but on top of a warm, pre-initialised context.
84104
let runtime = JsRuntimeForSnapshot::new(RuntimeOptions::default());
85105
let snapshot = runtime.snapshot();
86106
Box::leak(snapshot) as &[u8]
@@ -198,7 +218,7 @@ impl JsRuntime {
198218
/// snapshot, and eagerly spins up the worker pool. Should be called exactly
199219
/// once, from the main thread, during server startup.
200220
pub fn init_platform() {
201-
deno_core::JsRuntime::init_platform(None);
221+
ensure_platform();
202222
// Build the snapshot on the main thread before any worker boots so the
203223
// first script execution on each worker does not pay for it. V8 requires
204224
// the snapshotting isolate to run on a single thread, which is why we

src/js_runtime/worker_pool.rs

Lines changed: 184 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,51 @@
1-
//! A pool of long-lived worker threads, each owning a persistent
2-
//! `CurrentThread` tokio runtime + `LocalSet`. Script executions are
3-
//! dispatched round-robin to workers over an unbounded mpsc channel and run
4-
//! *concurrently* within each worker (each task is `spawn_local`-ed), so a
5-
//! script that parks on an async op - e.g. a `secutils.kv.watch` long-poll
6-
//! that can idle for tens of seconds - never blocks the other scripts sharing
7-
//! its worker thread. CPU-bound work is still cooperative: a script doing
8-
//! synchronous V8 work holds the thread until its next await point.
1+
//! An elastic pool of worker threads, each owning a persistent `CurrentThread` tokio runtime. A
2+
//! submitted script runs on exactly one worker thread and is driven to completion there
3+
//! (`block_on`) before that worker accepts the next task - i.e. **at most one V8 isolate is ever
4+
//! live per worker thread at a time**.
95
//!
10-
//! This replaces the previous per-call `spawn_blocking` + `new_current_thread`
11-
//! pattern, which paid the full cost of building a fresh tokio runtime (and
12-
//! its I/O driver, which consumes a kqueue/epoll fd) on every invocation.
6+
//! ## Why one isolate per thread
137
//!
14-
//! Each task still creates a fresh V8 isolate for strong isolation between
15-
//! scripts; reusing the worker thread and its tokio runtime is what yields
16-
//! the steady-state win. Future work (V8 startup snapshot, isolate pooling)
17-
//! can further reduce per-task cost on top of this foundation.
8+
//! A V8 isolate is pinned to the thread that created it, and V8 tracks the "current isolate" and
9+
//! the active `HandleScope` in *thread-local* state. Interleaving the event loops of two isolates
10+
//! on a single OS thread (e.g. by `spawn_local`-ing several script futures onto one `LocalSet`)
11+
//! corrupts that thread-local state: when one isolate parks on an `await` and another isolate
12+
//! resumes on the same thread, V8 can try to create a handle while the thread's current
13+
//! `HandleScope` belongs to the *other* isolate, aborting the process with a fatal
14+
//! `v8::HandleScope::CreateHandle` ("Cannot create a handle without a HandleScope"). Debug V8
15+
//! asserts this eagerly, release builds elide the check but the underlying state corruption is
16+
//! still UB.
1817
//!
19-
//! Note: we deliberately keep an unbounded channel here. Back-pressure for
20-
//! user-visible workloads is already applied upstream (e.g., the
21-
//! `max_concurrent_responder_requests` semaphore in the responder handler),
22-
//! and queueing cheap `ScriptTask`s is preferable to blocking the producing
23-
//! future on `mpsc::Sender::send`.
18+
//! So concurrency is achieved by running each concurrent script on its own thread, never by sharing
19+
//! a thread between two isolates.
20+
//!
21+
//! ## Elasticity
22+
//!
23+
//! A baseline of `min_workers` threads is pre-spawned and kept warm so the common case pays no
24+
//! thread-creation cost. When every worker is busy (for instance because several scripts are parked
25+
//! on a `secutils.kv.watch` long-poll that can idle for tens of seconds), additional workers are
26+
//! spawned on demand up to `max_workers`. Overflow workers above the baseline exit after
27+
//! `IDLE_TIMEOUT` of inactivity so a burst of long-polls does not leave threads lingering forever.
28+
//!
29+
//! Back-pressure for user-visible workloads is already applied upstream (e.g. the
30+
//! `max_concurrent_responder_requests` semaphore in the responder handler), so the task queue is
31+
//! unbounded: queueing a cheap `ScriptTask` is preferable to blocking the producing future.
2432
2533
use std::{
34+
collections::VecDeque,
2635
future::Future,
2736
pin::Pin,
28-
sync::{
29-
OnceLock,
30-
atomic::{AtomicUsize, Ordering},
31-
},
37+
sync::{Condvar, Mutex, OnceLock},
38+
time::Duration,
3239
};
33-
use tokio::{runtime::Builder, sync::mpsc, task::LocalSet};
40+
use tokio::runtime::Builder;
3441

35-
/// A boxed, thread-movable closure that, when invoked on a worker thread,
36-
/// yields a (!Send) future performing the actual script work. The future is
37-
/// `!Send` because V8 isolates are tied to the thread that created them;
38-
/// running it inside a `LocalSet` is sufficient.
42+
/// How long an overflow worker (one spawned above `min_workers`) waits for new work before exiting
43+
/// and releasing its thread + tokio runtime.
44+
const IDLE_TIMEOUT: Duration = Duration::from_secs(30);
45+
46+
/// A boxed, thread-movable closure that, when invoked on a worker thread, yields a (!Send) future
47+
/// performing the actual script work. The future is `!Send` because V8 isolates are tied to the
48+
/// thread that created them, it is block_on-ed to completion on the worker that picked up the task.
3949
type TaskBuilder = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + 'static>> + Send>;
4050

4151
/// A unit of work dispatched to a worker. Owns everything it needs, including
@@ -56,82 +66,146 @@ impl ScriptTask {
5666
}
5767
}
5868

59-
/// A round-robin pool of worker threads. Each worker has its own tokio
60-
/// `CurrentThread` runtime and `LocalSet`; tasks submitted to a worker run
61-
/// sequentially in FIFO order, so the pool provides up to `workers.len()`-way
62-
/// parallelism across workers.
69+
/// Mutable pool bookkeeping, guarded by [`Shared::lock`].
70+
struct State {
71+
/// FIFO queue of tasks waiting for a free worker.
72+
tasks: VecDeque<ScriptTask>,
73+
/// Total number of live worker threads (busy + idle).
74+
total: usize,
75+
/// Number of worker threads currently parked waiting for a task.
76+
idle: usize,
77+
}
78+
79+
struct Shared {
80+
lock: Mutex<State>,
81+
/// Signalled when a task is enqueued (wakes one parked worker).
82+
available: Condvar,
83+
}
84+
85+
/// An elastic round-robin-free pool: any idle worker pops the next queued task, so work is
86+
/// naturally balanced across whatever workers are free.
6387
pub struct WorkerPool {
64-
workers: Vec<mpsc::UnboundedSender<ScriptTask>>,
65-
next: AtomicUsize,
88+
shared: &'static Shared,
89+
min_workers: usize,
90+
max_workers: usize,
6691
}
6792

6893
impl WorkerPool {
69-
fn new(num_workers: usize) -> Self {
70-
let num_workers = num_workers.max(1);
71-
let mut workers = Vec::with_capacity(num_workers);
72-
for idx in 0..num_workers {
73-
workers.push(spawn_worker(idx));
74-
}
75-
Self {
76-
workers,
77-
next: AtomicUsize::new(0),
94+
fn new(min_workers: usize, max_workers: usize) -> Self {
95+
let min_workers = min_workers.max(1);
96+
let max_workers = max_workers.max(min_workers);
97+
let shared: &'static Shared = Box::leak(Box::new(Shared {
98+
lock: Mutex::new(State {
99+
tasks: VecDeque::new(),
100+
total: 0,
101+
idle: 0,
102+
}),
103+
available: Condvar::new(),
104+
}));
105+
106+
let pool = Self {
107+
shared,
108+
min_workers,
109+
max_workers,
110+
};
111+
112+
// Pre-spawn the warm baseline so the common path pays no thread-creation latency.
113+
{
114+
let mut state = shared.lock.lock().expect("worker pool mutex poisoned");
115+
for _ in 0..min_workers {
116+
state.total += 1;
117+
spawn_worker(shared, min_workers);
118+
}
78119
}
120+
121+
pool
79122
}
80123

81-
/// Submit a task to the next worker in round-robin order. Fails only if
82-
/// every worker thread has panicked and its receiver has been dropped.
124+
/// Submit a task. The task is enqueued and either handed to an already-idle worker or, if every
125+
/// worker is busy and the pool has not hit its ceiling, picked up by a freshly spawned worker.
126+
/// Returns `Err(task)` only if the pool somehow has no workers and cannot spawn one (it always
127+
/// can in practice), preserving the previous fallible signature for callers.
83128
pub fn submit(&self, task: ScriptTask) -> Result<(), ScriptTask> {
84-
let len = self.workers.len();
85-
let start = self.next.fetch_add(1, Ordering::Relaxed) % len;
86-
// Try workers starting at the round-robin index; fall through to the
87-
// next one only if a worker has crashed (sender closed).
88-
let mut task = Some(task);
89-
for offset in 0..len {
90-
let idx = (start + offset) % len;
91-
let t = task.take().expect("task slot must be populated");
92-
match self.workers[idx].send(t) {
93-
Ok(()) => return Ok(()),
94-
Err(err) => task = Some(err.0),
95-
}
129+
let mut state = self.shared.lock.lock().expect("worker pool mutex poisoned");
130+
state.tasks.push_back(task);
131+
132+
// Grow the pool when there is no idle worker ready to take the task and we still have
133+
// headroom. Otherwise wake a parked worker.
134+
if state.idle == 0 && state.total < self.max_workers {
135+
state.total += 1;
136+
spawn_worker(self.shared, self.min_workers);
137+
} else {
138+
self.shared.available.notify_one();
96139
}
97-
Err(task.expect("task slot must be populated on failure path"))
140+
141+
Ok(())
98142
}
99143
}
100144

101-
fn spawn_worker(index: usize) -> mpsc::UnboundedSender<ScriptTask> {
102-
let (tx, mut rx) = mpsc::unbounded_channel::<ScriptTask>();
103-
let name = format!("js-runtime-worker-{index}");
145+
fn spawn_worker(shared: &'static Shared, min_workers: usize) {
104146
std::thread::Builder::new()
105-
.name(name)
106-
.spawn(move || {
107-
let rt = Builder::new_current_thread()
108-
.enable_all()
109-
.build()
110-
.expect("Failed to build JS runtime worker tokio runtime");
111-
let local = LocalSet::new();
112-
local.spawn_local(async move {
113-
while let Some(task) = rx.recv().await {
114-
// Spawn each task onto the LocalSet so independent scripts
115-
// make progress concurrently on this single thread. A task
116-
// that awaits (DB round-trip, `kv.watch` long-poll, timer)
117-
// yields the thread to its peers instead of blocking them.
118-
let future = (task.build)();
119-
tokio::task::spawn_local(future);
120-
}
121-
});
122-
rt.block_on(local);
123-
})
147+
.name("js-runtime-worker".to_string())
148+
.spawn(move || worker_loop(shared, min_workers))
124149
.expect("Failed to spawn JS runtime worker thread");
125-
tx
150+
}
151+
152+
fn worker_loop(shared: &'static Shared, min_workers: usize) {
153+
let rt = Builder::new_current_thread()
154+
.enable_all()
155+
.build()
156+
.expect("Failed to build JS runtime worker tokio runtime");
157+
158+
loop {
159+
// Acquire the next task, parking while the queue is empty. Overflow workers (those above
160+
// the warm baseline) exit after an idle period.
161+
let task = {
162+
let mut state = shared.lock.lock().expect("worker pool mutex poisoned");
163+
loop {
164+
if let Some(task) = state.tasks.pop_front() {
165+
break Some(task);
166+
}
167+
168+
state.idle += 1;
169+
let (guard, timed_out) = {
170+
let (guard, wait_result) = shared
171+
.available
172+
.wait_timeout(state, IDLE_TIMEOUT)
173+
.expect("worker pool mutex poisoned");
174+
(guard, wait_result.timed_out())
175+
};
176+
state = guard;
177+
state.idle -= 1;
178+
179+
// If a task showed up while we were waking, loop and take it.
180+
if !state.tasks.is_empty() {
181+
continue;
182+
}
183+
184+
// No work, and we timed out: retire this worker, but always keep the warm baseline
185+
// alive.
186+
if timed_out && state.total > min_workers {
187+
state.total -= 1;
188+
break None;
189+
}
190+
}
191+
};
192+
193+
match task {
194+
// `block_on` drives the script to completion on this thread, no other isolate can run
195+
// here until it returns.
196+
Some(task) => rt.block_on((task.build)()),
197+
None => return,
198+
}
199+
}
126200
}
127201

128202
/// Process-wide worker pool shared by every `JsRuntime::execute_script` call.
129203
static POOL: OnceLock<WorkerPool> = OnceLock::new();
130204

131-
/// Worker count for the global pool. Overridable via `SECUTILS_JS_WORKERS` for
132-
/// local experimentation and CI; defaults to the parallelism reported by the
133-
/// OS, with a floor of 2 so even tiny CI boxes keep some concurrency.
134-
fn default_worker_count() -> usize {
205+
/// Warm baseline worker count. Overridable via `SECUTILS_JS_WORKERS` for local experimentation and
206+
/// CI, defaults to the parallelism reported by the OS, with a floor of 2 so even tiny CI boxes keep
207+
/// some concurrency.
208+
fn min_worker_count() -> usize {
135209
if let Ok(raw) = std::env::var("SECUTILS_JS_WORKERS")
136210
&& let Ok(parsed) = raw.parse::<usize>()
137211
&& parsed > 0
@@ -145,14 +219,34 @@ fn default_worker_count() -> usize {
145219
.max(2)
146220
}
147221

222+
/// Hard ceiling on worker threads. The pool grows past the warm baseline to absorb bursts of
223+
/// long-parked scripts (`kv.watch` long-polls), but never beyond this. Overridable vi
224+
/// `SECUTILS_JS_MAX_WORKERS` environment variable.
225+
fn max_worker_count(min: usize) -> usize {
226+
if let Ok(raw) = std::env::var("SECUTILS_JS_MAX_WORKERS")
227+
&& let Ok(parsed) = raw.parse::<usize>()
228+
&& parsed > 0
229+
{
230+
return parsed.max(min);
231+
}
232+
233+
// Generous headroom for concurrent long-polls without risking unbounded thread growth, upstream
234+
// semaphores bound real concurrency well below this.
235+
min.max(512)
236+
}
237+
238+
fn build_pool() -> WorkerPool {
239+
let min = min_worker_count();
240+
WorkerPool::new(min, max_worker_count(min))
241+
}
242+
148243
/// Eagerly initialise the pool (called once from `JsRuntime::init_platform`).
149244
/// Safe to call multiple times: subsequent calls are no-ops.
150245
pub fn init() {
151-
POOL.get_or_init(|| WorkerPool::new(default_worker_count()));
246+
POOL.get_or_init(build_pool);
152247
}
153248

154-
/// Return the shared pool, lazily initialising it with `default_worker_count`
155-
/// workers if `init` has not been called yet.
249+
/// Return the shared pool, lazily initialising it if `init` has not been called.
156250
pub fn global() -> &'static WorkerPool {
157-
POOL.get_or_init(|| WorkerPool::new(default_worker_count()))
251+
POOL.get_or_init(build_pool)
158252
}

0 commit comments

Comments
 (0)