diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index 7fa09afad..089bd0dbd 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,12 @@ # @openfn/cli +## 1.18.2 + +### Patch Changes + +- Updated dependencies [fe06f44] + - @openfn/runtime@1.7.5 + ## 1.18.1 ### Patch Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index fd008977b..251a1ecd7 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.18.1", + "version": "1.18.2", "description": "CLI devtools for the OpenFn toolchain", "engines": { "node": ">=18", diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index ddb04f467..1c2be2f91 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,12 @@ # engine-multi +## 1.8.3 + +### Patch Changes + +- Updated dependencies [fe06f44] + - @openfn/runtime@1.7.5 + ## 1.8.2 ### Patch Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index cec240182..e87f3ec3e 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.8.2", + "version": "1.8.3", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index 8548c8591..7708b3fa2 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/lightning-mock +## 2.3.6 + +### Patch Changes + +- Updated dependencies [fe06f44] + - @openfn/runtime@1.7.5 + - @openfn/engine-multi@1.8.3 + ## 2.3.5 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 9c618d918..52379008f 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.3.5", + "version": "2.3.6", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/runtime/CHANGELOG.md b/packages/runtime/CHANGELOG.md index 95476186d..62b24465a 100644 --- a/packages/runtime/CHANGELOG.md +++ b/packages/runtime/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/runtime +## 1.7.5 + +### Patch Changes + +- fe06f44: Expose Buffer to runtime context. Quite safe in node 20+ + ## 1.7.4 ### Patch Changes diff --git a/packages/runtime/package.json b/packages/runtime/package.json index bb432c725..0064dbbb1 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/runtime", - "version": "1.7.4", + "version": "1.7.5", "description": "Job processing runtime.", "type": "module", "exports": { diff --git a/packages/runtime/src/execute/context.ts b/packages/runtime/src/execute/context.ts index ab3b9f76b..5a644ed34 100644 --- a/packages/runtime/src/execute/context.ts +++ b/packages/runtime/src/execute/context.ts @@ -21,6 +21,19 @@ export default ( ) => { const logger = options.jobLogger ?? console; const globals = options.globals || {}; + + // Workaround for https://github.com/nodejs/node/issues/4660 + // The Buffer class we share directly with users will throw if used + // All _internal_ Buffer references use the original nodejs interface + class SafeBuffer extends Buffer { + constructor(x: any) { + throw new Error( + 'Do not call Buffer() constructor directly. Use Buffer.from() instead.' + ); + super(x); // keeps types happy + } + } + const context = vm.createContext( freezeAll( { @@ -34,6 +47,7 @@ export default ( setInterval, setTimeout, state, // TODO will be dropped as a global one day, see https://github.com/OpenFn/kit/issues/17 + Buffer: SafeBuffer, }, { state: true } ), diff --git a/packages/runtime/test/context.test.ts b/packages/runtime/test/context.test.ts index 4583837cb..b56d4b989 100644 --- a/packages/runtime/test/context.test.ts +++ b/packages/runtime/test/context.test.ts @@ -60,3 +60,27 @@ test("doesn't allow eval inside a job", async (t) => { message: /Illegal eval statement detected/, }); }); + +test('Buffer.from() works inside a job', async (t) => { + const expression = ` + export default [ + (s) => { s.data = Buffer.from('6a6f65', 'hex').toString(); return s; } + ];`; + const input = {}; + + const result = await run(expression, input); + t.is(result.data as any, 'joe'); +}); + +test('Buffer constructor throws inside a job', async (t) => { + const expression = ` + export default [ + (s) => { s.data = new Buffer('6a6f65', 'hex').toString(); return s; } + ];`; + const input = {}; + + const result = await run(expression, input); + const err = result.errors!['job-1']; + t.is(err.name, 'JobError'); + t.regex(err.message, /do not call Buffer\(\) constructor/i); +}); diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index a9a83669e..bb623a62b 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,14 @@ # ws-worker +## 1.19.3 + +### Patch Changes + +- 62f24b0: Fix an edge case where the server can shut down in-between claim and start (lost runs edge case) +- Updated dependencies [fe06f44] + - @openfn/runtime@1.7.5 + - @openfn/engine-multi@1.8.3 + ## 1.19.2 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index d66917a02..9dd3f7a27 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.19.2", + "version": "1.19.3", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/api/claim.ts b/packages/ws-worker/src/api/claim.ts index bcda93f25..19c215538 100644 --- a/packages/ws-worker/src/api/claim.ts +++ b/packages/ws-worker/src/api/claim.ts @@ -113,7 +113,7 @@ const claim = ( demand, worker_name: NAME || null, }) - .receive('ok', ({ runs }: ClaimReply) => { + .receive('ok', async ({ runs }: ClaimReply) => { delete app.openClaims[claimId]; const duration = Date.now() - start; logger.debug( @@ -128,7 +128,7 @@ const claim = ( return reject(new Error('No runs returned')); } - runs.forEach(async (run) => { + for (const run of runs) { if (app.options?.runPublicKey) { try { await verifyToken(run.token, app.options.runPublicKey); @@ -146,7 +146,8 @@ const claim = ( logger.debug(`${podName} starting run ${run.id}`); app.execute(run); - }); + } + // Don't trigger claim complete until all runs are registered resolve(); app.events.emit(INTERNAL_CLAIM_COMPLETE, { runs }); diff --git a/packages/ws-worker/test/api/destroy.test.ts b/packages/ws-worker/test/api/destroy.test.ts index fcbc344ed..c9cf58e5e 100644 --- a/packages/ws-worker/test/api/destroy.test.ts +++ b/packages/ws-worker/test/api/destroy.test.ts @@ -1,6 +1,9 @@ import test from 'ava'; import crypto from 'node:crypto'; -import createLightningServer from '@openfn/lightning-mock'; +import createLightningServer, { + generateKeys, + toBase64, +} from '@openfn/lightning-mock'; import { createMockLogger } from '@openfn/logger'; import { LightningPlan } from '@openfn/lexicon/lightning'; @@ -18,8 +21,8 @@ const lightningPort = workerPort + 1; const logger = createMockLogger(); let lightning: any; - let worker: any; +let keys = { private: '.', public: '.' }; const initLightning = (options = {}) => { lightning = createLightningServer({ port: lightningPort, ...options }); @@ -43,6 +46,10 @@ const initWorker = (options = {}) => { }); }; +test.before(async () => { + keys = await generateKeys(); +}); + test.afterEach(async () => { await lightning.destroy(); logger._reset(); @@ -107,11 +114,8 @@ test.serial('destroy a worker with no active runs', async (t) => { t.false(await ping()); // should not be claiming t.false(await waitForClaim()); - - // TODO how can I test the socket is closed? }); -// WARNING this might be flaky in CI test.serial('destroy a worker while one run is active', async (t) => { initLightning(); await initWorker(); @@ -203,7 +207,7 @@ test.serial( const doDestroy = async () => { await destroy(worker, logger); - t.true(didFinish); + t.true(didFinish, 'Run did not finish'); // should not respond to get t.false(await ping()); @@ -212,7 +216,56 @@ test.serial( done(); }; - // As soon as the claim starts, kill the worker + // As soon as the claim starts, kill the worker + worker.events.once(INTERNAL_CLAIM_START, () => { + doDestroy(); + }); + + // We still expect the run to complete + lightning.once('run:complete', () => { + didFinish = true; + }); + + // By the time the claim is complete, the worker should be marked destroyed + worker.events.once(INTERNAL_CLAIM_COMPLETE, () => { + t.true(worker.destroyed); + }); + + lightning.enqueueRun(createRun()); + worker.claim().catch(); + }); + } +); + +// The async bit is important because we can actually lose a run between claim and start +test.serial( + 'destroy a worker while a claim is outstanding and wait for the run to complete with async token validation', + async (t) => { + t.plan(4); + + initLightning({ + socketDelay: 50, + runPrivateKey: toBase64(keys.private), + }); + await initWorker({ noLoop: true, runPublicKey: keys.public }); + + return new Promise((done) => { + let didFinish = false; + + const doDestroy = async () => { + await destroy(worker, logger); + + t.true(didFinish, 'Run did not finish'); + + // should not respond to get + t.false(await ping()); + // should not be claiming + t.false(await waitForClaim()); + + done(); + }; + + // As soon as the claim starts, kill the worker worker.events.once(INTERNAL_CLAIM_START, () => { doDestroy(); });