Skip to content

Commit b0eb0bc

Browse files
authored
Merge pull request #1404 from OpenFn/1399-webhook-reponse-step-complete
feat: forward `state.webhookResponse` on `step:complete`
2 parents 66c7f79 + 5a6624d commit b0eb0bc

7 files changed

Lines changed: 289 additions & 2 deletions

File tree

packages/lexicon/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# lexicon
22

3+
## 2.1.0
4+
5+
### Minor Changes
6+
7+
- e453f6a: Support webhook_response in step:complete event when data available in results
8+
39
## 2.0.0
410

511
### Major Changes

packages/lexicon/lightning.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ export type StepStartPayload = {
209209
};
210210
export type StepStartReply = void;
211211

212+
export type WebhookResponse = {
213+
status?: number;
214+
body?: Record<string, any> | any[];
215+
};
216+
212217
export type StepCompletePayload = ExitReason & {
213218
run_id?: string;
214219
job_id: string;
@@ -223,6 +228,7 @@ export type StepCompletePayload = ExitReason & {
223228
};
224229
duration: number;
225230
timestamp: TimeInMicroSeconds;
231+
webhook_response?: WebhookResponse;
226232
};
227233
export type StepCompleteReply = void;
228234

packages/lexicon/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/lexicon",
3-
"version": "2.0.0",
3+
"version": "2.1.0",
44
"description": "Central repo of names and type definitions",
55
"author": "Open Function Group <admin@openfn.org>",
66
"license": "ISC",

packages/ws-worker/CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# ws-worker
22

3+
## 1.25.0
4+
5+
### Minor Changes
6+
7+
- e453f6a: Support webhook_response in step:complete event when data available in results
8+
9+
### Patch Changes
10+
11+
- Updated dependencies [e453f6a]
12+
- @openfn/lexicon@2.1.0
13+
314
## 1.24.2
415

516
### Patch Changes

packages/ws-worker/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/ws-worker",
3-
"version": "1.24.2",
3+
"version": "1.25.0",
44
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
55
"main": "dist/index.js",
66
"type": "module",

packages/ws-worker/src/events/step-complete.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ export default async function onStepComplete(
5252
timestamp: timeInMicroseconds(event.time),
5353
} as StepCompletePayload;
5454

