Skip to content

Commit 90eee49

Browse files
authored
Merge pull request #1110 from OpenFn/fix-childprocess-exceptions
Fix child process exceptions
2 parents b18e8a6 + fb5c232 commit 90eee49

6 files changed

Lines changed: 42 additions & 15 deletions

File tree

.changeset/fifty-pears-strive.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@openfn/engine-multi': patch
3+
---
4+
5+
Fix an issue where an empty stderr buffer from a crashed run could fail to return, triggering Lost

packages/engine-multi/src/test/worker-functions.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ const tasks = {
5656
throw new Error('test_error');
5757
},
5858

59+
weirdExit: async () => {
60+
// https://www.youtube.com/watch?v=Z2cXRtblqjQ
61+
process.exit(72);
62+
},
63+
5964
// Experiments with freezing the global scope
6065
// We may do this in the actual worker
6166
freeze: async () => {

packages/engine-multi/src/worker/pool.ts

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ export const returnToPool = (
6767
// creates a new pool of workers which use the same script
6868
function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
6969
const capacity = options.capacity || options.maxWorkers || 5;
70-
7170
logger.debug(`pool: Creating new child process pool | capacity: ${capacity}`);
7271
let destroyed = false;
7372

@@ -144,7 +143,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
144143
// TODO what should we do if a process in the pool dies, perhaps due to OOM?
145144
const onExit = async (code: number) => {
146145
if (code !== HANDLED_EXIT_CODE) {
147-
logger.debug('pool: Worker exited unexpectedly');
146+
logger.debug(`pool: Worker exited unexpectedly with code ${code}`);
148147
clearTimeout(timeout);
149148

150149
// Read the stderr stream from the worked to see if this looks like an OOM error
@@ -154,22 +153,22 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
154153
});
155154

156155
try {
157-
for await (const line of rl) {
158-
logger.debug(line);
159-
if (line.match(/JavaScript heap out of memory/)) {
160-
reject(new OOMError());
161-
162-
killWorker(worker);
163-
// restore a placeholder to the queue
164-
finish(false);
165-
return;
156+
if (worker.stderr && worker.stderr?.readableLength > 0) {
157+
for await (const line of rl) {
158+
if (line.match(/JavaScript heap out of memory/)) {
159+
killWorker(worker);
160+
// restore a placeholder to the queue
161+
finish(false);
162+
reject(new OOMError());
163+
return;
164+
}
166165
}
167166
}
168167
} catch (e) {
169168
// do nothing
170169
}
171-
reject(new ExitError(code));
172170
finish(worker);
171+
reject(new ExitError(code));
173172
}
174173
};
175174

@@ -224,8 +223,6 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
224223
worker.on('exit', onExit);
225224

226225
worker.on('message', (evt: any) => {
227-
// TODO I think here we may have to decode the payload
228-
229226
// forward the message out of the pool
230227
opts.on?.(evt);
231228

packages/engine-multi/src/worker/thread/runtime.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,16 @@ const run = (task: string, args: any[], options: Options = {}) => {
8080
});
8181
};
8282

83+
process.on('exit', (code) => {
84+
publish(ENGINE_REJECT_TASK, {
85+
error: {
86+
name: 'UNEXPECTED_EXIT',
87+
message: `worker thread exited with code ${code}`,
88+
severity: 'crash',
89+
},
90+
});
91+
});
92+
8393
parentPort!.on('message', async (evt) => {
8494
if (evt.type === ENGINE_RUN_TASK) {
8595
const args = evt.args || [];

packages/engine-multi/test/worker/pool.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,16 @@ test('throw if memory limit is exceeded', async (t) => {
185185
}
186186
});
187187

188+
test('handle weird exit', async (t) => {
189+
const pool = createPool(workerPath, {}, logger);
190+
191+
try {
192+
await pool.exec('weirdExit', []);
193+
} catch (e: any) {
194+
t.is(e.message, 'worker thread exited with code 72');
195+
}
196+
});
197+
188198
test('destroy should handle un-initialised workers', async (t) => {
189199
const pool = createPool(workerPath, { capacity: 10 }, logger);
190200
await pool.destroy();

packages/ws-worker/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ This will wrap a real runtime engine into the server. It will rebuild when the W
5454

5555
### Disabling auto-fetch
5656

57-
When working in dev it is convinient to disable the workloop. To switch it off, run:
57+
When working in dev it is convenient to disable the workloop. To switch it off, run:
5858

5959
```
6060
pnpm start --no-loop

0 commit comments

Comments
 (0)