Skip to content

Commit b61bf9b

Browse files
authored
Attempt to stabilise worker memory (#1099)
* purge staes cache * remove state entirely - just use context * tidy up and tweak test * restructure * add timestamps to events * performance tests * tidy and changeset * comments
1 parent 3b80c47 commit b61bf9b

File tree

14 files changed

+603
-84
lines changed

14 files changed

+603
-84
lines changed

.changeset/ripe-wolves-sniff.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@openfn/engine-multi': patch
3+
'@openfn/ws-worker': patch
4+
---
5+
6+
Fix an issue where memory may not be released after runs

packages/engine-multi/src/api/lifecycle.ts

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,9 @@ export const workflowComplete = (
5252
const { workflowId, state: result, threadId } = event;
5353

5454
logger.success('complete workflow ', workflowId);
55-
//logger.info(event.state);
56-
57-
// TODO I don't know how we'd get here in this architecture
58-
// if (!allWorkflows.has(workflowId)) {
59-
// throw new Error(`Workflow with id ${workflowId} is not defined`);
60-
// }
61-
6255
state.status = 'done';
6356
state.duration = Date.now() - state.startTime!;
6457

65-
// Important! We do NOT write the result back to this state object
66-
// It has a tendency to not get garbage collected and causing memory problems
67-
68-
// TODO do we have to remove this from the active workflows array?
69-
// const idx = activeWorkflows.findIndex((id) => id === workflowId);
70-
// activeWorkflows.splice(idx, 1);
71-
7258
// forward the event on to any external listeners
7359
context.emit(externalEvents.WORKFLOW_COMPLETE, {
7460
threadId,

packages/engine-multi/src/engine.ts

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import type {
2323
EventHandler,
2424
ExecuteOptions,
2525
RuntimeEngine,
26-
WorkflowState,
2726
} from './types';
2827
import type { AutoinstallOptions } from './api/autoinstall';
2928

@@ -100,7 +99,6 @@ const createEngine = async (
10099
options: EngineOptions,
101100
workerPath?: string
102101
): Promise<InternalEngine> => {
103-
const states: Record<string, WorkflowState> = {};
104102
const contexts: Record<string, ExecutionContext> = {};
105103
const deferredListeners: Record<string, Record<string, EventHandler>[]> = {};
106104

@@ -144,17 +142,6 @@ const createEngine = async (
144142
retries: options.workerValidationRetries,
145143
});
146144

147-
const registerWorkflow = (plan: ExecutionPlan, input: State) => {
148-
// TODO throw if already registered?
149-
const state = createState(plan, input);
150-
states[state.id] = state;
151-
return state;
152-
};
153-
154-
const getWorkflowState = (workflowId: string) => states[workflowId];
155-
156-
const getWorkflowStatus = (workflowId: string) => states[workflowId]?.status;
157-
158145
// TODO too much logic in this execute function, needs farming out
159146
// I don't mind having a wrapper here but it must be super thin
160147
// TODO maybe engine options is too broad?
@@ -165,13 +152,9 @@ const createEngine = async (
165152
) => {
166153
options.logger!.debug('executing plan ', plan?.id ?? '<no id>');
167154
const workflowId = plan.id!;
168-
// TODO throw if plan is invalid
169-
// Wait, don't throw because the server will die
170-
// Maybe return null instead
171-
const state = registerWorkflow(plan, input);
172155

173156
const context = new ExecutionContext({
174-
state,
157+
state: createState(plan, input),
175158
logger: options.logger!,
176159
callWorker,
177160
options: {
@@ -244,9 +227,6 @@ const createEngine = async (
244227
options,
245228
workerPath: resolvedWorkerPath,
246229
logger: options.logger,
247-
registerWorkflow,
248-
getWorkflowState,
249-
getWorkflowStatus,
250230
execute: executeWrapper,
251231
listen,
252232
destroy,

packages/engine-multi/test/engine.test.ts

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import test from 'ava';
22
import path from 'node:path';
33
import { createMockLogger } from '@openfn/logger';
4-
import type { ExecutionPlan } from '@openfn/lexicon';
54

65
import createEngine, { InternalEngine } from '../src/engine';
76
import * as e from '../src/events';
@@ -53,28 +52,6 @@ test.serial('create an engine', async (t) => {
5352

5453
test.todo('throw if the worker is invalid');
5554

56-
test.serial('register a workflow', async (t) => {
57-
const plan = { id: 'z' };
58-
engine = await createEngine(options);
59-
60-
const state = engine.registerWorkflow(plan);
61-
62-
t.is(state.status, 'pending');
63-
t.is(state.id, plan.id);
64-
t.deepEqual(state.plan, plan);
65-
});
66-
67-
test.serial('get workflow state', async (t) => {
68-
const plan = { id: 'z' } as ExecutionPlan;
69-
engine = await createEngine(options);
70-
71-
const s = engine.registerWorkflow(plan);
72-
73-
const state = engine.getWorkflowState(plan.id);
74-
75-
t.deepEqual(state, s);
76-
});
77-
7855
test.serial('use the default worker path', async (t) => {
7956
engine = await createEngine({ logger, repoDir: '.' });
8057
t.true(engine.workerPath.endsWith('worker/thread/run.js'));

packages/engine-multi/test/memtest.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// run this file with limited memory
22
// --max-old-space-size=50 or whatever
33
// NODE_OPTIONS="--max-old-space-size=50"
4-
4+
import { getHeapStatistics } from 'node:v8';
55
import { createMockLogger } from '@openfn/logger';
66
import { randomUUID } from 'node:crypto';
77
import createAPI from '../src/api';
@@ -11,10 +11,16 @@ const logger = createMockLogger();
1111

1212
let api: any;
1313

14+
function heap(reason: string) {
15+
const { used_heap_size } = getHeapStatistics();
16+
const mb = used_heap_size / 1024 / 1024;
17+
console.log(`>> [${reason}] Used heap at ${mb.toFixed(2)}mb`);
18+
}
19+
1420
function run() {
1521
const job = `
1622
export default [(state) => {
17-
state.data = new Array(1024 * 1024 * 4).fill('z').join('')
23+
state.data = new Array(1024 * 1024 * 7).fill('z').join('')
1824
return state
1925
}]`;
2026

@@ -29,29 +35,39 @@ function run() {
2935
},
3036
options: {},
3137
};
32-
console.log('>> running', plan.id);
38+
// console.log('>> running', plan.id);
3339

3440
api.execute(plan, {});
3541

3642
api.listen(plan.id!, {
3743
'workflow-complete': () => {
3844
completedCount++;
45+
heap('workflow-complete');
3946
console.log('>> Finished', completedCount);
4047

41-
setTimeout(() => {
42-
run();
43-
}, 10);
48+
// setTimeout(() => {
49+
// run();
50+
// }, 10);
4451
},
4552
});
4653
}
4754

55+
const runBatch = () => {
56+
for (let i = 0; i < 4; i++) {
57+
run();
58+
}
59+
};
60+
4861
async function start() {
4962
api = await createAPI({
5063
logger,
51-
maxWorkers: 1,
64+
maxWorkers: 4,
5265
});
5366

54-
run();
67+
runBatch();
68+
setInterval(() => {
69+
runBatch();
70+
}, 200);
5571
}
5672

5773
start();
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// WORKER_DISABLE_EXIT_LISTENERS=true clinic heapprofiler -- node clinic.js 1 --open false --collect-only
2+
// start lightning: pnpm start --port 9991
3+
import { getHeapStatistics } from 'node:v8';
4+
import createWorker from './dist/index.js';
5+
import createRTE from '@openfn/engine-multi';
6+
import createLightningServer from '@openfn/lightning-mock';
7+
8+
import payload from './payload.json' with { type: 'json' };
9+
import { createMockLogger } from '@openfn/logger';
10+
const obj = JSON.stringify({
11+
data: payload.data,
12+
13+
// Only send part of the payload or the mock will throw an error
14+
// This is enough data to prove the point
15+
references: payload.references.slice(0,10)
16+
})
17+
18+
let lng;
19+
let worker;
20+
let idgen = 1;
21+
22+
const WORKFLOW_COUNT = 1e6;
23+
let workflowsFinished = 0;
24+
25+
setInterval(() => {
26+
heap('POLL');
27+
}, 1000);
28+
29+
await setup();
30+
heap('SETUP');
31+
await test();
32+
33+
34+
function heap(reason) {
35+
const { used_heap_size } = getHeapStatistics();
36+
const mb = used_heap_size / 1024 / 1024;
37+
console.log(`>> [${reason}] Used heap at ${mb.toFixed(2)}mb`);
38+
}
39+
40+
// start the server
41+
async function setup() {
42+
const engineOptions = {
43+
repoDir: process.env.OPENFN_REPO_DIR,
44+
maxWorkers: 4,
45+
logger: createMockLogger()
46+
};
47+
48+
const engine = await createRTE(engineOptions);
49+
50+
worker = createWorker(engine, {
51+
port: 9992,
52+
lightning: 'ws://localhost:9991/worker',
53+
maxWorkflows: 4,
54+
backoff: {
55+
min: 10,
56+
max: 5e4,
57+
},
58+
logger: createMockLogger()
59+
});
60+
61+
worker.on('workflow-complete', (evt) => {
62+
heap('WORKFLOW:COMPLETE');
63+
if (++workflowsFinished === WORKFLOW_COUNT) {
64+
console.log('>> all done!');
65+
// console.log('>> Hit CTRL+C to exit and generate heap profile');
66+
// process.send('SIGINT');
67+
// process.abort();
68+
process.exit(0);
69+
}
70+
});
71+
}
72+
73+
// send a bunch of jobs through
74+
async function test() {
75+
const sleep = (duration = 100) =>
76+
new Promise((resolve) => setTimeout(resolve, duration));
77+
78+
let count = 0;
79+
const max = 1;
80+
81+
while (count++ < WORKFLOW_COUNT) {
82+
const w = wf();
83+
await fetch(`http://localhost:9991/run`, {
84+
method: 'POST',
85+
body: JSON.stringify(w),
86+
headers: {
87+
'content-type': 'application/json',
88+
},
89+
keepalive: true,
90+
91+
});
92+
// await sleep(5 * 1000);
93+
await sleep(500);
94+
}
95+
}
96+
97+
98+
99+
function wf() {
100+
const step = `
101+
export default [() => {
102+
return ${obj};
103+
}]`;
104+
return {
105+
id: `run-${idgen++}`,
106+
triggers: [],
107+
edges: [],
108+
jobs: [
109+
{
110+
id: 'a',
111+
body: step,
112+
},
113+
],
114+
};
115+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { getHeapStatistics } from 'node:v8';
2+
3+
function heap(reason) {
4+
const { used_heap_size } = getHeapStatistics();
5+
const mb = used_heap_size / 1024 / 1024;
6+
console.log(`>> [${reason}] Used heap at ${mb.toFixed(2)}mb`);
7+
}
8+
9+
heap('start');
10+
setTimeout(() => {
11+
heap('end');
12+
}, 1000);

packages/ws-worker/perf/mem-large.js

Lines changed: 219 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)