Skip to content

Commit 836433d

Browse files
authored
Merge pull request #1112 from OpenFn/lost-run-on-claim
worker: fix an edge case where a run can be lost between claim and st…
2 parents 5d2ccdf + 9e553db commit 836433d

3 files changed

Lines changed: 69 additions & 10 deletions

File tree

.changeset/bumpy-sloths-act.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@openfn/ws-worker': patch
3+
---
4+
5+
Fix an edge case where the server can shut down in-between claim and start (lost runs edge case)

packages/ws-worker/src/api/claim.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ const claim = (
113113
demand,
114114
worker_name: NAME || null,
115115
})
116-
.receive('ok', ({ runs }: ClaimReply) => {
116+
.receive('ok', async ({ runs }: ClaimReply) => {
117117
delete app.openClaims[claimId];
118118
const duration = Date.now() - start;
119119
logger.debug(
@@ -128,7 +128,7 @@ const claim = (
128128
return reject(new Error('No runs returned'));
129129
}
130130

131-
runs.forEach(async (run) => {
131+
for (const run of runs) {
132132
if (app.options?.runPublicKey) {
133133
try {
134134
await verifyToken(run.token, app.options.runPublicKey);
@@ -146,7 +146,8 @@ const claim = (
146146

147147
logger.debug(`${podName} starting run ${run.id}`);
148148
app.execute(run);
149-
});
149+
}
150+
150151
// Don't trigger claim complete until all runs are registered
151152
resolve();
152153
app.events.emit(INTERNAL_CLAIM_COMPLETE, { runs });

packages/ws-worker/test/api/destroy.test.ts

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import test from 'ava';
22
import crypto from 'node:crypto';
3-
import createLightningServer from '@openfn/lightning-mock';
3+
import createLightningServer, {
4+
generateKeys,
5+
toBase64,
6+
} from '@openfn/lightning-mock';
47
import { createMockLogger } from '@openfn/logger';
58
import { LightningPlan } from '@openfn/lexicon/lightning';
69

@@ -18,8 +21,8 @@ const lightningPort = workerPort + 1;
1821

1922
const logger = createMockLogger();
2023
let lightning: any;
21-
2224
let worker: any;
25+
let keys = { private: '.', public: '.' };
2326

2427
const initLightning = (options = {}) => {
2528
lightning = createLightningServer({ port: lightningPort, ...options });
@@ -43,6 +46,10 @@ const initWorker = (options = {}) => {
4346
});
4447
};
4548

49+
test.before(async () => {
50+
keys = await generateKeys();
51+
});
52+
4653
test.afterEach(async () => {
4754
await lightning.destroy();
4855
logger._reset();
@@ -107,11 +114,8 @@ test.serial('destroy a worker with no active runs', async (t) => {
107114
t.false(await ping());
108115
// should not be claiming
109116
t.false(await waitForClaim());
110-
111-
// TODO how can I test the socket is closed?
112117
});
113118

114-
// WARNING this might be flaky in CI
115119
test.serial('destroy a worker while one run is active', async (t) => {
116120
initLightning();
117121
await initWorker();
@@ -203,7 +207,7 @@ test.serial(
203207
const doDestroy = async () => {
204208
await destroy(worker, logger);
205209

206-
t.true(didFinish);
210+
t.true(didFinish, 'Run did not finish');
207211

208212
// should not respond to get
209213
t.false(await ping());
@@ -212,7 +216,56 @@ test.serial(
212216

213217
done();
214218
};
215-
// As soon as the claim starts, kill the worker
219+
// As soon as the claim starts, kill the worker
220+
worker.events.once(INTERNAL_CLAIM_START, () => {
221+
doDestroy();
222+
});
223+
224+
// We still expect the run to complete
225+
lightning.once('run:complete', () => {
226+
didFinish = true;
227+
});
228+
229+
// By the time the claim is complete, the worker should be marked destroyed
230+
worker.events.once(INTERNAL_CLAIM_COMPLETE, () => {
231+
t.true(worker.destroyed);
232+
});
233+
234+
lightning.enqueueRun(createRun());
235+
worker.claim().catch();
236+
});
237+
}
238+
);
239+
240+
// The async bit is important because we can actually lose a run between claim and start
241+
test.serial(
242+
'destroy a worker while a claim is outstanding and wait for the run to complete with async token validation',
243+
async (t) => {
244+
t.plan(4);
245+
246+
initLightning({
247+
socketDelay: 50,
248+
runPrivateKey: toBase64(keys.private),
249+
});
250+
await initWorker({ noLoop: true, runPublicKey: keys.public });
251+
252+
return new Promise((done) => {
253+
let didFinish = false;
254+
255+
const doDestroy = async () => {
256+
await destroy(worker, logger);
257+
258+
t.true(didFinish, 'Run did not finish');
259+
260+
// should not respond to get
261+
t.false(await ping());
262+
// should not be claiming
263+
t.false(await waitForClaim());
264+
265+
done();
266+
};
267+
268+
// As soon as the claim starts, kill the worker
216269
worker.events.once(INTERNAL_CLAIM_START, () => {
217270
doDestroy();
218271
});

0 commit comments

Comments
 (0)