From 8bae87264401b1149262292854dbdbddc4dcac27 Mon Sep 17 00:00:00 2001 From: Peter Kasarda Date: Wed, 20 May 2026 17:23:04 +0200 Subject: [PATCH] feat(pool): add pool.memoryLimit to recycle workers by RSS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reused workers (`isolate: false`) accumulate heap monotonically across test files — vendor module caches, JSDOM nodes, React fiber trees, accumulated closures. On long suites this drifts into multi-GB GC-thrash territory and individual tests start running 2-3× slower than they would in a fresh worker. This adds `pool.memoryLimit` (mirrors Vitest's `VmOptions.memoryLimit`) so users can bound the pressure without giving up worker reuse: ```ts defineConfig({ isolate: false, pool: { memoryLimit: '1GB' }, }); ``` Mechanism (zero protocol changes — the worker already reports `memoryUsage().rss` after each task for the existing `MemoryGate` spawn gate): - `PoolRunner` captures `response.memory.rss` from each `runFinished` / `collectFinished` message. - `isUsable()` returns `false` when the last sample exceeds the parsed byte cap. The pool's existing release path then disposes the worker instead of returning it to the idle pool, and the next dispatch spawns a fresh one. - `isolate: true` opts out of the check (workers are single-use, so the cap is dead code there). API: - `pool.memoryLimit?: number | string` — matches Vitest's accepted forms so the contract is familiar: - `number >= 1` → bytes - `number in (0, 1]` → fraction of total system memory - `string` with a unit suffix → `"512MB"`, `"1.5GB"`, `"20%"`, `"1GiB"`, … (case-insensitive). Unknown units, unparseable strings, or non-positive numbers disable the cap rather than erroring out — keeps CI green when a config option is misspelled. Files: - `packages/core/src/pool/parseMemoryLimit.ts` — new, parses the user-facing union to bytes - `packages/core/src/pool/poolRunner.ts` — `lastReportedRssBytes`, `memoryLimitBytes` opt, `isUsable()` cap check - `packages/core/src/pool/pool.ts` — wires the bytes through to `PoolRunner` only when `isolate: false` - `packages/core/src/pool/index.ts` — calls `parseMemoryLimit` once at pool construction - `packages/core/src/pool/types.ts` — `PoolOptions.memoryLimitBytes` - `packages/core/src/types/config.ts` — `RstestPoolOptions.memoryLimit` - `packages/core/tests/pool/parseMemoryLimit.test.ts` — 31 cases covering byte/fraction/unit/percent/whitespace/invalid inputs - `e2e/memory-limit/` — fixture with `memoryLimit: 2` (2 bytes) + `maxWorkers: 1` + 4 files; driver asserts unique `process.pid` per file, proving the cap recycles end-to-end --- e2e/memory-limit/fixtures/rstest.config.mts | 17 ++++ e2e/memory-limit/fixtures/test/f1.test.ts | 10 ++ e2e/memory-limit/fixtures/test/f2.test.ts | 10 ++ e2e/memory-limit/fixtures/test/f3.test.ts | 10 ++ e2e/memory-limit/fixtures/test/f4.test.ts | 10 ++ e2e/memory-limit/fixtures/test/record-pid.ts | 17 ++++ e2e/memory-limit/index.test.ts | 50 ++++++++++ packages/core/src/pool/index.ts | 8 ++ packages/core/src/pool/parseMemoryLimit.ts | 72 ++++++++++++++ packages/core/src/pool/pool.ts | 12 ++- packages/core/src/pool/poolRunner.ts | 33 ++++++- packages/core/src/pool/types.ts | 6 ++ packages/core/src/types/config.ts | 22 +++++ .../core/tests/pool/parseMemoryLimit.test.ts | 99 +++++++++++++++++++ 14 files changed, 374 insertions(+), 2 deletions(-) create mode 100644 e2e/memory-limit/fixtures/rstest.config.mts create mode 100644 e2e/memory-limit/fixtures/test/f1.test.ts create mode 100644 e2e/memory-limit/fixtures/test/f2.test.ts create mode 100644 e2e/memory-limit/fixtures/test/f3.test.ts create mode 100644 e2e/memory-limit/fixtures/test/f4.test.ts create mode 100644 e2e/memory-limit/fixtures/test/record-pid.ts create mode 100644 e2e/memory-limit/index.test.ts create mode 100644 packages/core/src/pool/parseMemoryLimit.ts create mode 100644 packages/core/tests/pool/parseMemoryLimit.test.ts diff --git a/e2e/memory-limit/fixtures/rstest.config.mts b/e2e/memory-limit/fixtures/rstest.config.mts new file mode 100644 index 000000000..af1393de7 --- /dev/null +++ b/e2e/memory-limit/fixtures/rstest.config.mts @@ -0,0 +1,17 @@ +import { defineConfig } from '@rstest/core'; + +export default defineConfig({ + // `isolate: false` is the only mode where `memoryLimit` is consulted + // — workers must be reused for the recycle decision to matter. + isolate: false, + // `memoryLimit: 2` parses as 2 bytes (values > 1 are bytes; values in + // `(0, 1]` would be a fraction of total system memory). Every worker + // is over-limit the instant it reports its first RSS sample, so the + // pool must dispose it before reusing. The fixture then asserts each + // test file ran in a distinct process, proving the cap was honored + // end-to-end. + pool: { + maxWorkers: 1, + memoryLimit: 2, + }, +}); diff --git a/e2e/memory-limit/fixtures/test/f1.test.ts b/e2e/memory-limit/fixtures/test/f1.test.ts new file mode 100644 index 000000000..f1cb6f9cb --- /dev/null +++ b/e2e/memory-limit/fixtures/test/f1.test.ts @@ -0,0 +1,10 @@ +import { describe, expect, it } from '@rstest/core'; +import { recordPid } from './record-pid'; + +recordPid('f1'); + +describe('memory limit recycle — file 1', () => { + it('runs and records its pid', () => { + expect(true).toBe(true); + }); +}); diff --git a/e2e/memory-limit/fixtures/test/f2.test.ts b/e2e/memory-limit/fixtures/test/f2.test.ts new file mode 100644 index 000000000..e724ac436 --- /dev/null +++ b/e2e/memory-limit/fixtures/test/f2.test.ts @@ -0,0 +1,10 @@ +import { describe, expect, it } from '@rstest/core'; +import { recordPid } from './record-pid'; + +recordPid('f2'); + +describe('memory limit recycle — file 2', () => { + it('runs and records its pid', () => { + expect(true).toBe(true); + }); +}); diff --git a/e2e/memory-limit/fixtures/test/f3.test.ts b/e2e/memory-limit/fixtures/test/f3.test.ts new file mode 100644 index 000000000..741d28818 --- /dev/null +++ b/e2e/memory-limit/fixtures/test/f3.test.ts @@ -0,0 +1,10 @@ +import { describe, expect, it } from '@rstest/core'; +import { recordPid } from './record-pid'; + +recordPid('f3'); + +describe('memory limit recycle — file 3', () => { + it('runs and records its pid', () => { + expect(true).toBe(true); + }); +}); diff --git a/e2e/memory-limit/fixtures/test/f4.test.ts b/e2e/memory-limit/fixtures/test/f4.test.ts new file mode 100644 index 000000000..06a2529de --- /dev/null +++ b/e2e/memory-limit/fixtures/test/f4.test.ts @@ -0,0 +1,10 @@ +import { describe, expect, it } from '@rstest/core'; +import { recordPid } from './record-pid'; + +recordPid('f4'); + +describe('memory limit recycle — file 4', () => { + it('runs and records its pid', () => { + expect(true).toBe(true); + }); +}); diff --git a/e2e/memory-limit/fixtures/test/record-pid.ts b/e2e/memory-limit/fixtures/test/record-pid.ts new file mode 100644 index 000000000..5ce310b3e --- /dev/null +++ b/e2e/memory-limit/fixtures/test/record-pid.ts @@ -0,0 +1,17 @@ +import { appendFileSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; + +/** + * Record this file's `process.pid` into a shared log. The driver reads + * the log after the fixture finishes and asserts unique pids — proving + * the worker was recycled between tasks (the configured `memoryLimit: + * 2` byte cap is blown on every task, so every runner is disposed and + * a fresh one spawned). + */ +export const PID_LOG_PATH = + process.env.RSTEST_MEMLIMIT_LOG ?? join(tmpdir(), 'rstest-memory-limit.log'); + +export const recordPid = (fileTag: string): void => { + appendFileSync(PID_LOG_PATH, `${process.pid}\t${fileTag}\n`); +}; diff --git a/e2e/memory-limit/index.test.ts b/e2e/memory-limit/index.test.ts new file mode 100644 index 000000000..02a038abc --- /dev/null +++ b/e2e/memory-limit/index.test.ts @@ -0,0 +1,50 @@ +import { existsSync, readFileSync, unlinkSync } from 'node:fs'; +import { dirname, join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { fileURLToPath } from 'node:url'; +import { describe, expect, it } from '@rstest/core'; +import { runRstestCli } from '../scripts/'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +describe('pool.memoryLimit — heap-based worker recycle', () => { + it('recycles the worker when its RSS exceeds the cap', async ({ + onTestFinished, + }) => { + const logPath = join( + tmpdir(), + `rstest-memory-limit-${process.pid}-${Date.now()}.log`, + ); + onTestFinished(() => { + if (existsSync(logPath)) unlinkSync(logPath); + }); + + const { expectExecSuccess } = await runRstestCli({ + command: 'rstest', + args: ['run'], + onTestFinished, + options: { + nodeOptions: { + cwd: join(__dirname, './fixtures'), + env: { + RSTEST_MEMLIMIT_LOG: logPath, + }, + }, + }, + }); + await expectExecSuccess(); + + // 4 files × `maxWorkers: 1` × `memoryLimit: 1` byte → every worker's + // first reported RSS is over-limit, so the pool must dispose it + // before the next task. Result: each file lands in a distinct + // process.pid. If the cap was ignored (regression), all 4 lines + // would share a pid and this assertion would catch it. + const lines = readFileSync(logPath, 'utf8') + .split('\n') + .filter((l) => l.trim().length > 0); + expect(lines.length).toBe(4); + const pids = new Set(lines.map((line) => line.split('\t')[0])); + expect(pids.size).toBe(4); + }); +}); diff --git a/packages/core/src/pool/index.ts b/packages/core/src/pool/index.ts index 5d4fc66cc..779f20ae6 100644 --- a/packages/core/src/pool/index.ts +++ b/packages/core/src/pool/index.ts @@ -29,6 +29,7 @@ import { import type { TraceEvent } from '../utils/trace'; import { isMemorySufficient } from '../utils/memory'; import { createDefaultMemoryGate } from './memoryGate'; +import { parseMemoryLimit } from './parseMemoryLimit'; import { Pool } from './pool'; import type { PoolWorkerKind } from './types'; @@ -369,11 +370,18 @@ export const createPool = async ({ throw `Invalid pool configuration: maxWorkers(${maxWorkers}) cannot be less than minWorkers(${minWorkers}).`; } + // `memoryLimit` only matters when runners are reused (`isolate: false`); + // otherwise workers are single-use, so the recycle check would be dead + // code. Parse to bytes here so the pool stores a single int. + const memoryLimitBytes = + isolate === false ? parseMemoryLimit(poolOptions.memoryLimit) : undefined; + const pool = new Pool({ workerEntry: resolve(__dirname, './worker.js'), isolate, maxWorkers, minWorkers, + memoryLimitBytes, execArgv: [ ...(poolOptions?.execArgv ?? []), ...execArgv, diff --git a/packages/core/src/pool/parseMemoryLimit.ts b/packages/core/src/pool/parseMemoryLimit.ts new file mode 100644 index 000000000..473c6da78 --- /dev/null +++ b/packages/core/src/pool/parseMemoryLimit.ts @@ -0,0 +1,72 @@ +import os from 'node:os'; + +/** + * Parse a user-supplied `pool.memoryLimit` value into a byte count. + * + * Returns `0` when: + * - input is `undefined`, `null`, or non-positive + * - the string cannot be interpreted (caller may want to log/warn + * separately; here we just disable the cap rather than crashing CI) + * + * Accepted forms (mirrors Vitest's `VmOptions.memoryLimit`): + * - `number >= 1` — interpreted as bytes + * - `number` in `(0, 1]` — fraction of total system memory + * - `string` with a unit suffix — case-insensitive: + * - `kb`/`k` (1e3), `kib` (2^10) + * - `mb`/`m` (1e6), `mib` (2^20) + * - `gb`/`g` (1e9), `gib` (2^30) + * - `%` — fraction of total system memory (e.g. `"20%"`) + * - `string` without a unit suffix — parsed as a number (same rules as + * the number form above) + */ +export const parseMemoryLimit = ( + input: number | string | undefined, +): number => { + if (input === undefined || input === null) return 0; + + const totalMemory = os.totalmem(); + + if (typeof input === 'string') { + // Pull a trailing unit suffix off (e.g. "1.5GB" → "1.5" + "GB"). + const match = input.trim().match(/^(-?\d+(?:\.\d+)?)\s*([a-z%]+)?$/i); + if (!match) return 0; + const numeric = Number.parseFloat(match[1]!); + if (!Number.isFinite(numeric) || numeric <= 0) return 0; + const unit = (match[2] ?? '').toLowerCase(); + + switch (unit) { + case '': + return fromNumber(numeric, totalMemory); + case '%': + return Math.floor((numeric / 100) * totalMemory); + case 'k': + case 'kb': + return Math.floor(numeric * 1e3); + case 'kib': + return Math.floor(numeric * 1024); + case 'm': + case 'mb': + return Math.floor(numeric * 1e6); + case 'mib': + return Math.floor(numeric * 1024 * 1024); + case 'g': + case 'gb': + return Math.floor(numeric * 1e9); + case 'gib': + return Math.floor(numeric * 1024 * 1024 * 1024); + default: + return 0; + } + } + + if (typeof input !== 'number' || !Number.isFinite(input) || input <= 0) { + return 0; + } + return fromNumber(input, totalMemory); +}; + +const fromNumber = (value: number, totalMemory: number): number => { + // `(0, 1]` → fraction of system memory; `> 1` → bytes verbatim. + if (value <= 1) return Math.floor(value * totalMemory); + return Math.floor(value); +}; diff --git a/packages/core/src/pool/pool.ts b/packages/core/src/pool/pool.ts index 731501e88..492c7d20e 100644 --- a/packages/core/src/pool/pool.ts +++ b/packages/core/src/pool/pool.ts @@ -113,7 +113,17 @@ export class Pool { const workerId = this.acquireWorkerId(); const worker = createPoolWorker(task, this.options, workerId); gate?.attachWorker(worker); - const runner = new PoolRunner(worker, { workerId }); + // `memoryLimitBytes` only matters when runners are reused + // (`isolate: false`). For `isolate: true` the runner is single-use + // so the cap is irrelevant and we omit it to keep `isUsable()` + // hot-path free of the (cheap) comparison. + const runner = new PoolRunner(worker, { + workerId, + memoryLimitBytes: + this.options.isolate === false + ? this.options.memoryLimitBytes + : undefined, + }); this.activeRunners.add(runner); try { await runner.start(); diff --git a/packages/core/src/pool/poolRunner.ts b/packages/core/src/pool/poolRunner.ts index 497ab8c33..c8fcac3bf 100644 --- a/packages/core/src/pool/poolRunner.ts +++ b/packages/core/src/pool/poolRunner.ts @@ -64,6 +64,17 @@ let nextTaskSeq = 0; type PoolRunnerOptions = { workerId: number; + /** + * Recycle this runner after it reports an RSS above this many bytes. + * Disabled when `0` or omitted. Only meaningful when the pool reuses + * runners (`isolate: false`); `isolate: true` workers are single-use. + * + * The worker reports its `process.memoryUsage().rss` at the end of + * each task. If a value exceeds the limit, `isUsable()` flips false + * and the pool's existing dispose path spawns a fresh runner instead + * of returning this one to the idle pool. + */ + memoryLimitBytes?: number; }; /** @@ -99,8 +110,19 @@ export class PoolRunner { */ private crashed = false; + /** + * Last RSS (bytes) reported by the worker. Updated on each + * `runFinished` / `collectFinished` response carrying a `memory` + * field. Compared against `memoryLimitBytes` in `isUsable()` so a + * bloated worker is recycled on next dispatch attempt instead of + * returning to the idle pool. + */ + private lastReportedRssBytes = 0; + private readonly memoryLimitBytes: number; + constructor(worker: PoolWorker, options: PoolRunnerOptions) { this.workerId = options.workerId; + this.memoryLimitBytes = Math.max(0, options.memoryLimitBytes ?? 0); this.worker = worker; this.handleMessage = this.handleMessage.bind(this); @@ -113,7 +135,14 @@ export class PoolRunner { } isUsable(): boolean { - return this.state === 'STARTED' && !this.crashed; + if (this.state !== 'STARTED' || this.crashed) return false; + if ( + this.memoryLimitBytes > 0 && + this.lastReportedRssBytes > this.memoryLimitBytes + ) { + return false; + } + return true; } start(): Promise { @@ -325,9 +354,11 @@ export class PoolRunner { this.startDeferred = undefined; return; case 'runFinished': + if (response.memory) this.lastReportedRssBytes = response.memory.rss; this.resolveTask('run', response.taskId, response.result); return; case 'collectFinished': + if (response.memory) this.lastReportedRssBytes = response.memory.rss; this.resolveTask('collect', response.taskId, response.result); return; case 'fatal_error': { diff --git a/packages/core/src/pool/types.ts b/packages/core/src/pool/types.ts index 1f0fde24c..a9bc0d510 100644 --- a/packages/core/src/pool/types.ts +++ b/packages/core/src/pool/types.ts @@ -15,6 +15,12 @@ export type PoolOptions = { maxWorkers: number; minWorkers: number; isolate: boolean; + /** + * Recycle a reused runner once its last-reported RSS exceeds this + * many bytes. Disabled when omitted or `0`. See + * `RstestPoolOptions.memoryLimit` for the user-facing knob. + */ + memoryLimitBytes?: number; env?: Record; execArgv?: string[]; /** diff --git a/packages/core/src/types/config.ts b/packages/core/src/types/config.ts index deb3a7a39..a7762edae 100644 --- a/packages/core/src/types/config.ts +++ b/packages/core/src/types/config.ts @@ -25,6 +25,28 @@ export type RstestPoolOptions = { minWorkers?: number | string; /** Pass additional arguments to node process in the child processes. */ execArgv?: string[]; + /** + * Recycle a reused worker once its RSS exceeds this threshold. Only + * meaningful when `isolate: false` (workers are reused across files); + * a no-op under the default `isolate: true` since workers are + * single-use anyway. + * + * Accepts: + * - `number` >= 1 — bytes (e.g. `1_500_000_000` for 1.5 GB) + * - `number` in `(0, 1]` — fraction of total system memory + * - `string` with a unit suffix — `"512MB"`, `"1.5GB"`, `"20%"`, + * `"1GiB"`, … (`%` is fraction of total system memory) + * + * After each test file the worker reports `process.memoryUsage().rss` + * to the host. When that exceeds the parsed limit, the pool disposes + * the worker and spawns a fresh one. Heap accumulated from prior + * files (vendor modules, JSDOM nodes, React fiber trees) is + * reclaimed by process exit. + * + * Useful for long suites under `isolate: false` where a single + * worker's heap can grow into GC-thrash territory. + */ + memoryLimit?: number | string; }; export type BundleDependencyPattern = string | RegExp; diff --git a/packages/core/tests/pool/parseMemoryLimit.test.ts b/packages/core/tests/pool/parseMemoryLimit.test.ts new file mode 100644 index 000000000..6ef881c89 --- /dev/null +++ b/packages/core/tests/pool/parseMemoryLimit.test.ts @@ -0,0 +1,99 @@ +import os from 'node:os'; +import { parseMemoryLimit } from '../../src/pool/parseMemoryLimit'; + +const MB = 1024 * 1024; +const SI_MB = 1_000_000; + +describe('parseMemoryLimit', () => { + describe('disable / falsy', () => { + it.each([ + ['undefined', undefined], + ['empty string', ''], + ['gibberish', 'not-a-number'], + ['negative number', -1], + ['zero', 0], + ['NaN', Number.NaN], + ['infinity', Number.POSITIVE_INFINITY], + ] as const)('returns 0 for %s', (_label, input) => { + // `as any` because some inputs are deliberately outside the public type. + expect(parseMemoryLimit(input as any)).toBe(0); + }); + }); + + describe('number form', () => { + it('treats values > 1 as a byte count', () => { + expect(parseMemoryLimit(2_000_000_000)).toBe(2_000_000_000); + }); + + it('treats values in (0, 1] as a fraction of total system memory', () => { + const half = Math.floor(0.5 * os.totalmem()); + expect(parseMemoryLimit(0.5)).toBe(half); + }); + + it('treats exactly 1 as 100% of system memory', () => { + expect(parseMemoryLimit(1)).toBe(os.totalmem()); + }); + + it('floors fractional bytes (no half-bytes downstream)', () => { + // 1.7 > 1 → interpreted as bytes; floored. + expect(parseMemoryLimit(1.7)).toBe(1); + }); + }); + + describe('string with unit suffix', () => { + it.each([ + // [input, expected bytes] + ['512KB', 512 * 1e3], + ['512kb', 512 * 1e3], + ['512k', 512 * 1e3], + ['512KiB', 512 * 1024], + ['256MB', 256 * SI_MB], + ['256mb', 256 * SI_MB], + ['256m', 256 * SI_MB], + ['256MiB', 256 * MB], + ['1GB', 1e9], + ['1.5GB', Math.floor(1.5 * 1e9)], + ['1.5g', Math.floor(1.5 * 1e9)], + ['1GiB', 1024 * 1024 * 1024], + ])('parses %s', (input, expected) => { + expect(parseMemoryLimit(input)).toBe(expected); + }); + + it('handles surrounding whitespace', () => { + expect(parseMemoryLimit(' 1GB ')).toBe(1e9); + }); + + it('returns 0 for unknown units', () => { + expect(parseMemoryLimit('5XB')).toBe(0); + }); + + it('returns 0 for negative numeric prefix', () => { + expect(parseMemoryLimit('-5MB')).toBe(0); + }); + }); + + describe('string with percent suffix', () => { + it('parses "20%" as a fraction of total system memory', () => { + expect(parseMemoryLimit('20%')).toBe(Math.floor(0.2 * os.totalmem())); + }); + + it('parses "100%" as exactly 100% of system memory', () => { + // floor of total since totalMemory is always an integer + expect(parseMemoryLimit('100%')).toBe(os.totalmem()); + }); + + it('parses fractional percent', () => { + expect(parseMemoryLimit('12.5%')).toBe(Math.floor(0.125 * os.totalmem())); + }); + }); + + describe('string without unit (treated as number)', () => { + it('parses a bare integer string as bytes', () => { + expect(parseMemoryLimit('2000000000')).toBe(2_000_000_000); + }); + + it('parses a fractional bare string as a system-memory fraction', () => { + expect(parseMemoryLimit('0.25')).toBe(Math.floor(0.25 * os.totalmem())); + }); + }); +});