-
Notifications
You must be signed in to change notification settings - Fork 16
Possible k8s OOM Kill prevention pill 2 - rlimit #1370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,10 +13,12 @@ import { | |
| import { HANDLED_EXIT_CODE } from '../events'; | ||
| import { Logger } from '@openfn/logger'; | ||
| import type { PayloadLimits } from './thread/runtime'; | ||
| import { detectPrlimitSupport, applyMemoryLimit } from './rlimit'; | ||
|
|
||
| export type PoolOptions = { | ||
| capacity?: number; // defaults to 5 | ||
| maxWorkers?: number; // alias for capacity. Which is best? | ||
| maxWorkerMemoryMb?: number; // process-level memory limit via RLIMIT_AS | ||
| env?: Record<string, string>; // default environment for workers | ||
|
|
||
| proxyStdout?: boolean; // print internal stdout to console | ||
|
|
@@ -71,6 +73,14 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { | |
| logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`); | ||
| let destroyed = false; | ||
|
|
||
| const hasPrlimit = detectPrlimitSupport(logger); | ||
|
|
||
| if (hasPrlimit && options.maxWorkerMemoryMb) { | ||
| logger.info( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would append this to the previous startup log - it'll be way more useful there |
||
| `pool: prlimit memory enforcement enabled | limit: ${options.maxWorkerMemoryMb}MB per child` | ||
| ); | ||
| } | ||
|
|
||
| // a pool of processes | ||
| const pool: ChildProcessPool = new Array(capacity).fill(false); | ||
|
|
||
|
|
@@ -101,6 +111,17 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { | |
|
|
||
| logger.debug('pool: Created new child process', child.pid); | ||
| allWorkers[child.pid!] = child; | ||
|
|
||
| if (hasPrlimit && options.maxWorkerMemoryMb) { | ||
| // RLIMIT_AS counts virtual address space, not RSS. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment doesn't belong here. We should just pass the mb limit into the
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'd also say: take the memory limit used in the run, add 10%, (20%?) and set that as the hard process limit. I don't really know why - I just feel like like we should let node control the exit itself, and use |
||
| // Node/V8 routinely reserves 4-8GB of virtual memory at startup | ||
| // (page table entries are cheap on 64-bit). We set a generous limit | ||
| // that only catches truly runaway allocations. | ||
| const limitBytes = Math.ceil( | ||
| (options.maxWorkerMemoryMb * 10 + 2048) * 1024 * 1024 | ||
| ); | ||
| applyMemoryLimit(child.pid!, limitBytes, logger); | ||
| } | ||
| } else { | ||
| child = maybeChild as ChildProcess; | ||
| logger.debug('pool: Using existing child process', child.pid); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| import { execFileSync } from 'node:child_process'; | ||
| import type { Logger } from '@openfn/logger'; | ||
|
|
||
| let prlimitAvailable: boolean | null = null; | ||
|
|
||
| /** | ||
| * Check if the prlimit command is available (Linux with util-linux). | ||
| * Result is cached for the process lifetime. | ||
| */ | ||
| export function detectPrlimitSupport(logger: Logger): boolean { | ||
| if (prlimitAvailable !== null) return prlimitAvailable; | ||
|
|
||
| try { | ||
| execFileSync('prlimit', ['--version'], { stdio: 'ignore' }); | ||
| prlimitAvailable = true; | ||
| logger.info('prlimit: memory enforcement available'); | ||
| } catch { | ||
| prlimitAvailable = false; | ||
| logger.debug('prlimit: not available (util-linux not installed)'); | ||
| } | ||
|
|
||
| return prlimitAvailable; | ||
| } | ||
|
|
||
| /** | ||
| * Apply RLIMIT_AS (virtual address space limit) to a child process. | ||
| * When exceeded, mmap/brk fails with ENOMEM, causing the process to crash. | ||
| */ | ||
| export function applyMemoryLimit( | ||
| pid: number, | ||
| limitBytes: number, | ||
| logger: Logger | ||
| ): boolean { | ||
| try { | ||
| execFileSync('prlimit', [ | ||
| '--pid', | ||
| String(pid), | ||
| `--as=${limitBytes}:${limitBytes}`, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to look into:
|
||
| ]); | ||
| logger.debug( | ||
| `prlimit: worker ${pid} RLIMIT_AS set to ${Math.round( | ||
| limitBytes / 1024 / 1024 | ||
| )}MB` | ||
| ); | ||
| return true; | ||
| } catch (e: any) { | ||
| logger.warn(`prlimit: failed to set limit for worker ${pid}:`, e.message); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| // Exported for testing only | ||
| export function _resetCache(): void { | ||
| prlimitAvailable = null; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| import test from 'ava'; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I don't know what these tests are going to tell us. Maybe this is something to do at the integration test level. Maybe it's more appropriate that we don't test this at all? |
||
| import { createMockLogger } from '@openfn/logger'; | ||
|
|
||
| import { | ||
| detectPrlimitSupport, | ||
| applyMemoryLimit, | ||
| _resetCache, | ||
| } from '../../src/worker/rlimit'; | ||
|
|
||
| const logger = createMockLogger(); | ||
|
|
||
| test.beforeEach(() => { | ||
| _resetCache(); | ||
| }); | ||
|
|
||
| test('detectPrlimitSupport caches the result across calls', (t) => { | ||
| const result1 = detectPrlimitSupport(logger); | ||
| const result2 = detectPrlimitSupport(logger); | ||
| t.is(result1, result2); | ||
| }); | ||
|
|
||
| // On macOS, prlimit is not available | ||
| const isLinux = process.platform === 'linux'; | ||
|
|
||
| if (!isLinux) { | ||
| test('detectPrlimitSupport returns false on non-Linux', (t) => { | ||
| const result = detectPrlimitSupport(logger); | ||
| t.false(result); | ||
| }); | ||
| } | ||
|
|
||
| test('applyMemoryLimit returns false when prlimit is not available', (t) => { | ||
| if (detectPrlimitSupport(logger)) { | ||
| t.pass('prlimit is available — skipping negative test'); | ||
| return; | ||
| } | ||
| const result = applyMemoryLimit(99999, 500 * 1024 * 1024, logger); | ||
| t.false(result); | ||
| }); | ||
|
|
||
| // Integration tests — only run on Linux with prlimit available | ||
| const hasPrlimit = isLinux && detectPrlimitSupport(createMockLogger()); | ||
| _resetCache(); // reset after the check so tests start clean | ||
|
|
||
| const prlimitTest = hasPrlimit ? test : test.skip; | ||
|
|
||
| prlimitTest('applyMemoryLimit succeeds on own process', (t) => { | ||
| // Apply a very generous limit to our own process (won't interfere with test) | ||
| const result = applyMemoryLimit(process.pid, 8 * 1024 * 1024 * 1024, logger); | ||
| t.true(result); | ||
| }); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just re-use the existing
memoryLimitMboptionHow that limit gets used in rgroups or heap memory settings or whatever is an implementation detail. The admin just needs to say "don't let any given job take more than 500mb"