Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/five-months-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/engine-multi': patch
'@openfn/ws-worker': patch
---

Enforce run memory limits at the child_process level
9 changes: 8 additions & 1 deletion packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type WorkerOptions = {
maxWorkers?: number;
env?: any;
timeout?: number; // ms
memoryLimitMb?: number;
proxyStdout?: boolean; // print internal stdout to console
};

Expand All @@ -22,13 +23,19 @@ export default function initWorkers(
options: WorkerOptions = {},
logger: Logger
) {
const { env = {}, maxWorkers = 5, proxyStdout = false } = options;
const {
env = {},
maxWorkers = 5,
memoryLimitMb,
proxyStdout = false,
} = options;

const workers = createPool(
workerPath,
{
maxWorkers,
env,
memoryLimitMb,
proxyStdout,
},
logger
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ const createEngine = async (
resolvedWorkerPath,
{
maxWorkers: options.maxWorkers,
memoryLimitMb: defaultMemoryLimit,
proxyStdout: options.proxyStdout,
},
options.logger
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/test/worker-functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const tasks = {
},
threadId: async () => threadId,
processId: async () => process.pid,
getExecArgv: async () => process.execArgv,
// very very simple intepretation of a run function
// Most tests should use the mock-worker instead
run: async (plan: ExecutionPlan, _input: any, _adaptorPaths: any) => {
Expand Down
8 changes: 7 additions & 1 deletion packages/engine-multi/src/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type PoolOptions = {
capacity?: number; // defaults to 5
maxWorkers?: number; // alias for capacity. Which is best?
env?: Record<string, string>; // default environment for workers
memoryLimitMb?: number; // --max-old-space-size for child processes

proxyStdout?: boolean; // print internal stdout to console
};
Expand Down Expand Up @@ -83,8 +84,13 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
let child: ChildProcess;
if (!maybeChild) {
// create a new child process and load the module script into it
const execArgv = ['--experimental-vm-modules', '--no-warnings'];
if (options.memoryLimitMb) {
execArgv.push(`--max-old-space-size=${options.memoryLimitMb}`);
}

child = fork(envPath, [script], {
execArgv: ['--experimental-vm-modules', '--no-warnings'],
execArgv,

env: options.env || {},

Expand Down
24 changes: 24 additions & 0 deletions packages/engine-multi/test/worker/pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,30 @@ test('throw if memory limit is exceeded', async (t) => {
}
});

test('child process should have --max-old-space-size when memoryLimitMb is set', async (t) => {
const pool = createPool(workerPath, { memoryLimitMb: 200 }, logger);
const execArgv = await pool.exec('getExecArgv', []);
t.true(execArgv.some((a: string) => a === '--max-old-space-size=200'));
});

test('child process should not have --max-old-space-size when memoryLimitMb is not set', async (t) => {
const pool = createPool(workerPath, {}, logger);
const execArgv = await pool.exec('getExecArgv', []);
t.false(execArgv.some((a: string) => a.includes('--max-old-space-size')));
});

test('pool recovers after process-level OOM', async (t) => {
const pool = createPool(workerPath, { memoryLimitMb: 50 }, logger);

await t.throwsAsync(() => pool.exec('blowMemory', [], { memoryLimitMb: 20 }), {
name: 'OOMError',
});

// Pool should still be functional after the OOM
const result = await pool.exec('test', [42]);
t.is(result, 42);
});

test('handle weird exit', async (t) => {
const pool = createPool(workerPath, {}, logger);

Expand Down