Skip to content

Commit 68f6538

Browse files
authored
Merge pull request #1074 from OpenFn/send_final_state
Send final state to lightning
2 parents a860081 + cf48e7d commit 68f6538

19 files changed

Lines changed: 259 additions & 92 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ Before merging to main, check out the release branch locally and run the followi
8383
1. Run `pnpm changeset tag` to generate tags
8484
1. Push tags `git push --tags`
8585

86-
Rememebr tags may need updating if commits come in after the tags are first generated.
86+
Remember tags may need updating if commits come in after the tags are first generated.
8787

8888
## TypeSync
8989

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
export const REDACTED_STATE = {
2-
data: '[REDACTED_STATE]',
3-
_$REDACTED$_: true,
4-
};
1+
// This specifies which keys of an event payload to potentially redact
2+
// if they are too big
3+
const KEYS_TO_VERIFY = ['state', 'final_state', 'log'];
54

6-
export const REDACTED_LOG = {
7-
message: ['[REDACTED: Message length exceeds payload limit]'],
8-
_$REDACTED$_: true,
5+
const replacements: Record<string, any> = {
6+
log: {
7+
message: ['[REDACTED: Message length exceeds payload limit]'],
8+
},
9+
default: {
10+
data: '[REDACTED]',
11+
},
912
};
1013

1114
export const verify = (value: any, limit_mb: number = 10) => {
@@ -32,19 +35,14 @@ export const verify = (value: any, limit_mb: number = 10) => {
3235
export default (payload: any, limit_mb: number = 10) => {
3336
const newPayload = { ...payload };
3437

35-
// The payload could be any of the runtime events
36-
// The bits we might want to redact are state and message
37-
try {
38-
verify(payload.state, limit_mb);
39-
} catch (e) {
40-
newPayload.state = REDACTED_STATE;
41-
newPayload.redacted = true;
42-
}
43-
try {
44-
verify(payload.log, limit_mb);
45-
} catch (e) {
46-
Object.assign(newPayload.log, REDACTED_LOG);
47-
newPayload.redacted = true;
38+
for (const key of KEYS_TO_VERIFY) {
39+
try {
40+
verify(payload[key], limit_mb);
41+
} catch (e) {
42+
Object.assign(newPayload[key], replacements[key] ?? replacements.default);
43+
newPayload.redacted = true;
44+
}
4845
}
46+
4947
return newPayload;
5048
};

packages/engine-multi/test/integration.test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import type { ExecutionPlan } from '@openfn/lexicon';
55

66
import createAPI from '../src/api';
77
import type { RuntimeEngine } from '../src';
8-
import { REDACTED_STATE, REDACTED_LOG } from '../src/util/ensure-payload-size';
98

109
const logger = createMockLogger(undefined, { level: 'debug' });
1110
let api: RuntimeEngine;
@@ -536,7 +535,7 @@ export default [(state) => {
536535
.execute(plan, emptyState, options)
537536
.on('workflow-complete', ({ state }) => {
538537
t.log(state);
539-
t.deepEqual(REDACTED_STATE, state);
538+
t.is(state.data, '[REDACTED]');
540539
done();
541540
});
542541
});
@@ -566,9 +565,8 @@ export default [(state) => {
566565
api
567566
.execute(plan, emptyState, options)
568567
.on('workflow-log', (evt) => {
569-
console.log(evt);
570568
if (evt.name === 'JOB') {
571-
t.deepEqual(evt.message, REDACTED_LOG.message);
569+
t.true(/redacted/i.test(evt.message[0]));
572570
}
573571
})
574572
.on('workflow-complete', () => {

packages/engine-multi/test/util/ensure-payload-size.test.ts

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
import test from 'ava';
2-
import ensurePayloadSize, {
3-
REDACTED_LOG,
4-
REDACTED_STATE,
5-
verify,
6-
} from '../../src/util/ensure-payload-size';
2+
import ensurePayloadSize, { verify } from '../../src/util/ensure-payload-size';
73

84
const mb = (bytes: number) => bytes / 1024 / 1024;
95

@@ -62,7 +58,9 @@ test('redact payload with state', (t) => {
6258
};
6359

6460
const newPayload = ensurePayloadSize(payload, 1);
65-
t.deepEqual(newPayload.state, REDACTED_STATE);
61+
t.deepEqual(newPayload.state, {
62+
data: '[REDACTED]',
63+
});
6664
t.true(newPayload.redacted);
6765
});
6866

@@ -74,6 +72,22 @@ test('redact payload with log message', (t) => {
7472
};
7573

7674
const newPayload = ensurePayloadSize(payload, 1);
77-
t.deepEqual(newPayload.log, REDACTED_LOG);
75+
t.deepEqual(newPayload.log, {
76+
message: ['[REDACTED: Message length exceeds payload limit]'],
77+
});
78+
t.true(newPayload.redacted);
79+
});
80+
81+
test('redact payload with final_state', (t) => {
82+
const payload = {
83+
final_state: {
84+
data: new Array(1024 * 1024).fill('z').join(''),
85+
},
86+
};
87+
88+
const newPayload = ensurePayloadSize(payload, 1);
89+
t.deepEqual(newPayload.final_state, {
90+
data: '[REDACTED]',
91+
});
7892
t.true(newPayload.redacted);
7993
});

packages/lexicon/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# lexicon
22

3+
## 1.2.5
4+
5+
### Patch Changes
6+
7+
- 09dd4b2: - Add `final_state` object to `workflow:complete` event
8+
- Remove unused `final_dataclip_id` from `workflow:complete` payload
9+
310
## 1.2.4
411

512
### Patch Changes

packages/lexicon/lightning.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ export type RunStartReply = {}; // no payload
168168

169169
export type RunCompletePayload = ExitReason & {
170170
timestamp: TimeInMicroSeconds;
171-
final_dataclip_id?: string; // TODO this will be removed soon
171+
final_state?: any; // The aggregated final state from the workflow (handles branching)
172172
};
173173
export type RunCompleteReply = undefined;
174174

packages/lexicon/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/lexicon",
3-
"version": "1.2.4",
3+
"version": "1.2.5",
44
"description": "Central repo of names and type definitions",
55
"author": "Open Function Group <admin@openfn.org>",
66
"license": "ISC",

packages/lightning-mock/CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
# @openfn/lightning-mock
22

3+
## 2.3.0
4+
5+
### Minor Changes
6+
7+
- 09dd4b2: - Add `final_state` object to `workflow:complete` event
8+
- Remove unused `final_dataclip_id` from `workflow:complete` payload
9+
10+
### Patch Changes
11+
12+
- Updated dependencies [09dd4b2]
13+
- @openfn/lexicon@1.2.4
14+
315
## 2.2.9
416

517
### Patch Changes

packages/lightning-mock/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/lightning-mock",
3-
"version": "2.2.9",
3+
"version": "2.3.0",
44
"private": true,
55
"description": "A mock Lightning server",
66
"main": "dist/index.js",

packages/lightning-mock/src/api-dev.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ const setupDevAPI = (
7575
}) => {
7676
if (evt.runId === runId) {
7777
state.events.removeListener(RUN_COMPLETE, handler);
78-
const result = state.dataclips[evt.payload.final_dataclip_id!];
79-
resolve(result);
78+
resolve(evt.payload.final_state);
8079
}
8180
};
8281
state.events.addListener(RUN_COMPLETE, handler);

0 commit comments

Comments
 (0)