diff --git a/Dockerfile b/Dockerfile index 72805704c..028e2b1f1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ WORKDIR /app # TODO: remove simple build once prod optimized build is working --------------- FROM base AS ws-worker -RUN apk add --no-cache git +RUN apk add --no-cache git util-linux RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile RUN pnpm build WORKDIR /app/packages/ws-worker diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index a89832f41..4965da09f 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -10,6 +10,7 @@ export type WorkerEvent = { type WorkerOptions = { maxWorkers?: number; + maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2) env?: any; timeout?: number; // ms proxyStdout?: boolean; // print internal stdout to console @@ -22,12 +23,18 @@ export default function initWorkers( options: WorkerOptions = {}, logger: Logger ) { - const { env = {}, maxWorkers = 5, proxyStdout = false } = options; + const { + env = {}, + maxWorkers = 5, + maxWorkerMemoryMb, + proxyStdout = false, + } = options; const workers = createPool( workerPath, { maxWorkers, + maxWorkerMemoryMb, env, proxyStdout, }, diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 630e334ab..f4584949b 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -139,6 +139,7 @@ const createEngine = async ( resolvedWorkerPath, { maxWorkers: options.maxWorkers, + maxWorkerMemoryMb: defaultMemoryLimit, proxyStdout: options.proxyStdout, }, options.logger diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 7a306b974..a8bf26812 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -13,10 +13,12 @@ import { import { HANDLED_EXIT_CODE } from '../events'; import { Logger } from '@openfn/logger'; import type { PayloadLimits } from './thread/runtime'; +import { detectPrlimitSupport, applyMemoryLimit } from './rlimit'; export type PoolOptions = { capacity?: number; // defaults to 5 maxWorkers?: number; // alias for capacity. Which is best? + maxWorkerMemoryMb?: number; // process-level memory limit via RLIMIT_AS env?: Record; // default environment for workers proxyStdout?: boolean; // print internal stdout to console @@ -71,6 +73,14 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`); let destroyed = false; + const hasPrlimit = detectPrlimitSupport(logger); + + if (hasPrlimit && options.maxWorkerMemoryMb) { + logger.info( + `pool: prlimit memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child` + ); + } + // a pool of processes const pool: ChildProcessPool = new Array(capacity).fill(false); @@ -101,6 +111,17 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; + + if (hasPrlimit && options.maxWorkerMemoryMb) { + // RLIMIT_AS counts virtual address space, not RSS. + // Node/V8 routinely reserves 4-8GB of virtual memory at startup + // (page table entries are cheap on 64-bit). We set a generous limit + // that only catches truly runaway allocations. + const limitBytes = Math.ceil( + (options.maxWorkerMemoryMb * 10 + 2048) * 1024 * 1024 + ); + applyMemoryLimit(child.pid!, limitBytes, logger); + } } else { child = maybeChild as ChildProcess; logger.debug('pool: Using existing child process', child.pid); diff --git a/packages/engine-multi/src/worker/rlimit.ts b/packages/engine-multi/src/worker/rlimit.ts new file mode 100644 index 000000000..2966fa684 --- /dev/null +++ b/packages/engine-multi/src/worker/rlimit.ts @@ -0,0 +1,55 @@ +import { execFileSync } from 'node:child_process'; +import type { Logger } from '@openfn/logger'; + +let prlimitAvailable: boolean | null = null; + +/** + * Check if the prlimit command is available (Linux with util-linux). + * Result is cached for the process lifetime. + */ +export function detectPrlimitSupport(logger: Logger): boolean { + if (prlimitAvailable !== null) return prlimitAvailable; + + try { + execFileSync('prlimit', ['--version'], { stdio: 'ignore' }); + prlimitAvailable = true; + logger.info('prlimit: memory enforcement available'); + } catch { + prlimitAvailable = false; + logger.debug('prlimit: not available (util-linux not installed)'); + } + + return prlimitAvailable; +} + +/** + * Apply RLIMIT_AS (virtual address space limit) to a child process. + * When exceeded, mmap/brk fails with ENOMEM, causing the process to crash. + */ +export function applyMemoryLimit( + pid: number, + limitBytes: number, + logger: Logger +): boolean { + try { + execFileSync('prlimit', [ + '--pid', + String(pid), + `--as=${limitBytes}:${limitBytes}`, + ]); + logger.debug( + `prlimit: worker ${pid} RLIMIT_AS set to ${Math.round( + limitBytes / 1024 / 1024 + )}MB` + ); + return true; + } catch (e: any) { + logger.warn(`prlimit: failed to set limit for worker ${pid}:`, e.message); + return false; + } +} + +// Exported for testing only +export function _resetCache(): void { + prlimitAvailable = null; +} diff --git a/packages/engine-multi/test/worker/rlimit.test.ts b/packages/engine-multi/test/worker/rlimit.test.ts new file mode 100644 index 000000000..71ade8b0e --- /dev/null +++ b/packages/engine-multi/test/worker/rlimit.test.ts @@ -0,0 +1,51 @@ +import test from 'ava'; +import { createMockLogger } from '@openfn/logger'; + +import { + detectPrlimitSupport, + applyMemoryLimit, + _resetCache, +} from '../../src/worker/rlimit'; + +const logger = createMockLogger(); + +test.beforeEach(() => { + _resetCache(); +}); + +test('detectPrlimitSupport caches the result across calls', (t) => { + const result1 = detectPrlimitSupport(logger); + const result2 = detectPrlimitSupport(logger); + t.is(result1, result2); +}); + +// On macOS, prlimit is not available +const isLinux = process.platform === 'linux'; + +if (!isLinux) { + test('detectPrlimitSupport returns false on non-Linux', (t) => { + const result = detectPrlimitSupport(logger); + t.false(result); + }); +} + +test('applyMemoryLimit returns false when prlimit is not available', (t) => { + if (detectPrlimitSupport(logger)) { + t.pass('prlimit is available — skipping negative test'); + return; + } + const result = applyMemoryLimit(99999, 500 * 1024 * 1024, logger); + t.false(result); +}); + +// Integration tests — only run on Linux with prlimit available +const hasPrlimit = isLinux && detectPrlimitSupport(createMockLogger()); +_resetCache(); // reset after the check so tests start clean + +const prlimitTest = hasPrlimit ? test : test.skip; + +prlimitTest('applyMemoryLimit succeeds on own process', (t) => { + // Apply a very generous limit to our own process (won't interfere with test) + const result = applyMemoryLimit(process.pid, 8 * 1024 * 1024 * 1024, logger); + t.true(result); +});