Skip to content

Commit 904ee37

Browse files
committed
fix(workflow-engine): only commit step state after success
1 parent 2afe212 commit 904ee37

6 files changed

Lines changed: 81 additions & 91 deletions

File tree

rivetkit-typescript/packages/rivetkit/src/actor/config.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export const DEFAULT_SLEEP_GRACE_PERIOD = 15_000;
2626
export const ACTOR_CONTEXT_INTERNAL_SYMBOL = Symbol(
2727
"rivetkit.actor_context_internal",
2828
);
29+
export const RAW_STATE_SYMBOL = Symbol("rivetkit.raw_state");
2930
export const CONN_DRIVER_SYMBOL = Symbol("rivetkit.conn_driver");
3031
export const CONN_STATE_MANAGER_SYMBOL = Symbol("rivetkit.conn_state_manager");
3132

@@ -310,6 +311,8 @@ export interface ActorContext<
310311
TQueues extends QueueSchemaConfig = Record<never, never>,
311312
> {
312313
[ACTOR_CONTEXT_INTERNAL_SYMBOL]?: unknown;
314+
/** Returns the raw unwrapped state without the write-through proxy. */
315+
[RAW_STATE_SYMBOL](): TState;
313316
state: TState;
314317
vars: TVars;
315318
readonly kv: ActorKv;

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { VirtualWebSocket } from "@rivetkit/virtual-websocket";
22
import {
33
ACTOR_CONTEXT_INTERNAL_SYMBOL,
44
CONN_STATE_MANAGER_SYMBOL,
5+
RAW_STATE_SYMBOL,
56
getRunFunction,
67
getRunInspectorConfig,
78
type WorkflowInspectorConfig,
@@ -1162,6 +1163,10 @@ class NativeConnAdapter {
11621163
);
11631164
}
11641165

1166+
[RAW_STATE_SYMBOL](): unknown {
1167+
return this.#readState();
1168+
}
1169+
11651170
get state(): unknown {
11661171
const nextState = this.#readState();
11671172
return createWriteThroughProxy(nextState, (nextValue) => {
@@ -2455,6 +2460,13 @@ export class ActorContextHandleAdapter {
24552460
throw databaseClientNotReadyError();
24562461
}
24572462

2463+
[RAW_STATE_SYMBOL](): unknown {
2464+
if (!this.#stateEnabled) {
2465+
throw stateNotEnabledError();
2466+
}
2467+
return this.#readState();
2468+
}
2469+
24582470
get state(): unknown {
24592471
if (!this.#stateEnabled) {
24602472
throw stateNotEnabledError();

rivetkit-typescript/packages/rivetkit/src/workflow/context.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// @ts-nocheck
2-
import type { RunContext } from "@/actor/config";
2+
import { RAW_STATE_SYMBOL, type RunContext } from "@/actor/config";
33
import type {
44
QueueFilterName,
55
QueueNextBatchOptions,
@@ -247,14 +247,14 @@ export class ActorWorkflowContext<
247247
}
248248
return await this.#wrapActive(() =>
249249
this.#inner.step(nameOrConfig, () =>
250-
this.#withActorAccess(run),
250+
this.#withActorAccessAndStateRollback(run),
251251
),
252252
);
253253
}
254254
const stepConfig = nameOrConfig as StepConfig<T>;
255255
const config: StepConfig<T> = {
256256
...stepConfig,
257-
run: () => this.#withActorAccess(stepConfig.run),
257+
run: () => this.#withActorAccessAndStateRollback(stepConfig.run),
258258
};
259259
return await this.#wrapActive(() => this.#inner.step(config));
260260
}
@@ -271,14 +271,14 @@ export class ActorWorkflowContext<
271271
}
272272
return await this.#wrapActive(() =>
273273
this.#inner.tryStep(nameOrConfig, () =>
274-
this.#withActorAccess(run),
274+
this.#withActorAccessAndStateRollback(run),
275275
),
276276
);
277277
}
278278
const stepConfig = nameOrConfig as TryStepConfig<T>;
279279
const config: TryStepConfig<T> = {
280280
...stepConfig,
281-
run: () => this.#withActorAccess(stepConfig.run),
281+
run: () => this.#withActorAccessAndStateRollback(stepConfig.run),
282282
};
283283
return await this.#wrapActive(() => this.#inner.tryStep(config));
284284
}
@@ -612,6 +612,20 @@ export class ActorWorkflowContext<
612612
}
613613
}
614614

