Skip to content

Commit 14a0eca

Browse files
authored
worker: don't destroy until all outstanding claims have come home (#1044)
* worker: don't destroy until all outstanding claims have come home * version@1.15.3 * fix test * types * fix more tests
1 parent 98dce27 commit 14a0eca

11 files changed

Lines changed: 189 additions & 58 deletions

File tree

packages/lightning-mock/src/server.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,13 @@ const createLightningServer = (options: LightningOptions = {}) => {
117117

118118
app.use(createRestAPI(app as any, state, logger, api));
119119

120-
app.destroy = () => {
121-
server.close();
122-
api.close();
123-
};
120+
app.destroy = () =>
121+
new Promise<void>(async (resolve) => {
122+
api.close();
123+
server.close(() => {
124+
resolve();
125+
});
126+
});
124127
return app;
125128
};
126129

packages/lightning-mock/src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export type DevServer = Koa & {
1414
addCredential(id: string, cred: Credential): void;
1515
addDataclip(id: string, data: DataClip): void;
1616
enqueueRun(run: LightningPlan): void;
17-
destroy: () => void;
17+
destroy: () => Promise<void>;
1818
getRun(id: string): LightningPlan;
1919
getCredential(id: string): Credential;
2020
getDataclip(id: string): DataClip;

packages/ws-worker/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# ws-worker
22

3+
## 1.15.3
4+
5+
### Patch Changes
6+
7+
- 5688813: Allow the worker to shutdown gracefully while claims are still in-flight. Runs will be completed before the server closes
8+
39
## 1.15.2
410

511
### Patch Changes

packages/ws-worker/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/ws-worker",
3-
"version": "1.15.2",
3+
"version": "1.15.3",
44
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
55
"main": "dist/index.js",
66
"type": "module",

packages/ws-worker/src/api/claim.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ import * as jose from 'jose';
44

55
import { Logger, createMockLogger } from '@openfn/logger';
66
import { ClaimPayload, ClaimReply } from '@openfn/lexicon/lightning';
7-
import { CLAIM } from '../events';
7+
import {
8+
CLAIM,
9+
INTERNAL_CLAIM_COMPLETE,
10+
INTERNAL_CLAIM_START,
11+
} from '../events';
812

913
import type { ServerApp } from '../server';
1014

@@ -95,6 +99,7 @@ const claim = (
9599

96100
logger.debug(`requesting run (capacity ${activeWorkers}/${maxWorkers})`);
97101

102+
app.events.emit(INTERNAL_CLAIM_START);
98103
const start = Date.now();
99104
app.queueChannel
100105
.push<ClaimPayload>(CLAIM, {
@@ -109,10 +114,9 @@ const claim = (
109114
runs.length ? runs.map((r) => r.id).join(',') : '-'
110115
})`
111116
);
112-
// TODO what if we get here after we've been cancelled?
113-
// the events have already been claimed...
114117

115118
if (!runs?.length) {
119+
app.events.emit(INTERNAL_CLAIM_COMPLETE, { runs });
116120
// throw to backoff and try again
117121
return reject(new Error('No runs returned'));
118122
}
@@ -135,8 +139,10 @@ const claim = (
135139

136140
logger.debug(`${podName} starting run ${run.id}`);
137141
app.execute(run);
138-
resolve();
139142
});
143+
// Don't trigger claim complete until all runs are registered
144+
resolve();
145+
app.events.emit(INTERNAL_CLAIM_COMPLETE, { runs });
140146
})
141147
// TODO need implementations for both of these really
142148
// What do we do if we fail to join the worker channel?

packages/ws-worker/src/api/destroy.ts

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ServerApp } from '../server';
2-
import { INTERNAL_RUN_COMPLETE } from '../events';
2+
import { INTERNAL_CLAIM_COMPLETE, INTERNAL_RUN_COMPLETE } from '../events';
33

44
import type { Logger } from '@openfn/logger';
55

@@ -15,7 +15,6 @@ const destroy = async (app: ServerApp, logger: Logger) => {
1515

1616
// Immediately stop asking for more work
1717
app.workloop?.stop('server closed');
18-
app.queueChannel?.leave();
1918

2019
// Shut down the HTTP server
2120
app.server.close(async () => {
@@ -24,8 +23,9 @@ const destroy = async (app: ServerApp, logger: Logger) => {
2423
}),
2524
new Promise<void>(async (resolve) => {
2625
// Let any active runs complete
27-
await waitForRuns(app, logger);
26+
await waitForRunsAndClaims(app, logger);
2827

28+
app.queueChannel?.leave();
2929
// Kill the engine and socket
3030
await app.engine.destroy();
3131
app.socket?.disconnect();
@@ -37,27 +37,38 @@ const destroy = async (app: ServerApp, logger: Logger) => {
3737
logger.success('Server closed');
3838
};
3939

40-
const waitForRuns = (app: ServerApp, logger: Logger) =>
40+
const waitForRunsAndClaims = (app: ServerApp, logger: Logger) =>
4141
new Promise<void>((resolve) => {
4242
const log = () => {
4343
logger.debug(
44-
`Waiting for ${Object.keys(app.workflows).length} runs to complete...`
44+
`Waiting for ${Object.keys(app.workflows).length} runs and ${
45+
Object.keys(app.openClaims).length
46+
} claims to complete...`
4547
);
4648
};
4749

48-
const onRunComplete = () => {
49-
if (Object.keys(app.workflows).length === 0) {
50+
const checkAllClear = () => {
51+
if (
52+
Object.keys(app.workflows).length +
53+
Object.keys(app.openClaims).length ===
54+
0
55+
) {
5056
logger.debug('All runs completed!');
51-
app.events.off(INTERNAL_RUN_COMPLETE, onRunComplete);
57+
app.events.off(INTERNAL_RUN_COMPLETE, checkAllClear);
58+
app.events.off(INTERNAL_CLAIM_COMPLETE, checkAllClear);
5259
resolve();
5360
} else {
5461
log();
5562
}
5663
};
5764

58-
if (Object.keys(app.workflows).length) {
65+
if (
66+
Object.keys(app.workflows).length ||
67+
Object.keys(app.openClaims).length
68+
) {
5969
log();
60-
app.events.on(INTERNAL_RUN_COMPLETE, onRunComplete);
70+
app.events.on(INTERNAL_RUN_COMPLETE, checkAllClear);
71+
app.events.on(INTERNAL_CLAIM_COMPLETE, checkAllClear);
6172
} else {
6273
logger.debug('No active runs detected, closing immediately');
6374
resolve();

packages/ws-worker/src/events.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ export const RUN_LOG = 'run:log';
1414
export const STEP_START = 'step:start';
1515
export const STEP_COMPLETE = 'step:complete';
1616
export const INTERNAL_RUN_COMPLETE = 'server:run-complete';
17+
export const INTERNAL_CLAIM_START = 'server:claim-start';
18+
export const INTERNAL_CLAIM_COMPLETE = 'server:claim-complete';
19+
export const INTERNAL_SOCKET_READY = 'server:socket-ready';
1720

1821
export type QueueEvents = {
1922
[CLAIM]: l.ClaimPayload;

packages/ws-worker/src/server.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ import Router from '@koa/router';
1111
import { humanId } from 'human-id';
1212
import { createMockLogger, Logger } from '@openfn/logger';
1313
import { ClaimRun } from '@openfn/lexicon/lightning';
14-
import { INTERNAL_RUN_COMPLETE, WORK_AVAILABLE } from './events';
14+
import {
15+
INTERNAL_RUN_COMPLETE,
16+
INTERNAL_SOCKET_READY,
17+
WORK_AVAILABLE,
18+
} from './events';
1519
import destroy from './api/destroy';
1620
import startWorkloop, { Workloop } from './api/workloop';
1721
import claim from './api/claim';
@@ -71,6 +75,9 @@ export interface ServerApp extends Koa {
7175
execute: ({ id, token }: ClaimRun) => Promise<void>;
7276
destroy: () => void;
7377
resumeWorkloop: () => void;
78+
79+
// debug API
80+
claim: () => Promise<any>;
7481
}
7582

7683
type SocketAndChannel = {
@@ -112,6 +119,7 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
112119
logger.break();
113120
}
114121

122+
app.events.emit(INTERNAL_SOCKET_READY);
115123
app.resumeWorkloop();
116124
};
117125

@@ -147,9 +155,11 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
147155
// handles messages for the worker:queue
148156
const onMessage = (event: string) => {
149157
if (event === WORK_AVAILABLE) {
150-
claim(app, logger, { maxWorkers: options.maxWorkflows }).catch(() => {
151-
// do nothing - it's fine if claim throws here
152-
});
158+
if (!app.destroyed) {
159+
claim(app, logger, { maxWorkers: options.maxWorkflows }).catch(() => {
160+
// do nothing - it's fine if claim throws here
161+
});
162+
}
153163
}
154164
};
155165

@@ -351,6 +361,12 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
351361
});
352362
});
353363

364+
app.claim = () => {
365+
return claim(app, logger, {
366+
maxWorkers: options.maxWorkflows,
367+
});
368+
};
369+
354370
app.destroy = () => destroy(app, logger);
355371

356372
app.use(router.routes());

packages/ws-worker/test/api/claim.test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { createMockLogger } from '@openfn/logger';
88
import { ServerApp } from '../../src/server';
99
import { mockChannel } from '../../src/mock/sockets';
1010
import { CLAIM } from '../../src';
11+
import EventEmitter from 'node:events';
1112

1213
let keys = { public: '.', private: '.' };
1314

@@ -144,10 +145,11 @@ const createMockApp = (opts: any) => {
144145
openClaims: {},
145146
workflows,
146147
queueChannel: channel,
147-
execute: (...args) => {
148+
execute: (...args: any) => {
148149
onExecute(...args);
149150
},
150-
} as ServerApp;
151+
events: new EventEmitter(),
152+
} as unknown as ServerApp;
151153
};
152154
const logger = createMockLogger();
153155

0 commit comments

Comments
 (0)