diff --git a/.changeset/five-months-dance.md b/.changeset/five-months-dance.md new file mode 100644 index 000000000..311b2ec54 --- /dev/null +++ b/.changeset/five-months-dance.md @@ -0,0 +1,6 @@ +--- +'@openfn/engine-multi': patch +'@openfn/ws-worker': patch +--- + +Enforce run memory limits at the child_process level diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index a89832f41..c3a062407 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -12,6 +12,7 @@ type WorkerOptions = { maxWorkers?: number; env?: any; timeout?: number; // ms + memoryLimitMb?: number; proxyStdout?: boolean; // print internal stdout to console }; @@ -22,13 +23,19 @@ export default function initWorkers( options: WorkerOptions = {}, logger: Logger ) { - const { env = {}, maxWorkers = 5, proxyStdout = false } = options; + const { + env = {}, + maxWorkers = 5, + memoryLimitMb, + proxyStdout = false, + } = options; const workers = createPool( workerPath, { maxWorkers, env, + memoryLimitMb, proxyStdout, }, logger diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 630e334ab..31ecc0774 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -139,6 +139,7 @@ const createEngine = async ( resolvedWorkerPath, { maxWorkers: options.maxWorkers, + memoryLimitMb: defaultMemoryLimit, proxyStdout: options.proxyStdout, }, options.logger diff --git a/packages/engine-multi/src/test/worker-functions.ts b/packages/engine-multi/src/test/worker-functions.ts index 7c73af6d1..dca335cea 100644 --- a/packages/engine-multi/src/test/worker-functions.ts +++ b/packages/engine-multi/src/test/worker-functions.ts @@ -24,6 +24,7 @@ const tasks = { }, threadId: async () => threadId, processId: async () => process.pid, + getExecArgv: async () => process.execArgv, // very very simple intepretation of a run function // Most tests should use the mock-worker instead run: async (plan: ExecutionPlan, _input: any, _adaptorPaths: any) => { diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 7a306b974..48a268803 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -18,6 +18,7 @@ export type PoolOptions = { capacity?: number; // defaults to 5 maxWorkers?: number; // alias for capacity. Which is best? env?: Record; // default environment for workers + memoryLimitMb?: number; // --max-old-space-size for child processes proxyStdout?: boolean; // print internal stdout to console }; @@ -83,8 +84,13 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { let child: ChildProcess; if (!maybeChild) { // create a new child process and load the module script into it + const execArgv = ['--experimental-vm-modules', '--no-warnings']; + if (options.memoryLimitMb) { + execArgv.push(`--max-old-space-size=${options.memoryLimitMb}`); + } + child = fork(envPath, [script], { - execArgv: ['--experimental-vm-modules', '--no-warnings'], + execArgv, env: options.env || {}, diff --git a/packages/engine-multi/test/worker/pool.test.ts b/packages/engine-multi/test/worker/pool.test.ts index fe162d249..5a3b8d9dc 100644 --- a/packages/engine-multi/test/worker/pool.test.ts +++ b/packages/engine-multi/test/worker/pool.test.ts @@ -185,6 +185,30 @@ test('throw if memory limit is exceeded', async (t) => { } }); +test('child process should have --max-old-space-size when memoryLimitMb is set', async (t) => { + const pool = createPool(workerPath, { memoryLimitMb: 200 }, logger); + const execArgv = await pool.exec('getExecArgv', []); + t.true(execArgv.some((a: string) => a === '--max-old-space-size=200')); +}); + +test('child process should not have --max-old-space-size when memoryLimitMb is not set', async (t) => { + const pool = createPool(workerPath, {}, logger); + const execArgv = await pool.exec('getExecArgv', []); + t.false(execArgv.some((a: string) => a.includes('--max-old-space-size'))); +}); + +test('pool recovers after process-level OOM', async (t) => { + const pool = createPool(workerPath, { memoryLimitMb: 50 }, logger); + + await t.throwsAsync(() => pool.exec('blowMemory', [], { memoryLimitMb: 20 }), { + name: 'OOMError', + }); + + // Pool should still be functional after the OOM + const result = await pool.exec('test', [42]); + t.is(result, 42); +}); + test('handle weird exit', async (t) => { const pool = createPool(workerPath, {}, logger);