615+
async #withActorAccessAndStateRollback<T>(
616+
run: () => Promise<T>,
617+
): Promise<T> {
618+
const stateSnapshot = structuredClone(this.#runCtx[RAW_STATE_SYMBOL]());
619+
const varsSnapshot = structuredClone(this.#runCtx.vars);
620+
try {
621+
return await this.#withActorAccess(run);
622+
} catch (error) {
623+
this.#runCtx.state = stateSnapshot;
624+
this.#runCtx.vars = varsSnapshot;
625+
throw error;
626+
}
627+
}
628+
615629
#ensureActorAccess(feature: string): void {
616630
if (!this.#allowActorAccess) {
617631
this.#guardViolation = true;

rivetkit-typescript/packages/workflow-engine/src/context.ts

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -825,10 +825,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
825825
const maxRetries = config.maxRetries ?? DEFAULT_MAX_RETRIES;
826826

827827
if (metadata.attempts > maxRetries) {
828-
// Prefer step history error, but fall back to metadata since
829-
// driver implementations may persist metadata without the history
830-
// entry error (e.g. partial writes/crashes between attempts).
831-
const lastError = stepData.error ?? metadata.error;
828+
const lastError = metadata.error;
832829
const exhaustedError = new StepExhaustedError(
833830
config.name,
834831
lastError,
@@ -941,15 +938,15 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
941938
});
942939
return output;
943940
} catch (error) {
941+
if (entry.kind.type === "step") {
942+
entry.kind.data.error = String(error);
943+
}
944+
entry.dirty = true;
945+
944946
// Timeout errors are treated as critical (no retry)
945947
if (error instanceof StepTimeoutError) {
946-
if (entry.kind.type === "step") {
947-
entry.kind.data.error = String(error);
948-
}
949-
entry.dirty = true;
950948
metadata.status = "exhausted";
951949
metadata.error = String(error);
952-
await this.flushStorage();
953950
await this.notifyStepError(config, metadata.attempts, error, {
954951
willRetry: false,
955952
});
@@ -967,13 +964,8 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
967964
error instanceof CriticalError ||
968965
error instanceof RollbackError
969966
) {
970-
if (entry.kind.type === "step") {
971-
entry.kind.data.error = String(error);
972-
}
973-
entry.dirty = true;
974967
metadata.status = "exhausted";
975968
metadata.error = String(error);
976-
await this.flushStorage();
977969
await this.notifyStepError(config, metadata.attempts, error, {
978970
willRetry: false,
979971
});
@@ -990,15 +982,10 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
990982
);
991983
}
992984

993-
if (entry.kind.type === "step") {
994-
entry.kind.data.error = String(error);
995-
}
996-
entry.dirty = true;
997985
const willRetry = metadata.attempts <= maxRetries;
998986
metadata.status = willRetry ? "failed" : "exhausted";
999987
metadata.error = String(error);
1000988

1001-
await this.flushStorage();
1002989
if (willRetry) {
1003990
const retryDelay = calculateBackoff(
1004991
metadata.attempts,

rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -597,8 +597,8 @@ for (const mode of modes) {
597597
}
598598

599599
// Crash at iteration 3 during first run. State was
600-
// persisted at iteration 2 (deferred) and awaited at
601-
// the start of iteration 3, so state should be saved.
600+
// persisted at the end of iteration 2 after
601+
// Loop.continue, so state should be saved.
602602
if (state.count === 3 && firstRun) {
603603
throw new Error("Crash after state save");
604604
}
@@ -628,8 +628,8 @@ for (const mode of modes) {
628628
expect(result.state).toBe("completed");
629629
expect(result.output).toBe(5);
630630

631-
// Should resume from saved state at iteration 2, not from 0
632-
expect(iterationsExecuted[0]).toBe(2);
631+
// Should resume from saved state at iteration 3, not from 0
632+
expect(iterationsExecuted[0]).toBe(3);
633633
});
634634

635635
it("should handle loop that breaks before first prune interval", async () => {

rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts

Lines changed: 37 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import {
1010
type WorkflowErrorEvent,
1111
type WorkflowContextInterface,
1212
} from "../src/testing.js";
13-
import { buildHistoryPrefixAll, keyStartsWith } from "../src/keys.js";
14-
import { deserializeEntry, serializeEntry } from "../schemas/serde.js";
13+
import { buildHistoryPrefixAll } from "../src/keys.js";
14+
import { deserializeEntry } from "../schemas/serde.js";
1515

1616
const modes = ["yield", "live"] as const;
1717

@@ -26,30 +26,6 @@ class CountingDriver extends InMemoryDriver {
2626
}
2727
}
2828

29-
class StripStepHistoryErrorDriver extends InMemoryDriver {
30-
override async batch(
31-
writes: { key: Uint8Array; value: Uint8Array }[],
32-
): Promise<void> {
33-
const historyPrefix = buildHistoryPrefixAll();
34-
const rewritten = writes.map((write) => {
35-
if (!keyStartsWith(write.key, historyPrefix)) {
36-
return write;
37-
}
38-
39-
const entry = deserializeEntry(write.value);
40-
if (entry.kind.type === "step") {
41-
// Simulate a driver/crash scenario where the step error is not persisted
42-
// to the history entry, even though retries/exhaustion metadata is.
43-
entry.kind.data.error = undefined;
44-
return { key: write.key, value: serializeEntry(entry) };
45-
}
46-
47-
return write;
48-
});
49-
50-
return await super.batch(rewritten);
51-
}
52-
}
5329

5430
for (const mode of modes) {
5531
describe(`Workflow Engine Steps (${mode})`, { sequential: true }, () => {
@@ -485,43 +461,6 @@ for (const mode of modes) {
485461
).rejects.toThrow(StepExhaustedError);
486462
});
487463

488-
it("should surface the last error even if step history is missing the error", async () => {
489-
const driver = new StripStepHistoryErrorDriver();
490-
driver.latency = 0;
491-
492-
const workflow = async (ctx: WorkflowContextInterface) => {
493-
return await ctx.step({
494-
name: "always-fails",
495-
maxRetries: 1,
496-
retryBackoffBase: 0,
497-
retryBackoffMax: 0,
498-
run: async () => {
499-
throw new Error("Always fails");
500-
},
501-
});
502-
};
503-
504-
if (mode === "yield") {
505-
const firstResult = await runWorkflow(
506-
"wf-1",
507-
workflow,
508-
undefined,
509-
driver,
510-
{ mode },
511-
).result;
512-
expect(firstResult.state).toBe("sleeping");
513-
}
514-
515-
await expect(
516-
runWorkflow("wf-1", workflow, undefined, driver, { mode })
517-
.result,
518-
).rejects.toThrow(StepExhaustedError);
519-
await expect(
520-
runWorkflow("wf-1", workflow, undefined, driver, { mode })
521-
.result,
522-
).rejects.toThrow(/Always fails/);
523-
});
524-
525464
it("should recover exhausted retries", async () => {
526465
let attempts = 0;
527466

@@ -634,5 +573,40 @@ for (const mode of modes) {
634573
expect(result.output).toBe("done");
635574
expect(driver.batchCalls).toBe(2);
636575
});
576+
577+
it("should not commit step error data to entry on failure", async () => {
578+
const workflow = async (ctx: WorkflowContextInterface) => {
579+
return await ctx.step({
580+
name: "fail-once",
581+
maxRetries: 3,
582+
retryBackoffBase: 0,
583+
retryBackoffMax: 0,
584+
run: async () => {
585+
throw new Error("step failed");
586+
},
587+
});
588+
};
589+
590+
// Run once so the step fails and state is flushed
591+
try {
592+
await runWorkflow("wf-1", workflow, undefined, driver, {
593+
mode,
594+
}).result;
595+
} catch {}
596+
597+
// Inspect all history entries in KV
598+
const historyPrefix = buildHistoryPrefixAll();
599+
const entries = await driver.list(historyPrefix);
600+
601+
for (const kv of entries) {
602+
const entry = deserializeEntry(kv.value);
603+
if (entry.kind.type === "step") {
604+
// The step entry should have no output committed on failure.
605+
expect(entry.kind.data.output).toBeUndefined();
606+
// The error should be committed for inspection.
607+
expect(entry.kind.data.error).toBe("Error: step failed");
608+
}
609+
}
610+
});
637611
});
638612
}

0 commit comments

Comments
 (0)