Skip to content

Commit f34b281

Browse files
authored
Merge pull request #1107 from OpenFn/async-child-process-kill
Async child process kill
2 parents 8632629 + 7e12e4b commit f34b281

File tree

7 files changed

+68
-29
lines changed

7 files changed

+68
-29
lines changed

.changeset/late-sloths-agree.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@openfn/engine-multi': patch
3+
---
4+
5+
Wait for child processes to exit properly when shutting down (shouldn't affect prod environments)

packages/engine-multi/src/engine.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ const createEngine = async (
229229
// How does this work if deferred?
230230
};
231231

232-
const destroy = (instant?: boolean) => closeWorkers(instant);
232+
const destroy = async (instant?: boolean) => closeWorkers(instant);
233233

234234
return Object.assign(engine, {
235235
options,

packages/engine-multi/src/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export type ExecutionContextOptions = ExecuteOptions & EngineOptions;
5757

5858
export interface EngineAPI extends EventEmitter {
5959
callWorker: CallWorker;
60-
closeWorkers: (instant?: boolean) => void;
60+
closeWorkers: (instant?: boolean) => Promise<void>;
6161
}
6262

6363
export interface RuntimeEngine {
@@ -74,7 +74,7 @@ export interface RuntimeEngine {
7474
options?: Partial<EngineOptions>
7575
): Pick<EventEmitter, 'on' | 'off' | 'once'>;
7676

77-
destroy(): void;
77+
destroy(instant?: boolean): Promise<void>;
7878

7979
on: (evt: string, fn: (...args: any[]) => void) => void;
8080
}

packages/engine-multi/src/worker/pool.ts

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,18 +265,53 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
265265
}
266266
};
267267

268-
const destroy = (immediate = false) => {
268+
const waitForWorkerExit = (
269+
worker: ChildProcess,
270+
forceKillTimeout = 5000
271+
): Promise<void> => {
272+
return new Promise((resolve) => {
273+
if (!worker || worker.killed || !worker.connected) {
274+
resolve();
275+
return;
276+
}
277+
278+
const timeout = setTimeout(() => {
279+
logger.debug('pool: force killing worker', worker.pid);
280+
worker.kill('SIGKILL');
281+
resolve();
282+
}, forceKillTimeout);
283+
284+
worker.once('exit', () => {
285+
clearTimeout(timeout);
286+
resolve();
287+
});
288+
289+
worker.kill();
290+
});
291+
};
292+
293+
const destroy = async (immediate = false): Promise<void> => {
269294
destroyed = true;
270295

296+
const killPromises: Promise<void>[] = [];
297+
271298
// Drain the pool
272299
while (pool.length) {
273-
killWorker(pool.pop()!);
300+
const worker = pool.pop();
301+
if (worker) {
302+
killPromises.push(waitForWorkerExit(worker));
303+
delete allWorkers[worker.pid!];
304+
}
274305
}
275306

276307
if (immediate) {
277-
Object.values(allWorkers).forEach(killWorker);
308+
Object.values(allWorkers).forEach((worker) => {
309+
killPromises.push(waitForWorkerExit(worker, 1000));
310+
delete allWorkers[worker.pid!];
311+
});
278312
}
279-
// TODO set a timeout and force any outstanding workers to die
313+
314+
await Promise.all(killPromises);
280315
};
281316

282317
const api = {

packages/engine-multi/test/api.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ import loadVersions from '../src/util/load-versions';
1212
const logger = createMockLogger(undefined, { level: 'debug' });
1313
let api: RuntimeEngine;
1414

15-
test.afterEach(() => {
15+
test.afterEach(async () => {
1616
logger._reset();
17-
api?.destroy();
17+
await api?.destroy();
1818
});
1919

2020
test.serial('create a default engine api without throwing', async (t) => {

packages/engine-multi/test/integration.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ let api: RuntimeEngine;
1111

1212
const emptyState = {};
1313

14-
test.afterEach(() => {
14+
test.afterEach(async () => {
1515
logger._reset();
16-
api.destroy();
16+
await api.destroy();
1717
});
1818

1919
// this tests the full API with the actual runtime

packages/engine-multi/test/worker/pool.test.ts

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ test('throw if memory limit is exceeded', async (t) => {
187187

188188
test('destroy should handle un-initialised workers', async (t) => {
189189
const pool = createPool(workerPath, { capacity: 10 }, logger);
190-
pool.destroy();
190+
await pool.destroy();
191191
t.is(pool._pool.length, 0);
192192
});
193193

@@ -201,7 +201,7 @@ test('destroy should close all child processes', async (t) => {
201201
const workers = Object.values(pool._allWorkers);
202202

203203
// now destroy it
204-
pool.destroy();
204+
await pool.destroy();
205205

206206
// check that every child is disconnected
207207
t.true(workers.every((child) => child.killed));
@@ -210,27 +210,26 @@ test('destroy should close all child processes', async (t) => {
210210
t.is(pool._pool.length, 0);
211211
});
212212

213-
test('destroy gracefully', (t) => {
214-
return new Promise((done) => {
215-
const pool = createPool(workerPath, {}, logger);
216-
const workers = Object.values(pool._allWorkers);
213+
test('destroy gracefully', async (t) => {
214+
const pool = createPool(workerPath, {}, logger);
215+
const workers = Object.values(pool._allWorkers);
217216

218-
t.is(pool._pool.length, 5);
217+
t.is(pool._pool.length, 5);
219218

220-
pool.exec('wait', [100]).then((result) => {
221-
t.is(result, 1);
222-
setTimeout(() => {
223-
t.true(workers.every((child) => child.killed));
224-
t.is(pool._pool.length, 0);
219+
const taskPromise = pool.exec('wait', [100]);
225220

226-
done();
227-
}, 1);
228-
});
221+
await pool.destroy();
229222

230-
pool.destroy();
223+
t.is(pool._pool.length, 0);
231224

232-
t.is(pool._pool.length, 0);
233-
});
225+
// The task should complete gracefully
226+
const result = await taskPromise;
227+
t.is(result, 1);
228+
229+
// Give a moment for cleanup to finish
230+
await new Promise((resolve) => setTimeout(resolve, 10));
231+
232+
t.true(workers.every((child) => child.killed));
234233
});
235234

236235
// TODO should the worker throw on sigterm?

0 commit comments

Comments
 (0)