Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions packages/eslint-plugin-runner/src/worker-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,43 @@ const WORKER_FILE = (() => {
const WORKER_EXIT_GRACE_MS = 5_000;

/**
* Terminate a worker, closing its piped stdout/stderr FIRST.
* Upper bound on how long `terminateWorker` waits for a piped stream to
* emit `'close'` after `destroy()` before giving up and terminating
* anyway. A normal close lands within a tick; this only guards a wedged
* worker whose stream never closes, so teardown can never stall.
*/
const PIPE_CLOSE_GRACE_MS = 1_000;

/**
* Destroy one worker stdio pipe and resolve once it has ACTUALLY closed
* — or immediately if the stream is absent / already destroyed, and at
* the latest after `PIPE_CLOSE_GRACE_MS` so a stream that never emits
* `'close'` can't stall the caller.
*/
async function closePipe(stream: Worker['stdout']): Promise<void> {
if (!stream || stream.destroyed) return;
await new Promise<void>((resolveClosed) => {
const done = (): void => {
clearTimeout(timer);
stream.off('close', done);
resolveClosed();
};
const timer = setTimeout(done, PIPE_CLOSE_GRACE_MS);
stream.once('close', done);
try {
stream.destroy();
} catch {
// `destroy()` is not expected to throw synchronously, but guard so
// a faulty stream still settles (clearing the timer + listener)
// instead of leaking the promise, the timer, and the listener.
done();
}
});
}

/**
* Terminate a worker, closing its piped stdout/stderr FIRST and WAITING
* for them to actually close before killing the thread.
*
* Mitigates a windows-latest-only crash: `worker.terminate()` abruptly
* kills the thread while its stdio named pipes are still live, and
Expand All @@ -179,16 +215,21 @@ const WORKER_EXIT_GRACE_MS = 5_000;
* `uncaughtException` / `unhandledRejection` / `process.exit`, so the
* only way that child can "exit unexpectedly" is a native fault). It
* surfaced only in the high-terminate-churn `worker-pool-e2e` suite.
* Closing our read end first makes the pipe teardown deterministic and
* removes that race — and is sound teardown hygiene on every platform.
*
* `destroy()` alone is NOT enough: it only *initiates* the close, which
* completes on a later tick, so a `terminate()` called synchronously
* after it still races the in-flight teardown. Awaiting each pipe's
* `'close'` SERIALIZES the two — by the time we terminate, the pipes are
* already gone and there is no concurrent teardown left to fault.
* Bounded by `PIPE_CLOSE_GRACE_MS` so a wedged worker can't stall here.
*
* Trailing worker output is intentionally dropped (the worker is being
* killed); `destroy()` errors are swallowed by the `ignorePipeError`
* listeners wired in `spawnWorker`. Returns `terminate()`'s promise so
* callers can chain `.then` / `.catch` / `.finally` unchanged.
*/
async function terminateWorker(worker: Worker): Promise<number> {
worker.stdout?.destroy();
worker.stderr?.destroy();
export async function terminateWorker(worker: Worker): Promise<number> {
await Promise.all([closePipe(worker.stdout), closePipe(worker.stderr)]);
return worker.terminate();
}

Expand Down
16 changes: 8 additions & 8 deletions packages/eslint-plugin-runner/tests/regression-coverage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import path from 'node:path';
import * as espree from 'espree';
import { parseSync } from 'oxc-parser';

import { WorkerPool } from '../src/worker-pool.js';
import { WorkerPool, terminateWorker } from '../src/worker-pool.js';
import { createSourceCode } from '../src/source-code/source-code.js';
import { lintFile } from '../src/linter/ecma-language-plugin.js';
import type { LoadedPlugins } from '../src/plugin/plugin-loader.js';
Expand Down Expand Up @@ -96,7 +96,7 @@ describe('WorkerPool respawn rejection drains pendingQueue', () => {
// Pre-fix: pendingQueue stays full forever.
// Post-fix: drainQueueIfAllSlotsDegraded() in the reject
// callback resolves siblings with pool_degraded.
await internals.workers[0].worker.terminate();
await terminateWorker(internals.workers[0].worker);

// 5. Race sibling against a generous-but-bounded timeout. Pre-fix
// the race resolves with 'timeout'; post-fix sibling settles
Expand Down Expand Up @@ -1167,7 +1167,7 @@ describe('WorkerPool shutdown does not wait for already-exited workers', () => {
const internals = pool as any;
// Terminate worker[0]; its 'exit' event fires now. With retryCap=0
// the slot is NOT respawned — it stays in this.workers as dead.
await internals.workers[0].worker.terminate();
await terminateWorker(internals.workers[0].worker);
// Drain pendingQueue cascade.
await new Promise((r) => setTimeout(r, 100));

Expand Down Expand Up @@ -1913,8 +1913,8 @@ describe('drainQueueIfAllSlotsDegraded does not fire during mid-respawn', () =>

// Terminate both nearly simultaneously to trigger respawn race.
await Promise.all([
internals.workers[0].worker.terminate(),
internals.workers[1].worker.terminate(),
terminateWorker(internals.workers[0].worker),
terminateWorker(internals.workers[1].worker),
]);
void spawnCallNum;

Expand Down Expand Up @@ -2525,7 +2525,7 @@ describe('WorkerPool: worker crashing mid-init is correctly respawned', () => {
const internals = pool as any;
const originalSpawn = internals.spawnWorker.bind(internals);
let firstSlot: {
worker: { terminate(): Promise<number> };
worker: import('node:worker_threads').Worker;
id: number;
} | null = null;
let callIdx = 0;
Expand Down Expand Up @@ -2563,7 +2563,7 @@ describe('WorkerPool: worker crashing mid-init is correctly respawned', () => {
// slot 0 in it. Pre-fix `this.workers` was empty until the final
// assignment, so the respawn's findIndex returned -1 and the
// replacement got terminated as an orphan.
await firstSlot!.worker.terminate();
await terminateWorker(firstSlot!.worker);

// Give the respawn a moment to complete.
await new Promise((r) => setTimeout(r, 500));
Expand Down Expand Up @@ -3006,7 +3006,7 @@ describe('WorkerPool: shutdown awaits in-flight respawn', () => {
};

// Crash the only worker → exit handler kicks off the (slow) respawn.
await internals.workers[0].worker.terminate();
await terminateWorker(internals.workers[0].worker);
// Give the exit handler a tick to enter the respawn branch.
await new Promise((r) => setTimeout(r, 50));
expect(respawnStarted).toBe(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, test, expect } from '@rstest/core';

import { WorkerPool } from '../src/worker-pool.js';
import { WorkerPool, terminateWorker } from '../src/worker-pool.js';
import type { LintTask } from '../src/worker-pool.js';

import {
Expand Down Expand Up @@ -122,7 +122,7 @@ describe('WorkerPool end-to-end with a local fixture plugin', () => {
// path is to terminate the worker (triggers a real 'exit') after
// setting crashCount to the cap.
internals.workers[0].crashCount = internals.opts.retryCap;
await internals.workers[0].worker.terminate();
await terminateWorker(internals.workers[0].worker);

const result = await batchP;
expect(result).toHaveLength(2);
Expand Down Expand Up @@ -293,7 +293,7 @@ describe('WorkerPool end-to-end with a local fixture plugin', () => {
const firstBatch = pool.lintBatch([task('first.ts', 'const x = null;\n')]);
await new Promise((r) => setTimeout(r, 30));
internals.workers[0].crashCount = internals.opts.retryCap;
await internals.workers[0].worker.terminate();
await terminateWorker(internals.workers[0].worker);

const firstResult = await firstBatch;
expect(firstResult).toHaveLength(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { describe, test, expect } from '@rstest/core';
import path from 'node:path';
import type { Worker } from 'node:worker_threads';

import { WorkerPool } from '../src/worker-pool.js';
import { WorkerPool, terminateWorker } from '../src/worker-pool.js';
import type { LintTask } from '../src/worker-pool.js';

import {
Expand Down Expand Up @@ -124,11 +125,9 @@ describe('WorkerPool end-to-end with a local fixture plugin', () => {
await pool.init();

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const workers = (pool as any).workers as Array<{
worker: { terminate(): Promise<number> };
}>;
const workers = (pool as any).workers as Array<{ worker: Worker }>;
expect(workers.length).toBeGreaterThan(0);
void workers[0].worker.terminate();
void terminateWorker(workers[0].worker);

const shutdownStart = Date.now();
await pool.shutdown();
Expand Down
Loading