Skip to content

Commit f7a668d

Browse files
committed
test(rivetkit): expect workflow step rollback
1 parent 2d81eaa commit f7a668d

3 files changed

Lines changed: 72 additions & 2 deletions

File tree

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ import {
166166
workflowSleepActor,
167167
workflowSpawnChildActor,
168168
workflowSpawnParentActor,
169+
workflowStepRollbackActor,
169170
workflowStopTeardownActor,
170171
workflowTryActor,
171172
} from "./workflow";
@@ -322,6 +323,7 @@ export const registry = setup({
322323
workflowRunningStepActor,
323324
workflowReplayActor,
324325
workflowSleepActor,
326+
workflowStepRollbackActor,
325327
workflowTryActor,
326328
warmupActor,
327329
workflowStopTeardownActor,

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,12 +455,17 @@ export const workflowTryActor = actor({
455455
} | null,
456456
tryJoinFailure: null as string | null,
457457
},
458+
vars: {
459+
innerWrites: 0,
460+
recoveryWrites: 0,
461+
},
458462
run: workflow(async (ctx) => {
459463
const stepResult = await ctx.tryStep({
460464
name: "charge-card",
461465
maxRetries: 0,
462466
run: async () => {
463467
ctx.state.innerWrites += 1;
468+
ctx.vars.innerWrites += 1;
464469
throw new Error("card declined");
465470
},
466471
});
@@ -479,6 +484,7 @@ export const workflowTryActor = actor({
479484
});
480485

481486
await ctx.step("store-try-results", async () => {
487+
ctx.vars.recoveryWrites += 1;
482488
if (!stepResult.ok) {
483489
ctx.state.tryStepFailure = {
484490
kind: stepResult.failure.kind,
@@ -492,7 +498,43 @@ export const workflowTryActor = actor({
492498
});
493499
}),
494500
actions: {
495-
getState: (c) => c.state,
501+
getState: (c) => ({ ...c.state, vars: c.vars }),
502+
},
503+
options: {
504+
sleepTimeout: 50,
505+
},
506+
});
507+
508+
export const workflowStepRollbackActor = actor({
509+
state: {
510+
failedStateWrites: 0,
511+
recoveryStateWrites: 0,
512+
failureCaught: false,
513+
},
514+
vars: {
515+
failedVarsWrites: 0,
516+
recoveryVarsWrites: 0,
517+
},
518+
run: workflow(async (ctx) => {
519+
const stepResult = await ctx.try(
520+
"recover-failed-step",
521+
async (tryCtx) => {
522+
await tryCtx.step("failing-step", async () => {
523+
tryCtx.state.failedStateWrites += 1;
524+
tryCtx.vars.failedVarsWrites += 1;
525+
throw new Error("step rollback");
526+
});
527+
},
528+
);
529+
530+
await ctx.step("record-recovery", async () => {
531+
ctx.state.recoveryStateWrites += 1;
532+
ctx.vars.recoveryVarsWrites += 1;
533+
ctx.state.failureCaught = !stepResult.ok;
534+
});
535+
}),
536+
actions: {
537+
getSnapshot: (c) => ({ state: c.state, vars: c.vars }),
496538
},
497539
options: {
498540
sleepTimeout: 50,

rivetkit-typescript/packages/rivetkit/tests/driver/actor-workflow.test.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,10 @@ describeDriverMatrix("Actor Workflow", (driverTestConfig) => {
231231
state = await actor.getState();
232232
}
233233

234-
expect(state.innerWrites).toBe(1);
234+
// Failed workflow steps roll back actor state mutations.
235+
expect(state.innerWrites).toBe(0);
236+
expect(state.vars.innerWrites).toBe(0);
237+
expect(state.vars.recoveryWrites).toBe(1);
235238
expect(state.tryStepFailure).toEqual({
236239
kind: "exhausted",
237240
message: "card declined",
@@ -240,6 +243,29 @@ describeDriverMatrix("Actor Workflow", (driverTestConfig) => {
240243
expect(state.tryJoinFailure).toBe("join:parallel");
241244
});
242245

246+
test("failed workflow step rolls back state and vars", async (c) => {
247+
const { client } = await setupDriverTest(c, driverTestConfig);
248+
const actor = client.workflowStepRollbackActor.getOrCreate([
249+
"workflow-step-rollback",
250+
]);
251+
252+
let snapshot = await actor.getSnapshot();
253+
for (let i = 0; i < 40 && !snapshot.state.failureCaught; i++) {
254+
await waitFor(driverTestConfig, 50);
255+
snapshot = await actor.getSnapshot();
256+
}
257+
258+
expect(snapshot.state).toMatchObject({
259+
failedStateWrites: 0,
260+
recoveryStateWrites: 1,
261+
failureCaught: true,
262+
});
263+
expect(snapshot.vars).toMatchObject({
264+
failedVarsWrites: 0,
265+
recoveryVarsWrites: 1,
266+
});
267+
});
268+
243269
test("sleeps and resumes between ticks", async (c) => {
244270
const { client } = await setupDriverTest(c, driverTestConfig);
245271
const actor = client.workflowSleepActor.getOrCreate([

0 commit comments

Comments
 (0)