Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ WORKDIR /app

# TODO: remove simple build once prod optimized build is working ---------------
FROM base AS ws-worker
RUN apk add --no-cache git
RUN apk add --no-cache git util-linux
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile
RUN pnpm build
WORKDIR /app/packages/ws-worker
Expand Down
9 changes: 8 additions & 1 deletion packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export type WorkerEvent = {

type WorkerOptions = {
maxWorkers?: number;
maxWorkerMemoryMb?: number; // kernel-level memory limit per child process (cgroup v2)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just re-use the existing memoryLimitMb option

How that limit gets used in rgroups or heap memory settings or whatever is an implementation detail. The admin just needs to say "don't let any given job take more than 500mb"

env?: any;
timeout?: number; // ms
proxyStdout?: boolean; // print internal stdout to console
Expand All @@ -22,12 +23,18 @@ export default function initWorkers(
options: WorkerOptions = {},
logger: Logger
) {
const { env = {}, maxWorkers = 5, proxyStdout = false } = options;
const {
env = {},
maxWorkers = 5,
maxWorkerMemoryMb,
proxyStdout = false,
} = options;

const workers = createPool(
workerPath,
{
maxWorkers,
maxWorkerMemoryMb,
env,
proxyStdout,
},
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ const createEngine = async (
resolvedWorkerPath,
{
maxWorkers: options.maxWorkers,
maxWorkerMemoryMb: defaultMemoryLimit,
proxyStdout: options.proxyStdout,
},
options.logger
Expand Down
21 changes: 21 additions & 0 deletions packages/engine-multi/src/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import {
import { HANDLED_EXIT_CODE } from '../events';
import { Logger } from '@openfn/logger';
import type { PayloadLimits } from './thread/runtime';
import { detectPrlimitSupport, applyMemoryLimit } from './rlimit';

export type PoolOptions = {
capacity?: number; // defaults to 5
maxWorkers?: number; // alias for capacity. Which is best?
maxWorkerMemoryMb?: number; // process-level memory limit via RLIMIT_AS
env?: Record<string, string>; // default environment for workers

proxyStdout?: boolean; // print internal stdout to console
Expand Down Expand Up @@ -71,6 +73,14 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`);
let destroyed = false;

const hasPrlimit = detectPrlimitSupport(logger);

if (hasPrlimit && options.maxWorkerMemoryMb) {
logger.info(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would append this to the previous startup log - it'll be way more useful there

`pool: prlimit memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child`
);
}

// a pool of processes
const pool: ChildProcessPool = new Array(capacity).fill(false);

Expand Down Expand Up @@ -101,6 +111,17 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {

logger.debug('pool: Created new child process', child.pid);
allWorkers[child.pid!] = child;

if (hasPrlimit && options.maxWorkerMemoryMb) {
// RLIMIT_AS counts virtual address space, not RSS.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't belong here. We should just pass the mb limit into the applyMemoryLimit function

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd also say: take the memory limit used in the run, add 10%, (20%?) and set that as the hard process limit.

I don't really know why - I just feel like like we should let node control the exit itself, and use limit as a hard fallback

// Node/V8 routinely reserves 4-8GB of virtual memory at startup
// (page table entries are cheap on 64-bit). We set a generous limit
// that only catches truly runaway allocations.
const limitBytes = Math.ceil(
(options.maxWorkerMemoryMb * 10 + 2048) * 1024 * 1024
);
applyMemoryLimit(child.pid!, limitBytes, logger);
}
} else {
child = maybeChild as ChildProcess;
logger.debug('pool: Using existing child process', child.pid);
Expand Down
55 changes: 55 additions & 0 deletions packages/engine-multi/src/worker/rlimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { execFileSync } from 'node:child_process';
import type { Logger } from '@openfn/logger';

let prlimitAvailable: boolean | null = null;

/**
* Check if the prlimit command is available (Linux with util-linux).
* Result is cached for the process lifetime.
*/
export function detectPrlimitSupport(logger: Logger): boolean {
if (prlimitAvailable !== null) return prlimitAvailable;

try {
execFileSync('prlimit', ['--version'], { stdio: 'ignore' });
prlimitAvailable = true;
logger.info('prlimit: memory enforcement available');
} catch {
prlimitAvailable = false;
logger.debug('prlimit: not available (util-linux not installed)');
}

return prlimitAvailable;
}

/**
* Apply RLIMIT_AS (virtual address space limit) to a child process.
* When exceeded, mmap/brk fails with ENOMEM, causing the process to crash.
*/
export function applyMemoryLimit(
pid: number,
limitBytes: number,
logger: Logger
): boolean {
try {
execFileSync('prlimit', [
'--pid',
String(pid),
`--as=${limitBytes}:${limitBytes}`,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to look into:

  • should the soft and hard limit be the same?
  • should we be setting address space or RSS? or both?

]);
logger.debug(
`prlimit: worker ${pid} RLIMIT_AS set to ${Math.round(
limitBytes / 1024 / 1024
)}MB`
);
return true;
} catch (e: any) {
logger.warn(`prlimit: failed to set limit for worker ${pid}:`, e.message);
return false;
}
}

// Exported for testing only
export function _resetCache(): void {
prlimitAvailable = null;
}
51 changes: 51 additions & 0 deletions packages/engine-multi/test/worker/rlimit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import test from 'ava';
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't know what these tests are going to tell us. Maybe this is something to do at the integration test level. Maybe it's more appropriate that we don't test this at all?

import { createMockLogger } from '@openfn/logger';

import {
detectPrlimitSupport,
applyMemoryLimit,
_resetCache,
} from '../../src/worker/rlimit';

const logger = createMockLogger();

test.beforeEach(() => {
_resetCache();
});

test('detectPrlimitSupport caches the result across calls', (t) => {
const result1 = detectPrlimitSupport(logger);
const result2 = detectPrlimitSupport(logger);
t.is(result1, result2);
});

// On macOS, prlimit is not available
const isLinux = process.platform === 'linux';

if (!isLinux) {
test('detectPrlimitSupport returns false on non-Linux', (t) => {
const result = detectPrlimitSupport(logger);
t.false(result);
});
}

test('applyMemoryLimit returns false when prlimit is not available', (t) => {
if (detectPrlimitSupport(logger)) {
t.pass('prlimit is available — skipping negative test');
return;
}
const result = applyMemoryLimit(99999, 500 * 1024 * 1024, logger);
t.false(result);
});

// Integration tests — only run on Linux with prlimit available
const hasPrlimit = isLinux && detectPrlimitSupport(createMockLogger());
_resetCache(); // reset after the check so tests start clean

const prlimitTest = hasPrlimit ? test : test.skip;

prlimitTest('applyMemoryLimit succeeds on own process', (t) => {
// Apply a very generous limit to our own process (won't interfere with test)
const result = applyMemoryLimit(process.pid, 8 * 1024 * 1024 * 1024, logger);
t.true(result);
});