Skip to content

Commit c7e1d78

Browse files
committed
fix(eslint-plugin-runner): await stdio pipe close before terminating workers
On windows-latest, terminating a worker thread while its stdio pipes are still live lets libuv's concurrent pipe teardown fault below the JS layer (a native abort). rstest's forked test child intercepts uncaughtException/unhandledRejection/process.exit, so this surfaced as "Worker exited unexpectedly" in the high-terminate-churn worker-pool-e2e suite. terminateWorker already destroyed the pipes first, but destroy() only initiates the close; a terminate() right after still races the in-flight teardown. Await each pipe's 'close' (bounded by a 1s grace) so the two are serialized — by the time we terminate, the pipes are gone and there is no concurrent teardown left to fault. Also route the tests' raw worker.terminate() calls through terminateWorker so they exercise the same safe teardown as the production timeout/crash/shutdown paths.
1 parent a9f0099 commit c7e1d78

4 files changed

Lines changed: 62 additions & 22 deletions

File tree

packages/eslint-plugin-runner/src/worker-pool.ts

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,43 @@ const WORKER_FILE = (() => {
170170
const WORKER_EXIT_GRACE_MS = 5_000;
171171

172172
/**
173-
* Terminate a worker, closing its piped stdout/stderr FIRST.
173+
* Upper bound on how long `terminateWorker` waits for a piped stream to
174+
* emit `'close'` after `destroy()` before giving up and terminating
175+
* anyway. A normal close lands within a tick; this only guards a wedged
176+
* worker whose stream never closes, so teardown can never stall.
177+
*/
178+
const PIPE_CLOSE_GRACE_MS = 1_000;
179+
180+
/**
181+
* Destroy one worker stdio pipe and resolve once it has ACTUALLY closed
182+
* — or immediately if the stream is absent / already destroyed, and at
183+
* the latest after `PIPE_CLOSE_GRACE_MS` so a stream that never emits
184+
* `'close'` can't stall the caller.
185+
*/
186+
async function closePipe(stream: Worker['stdout']): Promise<void> {
187+
if (!stream || stream.destroyed) return;
188+
await new Promise<void>((resolveClosed) => {
189+
const done = (): void => {
190+
clearTimeout(timer);
191+
stream.off('close', done);
192+
resolveClosed();
193+
};
194+
const timer = setTimeout(done, PIPE_CLOSE_GRACE_MS);
195+
stream.once('close', done);
196+
try {
197+
stream.destroy();
198+
} catch {
199+
// `destroy()` is not expected to throw synchronously, but guard so
200+
// a faulty stream still settles (clearing the timer + listener)
201+
// instead of leaking the promise, the timer, and the listener.
202+
done();
203+
}
204+
});
205+
}
206+
207+
/**
208+
* Terminate a worker, closing its piped stdout/stderr FIRST and WAITING
209+
* for them to actually close before killing the thread.
174210
*
175211
* Mitigates a windows-latest-only crash: `worker.terminate()` abruptly
176212
* kills the thread while its stdio named pipes are still live, and
@@ -179,16 +215,21 @@ const WORKER_EXIT_GRACE_MS = 5_000;
179215
* `uncaughtException` / `unhandledRejection` / `process.exit`, so the
180216
* only way that child can "exit unexpectedly" is a native fault). It
181217
* surfaced only in the high-terminate-churn `worker-pool-e2e` suite.
182-
* Closing our read end first makes the pipe teardown deterministic and
183-
* removes that race — and is sound teardown hygiene on every platform.
218+
*
219+
* `destroy()` alone is NOT enough: it only *initiates* the close, which
220+
* completes on a later tick, so a `terminate()` called synchronously
221+
* after it still races the in-flight teardown. Awaiting each pipe's
222+
* `'close'` SERIALIZES the two — by the time we terminate, the pipes are
223+
* already gone and there is no concurrent teardown left to fault.
224+
* Bounded by `PIPE_CLOSE_GRACE_MS` so a wedged worker can't stall here.
225+
*
184226
* Trailing worker output is intentionally dropped (the worker is being
185227
* killed); `destroy()` errors are swallowed by the `ignorePipeError`
186228
* listeners wired in `spawnWorker`. Returns `terminate()`'s promise so
187229
* callers can chain `.then` / `.catch` / `.finally` unchanged.
188230
*/
189-
async function terminateWorker(worker: Worker): Promise<number> {
190-
worker.stdout?.destroy();
191-
worker.stderr?.destroy();
231+
export async function terminateWorker(worker: Worker): Promise<number> {
232+
await Promise.all([closePipe(worker.stdout), closePipe(worker.stderr)]);
192233
return worker.terminate();
193234
}
194235

packages/eslint-plugin-runner/tests/regression-coverage.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import path from 'node:path';
1616
import * as espree from 'espree';
1717
import { parseSync } from 'oxc-parser';
1818

19-
import { WorkerPool } from '../src/worker-pool.js';
19+
import { WorkerPool, terminateWorker } from '../src/worker-pool.js';
2020
import { createSourceCode } from '../src/source-code/source-code.js';
2121
import { lintFile } from '../src/linter/ecma-language-plugin.js';
2222
import type { LoadedPlugins } from '../src/plugin/plugin-loader.js';
@@ -96,7 +96,7 @@ describe('WorkerPool respawn rejection drains pendingQueue', () => {
9696
// Pre-fix: pendingQueue stays full forever.
9797
// Post-fix: drainQueueIfAllSlotsDegraded() in the reject
9898
// callback resolves siblings with pool_degraded.
99-
await internals.workers[0].worker.terminate();
99+
await terminateWorker(internals.workers[0].worker);
100100

101101
// 5. Race sibling against a generous-but-bounded timeout. Pre-fix
102102
// the race resolves with 'timeout'; post-fix sibling settles
@@ -1167,7 +1167,7 @@ describe('WorkerPool shutdown does not wait for already-exited workers', () => {
11671167
const internals = pool as any;
11681168
// Terminate worker[0]; its 'exit' event fires now. With retryCap=0
11691169
// the slot is NOT respawned — it stays in this.workers as dead.
1170-
await internals.workers[0].worker.terminate();
1170+
await terminateWorker(internals.workers[0].worker);
11711171
// Drain pendingQueue cascade.
11721172
await new Promise((r) => setTimeout(r, 100));
11731173

@@ -1913,8 +1913,8 @@ describe('drainQueueIfAllSlotsDegraded does not fire during mid-respawn', () =>
19131913

19141914
// Terminate both nearly simultaneously to trigger respawn race.
19151915
await Promise.all([
1916-
internals.workers[0].worker.terminate(),
1917-
internals.workers[1].worker.terminate(),
1916+
terminateWorker(internals.workers[0].worker),
1917+
terminateWorker(internals.workers[1].worker),
19181918
]);
19191919
void spawnCallNum;
19201920

@@ -2525,7 +2525,7 @@ describe('WorkerPool: worker crashing mid-init is correctly respawned', () => {
25252525
const internals = pool as any;
25262526
const originalSpawn = internals.spawnWorker.bind(internals);
25272527
let firstSlot: {
2528-
worker: { terminate(): Promise<number> };
2528+
worker: import('node:worker_threads').Worker;
25292529
id: number;
25302530
} | null = null;
25312531
let callIdx = 0;
@@ -2563,7 +2563,7 @@ describe('WorkerPool: worker crashing mid-init is correctly respawned', () => {
25632563
// slot 0 in it. Pre-fix `this.workers` was empty until the final
25642564
// assignment, so the respawn's findIndex returned -1 and the
25652565
// replacement got terminated as an orphan.
2566-
await firstSlot!.worker.terminate();
2566+
await terminateWorker(firstSlot!.worker);
25672567

25682568
// Give the respawn a moment to complete.
25692569
await new Promise((r) => setTimeout(r, 500));
@@ -3006,7 +3006,7 @@ describe('WorkerPool: shutdown awaits in-flight respawn', () => {
30063006
};
30073007

30083008
// Crash the only worker → exit handler kicks off the (slow) respawn.
3009-
await internals.workers[0].worker.terminate();
3009+
await terminateWorker(internals.workers[0].worker);
30103010
// Give the exit handler a tick to enter the respawn branch.
30113011
await new Promise((r) => setTimeout(r, 50));
30123012
expect(respawnStarted).toBe(true);

packages/eslint-plugin-runner/tests/worker-pool-e2e-queue.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { describe, test, expect } from '@rstest/core';
22

3-
import { WorkerPool } from '../src/worker-pool.js';
3+
import { WorkerPool, terminateWorker } from '../src/worker-pool.js';
44
import type { LintTask } from '../src/worker-pool.js';
55

66
import {
@@ -122,7 +122,7 @@ describe('WorkerPool end-to-end with a local fixture plugin', () => {
122122
// path is to terminate the worker (triggers a real 'exit') after
123123
// setting crashCount to the cap.
124124
internals.workers[0].crashCount = internals.opts.retryCap;
125-
await internals.workers[0].worker.terminate();
125+
await terminateWorker(internals.workers[0].worker);
126126

127127
const result = await batchP;
128128
expect(result).toHaveLength(2);
@@ -293,7 +293,7 @@ describe('WorkerPool end-to-end with a local fixture plugin', () => {
293293
const firstBatch = pool.lintBatch([task('first.ts', 'const x = null;\n')]);
294294
await new Promise((r) => setTimeout(r, 30));
295295
internals.workers[0].crashCount = internals.opts.retryCap;
296-
await internals.workers[0].worker.terminate();
296+
await terminateWorker(internals.workers[0].worker);
297297

298298
const firstResult = await firstBatch;
299299
expect(firstResult).toHaveLength(1);

packages/eslint-plugin-runner/tests/worker-pool-e2e-resilience.test.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { describe, test, expect } from '@rstest/core';
22
import path from 'node:path';
3+
import type { Worker } from 'node:worker_threads';
34

4-
import { WorkerPool } from '../src/worker-pool.js';
5+
import { WorkerPool, terminateWorker } from '../src/worker-pool.js';
56
import type { LintTask } from '../src/worker-pool.js';
67

78
import {
@@ -124,11 +125,9 @@ describe('WorkerPool end-to-end with a local fixture plugin', () => {
124125
await pool.init();
125126

126127
// eslint-disable-next-line @typescript-eslint/no-explicit-any
127-
const workers = (pool as any).workers as Array<{
128-
worker: { terminate(): Promise<number> };
129-
}>;
128+
const workers = (pool as any).workers as Array<{ worker: Worker }>;
130129
expect(workers.length).toBeGreaterThan(0);
131-
void workers[0].worker.terminate();
130+
void terminateWorker(workers[0].worker);
132131

133132
const shutdownStart = Date.now();
134133
await pool.shutdown();

0 commit comments

Comments
 (0)