From d158f4f3fcf00d1935eb8f20b8dae3e75478095e Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Mon, 13 Apr 2026 14:43:44 -0400 Subject: [PATCH 1/4] cgroup --- packages/engine-multi/src/api/call-worker.ts | 5 +- packages/engine-multi/src/engine.ts | 1 + packages/engine-multi/src/worker/cgroup.ts | 152 ++++++++++++++++++ packages/engine-multi/src/worker/pool.ts | 60 ++++++- .../engine-multi/test/worker/cgroup.test.ts | 98 +++++++++++ 5 files changed, 311 insertions(+), 5 deletions(-) create mode 100644 packages/engine-multi/src/worker/cgroup.ts create mode 100644 packages/engine-multi/test/worker/cgroup.test.ts diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index a89832f41..d7486705a 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -10,6 +10,7 @@ export type WorkerEvent = { type WorkerOptions = { maxWorkers?: number; + maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2) env?: any; timeout?: number; // ms proxyStdout?: boolean; // print internal stdout to console @@ -22,12 +23,14 @@ export default function initWorkers( options: WorkerOptions = {}, logger: Logger ) { - const { env = {}, maxWorkers = 5, proxyStdout = false } = options; + const { env = {}, maxWorkers = 5, maxWorkerMemoryMb, proxyStdout = false } = + options; const workers = createPool( workerPath, { maxWorkers, + maxWorkerMemoryMb, env, proxyStdout, }, diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 630e334ab..f4584949b 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -139,6 +139,7 @@ const createEngine = async ( resolvedWorkerPath, { maxWorkers: options.maxWorkers, + maxWorkerMemoryMb: defaultMemoryLimit, proxyStdout: options.proxyStdout, }, options.logger diff --git a/packages/engine-multi/src/worker/cgroup.ts b/packages/engine-multi/src/worker/cgroup.ts new file mode 100644 index 000000000..76a60b47e --- /dev/null +++ b/packages/engine-multi/src/worker/cgroup.ts @@ -0,0 +1,152 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import type { Logger } from '@openfn/logger'; + +type CgroupSupport = { + supported: boolean; + cgroupRoot: string | null; +}; + +let cachedResult: CgroupSupport | null = null; + +/** + * Detect whether cgroup v2 memory enforcement is available. + * Called once at pool creation; result is cached for the process lifetime. + */ +export function detectCgroupSupport(logger: Logger): CgroupSupport { + if (cachedResult) return cachedResult; + + try { + // Step 1: confirm cgroup v2 + if (!fs.existsSync('/sys/fs/cgroup/cgroup.controllers')) { + logger.debug('cgroup: v2 not available'); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + + // Step 2: find our cgroup path + const procCgroup = fs.readFileSync('/proc/self/cgroup', 'utf-8').trim(); + const match = procCgroup.match(/^0::(.+)$/m); + if (!match) { + logger.debug('cgroup: could not parse /proc/self/cgroup'); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + + const cgroupRoot = path.join('/sys/fs/cgroup', match[1]); + + // Step 3: ensure memory controller is delegated + const subtreeControl = fs + .readFileSync(path.join(cgroupRoot, 'cgroup.subtree_control'), 'utf-8') + .trim(); + + if (!subtreeControl.includes('memory')) { + // Try to enable memory delegation + try { + fs.writeFileSync( + path.join(cgroupRoot, 'cgroup.subtree_control'), + '+memory' + ); + } catch (e: any) { + if (e.code === 'EBUSY') { + // "no internal process" constraint — move ourselves to a child cgroup + const initPath = path.join(cgroupRoot, 'openfn-init'); + try { + fs.mkdirSync(initPath, { recursive: true }); + fs.writeFileSync( + path.join(initPath, 'cgroup.procs'), + String(process.pid) + ); + fs.writeFileSync( + path.join(cgroupRoot, 'cgroup.subtree_control'), + '+memory' + ); + } catch (inner: any) { + logger.warn('cgroup: failed to delegate memory controller:', inner.message); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + } else { + logger.warn('cgroup: failed to enable memory controller:', e.message); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + } + } + + // Step 4: smoke test — create and remove a probe cgroup + const probePath = path.join(cgroupRoot, `openfn-probe-${process.pid}`); + try { + fs.mkdirSync(probePath); + fs.writeFileSync(path.join(probePath, 'memory.max'), '1073741824'); // 1GB + fs.rmdirSync(probePath); + } catch (e: any) { + logger.warn('cgroup: smoke test failed:', e.message); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } + + logger.info('cgroup: memory enforcement available at', cgroupRoot); + cachedResult = { supported: true, cgroupRoot }; + return cachedResult; + } catch (e: any) { + logger.debug('cgroup: detection failed:', e.message); + cachedResult = { supported: false, cgroupRoot: null }; + return cachedResult; + } +} + +/** + * Create a cgroup for a child process and apply a memory limit. + * Returns the cgroup directory path, or null on failure. + */ +export function setupCgroup( + pid: number, + memoryLimitBytes: number, + cgroupRoot: string, + logger: Logger +): string | null { + const cgroupPath = path.join(cgroupRoot, `openfn-worker-${pid}`); + + try { + fs.mkdirSync(cgroupPath); + fs.writeFileSync( + path.join(cgroupPath, 'memory.max'), + String(memoryLimitBytes) + ); + fs.writeFileSync(path.join(cgroupPath, 'memory.swap.max'), '0'); + fs.writeFileSync(path.join(cgroupPath, 'cgroup.procs'), String(pid)); + + logger.debug( + `cgroup: worker ${pid} limited to ${Math.round(memoryLimitBytes / 1024 / 1024)}MB` + ); + return cgroupPath; + } catch (e: any) { + logger.warn(`cgroup: failed to set up cgroup for worker ${pid}:`, e.message); + // Clean up partial state + try { + fs.rmdirSync(cgroupPath); + } catch { + // ignore cleanup failure + } + return null; + } +} + +/** + * Remove a cgroup directory after the child process has exited. + */ +export function cleanupCgroup(cgroupPath: string, logger: Logger): void { + try { + fs.rmdirSync(cgroupPath); + } catch (e: any) { + if (e.code !== 'ENOENT') { + logger.warn('cgroup: cleanup failed for', cgroupPath, e.message); + } + } +} + +// Exported for testing only +export function _resetCache(): void { + cachedResult = null; +} diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 7a306b974..0e607d717 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -13,10 +13,12 @@ import { import { HANDLED_EXIT_CODE } from '../events'; import { Logger } from '@openfn/logger'; import type { PayloadLimits } from './thread/runtime'; +import { detectCgroupSupport, setupCgroup, cleanupCgroup } from './cgroup'; export type PoolOptions = { capacity?: number; // defaults to 5 maxWorkers?: number; // alias for capacity. Which is best? + maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2) env?: Record; // default environment for workers proxyStdout?: boolean; // print internal stdout to console @@ -71,6 +73,15 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`); let destroyed = false; + const cgroupInfo = detectCgroupSupport(logger); + const cgroupPaths = new Map(); + + if (cgroupInfo.supported && options.maxWorkerMemoryMb) { + logger.info( + `pool: cgroup memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child` + ); + } + // a pool of processes const pool: ChildProcessPool = new Array(capacity).fill(false); @@ -101,6 +112,21 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; + + if (cgroupInfo.supported && options.maxWorkerMemoryMb) { + const limitBytes = Math.ceil( + (options.maxWorkerMemoryMb * 1.2 + 50) * 1024 * 1024 + ); + const cgPath = setupCgroup( + child.pid!, + limitBytes, + cgroupInfo.cgroupRoot!, + logger + ); + if (cgPath) { + cgroupPaths.set(child.pid!, cgPath); + } + } } else { child = maybeChild as ChildProcess; logger.debug('pool: Using existing child process', child.pid); @@ -108,6 +134,14 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { return child; }; + const maybeCleanupCgroup = (pid: number) => { + const cgPath = cgroupPaths.get(pid); + if (cgPath) { + cleanupCgroup(cgPath, logger); + cgroupPaths.delete(pid); + } + }; + const finish = (worker: ChildProcess | false) => { if (worker) { logger.debug('pool: finished task in worker', worker.pid); @@ -142,7 +176,20 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const promise = new Promise(async (resolve, reject) => { // TODO what should we do if a process in the pool dies, perhaps due to OOM? - const onExit = async (code: number) => { + const onExit = async (code: number | null, signal: string | null) => { + // Kernel OOM kill: cgroup sends SIGKILL with null exit code + if (signal === 'SIGKILL' && code === null && !destroyed) { + logger.debug( + `pool: Worker ${worker.pid} killed by SIGKILL (probable OOM)` + ); + clearTimeout(timeout); + maybeCleanupCgroup(worker.pid!); + killWorker(worker); + finish(false); + reject(new OOMError()); + return; + } + if (code !== HANDLED_EXIT_CODE) { logger.debug(`pool: Worker exited unexpectedly with code ${code}`); clearTimeout(timeout); @@ -168,8 +215,9 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { } catch (e) { // do nothing } + maybeCleanupCgroup(worker.pid!); finish(worker); - reject(new ExitError(code)); + reject(new ExitError(code!)); } }; @@ -257,9 +305,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const killWorker = (worker: ChildProcess | false) => { if (worker) { - logger.debug('pool: destroying worker ', worker.pid); + const pid = worker.pid!; + logger.debug('pool: destroying worker ', pid); worker.kill(); - delete allWorkers[worker.pid!]; + delete allWorkers[pid]; + worker.once('exit', () => maybeCleanupCgroup(pid)); } }; @@ -276,11 +326,13 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const timeout = setTimeout(() => { logger.debug('pool: force killing worker', worker.pid); worker.kill('SIGKILL'); + maybeCleanupCgroup(worker.pid!); resolve(); }, forceKillTimeout); worker.once('exit', () => { clearTimeout(timeout); + maybeCleanupCgroup(worker.pid!); resolve(); }); diff --git a/packages/engine-multi/test/worker/cgroup.test.ts b/packages/engine-multi/test/worker/cgroup.test.ts new file mode 100644 index 000000000..f4bead0c6 --- /dev/null +++ b/packages/engine-multi/test/worker/cgroup.test.ts @@ -0,0 +1,98 @@ +import test from 'ava'; +import fs from 'node:fs'; +import path from 'node:path'; +import { createMockLogger } from '@openfn/logger'; + +import { + detectCgroupSupport, + setupCgroup, + cleanupCgroup, + _resetCache, +} from '../../src/worker/cgroup'; + +const logger = createMockLogger(); + +test.beforeEach(() => { + _resetCache(); +}); + +// On macOS / non-Linux, detection should return unsupported +test('detectCgroupSupport returns false when cgroup v2 is not available', (t) => { + // This test runs on any platform. On macOS, /sys/fs/cgroup doesn't exist. + // On Linux without cgroup v2, cgroup.controllers won't exist. + if (fs.existsSync('/sys/fs/cgroup/cgroup.controllers')) { + t.pass('cgroup v2 is available on this system — skipping negative test'); + return; + } + + const result = detectCgroupSupport(logger); + t.false(result.supported); + t.is(result.cgroupRoot, null); +}); + +test('detectCgroupSupport caches the result across calls', (t) => { + const result1 = detectCgroupSupport(logger); + const result2 = detectCgroupSupport(logger); + t.is(result1, result2); // same object reference +}); + +test('setupCgroup returns null when directory creation fails', (t) => { + // Use a nonexistent root so mkdirSync fails + const result = setupCgroup( + 99999, + 500 * 1024 * 1024, + '/nonexistent/cgroup/root', + logger + ); + t.is(result, null); +}); + +test('cleanupCgroup does not throw on ENOENT', (t) => { + t.notThrows(() => { + cleanupCgroup('/nonexistent/cgroup/path', logger); + }); +}); + +// Integration tests — only run on Linux with cgroup v2 and write access +const hasCgroupV2 = fs.existsSync('/sys/fs/cgroup/cgroup.controllers'); + +const cgroupTest = hasCgroupV2 ? test : test.skip; + +cgroupTest('detectCgroupSupport returns true on cgroup v2 system', (t) => { + const result = detectCgroupSupport(logger); + t.true(result.supported); + t.truthy(result.cgroupRoot); +}); + +cgroupTest('setupCgroup creates cgroup directory and cleanupCgroup removes it', (t) => { + const detection = detectCgroupSupport(logger); + if (!detection.supported) { + t.pass('cgroup not supported — skipping'); + return; + } + + // Use a fake PID that won't collide + const fakePid = 2147483640; + const limitBytes = 256 * 1024 * 1024; + + const cgPath = setupCgroup(fakePid, limitBytes, detection.cgroupRoot!, logger); + + if (!cgPath) { + t.pass('setupCgroup returned null — likely permission issue'); + return; + } + + t.true(fs.existsSync(cgPath)); + + // Verify memory.max was written + const memMax = fs.readFileSync(path.join(cgPath, 'memory.max'), 'utf-8').trim(); + t.is(memMax, String(limitBytes)); + + // Verify memory.swap.max was written + const swapMax = fs.readFileSync(path.join(cgPath, 'memory.swap.max'), 'utf-8').trim(); + t.is(swapMax, '0'); + + // Clean up + cleanupCgroup(cgPath, logger); + t.false(fs.existsSync(cgPath)); +}); From 85f79fe9e0742481f9aa1f81cd1cca16f950a0cc Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Mon, 13 Apr 2026 14:58:10 -0400 Subject: [PATCH 2/4] fix tests --- packages/engine-multi/src/api/call-worker.ts | 8 ++++++-- packages/engine-multi/src/worker/cgroup.ts | 14 +++++++++++--- packages/engine-multi/test/worker/cgroup.test.ts | 8 ++++++-- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index d7486705a..4965da09f 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -23,8 +23,12 @@ export default function initWorkers( options: WorkerOptions = {}, logger: Logger ) { - const { env = {}, maxWorkers = 5, maxWorkerMemoryMb, proxyStdout = false } = - options; + const { + env = {}, + maxWorkers = 5, + maxWorkerMemoryMb, + proxyStdout = false, + } = options; const workers = createPool( workerPath, diff --git a/packages/engine-multi/src/worker/cgroup.ts b/packages/engine-multi/src/worker/cgroup.ts index 76a60b47e..dd08380cb 100644 --- a/packages/engine-multi/src/worker/cgroup.ts +++ b/packages/engine-multi/src/worker/cgroup.ts @@ -62,7 +62,10 @@ export function detectCgroupSupport(logger: Logger): CgroupSupport { '+memory' ); } catch (inner: any) { - logger.warn('cgroup: failed to delegate memory controller:', inner.message); + logger.warn( + 'cgroup: failed to delegate memory controller:', + inner.message + ); cachedResult = { supported: false, cgroupRoot: null }; return cachedResult; } @@ -118,11 +121,16 @@ export function setupCgroup( fs.writeFileSync(path.join(cgroupPath, 'cgroup.procs'), String(pid)); logger.debug( - `cgroup: worker ${pid} limited to ${Math.round(memoryLimitBytes / 1024 / 1024)}MB` + `cgroup: worker ${pid} limited to ${Math.round( + memoryLimitBytes / 1024 / 1024 + )}MB` ); return cgroupPath; } catch (e: any) { - logger.warn(`cgroup: failed to set up cgroup for worker ${pid}:`, e.message); + logger.warn( + `cgroup: failed to set up cgroup for worker ${pid}:`, + e.message + ); // Clean up partial state try { fs.rmdirSync(cgroupPath); diff --git a/packages/engine-multi/test/worker/cgroup.test.ts b/packages/engine-multi/test/worker/cgroup.test.ts index f4bead0c6..e1442d216 100644 --- a/packages/engine-multi/test/worker/cgroup.test.ts +++ b/packages/engine-multi/test/worker/cgroup.test.ts @@ -58,9 +58,13 @@ const hasCgroupV2 = fs.existsSync('/sys/fs/cgroup/cgroup.controllers'); const cgroupTest = hasCgroupV2 ? test : test.skip; -cgroupTest('detectCgroupSupport returns true on cgroup v2 system', (t) => { +cgroupTest('detectCgroupSupport on cgroup v2 system', (t) => { const result = detectCgroupSupport(logger); - t.true(result.supported); + if (!result.supported) { + // cgroup v2 is present but we lack permissions to delegate — that's fine + t.pass('cgroup v2 present but not writable — detection correctly returned false'); + return; + } t.truthy(result.cgroupRoot); }); From a19869d964904b8cc75f0baf4d40abb6110298f6 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Mon, 13 Apr 2026 15:15:19 -0400 Subject: [PATCH 3/4] rlimit --- Dockerfile | 2 +- packages/engine-multi/src/worker/cgroup.ts | 160 ------------------ packages/engine-multi/src/worker/pool.ts | 62 ++----- packages/engine-multi/src/worker/rlimit.ts | 55 ++++++ .../engine-multi/test/worker/cgroup.test.ts | 102 ----------- .../engine-multi/test/worker/rlimit.test.ts | 51 ++++++ 6 files changed, 122 insertions(+), 310 deletions(-) delete mode 100644 packages/engine-multi/src/worker/cgroup.ts create mode 100644 packages/engine-multi/src/worker/rlimit.ts delete mode 100644 packages/engine-multi/test/worker/cgroup.test.ts create mode 100644 packages/engine-multi/test/worker/rlimit.test.ts diff --git a/Dockerfile b/Dockerfile index 72805704c..028e2b1f1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ WORKDIR /app # TODO: remove simple build once prod optimized build is working --------------- FROM base AS ws-worker -RUN apk add --no-cache git +RUN apk add --no-cache git util-linux RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile RUN pnpm build WORKDIR /app/packages/ws-worker diff --git a/packages/engine-multi/src/worker/cgroup.ts b/packages/engine-multi/src/worker/cgroup.ts deleted file mode 100644 index dd08380cb..000000000 --- a/packages/engine-multi/src/worker/cgroup.ts +++ /dev/null @@ -1,160 +0,0 @@ -import fs from 'node:fs'; -import path from 'node:path'; -import type { Logger } from '@openfn/logger'; - -type CgroupSupport = { - supported: boolean; - cgroupRoot: string | null; -}; - -let cachedResult: CgroupSupport | null = null; - -/** - * Detect whether cgroup v2 memory enforcement is available. - * Called once at pool creation; result is cached for the process lifetime. - */ -export function detectCgroupSupport(logger: Logger): CgroupSupport { - if (cachedResult) return cachedResult; - - try { - // Step 1: confirm cgroup v2 - if (!fs.existsSync('/sys/fs/cgroup/cgroup.controllers')) { - logger.debug('cgroup: v2 not available'); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - - // Step 2: find our cgroup path - const procCgroup = fs.readFileSync('/proc/self/cgroup', 'utf-8').trim(); - const match = procCgroup.match(/^0::(.+)$/m); - if (!match) { - logger.debug('cgroup: could not parse /proc/self/cgroup'); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - - const cgroupRoot = path.join('/sys/fs/cgroup', match[1]); - - // Step 3: ensure memory controller is delegated - const subtreeControl = fs - .readFileSync(path.join(cgroupRoot, 'cgroup.subtree_control'), 'utf-8') - .trim(); - - if (!subtreeControl.includes('memory')) { - // Try to enable memory delegation - try { - fs.writeFileSync( - path.join(cgroupRoot, 'cgroup.subtree_control'), - '+memory' - ); - } catch (e: any) { - if (e.code === 'EBUSY') { - // "no internal process" constraint — move ourselves to a child cgroup - const initPath = path.join(cgroupRoot, 'openfn-init'); - try { - fs.mkdirSync(initPath, { recursive: true }); - fs.writeFileSync( - path.join(initPath, 'cgroup.procs'), - String(process.pid) - ); - fs.writeFileSync( - path.join(cgroupRoot, 'cgroup.subtree_control'), - '+memory' - ); - } catch (inner: any) { - logger.warn( - 'cgroup: failed to delegate memory controller:', - inner.message - ); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - } else { - logger.warn('cgroup: failed to enable memory controller:', e.message); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - } - } - - // Step 4: smoke test — create and remove a probe cgroup - const probePath = path.join(cgroupRoot, `openfn-probe-${process.pid}`); - try { - fs.mkdirSync(probePath); - fs.writeFileSync(path.join(probePath, 'memory.max'), '1073741824'); // 1GB - fs.rmdirSync(probePath); - } catch (e: any) { - logger.warn('cgroup: smoke test failed:', e.message); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } - - logger.info('cgroup: memory enforcement available at', cgroupRoot); - cachedResult = { supported: true, cgroupRoot }; - return cachedResult; - } catch (e: any) { - logger.debug('cgroup: detection failed:', e.message); - cachedResult = { supported: false, cgroupRoot: null }; - return cachedResult; - } -} - -/** - * Create a cgroup for a child process and apply a memory limit. - * Returns the cgroup directory path, or null on failure. - */ -export function setupCgroup( - pid: number, - memoryLimitBytes: number, - cgroupRoot: string, - logger: Logger -): string | null { - const cgroupPath = path.join(cgroupRoot, `openfn-worker-${pid}`); - - try { - fs.mkdirSync(cgroupPath); - fs.writeFileSync( - path.join(cgroupPath, 'memory.max'), - String(memoryLimitBytes) - ); - fs.writeFileSync(path.join(cgroupPath, 'memory.swap.max'), '0'); - fs.writeFileSync(path.join(cgroupPath, 'cgroup.procs'), String(pid)); - - logger.debug( - `cgroup: worker ${pid} limited to ${Math.round( - memoryLimitBytes / 1024 / 1024 - )}MB` - ); - return cgroupPath; - } catch (e: any) { - logger.warn( - `cgroup: failed to set up cgroup for worker ${pid}:`, - e.message - ); - // Clean up partial state - try { - fs.rmdirSync(cgroupPath); - } catch { - // ignore cleanup failure - } - return null; - } -} - -/** - * Remove a cgroup directory after the child process has exited. - */ -export function cleanupCgroup(cgroupPath: string, logger: Logger): void { - try { - fs.rmdirSync(cgroupPath); - } catch (e: any) { - if (e.code !== 'ENOENT') { - logger.warn('cgroup: cleanup failed for', cgroupPath, e.message); - } - } -} - -// Exported for testing only -export function _resetCache(): void { - cachedResult = null; -} diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 0e607d717..f1c5add2c 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -13,12 +13,12 @@ import { import { HANDLED_EXIT_CODE } from '../events'; import { Logger } from '@openfn/logger'; import type { PayloadLimits } from './thread/runtime'; -import { detectCgroupSupport, setupCgroup, cleanupCgroup } from './cgroup'; +import { detectPrlimitSupport, applyMemoryLimit } from './rlimit'; export type PoolOptions = { capacity?: number; // defaults to 5 maxWorkers?: number; // alias for capacity. Which is best? - maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2) + maxWorkerMemoryMb?: number; // process-level memory limit via RLIMIT_AS env?: Record; // default environment for workers proxyStdout?: boolean; // print internal stdout to console @@ -73,12 +73,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`); let destroyed = false; - const cgroupInfo = detectCgroupSupport(logger); - const cgroupPaths = new Map(); + const hasPrlimit = detectPrlimitSupport(logger); - if (cgroupInfo.supported && options.maxWorkerMemoryMb) { + if (hasPrlimit && options.maxWorkerMemoryMb) { logger.info( - `pool: cgroup memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child` + `pool: prlimit memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child` ); } @@ -113,19 +112,14 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; - if (cgroupInfo.supported && options.maxWorkerMemoryMb) { + if (hasPrlimit && options.maxWorkerMemoryMb) { + // RLIMIT_AS counts virtual address space, not RSS. + // Node/V8 reserve ~1-2GB virtual memory at startup, so we need + // generous headroom to avoid false positives. const limitBytes = Math.ceil( - (options.maxWorkerMemoryMb * 1.2 + 50) * 1024 * 1024 + (options.maxWorkerMemoryMb * 3 + 512) * 1024 * 1024 ); - const cgPath = setupCgroup( - child.pid!, - limitBytes, - cgroupInfo.cgroupRoot!, - logger - ); - if (cgPath) { - cgroupPaths.set(child.pid!, cgPath); - } + applyMemoryLimit(child.pid!, limitBytes, logger); } } else { child = maybeChild as ChildProcess; @@ -134,14 +128,6 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { return child; }; - const maybeCleanupCgroup = (pid: number) => { - const cgPath = cgroupPaths.get(pid); - if (cgPath) { - cleanupCgroup(cgPath, logger); - cgroupPaths.delete(pid); - } - }; - const finish = (worker: ChildProcess | false) => { if (worker) { logger.debug('pool: finished task in worker', worker.pid); @@ -176,20 +162,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const promise = new Promise(async (resolve, reject) => { // TODO what should we do if a process in the pool dies, perhaps due to OOM? - const onExit = async (code: number | null, signal: string | null) => { - // Kernel OOM kill: cgroup sends SIGKILL with null exit code - if (signal === 'SIGKILL' && code === null && !destroyed) { - logger.debug( - `pool: Worker ${worker.pid} killed by SIGKILL (probable OOM)` - ); - clearTimeout(timeout); - maybeCleanupCgroup(worker.pid!); - killWorker(worker); - finish(false); - reject(new OOMError()); - return; - } - + const onExit = async (code: number) => { if (code !== HANDLED_EXIT_CODE) { logger.debug(`pool: Worker exited unexpectedly with code ${code}`); clearTimeout(timeout); @@ -215,9 +188,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { } catch (e) { // do nothing } - maybeCleanupCgroup(worker.pid!); finish(worker); - reject(new ExitError(code!)); + reject(new ExitError(code)); } }; @@ -305,11 +277,9 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const killWorker = (worker: ChildProcess | false) => { if (worker) { - const pid = worker.pid!; - logger.debug('pool: destroying worker ', pid); + logger.debug('pool: destroying worker ', worker.pid); worker.kill(); - delete allWorkers[pid]; - worker.once('exit', () => maybeCleanupCgroup(pid)); + delete allWorkers[worker.pid!]; } }; @@ -326,13 +296,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const timeout = setTimeout(() => { logger.debug('pool: force killing worker', worker.pid); worker.kill('SIGKILL'); - maybeCleanupCgroup(worker.pid!); resolve(); }, forceKillTimeout); worker.once('exit', () => { clearTimeout(timeout); - maybeCleanupCgroup(worker.pid!); resolve(); }); diff --git a/packages/engine-multi/src/worker/rlimit.ts b/packages/engine-multi/src/worker/rlimit.ts new file mode 100644 index 000000000..2966fa684 --- /dev/null +++ b/packages/engine-multi/src/worker/rlimit.ts @@ -0,0 +1,55 @@ +import { execFileSync } from 'node:child_process'; +import type { Logger } from '@openfn/logger'; + +let prlimitAvailable: boolean | null = null; + +/** + * Check if the prlimit command is available (Linux with util-linux). + * Result is cached for the process lifetime. + */ +export function detectPrlimitSupport(logger: Logger): boolean { + if (prlimitAvailable !== null) return prlimitAvailable; + + try { + execFileSync('prlimit', ['--version'], { stdio: 'ignore' }); + prlimitAvailable = true; + logger.info('prlimit: memory enforcement available'); + } catch { + prlimitAvailable = false; + logger.debug('prlimit: not available (util-linux not installed)'); + } + + return prlimitAvailable; +} + +/** + * Apply RLIMIT_AS (virtual address space limit) to a child process. + * When exceeded, mmap/brk fails with ENOMEM, causing the process to crash. + */ +export function applyMemoryLimit( + pid: number, + limitBytes: number, + logger: Logger +): boolean { + try { + execFileSync('prlimit', [ + '--pid', + String(pid), + `--as=${limitBytes}:${limitBytes}`, + ]); + logger.debug( + `prlimit: worker ${pid} RLIMIT_AS set to ${Math.round( + limitBytes / 1024 / 1024 + )}MB` + ); + return true; + } catch (e: any) { + logger.warn(`prlimit: failed to set limit for worker ${pid}:`, e.message); + return false; + } +} + +// Exported for testing only +export function _resetCache(): void { + prlimitAvailable = null; +} diff --git a/packages/engine-multi/test/worker/cgroup.test.ts b/packages/engine-multi/test/worker/cgroup.test.ts deleted file mode 100644 index e1442d216..000000000 --- a/packages/engine-multi/test/worker/cgroup.test.ts +++ /dev/null @@ -1,102 +0,0 @@ -import test from 'ava'; -import fs from 'node:fs'; -import path from 'node:path'; -import { createMockLogger } from '@openfn/logger'; - -import { - detectCgroupSupport, - setupCgroup, - cleanupCgroup, - _resetCache, -} from '../../src/worker/cgroup'; - -const logger = createMockLogger(); - -test.beforeEach(() => { - _resetCache(); -}); - -// On macOS / non-Linux, detection should return unsupported -test('detectCgroupSupport returns false when cgroup v2 is not available', (t) => { - // This test runs on any platform. On macOS, /sys/fs/cgroup doesn't exist. - // On Linux without cgroup v2, cgroup.controllers won't exist. - if (fs.existsSync('/sys/fs/cgroup/cgroup.controllers')) { - t.pass('cgroup v2 is available on this system — skipping negative test'); - return; - } - - const result = detectCgroupSupport(logger); - t.false(result.supported); - t.is(result.cgroupRoot, null); -}); - -test('detectCgroupSupport caches the result across calls', (t) => { - const result1 = detectCgroupSupport(logger); - const result2 = detectCgroupSupport(logger); - t.is(result1, result2); // same object reference -}); - -test('setupCgroup returns null when directory creation fails', (t) => { - // Use a nonexistent root so mkdirSync fails - const result = setupCgroup( - 99999, - 500 * 1024 * 1024, - '/nonexistent/cgroup/root', - logger - ); - t.is(result, null); -}); - -test('cleanupCgroup does not throw on ENOENT', (t) => { - t.notThrows(() => { - cleanupCgroup('/nonexistent/cgroup/path', logger); - }); -}); - -// Integration tests — only run on Linux with cgroup v2 and write access -const hasCgroupV2 = fs.existsSync('/sys/fs/cgroup/cgroup.controllers'); - -const cgroupTest = hasCgroupV2 ? test : test.skip; - -cgroupTest('detectCgroupSupport on cgroup v2 system', (t) => { - const result = detectCgroupSupport(logger); - if (!result.supported) { - // cgroup v2 is present but we lack permissions to delegate — that's fine - t.pass('cgroup v2 present but not writable — detection correctly returned false'); - return; - } - t.truthy(result.cgroupRoot); -}); - -cgroupTest('setupCgroup creates cgroup directory and cleanupCgroup removes it', (t) => { - const detection = detectCgroupSupport(logger); - if (!detection.supported) { - t.pass('cgroup not supported — skipping'); - return; - } - - // Use a fake PID that won't collide - const fakePid = 2147483640; - const limitBytes = 256 * 1024 * 1024; - - const cgPath = setupCgroup(fakePid, limitBytes, detection.cgroupRoot!, logger); - - if (!cgPath) { - t.pass('setupCgroup returned null — likely permission issue'); - return; - } - - t.true(fs.existsSync(cgPath)); - - // Verify memory.max was written - const memMax = fs.readFileSync(path.join(cgPath, 'memory.max'), 'utf-8').trim(); - t.is(memMax, String(limitBytes)); - - // Verify memory.swap.max was written - const swapMax = fs.readFileSync(path.join(cgPath, 'memory.swap.max'), 'utf-8').trim(); - t.is(swapMax, '0'); - - // Clean up - cleanupCgroup(cgPath, logger); - t.false(fs.existsSync(cgPath)); -}); diff --git a/packages/engine-multi/test/worker/rlimit.test.ts b/packages/engine-multi/test/worker/rlimit.test.ts new file mode 100644 index 000000000..71ade8b0e --- /dev/null +++ b/packages/engine-multi/test/worker/rlimit.test.ts @@ -0,0 +1,51 @@ +import test from 'ava'; +import { createMockLogger } from '@openfn/logger'; + +import { + detectPrlimitSupport, + applyMemoryLimit, + _resetCache, +} from '../../src/worker/rlimit'; + +const logger = createMockLogger(); + +test.beforeEach(() => { + _resetCache(); +}); + +test('detectPrlimitSupport caches the result across calls', (t) => { + const result1 = detectPrlimitSupport(logger); + const result2 = detectPrlimitSupport(logger); + t.is(result1, result2); +}); + +// On macOS, prlimit is not available +const isLinux = process.platform === 'linux'; + +if (!isLinux) { + test('detectPrlimitSupport returns false on non-Linux', (t) => { + const result = detectPrlimitSupport(logger); + t.false(result); + }); +} + +test('applyMemoryLimit returns false when prlimit is not available', (t) => { + if (detectPrlimitSupport(logger)) { + t.pass('prlimit is available — skipping negative test'); + return; + } + const result = applyMemoryLimit(99999, 500 * 1024 * 1024, logger); + t.false(result); +}); + +// Integration tests — only run on Linux with prlimit available +const hasPrlimit = isLinux && detectPrlimitSupport(createMockLogger()); +_resetCache(); // reset after the check so tests start clean + +const prlimitTest = hasPrlimit ? test : test.skip; + +prlimitTest('applyMemoryLimit succeeds on own process', (t) => { + // Apply a very generous limit to our own process (won't interfere with test) + const result = applyMemoryLimit(process.pid, 8 * 1024 * 1024 * 1024, logger); + t.true(result); +}); From 9066dacd1f14fbda181db646c4b10f894a16c5d7 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Mon, 13 Apr 2026 15:26:57 -0400 Subject: [PATCH 4/4] bigger --- packages/engine-multi/src/worker/pool.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index f1c5add2c..a8bf26812 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -114,10 +114,11 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { if (hasPrlimit && options.maxWorkerMemoryMb) { // RLIMIT_AS counts virtual address space, not RSS. - // Node/V8 reserve ~1-2GB virtual memory at startup, so we need - // generous headroom to avoid false positives. + // Node/V8 routinely reserves 4-8GB of virtual memory at startup + // (page table entries are cheap on 64-bit). We set a generous limit + // that only catches truly runaway allocations. const limitBytes = Math.ceil( - (options.maxWorkerMemoryMb * 3 + 512) * 1024 * 1024 + (options.maxWorkerMemoryMb * 10 + 2048) * 1024 * 1024 ); applyMemoryLimit(child.pid!, limitBytes, logger); }