Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @openfn/cli

## 1.18.2

### Patch Changes

- Updated dependencies [fe06f44]
- @openfn/runtime@1.7.5

## 1.18.1

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.18.1",
"version": "1.18.2",
"description": "CLI devtools for the OpenFn toolchain",
"engines": {
"node": ">=18",
Expand Down
7 changes: 7 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# engine-multi

## 1.8.3

### Patch Changes

- Updated dependencies [fe06f44]
- @openfn/runtime@1.7.5

## 1.8.2

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.8.2",
"version": "1.8.3",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
8 changes: 8 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/lightning-mock

## 2.3.6

### Patch Changes

- Updated dependencies [fe06f44]
- @openfn/runtime@1.7.5
- @openfn/engine-multi@1.8.3

## 2.3.5

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.3.5",
"version": "2.3.6",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
6 changes: 6 additions & 0 deletions packages/runtime/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/runtime

## 1.7.5

### Patch Changes

- fe06f44: Expose Buffer to runtime context. Quite safe in node 20+

## 1.7.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/runtime/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/runtime",
"version": "1.7.4",
"version": "1.7.5",
"description": "Job processing runtime.",
"type": "module",
"exports": {
Expand Down
14 changes: 14 additions & 0 deletions packages/runtime/src/execute/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ export default (
) => {
const logger = options.jobLogger ?? console;
const globals = options.globals || {};

// Workaround for https://github.com/nodejs/node/issues/4660
// The Buffer class we share directly with users will throw if used
// All _internal_ Buffer references use the original nodejs interface
class SafeBuffer extends Buffer {
constructor(x: any) {
throw new Error(
'Do not call Buffer() constructor directly. Use Buffer.from() instead.'
);
super(x); // keeps types happy
}
}

const context = vm.createContext(
freezeAll(
{
Expand All @@ -34,6 +47,7 @@ export default (
setInterval,
setTimeout,
state, // TODO will be dropped as a global one day, see https://github.com/OpenFn/kit/issues/17
Buffer: SafeBuffer,
},
{ state: true }
),
Expand Down
24 changes: 24 additions & 0 deletions packages/runtime/test/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,27 @@ test("doesn't allow eval inside a job", async (t) => {
message: /Illegal eval statement detected/,
});
});

test('Buffer.from() works inside a job', async (t) => {
const expression = `
export default [
(s) => { s.data = Buffer.from('6a6f65', 'hex').toString(); return s; }
];`;
const input = {};

const result = await run(expression, input);
t.is(result.data as any, 'joe');
});

test('Buffer constructor throws inside a job', async (t) => {
const expression = `
export default [
(s) => { s.data = new Buffer('6a6f65', 'hex').toString(); return s; }
];`;
const input = {};

const result = await run(expression, input);
const err = result.errors!['job-1'];
t.is(err.name, 'JobError');
t.regex(err.message, /do not call Buffer\(\) constructor/i);
});
9 changes: 9 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# ws-worker

## 1.19.3

### Patch Changes

- 62f24b0: Fix an edge case where the server can shut down in-between claim and start (lost runs edge case)
- Updated dependencies [fe06f44]
- @openfn/runtime@1.7.5
- @openfn/engine-multi@1.8.3

## 1.19.2

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.19.2",
"version": "1.19.3",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
7 changes: 4 additions & 3 deletions packages/ws-worker/src/api/claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ const claim = (
demand,
worker_name: NAME || null,
})
.receive('ok', ({ runs }: ClaimReply) => {
.receive('ok', async ({ runs }: ClaimReply) => {
delete app.openClaims[claimId];
const duration = Date.now() - start;
logger.debug(
Expand All @@ -128,7 +128,7 @@ const claim = (
return reject(new Error('No runs returned'));
}

runs.forEach(async (run) => {
for (const run of runs) {
if (app.options?.runPublicKey) {
try {
await verifyToken(run.token, app.options.runPublicKey);
Expand All @@ -146,7 +146,8 @@ const claim = (

logger.debug(`${podName} starting run ${run.id}`);
app.execute(run);
});
}

// Don't trigger claim complete until all runs are registered
resolve();
app.events.emit(INTERNAL_CLAIM_COMPLETE, { runs });
Expand Down
67 changes: 60 additions & 7 deletions packages/ws-worker/test/api/destroy.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import test from 'ava';
import crypto from 'node:crypto';
import createLightningServer from '@openfn/lightning-mock';
import createLightningServer, {
generateKeys,
toBase64,
} from '@openfn/lightning-mock';
import { createMockLogger } from '@openfn/logger';
import { LightningPlan } from '@openfn/lexicon/lightning';

Expand All @@ -18,8 +21,8 @@ const lightningPort = workerPort + 1;

const logger = createMockLogger();
let lightning: any;

let worker: any;
let keys = { private: '.', public: '.' };

const initLightning = (options = {}) => {
lightning = createLightningServer({ port: lightningPort, ...options });
Expand All @@ -43,6 +46,10 @@ const initWorker = (options = {}) => {
});
};

test.before(async () => {
keys = await generateKeys();
});

test.afterEach(async () => {
await lightning.destroy();
logger._reset();
Expand Down Expand Up @@ -107,11 +114,8 @@ test.serial('destroy a worker with no active runs', async (t) => {
t.false(await ping());
// should not be claiming
t.false(await waitForClaim());

// TODO how can I test the socket is closed?
});

// WARNING this might be flaky in CI
test.serial('destroy a worker while one run is active', async (t) => {
initLightning();
await initWorker();
Expand Down Expand Up @@ -203,7 +207,7 @@ test.serial(
const doDestroy = async () => {
await destroy(worker, logger);

t.true(didFinish);
t.true(didFinish, 'Run did not finish');

// should not respond to get
t.false(await ping());
Expand All @@ -212,7 +216,56 @@ test.serial(

done();
};
// As soon as the claim starts, kill the worker
// As soon as the claim starts, kill the worker
worker.events.once(INTERNAL_CLAIM_START, () => {
doDestroy();
});

// We still expect the run to complete
lightning.once('run:complete', () => {
didFinish = true;
});

// By the time the claim is complete, the worker should be marked destroyed
worker.events.once(INTERNAL_CLAIM_COMPLETE, () => {
t.true(worker.destroyed);
});

lightning.enqueueRun(createRun());
worker.claim().catch();
});
}
);

// The async bit is important because we can actually lose a run between claim and start
test.serial(
'destroy a worker while a claim is outstanding and wait for the run to complete with async token validation',
async (t) => {
t.plan(4);

initLightning({
socketDelay: 50,
runPrivateKey: toBase64(keys.private),
});
await initWorker({ noLoop: true, runPublicKey: keys.public });

return new Promise((done) => {
let didFinish = false;

const doDestroy = async () => {
await destroy(worker, logger);

t.true(didFinish, 'Run did not finish');

// should not respond to get
t.false(await ping());
// should not be claiming
t.false(await waitForClaim());

done();
};

// As soon as the claim starts, kill the worker
worker.events.once(INTERNAL_CLAIM_START, () => {
doDestroy();
});
Expand Down