Skip to content

Commit 90f4f1b

Browse files
authored
fix(core): prevent pool.close() hang on forks pool (#1285)
1 parent bf0bd71 commit 90f4f1b

6 files changed

Lines changed: 48 additions & 151 deletions

File tree

packages/core/src/pool/poolRunner.ts

Lines changed: 11 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import {
1313
import type { PoolTask } from './types';
1414

1515
const WORKER_START_TIMEOUT_MS = 90_000;
16-
const WORKER_STOP_TIMEOUT_MS = 60_000;
1716
const MAX_STDERR_MESSAGE_BYTES = 64 * 1024;
1817

1918
function formatCapturedStderr(text: string): string {
@@ -90,8 +89,6 @@ export class PoolRunner {
9089
private startDeferred: Deferred | undefined;
9190
private stopDeferred: Deferred | undefined;
9291
private startTimer: NodeJS.Timeout | undefined;
93-
private stopTimer: NodeJS.Timeout | undefined;
94-
private forceKillTimer: NodeJS.Timeout | undefined;
9592
private lastFatalError: Error | undefined;
9693
/**
9794
* Set when the worker reports `fatal_error` or a transport error. The
@@ -180,14 +177,23 @@ export class PoolRunner {
180177
return this.runTaskInternal('collect', task) as Promise<CollectTaskResult>;
181178
}
182179

180+
/**
181+
* Host owns termination — no IPC handshake. Per-task teardown runs in
182+
* `runInPool`'s own `finally` before `runFinished`, so by `stop()` there
183+
* is nothing process-level to drain. Relying on the worker's own
184+
* `process.exit()` was the rstest#1275 hang.
185+
*/
183186
stop(options?: { force?: boolean }): Promise<void> {
184187
return this.runOperation(async () => {
185188
switch (this.state) {
186189
case 'STOPPED':
187190
case 'IDLE':
188191
return;
189192
case 'STOPPING': {
190-
// Wait for the in-flight stop, then optionally force.
193+
// Wait for the in-flight stop to settle. If the caller asks for
194+
// `force` and the prior stop was graceful, escalate to SIGKILL —
195+
// the prior `await` may have resolved without actually killing
196+
// the child (e.g. SIGTERM masked).
191197
if (this.stopDeferred) {
192198
await this.stopDeferred.promise;
193199
}
@@ -216,34 +222,7 @@ export class PoolRunner {
216222
this.state = 'STOPPING';
217223
this.stopDeferred = createDeferred();
218224

219-
// Best-effort graceful stop. The worker defers its own exit until
220-
// after any in-flight task completes teardown.
221-
try {
222-
this.worker.send({ type: 'stop' });
223-
} catch {
224-
// ignore: worker may already be down
225-
}
226-
227-
// Escalate to SIGTERM after the budget expires, then SIGKILL shortly
228-
// after. On Windows, SIGTERM is unconditionally fatal (the process
229-
// cannot trap it), so sending it immediately would kill workers with
230-
// in-flight tasks. Instead, rely on the IPC `stop` message for
231-
// graceful shutdown and only signal when the worker fails to exit in
232-
// time.
233-
this.stopTimer = setTimeout(() => {
234-
void this.worker.stop().catch(() => undefined);
235-
}, WORKER_STOP_TIMEOUT_MS);
236-
this.stopTimer.unref();
237-
// Arm a hard SIGKILL fallback independently so it fires even if
238-
// SIGTERM is trapped/ignored and `worker.stop()` never resolves.
239-
this.forceKillTimer = setTimeout(() => {
240-
void this.worker.stop({ force: true }).catch(() => undefined);
241-
}, WORKER_STOP_TIMEOUT_MS + 5_000);
242-
this.forceKillTimer.unref();
243-
244-
if (options?.force) {
245-
await this.worker.stop({ force: true });
246-
}
225+
await this.worker.stop({ force: options?.force ?? false });
247226
await this.stopDeferred.promise;
248227
});
249228
}
@@ -351,10 +330,6 @@ export class PoolRunner {
351330
case 'collectFinished':
352331
this.resolveTask('collect', response.taskId, response.result);
353332
return;
354-
case 'stopped':
355-
// Worker acknowledged graceful shutdown — actual transition happens
356-
// in `handleExit`.
357-
return;
358333
case 'fatal_error': {
359334
const error = deserializeError(response.error);
360335
// Mark as crashed BEFORE rejecting. The host's dispatch unwinds via
@@ -384,7 +359,6 @@ export class PoolRunner {
384359
}
385360

386361
private handleExit(code: number | null, signal: NodeJS.Signals | null): void {
387-
this.clearStopTimer();
388362
this.clearStartTimer();
389363

390364
const wasStopping = this.state === 'STOPPING';
@@ -470,15 +444,4 @@ export class PoolRunner {
470444
clearTimeout(this.startTimer);
471445
this.startTimer = undefined;
472446
}
473-
474-
private clearStopTimer(): void {
475-
if (this.stopTimer) {
476-
clearTimeout(this.stopTimer);
477-
this.stopTimer = undefined;
478-
}
479-
if (this.forceKillTimer) {
480-
clearTimeout(this.forceKillTimer);
481-
this.forceKillTimer = undefined;
482-
}
483-
}
484447
}

packages/core/src/pool/protocol.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ export type WorkerRequest =
2020
type: 'collect';
2121
taskId: number;
2222
options: RunWorkerOptions['options'];
23-
}
24-
| { type: 'stop' };
23+
};
2524

2625
export type CollectTaskResult = {
2726
tests: Test[];
@@ -49,7 +48,6 @@ export type WorkerResponse =
4948
result: CollectTaskResult;
5049
memory?: WorkerMemoryReport;
5150
}
52-
| { type: 'stopped' }
5351
| {
5452
type: 'fatal_error';
5553
error: SerializedError;

packages/core/src/pool/workers/forksPoolWorker.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ const BENIGN_IPC_ERROR_CODES = new Set([
1010
'ERR_STREAM_WRITE_AFTER_END',
1111
]);
1212

13+
/**
14+
* SIGKILL escalation budget after SIGTERM. Defensive guard for native
15+
* modules that mask SIGTERM.
16+
*/
17+
const SIGKILL_FALLBACK_MS = 500;
18+
1319
/**
1420
* IPC errors that surface during shutdown but reflect the channel already
1521
* going away, not a genuine failure. Windows additionally surfaces
@@ -93,8 +99,7 @@ export class ForksPoolWorker extends BasePoolWorker {
9399
// Use `exit`, not `close`. `close` waits for all processes holding
94100
// the stdio pipes to release them — if a test spawns a subprocess
95101
// that inherits the worker's stdout/stderr, `close` blocks until
96-
// that grandchild exits too, stalling slot reclaim and potentially
97-
// hanging the pool until WORKER_STOP_TIMEOUT_MS force-kills.
102+
// that grandchild exits too, stalling slot reclaim.
98103
child.on('exit', (code, signal) => {
99104
this.exited = true;
100105
this.emitter.emit('exit', code, signal);
@@ -113,10 +118,11 @@ export class ForksPoolWorker extends BasePoolWorker {
113118

114119
async stop(options?: { force?: boolean }): Promise<void> {
115120
if (!this.hasLiveChild()) return;
116-
await killAndWait(
117-
this.childProcess!,
118-
options?.force ? 'SIGKILL' : 'SIGTERM',
119-
);
121+
if (options?.force) {
122+
await killAndWait(this.childProcess!, 'SIGKILL');
123+
return;
124+
}
125+
await killAndWait(this.childProcess!, 'SIGTERM', SIGKILL_FALLBACK_MS);
120126
}
121127

122128
sendRaw(envelope: Envelope): void {

packages/core/src/runtime/worker/index.ts

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,6 @@ const send = (response: WorkerResponse): void => {
1414
};
1515

1616
let currentTaskId: number | undefined;
17-
let stopRequested = false;
18-
/**
19-
* Set when a stop arrived mid-task. The task handler drains teardown in its
20-
* own `finally` first; only then do we flush `stopped` and exit. Without the
21-
* deferral `process.exit(0)` would orphan env/coverage/mock cleanup and the
22-
* `process.exit` / `process.kill` restoration.
23-
*/
24-
let exitOnTaskIdle = false;
2517
/**
2618
* Set when a task handler has reported `fatal_error` and is on its way to
2719
* exit. Suppresses the bottom-of-the-stack `fatalExit` from racing in with a
@@ -36,11 +28,6 @@ const sendFatalError = (err: unknown): void => {
3628
});
3729
};
3830

39-
const finalizeStop = (): void => {
40-
send({ type: 'stopped' });
41-
setImmediate(() => process.exit(0));
42-
};
43-
4431
/**
4532
* Last-resort handlers. The runtime's `runInPool` registers its own
4633
* uncaught/unhandled handlers that capture errors thrown WHILE a test is
@@ -110,30 +97,12 @@ const runTask = async (
11097
}
11198

11299
currentTaskId = undefined;
113-
if (exitOnTaskIdle) finalizeStop();
114-
};
115-
116-
const requestGracefulStop = (): void => {
117-
if (stopRequested) return;
118-
stopRequested = true;
119-
if (currentTaskId !== undefined) {
120-
// Defer the ack + exit until the task's `finally { await teardown(); }`
121-
// runs. PoolRunner's WORKER_STOP_TIMEOUT_MS escalates to SIGKILL if
122-
// teardown truly hangs, so this deferral is bounded.
123-
exitOnTaskIdle = true;
124-
return;
125-
}
126-
finalizeStop();
127100
};
128101

129-
// SIGTERM shares the same shutdown path under the forks pool. `PoolRunner.stop`
130-
// sends SIGTERM alongside the `stop` envelope; without this handler Node's
131-
// default SIGTERM behavior would terminate the process immediately and skip
132-
// teardown. Under the threads pool the host uses `worker.terminate()` instead
133-
// of signals, so this handler is a no-op there. `setup.ts` may install a
134-
// profiling-specific SIGTERM handler (`--cpu-prof` et al.) that runs first and
135-
// preempts ours, preserving existing profiling semantics.
136-
process.on('SIGTERM', requestGracefulStop);
102+
// No SIGTERM handler — the host owns termination and SIGTERM (default action:
103+
// exit) is what gets us out. Any handler that didn't unconditionally exit
104+
// would defeat that contract (rstest#1275). `setup.ts` may install a
105+
// profiling-specific handler that calls `process.exit()`, which is compatible.
137106

138107
channel.on((message: unknown) => {
139108
if (!isWorkerRequestEnvelope(message)) {
@@ -152,8 +121,5 @@ channel.on((message: unknown) => {
152121
case 'collect':
153122
void runTask('collect', request);
154123
break;
155-
case 'stop':
156-
requestGracefulStop();
157-
break;
158124
}
159125
});

packages/core/tests/pool/fixtures/testWorker.mjs

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -79,32 +79,18 @@ const makeRunResult = (request, extra) => ({
7979
...extra,
8080
});
8181

82-
// Mirrors the real worker: track whether a task is in-flight and defer
83-
// stop until the task completes.
84-
let taskInFlight = false;
85-
let exitOnTaskIdle = false;
86-
87-
const finalizeStop = () => {
88-
send({ type: 'stopped' });
89-
setTimeout(() => process.exit(0), 10);
90-
};
91-
9282
const handleRun = (request) => {
9383
const mode = request.options?.__testMode;
94-
taskInFlight = true;
9584

9685
const finish = (extra) => {
9786
send({
9887
type: 'runFinished',
9988
taskId: request.taskId,
10089
result: makeRunResult(request, extra),
10190
});
102-
taskInFlight = false;
103-
if (exitOnTaskIdle) finalizeStop();
10491
};
10592

10693
if (mode === 'fatal') {
107-
taskInFlight = false;
10894
send({
10995
type: 'fatal_error',
11096
error: {
@@ -189,18 +175,6 @@ const handleCollect = (request) => {
189175
});
190176
};
191177

192-
let stopRequested = false;
193-
const requestGracefulStop = () => {
194-
if (stopRequested) return;
195-
stopRequested = true;
196-
if (taskInFlight) {
197-
// Defer until the in-flight task finishes, just like the real worker.
198-
exitOnTaskIdle = true;
199-
return;
200-
}
201-
finalizeStop();
202-
};
203-
204178
onHostMessage((message) => {
205179
if (!message || message[REQ_TAG] !== true) return;
206180
const request = message.request;
@@ -216,13 +190,7 @@ onHostMessage((message) => {
216190
case 'collect':
217191
handleCollect(request);
218192
break;
219-
case 'stop':
220-
requestGracefulStop();
221-
break;
222193
}
223194
});
224195

225-
// SIGTERM is meaningful only under the forks pool. In threads mode signals
226-
// are delivered to the parent process, not the worker thread, so this
227-
// handler is a harmless no-op there.
228-
process.on('SIGTERM', requestGracefulStop);
196+
// No SIGTERM handler — mirrors worker/index.ts.

packages/core/tests/pool/pool.test.ts

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ describe('Pool - exit-based lifecycle (not close)', () => {
244244
// Task 1: worker spawns a 30s grandchild that inherits stdout/stderr,
245245
// then sends its result and exits. With a `close`-based lifecycle this
246246
// would block slot reclaim until the grandchild exits (30s), causing
247-
// task 2 to hang until WORKER_STOP_TIMEOUT_MS.
247+
// task 2 to hang.
248248
const start = Date.now();
249249
const r1 = await pool.runTest(
250250
createTask('run', { __testMode: 'spawn-orphan' }),
@@ -312,33 +312,29 @@ describe('Pool - failure recovery', () => {
312312
// ── close() behavior ──────────────────────────────────────────────────────
313313

314314
describe('Pool - close()', () => {
315-
it('should not drop in-flight task result', async () => {
316-
// Use isolate: false so the warm-up run establishes a reusable worker.
317-
// The slow task then dispatches instantly on the existing process,
318-
// eliminating the fork+handshake race that makes timer-based
319-
// synchronization unreliable on slow CI.
320-
const pool = new Pool(createPoolOptions({ isolate: false, minWorkers: 1 }));
321-
// Warm up: ensure the worker is started and idle.
322-
await pool.runTest(createTask());
323-
324-
const taskPromise = pool.runTest(
325-
createTask('run', { __testMode: 'slow', __delayMs: 500 }),
326-
);
327-
// The reused worker receives the run request on an already-open IPC
328-
// channel, so a short delay is enough for the message to arrive.
329-
await new Promise((r) => setTimeout(r, 50));
330-
const closePromise = pool.close();
331-
// The task should still resolve (worker sends result before stopping).
332-
const result = await taskPromise;
333-
expect(result.status).toBe('pass');
334-
await closePromise;
335-
});
336-
337315
it('should reject subsequent submissions after close()', async () => {
338316
const pool = new Pool(createPoolOptions());
339317
await pool.close();
340318
await expect(pool.runTest(createTask())).rejects.toThrow(/closed/);
341319
});
320+
321+
// Regression: rstest#1275. The host owns worker termination via SIGTERM
322+
// and does not wait for any IPC ack. Previously the host waited the full
323+
// 60s WORKER_STOP_TIMEOUT_MS plus a 5s SIGKILL escalation for workers
324+
// that couldn't self-exit (e.g. rspack tokio threads ref'ing the loop),
325+
// producing the 60–65s hang reported in the issue.
326+
it('should close promptly under the forks pool (rstest#1275)', async () => {
327+
const pool = new Pool(createPoolOptions({ isolate: false, minWorkers: 1 }));
328+
await pool.runTest(createTask()); // warm up: worker is now alive and idle
329+
330+
const start = Date.now();
331+
await pool.close();
332+
const elapsed = Date.now() - start;
333+
334+
// Idle close should be SIGTERM-fast (sub-second). Allow generous CI
335+
// headroom but require it to be well below the 60s graceful budget.
336+
expect(elapsed).toBeLessThan(5000);
337+
});
342338
});
343339

344340
// ── maxWorkers capacity ─────────────────────────────────────────────────────

0 commit comments

Comments
 (0)