diff --git a/packages/eslint-plugin-runner/src/worker-pool.ts b/packages/eslint-plugin-runner/src/worker-pool.ts index 56ef66835..df5f78f01 100644 --- a/packages/eslint-plugin-runner/src/worker-pool.ts +++ b/packages/eslint-plugin-runner/src/worker-pool.ts @@ -170,7 +170,43 @@ const WORKER_FILE = (() => { const WORKER_EXIT_GRACE_MS = 5_000; /** - * Terminate a worker, closing its piped stdout/stderr FIRST. + * Upper bound on how long `terminateWorker` waits for a piped stream to + * emit `'close'` after `destroy()` before giving up and terminating + * anyway. A normal close lands within a tick; this only guards a wedged + * worker whose stream never closes, so teardown can never stall. + */ +const PIPE_CLOSE_GRACE_MS = 1_000; + +/** + * Destroy one worker stdio pipe and resolve once it has ACTUALLY closed + * — or immediately if the stream is absent / already destroyed, and at + * the latest after `PIPE_CLOSE_GRACE_MS` so a stream that never emits + * `'close'` can't stall the caller. + */ +async function closePipe(stream: Worker['stdout']): Promise { + if (!stream || stream.destroyed) return; + await new Promise((resolveClosed) => { + const done = (): void => { + clearTimeout(timer); + stream.off('close', done); + resolveClosed(); + }; + const timer = setTimeout(done, PIPE_CLOSE_GRACE_MS); + stream.once('close', done); + try { + stream.destroy(); + } catch { + // `destroy()` is not expected to throw synchronously, but guard so + // a faulty stream still settles (clearing the timer + listener) + // instead of leaking the promise, the timer, and the listener. + done(); + } + }); +} + +/** + * Terminate a worker, closing its piped stdout/stderr FIRST and WAITING + * for them to actually close before killing the thread. * * Mitigates a windows-latest-only crash: `worker.terminate()` abruptly * kills the thread while its stdio named pipes are still live, and @@ -179,16 +215,21 @@ const WORKER_EXIT_GRACE_MS = 5_000; * `uncaughtException` / `unhandledRejection` / `process.exit`, so the * only way that child can "exit unexpectedly" is a native fault). It * surfaced only in the high-terminate-churn `worker-pool-e2e` suite. - * Closing our read end first makes the pipe teardown deterministic and - * removes that race — and is sound teardown hygiene on every platform. + * + * `destroy()` alone is NOT enough: it only *initiates* the close, which + * completes on a later tick, so a `terminate()` called synchronously + * after it still races the in-flight teardown. Awaiting each pipe's + * `'close'` SERIALIZES the two — by the time we terminate, the pipes are + * already gone and there is no concurrent teardown left to fault. + * Bounded by `PIPE_CLOSE_GRACE_MS` so a wedged worker can't stall here. + * * Trailing worker output is intentionally dropped (the worker is being * killed); `destroy()` errors are swallowed by the `ignorePipeError` * listeners wired in `spawnWorker`. Returns `terminate()`'s promise so * callers can chain `.then` / `.catch` / `.finally` unchanged. */ -async function terminateWorker(worker: Worker): Promise { - worker.stdout?.destroy(); - worker.stderr?.destroy(); +export async function terminateWorker(worker: Worker): Promise { + await Promise.all([closePipe(worker.stdout), closePipe(worker.stderr)]); return worker.terminate(); } diff --git a/packages/eslint-plugin-runner/tests/regression-coverage.test.ts b/packages/eslint-plugin-runner/tests/regression-coverage.test.ts index 3e277ae92..10ce15623 100644 --- a/packages/eslint-plugin-runner/tests/regression-coverage.test.ts +++ b/packages/eslint-plugin-runner/tests/regression-coverage.test.ts @@ -16,7 +16,7 @@ import path from 'node:path'; import * as espree from 'espree'; import { parseSync } from 'oxc-parser'; -import { WorkerPool } from '../src/worker-pool.js'; +import { WorkerPool, terminateWorker } from '../src/worker-pool.js'; import { createSourceCode } from '../src/source-code/source-code.js'; import { lintFile } from '../src/linter/ecma-language-plugin.js'; import type { LoadedPlugins } from '../src/plugin/plugin-loader.js'; @@ -96,7 +96,7 @@ describe('WorkerPool respawn rejection drains pendingQueue', () => { // Pre-fix: pendingQueue stays full forever. // Post-fix: drainQueueIfAllSlotsDegraded() in the reject // callback resolves siblings with pool_degraded. - await internals.workers[0].worker.terminate(); + await terminateWorker(internals.workers[0].worker); // 5. Race sibling against a generous-but-bounded timeout. Pre-fix // the race resolves with 'timeout'; post-fix sibling settles @@ -1167,7 +1167,7 @@ describe('WorkerPool shutdown does not wait for already-exited workers', () => { const internals = pool as any; // Terminate worker[0]; its 'exit' event fires now. With retryCap=0 // the slot is NOT respawned — it stays in this.workers as dead. - await internals.workers[0].worker.terminate(); + await terminateWorker(internals.workers[0].worker); // Drain pendingQueue cascade. await new Promise((r) => setTimeout(r, 100)); @@ -1913,8 +1913,8 @@ describe('drainQueueIfAllSlotsDegraded does not fire during mid-respawn', () => // Terminate both nearly simultaneously to trigger respawn race. await Promise.all([ - internals.workers[0].worker.terminate(), - internals.workers[1].worker.terminate(), + terminateWorker(internals.workers[0].worker), + terminateWorker(internals.workers[1].worker), ]); void spawnCallNum; @@ -2525,7 +2525,7 @@ describe('WorkerPool: worker crashing mid-init is correctly respawned', () => { const internals = pool as any; const originalSpawn = internals.spawnWorker.bind(internals); let firstSlot: { - worker: { terminate(): Promise }; + worker: import('node:worker_threads').Worker; id: number; } | null = null; let callIdx = 0; @@ -2563,7 +2563,7 @@ describe('WorkerPool: worker crashing mid-init is correctly respawned', () => { // slot 0 in it. Pre-fix `this.workers` was empty until the final // assignment, so the respawn's findIndex returned -1 and the // replacement got terminated as an orphan. - await firstSlot!.worker.terminate(); + await terminateWorker(firstSlot!.worker); // Give the respawn a moment to complete. await new Promise((r) => setTimeout(r, 500)); @@ -3006,7 +3006,7 @@ describe('WorkerPool: shutdown awaits in-flight respawn', () => { }; // Crash the only worker → exit handler kicks off the (slow) respawn. - await internals.workers[0].worker.terminate(); + await terminateWorker(internals.workers[0].worker); // Give the exit handler a tick to enter the respawn branch. await new Promise((r) => setTimeout(r, 50)); expect(respawnStarted).toBe(true); diff --git a/packages/eslint-plugin-runner/tests/worker-pool-e2e-queue.test.ts b/packages/eslint-plugin-runner/tests/worker-pool-e2e-queue.test.ts index 7af8a84e1..5b7e40a87 100644 --- a/packages/eslint-plugin-runner/tests/worker-pool-e2e-queue.test.ts +++ b/packages/eslint-plugin-runner/tests/worker-pool-e2e-queue.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect } from '@rstest/core'; -import { WorkerPool } from '../src/worker-pool.js'; +import { WorkerPool, terminateWorker } from '../src/worker-pool.js'; import type { LintTask } from '../src/worker-pool.js'; import { @@ -122,7 +122,7 @@ describe('WorkerPool end-to-end with a local fixture plugin', () => { // path is to terminate the worker (triggers a real 'exit') after // setting crashCount to the cap. internals.workers[0].crashCount = internals.opts.retryCap; - await internals.workers[0].worker.terminate(); + await terminateWorker(internals.workers[0].worker); const result = await batchP; expect(result).toHaveLength(2); @@ -293,7 +293,7 @@ describe('WorkerPool end-to-end with a local fixture plugin', () => { const firstBatch = pool.lintBatch([task('first.ts', 'const x = null;\n')]); await new Promise((r) => setTimeout(r, 30)); internals.workers[0].crashCount = internals.opts.retryCap; - await internals.workers[0].worker.terminate(); + await terminateWorker(internals.workers[0].worker); const firstResult = await firstBatch; expect(firstResult).toHaveLength(1); diff --git a/packages/eslint-plugin-runner/tests/worker-pool-e2e-resilience.test.ts b/packages/eslint-plugin-runner/tests/worker-pool-e2e-resilience.test.ts index 94fda46af..9d48e023f 100644 --- a/packages/eslint-plugin-runner/tests/worker-pool-e2e-resilience.test.ts +++ b/packages/eslint-plugin-runner/tests/worker-pool-e2e-resilience.test.ts @@ -1,7 +1,8 @@ import { describe, test, expect } from '@rstest/core'; import path from 'node:path'; +import type { Worker } from 'node:worker_threads'; -import { WorkerPool } from '../src/worker-pool.js'; +import { WorkerPool, terminateWorker } from '../src/worker-pool.js'; import type { LintTask } from '../src/worker-pool.js'; import { @@ -124,11 +125,9 @@ describe('WorkerPool end-to-end with a local fixture plugin', () => { await pool.init(); // eslint-disable-next-line @typescript-eslint/no-explicit-any - const workers = (pool as any).workers as Array<{ - worker: { terminate(): Promise }; - }>; + const workers = (pool as any).workers as Array<{ worker: Worker }>; expect(workers.length).toBeGreaterThan(0); - void workers[0].worker.terminate(); + void terminateWorker(workers[0].worker); const shutdownStart = Date.now(); await pool.shutdown();