Skip to content

Commit 267cc15

Browse files
deploy: support webhook_reply and cron_cursor_job (#1344)
* deploy: support webhook_reply and cron_cursor_job * oops, did not handle scenario where both state and spec trigger exist * versions --------- Co-authored-by: Joe Clark <jclark@openfn.org>
1 parent 6e539f9 commit 267cc15

7 files changed

Lines changed: 308 additions & 2 deletions

File tree

packages/cli/CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
# @openfn/cli
22

3+
## 1.31.0
4+
5+
### Minor Changes
6+
7+
- In deploy v1, add support for support for new `webhook_reply` and `cron_cursor_job` keys on triggers.
8+
9+
When syncing projects with these keys, versions of lightning prior to `2.16.0` will return errors. In other words, projects pulled from the latest cloud may fail when deployed to older instances.
10+
11+
To workaround, update the target lightning version or use an older CLI version.
12+
13+
### Patch Changes
14+
15+
- Updated dependencies
16+
- @openfn/deploy@0.12.0
17+
318
## 1.30.6
419

520
### Patch Changes

packages/cli/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/cli",
3-
"version": "1.30.6",
3+
"version": "1.31.0",
44
"description": "CLI devtools for the OpenFn toolchain",
55
"engines": {
66
"node": ">=18",

packages/deploy/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# @openfn/deploy
22

3+
## 0.12.0
4+
5+
### Minor Changes
6+
7+
- Support webhook_reply and cron_cursor_job.
8+
39
## 0.11.7
410

511
### Patch Changes

packages/deploy/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openfn/deploy",
3-
"version": "0.11.7",
3+
"version": "0.12.0",
44
"description": "Deploy projects to Lightning instances",
55
"type": "module",
66
"exports": {

packages/deploy/src/stateTransform.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,20 @@ function getStateJobCredential(
6363
return stateCredentials[specJobCredential].id;
6464
}
6565

66+
function getStateJob(
67+
specJob: string,
68+
stateJobs: WorkflowState['jobs']
69+
): string {
70+
if (!stateJobs[specJob]) {
71+
throw new DeployError(
72+
`Could not find a job with key: ${specJob}`,
73+
'VALIDATION_ERROR'
74+
);
75+
}
76+
77+
return stateJobs[specJob].id;
78+
}
79+
6680
function mergeJobs(
6781
credentials: ProjectState['project_credentials'],
6882
stateJobs: WorkflowState['jobs'],
@@ -134,6 +148,7 @@ function pickValue(
134148
}
135149

136150
function mergeTriggers(
151+
jobs: WorkflowState['jobs'],
137152
stateTriggers: WorkflowState['triggers'],
138153
specTriggers: WorkflowSpec['triggers']
139154
): WorkflowState['triggers'] {
@@ -146,8 +161,19 @@ function mergeTriggers(
146161
...pickKeys(specTrigger, ['type', 'enabled']),
147162
};
148163

164+
if (specTrigger.type === 'webhook' && specTrigger.webhook_reply) {
165+
trigger.webhook_reply = specTrigger.webhook_reply;
166+
}
167+
149168
if (specTrigger.type === 'cron') {
150169
trigger.cron_expression = specTrigger.cron_expression;
170+
171+
if (specTrigger.cron_cursor_job) {
172+
trigger.cron_cursor_job_id = getStateJob(
173+
specTrigger.cron_cursor_job,
174+
jobs
175+
);
176+
}
151177
}
152178

153179
if (specTrigger.type === 'kafka') {
@@ -173,8 +199,19 @@ function mergeTriggers(
173199
enabled: pickValue(specTrigger!, stateTrigger!, 'enabled', true),
174200
};
175201

202+
if (specTrigger!.type === 'webhook' && specTrigger!.webhook_reply) {
203+
trigger.webhook_reply = specTrigger!.webhook_reply;
204+
}
205+
176206
if (specTrigger!.type === 'cron') {
177207
trigger.cron_expression = specTrigger!.cron_expression;
208+
209+
if (specTrigger!.cron_cursor_job) {
210+
trigger.cron_cursor_job_id = getStateJob(
211+
specTrigger!.cron_cursor_job,
212+
jobs
213+
);
214+
}
178215
}
179216

180217
if (specTrigger!.type === 'kafka') {
@@ -334,6 +371,7 @@ export function mergeSpecIntoState(
334371
);
335372

336373
const nextTriggers = mergeTriggers(
374+
nextJobs,
337375
stateWorkflow?.triggers || {},
338376
specWorkflow?.triggers || {}
339377
);

packages/deploy/src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,13 @@ export type SpecKafkaConfiguration = {
3838
connect_timeout: number;
3939
};
4040

41+
export type WebhookReply = 'before_start' | 'after_completion';
42+
4143
export type SpecTrigger = {
4244
type: string;
4345
cron_expression?: string;
46+
cron_cursor_job?: string;
47+
webhook_reply?: WebhookReply;
4448
enabled?: boolean;
4549
kafka_configuration?: SpecKafkaConfiguration;
4650
};
@@ -49,6 +53,8 @@ export type StateTrigger = {
4953
id: string;
5054
type: string;
5155
cron_expression?: string;
56+
cron_cursor_job_id?: string | null;
57+
webhook_reply?: WebhookReply;
5258
delete?: boolean;
5359
enabled?: boolean;
5460
kafka_configuration?: StateKafkaConfiguration;

packages/deploy/test/stateTransform.test.ts

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,247 @@ test('toNextState with for kafka trigger', (t) => {
475475
});
476476
});
477477

478+
test('toNextState sets webhook_reply when specified', (t) => {
479+
const state = { workflows: {} };
480+
const spec = {
481+
name: 'my project',
482+
workflows: {
483+
w: {
484+
name: 'workflow',
485+
jobs: {},
486+
triggers: {
487+
t: { type: 'webhook', webhook_reply: 'before_start' },
488+
},
489+
edges: {},
490+
},
491+
},
492+
};
493+
494+
const result = mergeSpecIntoState(state, spec);
495+
t.is(result.workflows.w.triggers.t.webhook_reply, 'before_start');
496+
});
497+
498+
test('toNextState omits webhook_reply when not specified', (t) => {
499+
const state = { workflows: {} };
500+
const spec = {
501+
name: 'my project',
502+
workflows: {
503+
w: {
504+
name: 'workflow',
505+
jobs: {},
506+
triggers: {
507+
t: { type: 'webhook' },
508+
},
509+
edges: {},
510+
},
511+
},
512+
};
513+
514+
const result = mergeSpecIntoState(state, spec);
515+
t.false('webhook_reply' in result.workflows.w.triggers.t);
516+
});
517+
518+
test('toNextState sets cron_cursor_job_id when specified', (t) => {
519+
const state = { workflows: {} };
520+
const spec = {
521+
name: 'my project',
522+
workflows: {
523+
w: {
524+
name: 'workflow',
525+
jobs: {
526+
'job-a': {
527+
name: 'job a',
528+
adaptor: '@openfn/language-http',
529+
body: 'fn()',
530+
},
531+
},
532+
triggers: {
533+
t: {
534+
type: 'cron',
535+
cron_expression: '0 * * * *',
536+
cron_cursor_job: 'job-a',
537+
},
538+
},
539+
edges: {},
540+
},
541+
},
542+
};
543+
544+
const result = mergeSpecIntoState(state, spec);
545+
const jobId = result.workflows.w.jobs['job-a'].id;
546+
t.is(result.workflows.w.triggers.t.cron_cursor_job_id, jobId);
547+
});
548+
549+
test('toNextState omits cron_cursor_job_id when not specified', (t) => {
550+
const state = { workflows: {} };
551+
const spec = {
552+
name: 'my project',
553+
workflows: {
554+
w: {
555+
name: 'workflow',
556+
jobs: {
557+
'job-a': {
558+
name: 'job a',
559+
adaptor: '@openfn/language-http',
560+
body: 'fn()',
561+
},
562+
},
563+
triggers: {
564+
t: { type: 'cron', cron_expression: '0 * * * *' },
565+
},
566+
edges: {},
567+
},
568+
},
569+
};
570+
571+
const result = mergeSpecIntoState(state, spec);
572+
t.false('cron_cursor_job_id' in result.workflows.w.triggers.t);
573+
});
574+
575+
test('toNextState sets webhook_reply on existing trigger', (t) => {
576+
const triggerId = 'aaa-bbb-ccc';
577+
const state = {
578+
workflows: {
579+
w: {
580+
id: 'wf-1',
581+
name: 'workflow',
582+
jobs: {},
583+
triggers: {
584+
t: { id: triggerId, type: 'webhook', enabled: true },
585+
},
586+
edges: {},
587+
},
588+
},
589+
};
590+
const spec = {
591+
name: 'my project',
592+
workflows: {
593+
w: {
594+
name: 'workflow',
595+
jobs: {},
596+
triggers: {
597+
t: { type: 'webhook', webhook_reply: 'after_completion' },
598+
},
599+
edges: {},
600+
},
601+
},
602+
};
603+
604+
const result = mergeSpecIntoState(state, spec);
605+
t.is(result.workflows.w.triggers.t.id, triggerId);
606+
t.is(result.workflows.w.triggers.t.webhook_reply, 'after_completion');
607+
});
608+
609+
test('toNextState omits webhook_reply on existing trigger when not specified', (t) => {
610+
const triggerId = 'aaa-bbb-ccc';
611+
const state = {
612+
workflows: {
613+
w: {
614+
id: 'wf-1',
615+
name: 'workflow',
616+
jobs: {},
617+
triggers: {
618+
t: { id: triggerId, type: 'webhook', enabled: true },
619+
},
620+
edges: {},
621+
},
622+
},
623+
};
624+
const spec = {
625+
name: 'my project',
626+
workflows: {
627+
w: {
628+
name: 'workflow',
629+
jobs: {},
630+
triggers: {
631+
t: { type: 'webhook' },
632+
},
633+
edges: {},
634+
},
635+
},
636+
};
637+
638+
const result = mergeSpecIntoState(state, spec);
639+
t.false('webhook_reply' in result.workflows.w.triggers.t);
640+
});
641+
642+
test('toNextState sets cron_cursor_job_id on existing trigger', (t) => {
643+
const triggerId = 'aaa-bbb-ccc';
644+
const jobId = 'job-uuid-111';
645+
const state = {
646+
workflows: {
647+
w: {
648+
id: 'wf-1',
649+
name: 'workflow',
650+
jobs: {
651+
'job-a': { id: jobId, name: 'job a' },
652+
},
653+
triggers: {
654+
t: { id: triggerId, type: 'cron', enabled: true },
655+
},
656+
edges: {},
657+
},
658+
},
659+
};
660+
const spec = {
661+
name: 'my project',
662+
workflows: {
663+
w: {
664+
name: 'workflow',
665+
jobs: {
666+
'job-a': { name: 'job a', adaptor: '@openfn/language-http', body: 'fn()' },
667+
},
668+
triggers: {
669+
t: { type: 'cron', cron_expression: '0 * * * *', cron_cursor_job: 'job-a' },
670+
},
671+
edges: {},
672+
},
673+
},
674+
};
675+
676+
const result = mergeSpecIntoState(state, spec);
677+
t.is(result.workflows.w.triggers.t.id, triggerId);
678+
t.is(result.workflows.w.triggers.t.cron_cursor_job_id, jobId);
679+
});
680+
681+
test('toNextState omits cron_cursor_job_id on existing trigger when not specified', (t) => {
682+
const triggerId = 'aaa-bbb-ccc';
683+
const jobId = 'job-uuid-111';
684+
const state = {
685+
workflows: {
686+
w: {
687+
id: 'wf-1',
688+
name: 'workflow',
689+
jobs: {
690+
'job-a': { id: jobId, name: 'job a' },
691+
},
692+
triggers: {
693+
t: { id: triggerId, type: 'cron', enabled: true },
694+
},
695+
edges: {},
696+
},
697+
},
698+
};
699+
const spec = {
700+
name: 'my project',
701+
workflows: {
702+
w: {
703+
name: 'workflow',
704+
jobs: {
705+
'job-a': { name: 'job a', adaptor: '@openfn/language-http', body: 'fn()' },
706+
},
707+
triggers: {
708+
t: { type: 'cron', cron_expression: '0 * * * *' },
709+
},
710+
edges: {},
711+
},
712+
},
713+
};
714+
715+
const result = mergeSpecIntoState(state, spec);
716+
t.false('cron_cursor_job_id' in result.workflows.w.triggers.t);
717+
});
718+
478719
test('mergeProjectIntoState with no changes', (t) => {
479720
let existingState = fullExampleState();
480721
const workflowOne = existingState.workflows['workflow-one'];

0 commit comments

Comments
 (0)