55+
// Feed through the webhook response if it's on state
56+
// We do this on the event so that Lightning
57+
// doesn't have the parse the dataclip
58+
// (which may not be sent in zero persistence mode!)
59+
if (outputState.webhookResponse) {
60+
evt.webhook_response = outputState.webhookResponse;
61+
}
62+
5563
if (event.redacted) {
5664
state.withheldDataclips[dataclipId] = true;
5765
evt.output_dataclip_error = 'DATACLIP_TOO_LARGE';

packages/ws-worker/test/events/step-complete.test.ts

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,262 @@ test('accumulate multiple leaf dataclips for branching workflow', async (t) => {
365365
t.not(state.leafDataclipIds[0], state.leafDataclipIds[1]);
366366
});
367367

368+
test('includes webhook_response in step:complete payload when webhookResponse is set on state', async (t) => {
369+
const plan = createPlan();
370+
const jobId = 'job-1';
371+
372+
const state = createRunState(plan);
373+
state.activeJob = jobId;
374+
state.activeStep = 'b';
375+
376+
let lightningEvent: any;
377+
const channel = mockChannel({
378+
[STEP_COMPLETE]: (evt) => {
379+
lightningEvent = evt;
380+
},
381+
});
382+
383+
const event = {
384+
state: { x: 10, webhookResponse: { status: 201, body: { ok: true } } },
385+
} as any;
386+
await handleStepComplete({ channel, state } as any, event);
387+
388+
t.deepEqual(lightningEvent.webhook_response, {
389+
status: 201,
390+
body: { ok: true },
391+
});
392+
});
393+
394+
test('includes webhook_response in step:complete payload when webhookResponse is set on state (status only)', async (t) => {
395+
const plan = createPlan();
396+
const jobId = 'job-1';
397+
398+
const state = createRunState(plan);
399+
state.activeJob = jobId;
400+
state.activeStep = 'b';
401+
402+
let lightningEvent: any;
403+
const channel = mockChannel({
404+
[STEP_COMPLETE]: (evt) => {
405+
lightningEvent = evt;
406+
},
407+
});
408+
409+
const event = {
410+
state: { x: 10, webhookResponse: { status: 201 } },
411+
} as any;
412+
await handleStepComplete({ channel, state } as any, event);
413+
414+
t.deepEqual(lightningEvent.webhook_response, {
415+
status: 201,
416+
});
417+
});
418+
419+
test('includes webhook_response in step:complete payload when webhookResponse is set on state (body only)', async (t) => {
420+
const plan = createPlan();
421+
const jobId = 'job-1';
422+
423+
const state = createRunState(plan);
424+
state.activeJob = jobId;
425+
state.activeStep = 'b';
426+
427+
let lightningEvent: any;
428+
const channel = mockChannel({
429+
[STEP_COMPLETE]: (evt) => {
430+
lightningEvent = evt;
431+
},
432+
});
433+
434+
const event = {
435+
state: { x: 10, webhookResponse: { body: { ok: true } } },
436+
} as any;
437+
await handleStepComplete({ channel, state } as any, event);
438+
439+
t.deepEqual(lightningEvent.webhook_response, {
440+
body: { ok: true },
441+
});
442+
});
443+
444+
test('includes webhook_response in step:complete payload when webhookResponse is set on state (no keys)', async (t) => {
445+
const plan = createPlan();
446+
const jobId = 'job-1';
447+
448+
const state = createRunState(plan);
449+
state.activeJob = jobId;
450+
state.activeStep = 'b';
451+
452+
let lightningEvent: any;
453+
const channel = mockChannel({
454+
[STEP_COMPLETE]: (evt) => {
455+
lightningEvent = evt;
456+
},
457+
});
458+
459+
const event = {
460+
state: { x: 10, webhookResponse: {} },
461+
} as any;
462+
await handleStepComplete({ channel, state } as any, event);
463+
464+
t.deepEqual(lightningEvent.webhook_response, {});
465+
});
466+
467+
test('omits webhook_response from payload when webhookResponse is not set on state', async (t) => {
468+
const plan = createPlan();
469+
const jobId = 'job-1';
470+
471+
const state = createRunState(plan);
472+
state.activeJob = jobId;
473+
state.activeStep = 'b';
474+
475+
let lightningEvent: any;
476+
const channel = mockChannel({
477+
[STEP_COMPLETE]: (evt) => {
478+
lightningEvent = evt;
479+
},
480+
});
481+
482+
const event = { state: { x: 10 } } as any;
483+
await handleStepComplete({ channel, state } as any, event);
484+
485+
t.is(lightningEvent.webhook_response, undefined);
486+
});
487+
488+
test('webhookResponse is included in dataclip', async (t) => {
489+
const plan = createPlan();
490+
const jobId = 'job-1';
491+
492+
const state = createRunState(plan);
493+
state.activeJob = jobId;
494+
state.activeStep = 'b';
495+
496+
const channel = mockChannel({
497+
[STEP_COMPLETE]: () => true,
498+
});
499+
500+
const event = {
501+
state: { x: 10, webhookResponse: { status: 201, body: {} } },
502+
} as any;
503+
await handleStepComplete({ channel, state } as any, event);
504+
505+
const [dataclip] = Object.values(state.dataclips);
506+
t.deepEqual(dataclip, { x: 10, webhookResponse: { status: 201, body: {} } });
507+
});
508+
509+
test('webhookResponse included in the serialized output_dataclip sent to Lightning', async (t) => {
510+
const plan = createPlan();
511+
const jobId = 'job-1';
512+
513+
const state = createRunState(plan);
514+
state.activeJob = jobId;
515+
state.activeStep = 'b';
516+
517+
let lightningEvent: any;
518+
const channel = mockChannel({
519+
[STEP_COMPLETE]: (evt) => {
520+
lightningEvent = evt;
521+
},
522+
});
523+
524+
const event = {
525+
state: { x: 10, webhookResponse: { status: 201, body: {} } },
526+
} as any;
527+
await handleStepComplete({ channel, state } as any, event);
528+
529+
t.deepEqual(JSON.parse(lightningEvent.output_dataclip), {
530+
x: 10,
531+
webhookResponse: { status: 201, body: {} },
532+
});
533+
});
534+
535+
test('handles webhookResponse with only a body', async (t) => {
536+
const plan = createPlan();
537+
const jobId = 'job-1';
538+
539+
const state = createRunState(plan);
540+
state.activeJob = jobId;
541+
state.activeStep = 'b';
542+
543+
let lightningEvent: any;
544+
const channel = mockChannel({
545+
[STEP_COMPLETE]: (evt) => {
546+
lightningEvent = evt;
547+
},
548+
});
549+
550+
const event = {
551+
state: { webhookResponse: { body: { message: 'hello' } } },
552+
} as any;
553+
await handleStepComplete({ channel, state } as any, event);
554+
555+
t.deepEqual(lightningEvent.webhook_response, { body: { message: 'hello' } });
556+
});
557+
558+
test('handles webhookResponse body as a JSON array', async (t) => {
559+
const plan = createPlan();
560+
const jobId = 'job-1';
561+
562+
const state = createRunState(plan);
563+
state.activeJob = jobId;
564+
state.activeStep = 'b';
565+
566+
let lightningEvent: any;
567+
const channel = mockChannel({
568+
[STEP_COMPLETE]: (evt) => {
569+
lightningEvent = evt;
570+
},
571+
});
572+
573+
const event = {
574+
state: { webhookResponse: { status: 200, body: [{ id: 1 }, { id: 2 }] } },
575+
} as any;
576+
await handleStepComplete({ channel, state } as any, event);
577+
578+
t.deepEqual(lightningEvent.webhook_response, {
579+
status: 200,
580+
body: [{ id: 1 }, { id: 2 }],
581+
});
582+
});
583+
584+
test('does nothing with webhookResponse when event.state is empty', async (t) => {
585+
const plan = createPlan();
586+
const jobId = 'job-1';
587+
588+
const state = createRunState(plan);
589+
state.activeJob = jobId;
590+
state.activeStep = 'b';
591+
592+
let lightningEvent: any;
593+
const channel = mockChannel({
594+
[STEP_COMPLETE]: (evt) => {
595+
lightningEvent = evt;
596+
},
597+
});
598+
599+
await handleStepComplete({ channel, state } as any, { state: {} } as any);
600+
t.is(lightningEvent.webhook_response, undefined);
601+
});
602+
603+
test('does nothing with webhookResponse when event.state is undefined', async (t) => {
604+
const plan = createPlan();
605+
const jobId = 'job-1';
606+
607+
const state = createRunState(plan);
608+
state.activeJob = jobId;
609+
state.activeStep = 'b';
610+
611+
let lightningEvent: any;
612+
const channel = mockChannel({
613+
[STEP_COMPLETE]: (evt) => {
614+
lightningEvent = evt;
615+
},
616+
});
617+
618+
await t.notThrowsAsync(() =>
619+
handleStepComplete({ channel, state } as any, {} as any)
620+
);
621+
t.is(lightningEvent.webhook_response, undefined);
622+
});
623+
368624
// Single leaf reached by two paths: start → a → x, start → b → x
369625
// x executes twice, both times with no downstream
370626
test('accumulate two leaf dataclips when same node reached by two paths', async (t) => {

0 commit comments

Comments
 (0)