Skip to content

Commit b04eedf

Browse files
authored
[workflows] adding restart-from-step support to the sdk (#13565)
1 parent 6457fb3 commit b04eedf

9 files changed

Lines changed: 358 additions & 72 deletions

File tree

.changeset/restart-from-step.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
"@cloudflare/workflows-shared": minor
3+
"miniflare": minor
4+
---
5+
6+
Add restart from step support for local Workflows development
7+
8+
Workflow instances can now be restarted from a specific step in local development. When restarting from a step, all earlier steps preserve their cached results and replay instantly, while the target step and everything after it re-execute.
9+
10+
The `WorkflowInstance.restart()` method now accepts an optional `{ from: { name, count?, type? } }` parameter to specify which step to restart from.

fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ describe("workflow instance lifecycle methods", () => {
140140
// DISPOSE: ensured by `await using`
141141
});
142142

143+
// TODO(vaish): add restart-from-step test here once @cloudflare/workers-types ships restart options
144+
143145
it("should pause a workflow instance", async ({ expect }) => {
144146
// CONFIG:
145147
await using introspector = await introspectWorkflow(env.MODERATOR);

packages/miniflare/src/workers/local-explorer/resources/workflows.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type { AppContext } from "../common";
88
import type { Env } from "../explorer.worker";
99
import type { WorkflowsWorkflow } from "../generated";
1010
import type { zWorkflowsListInstancesData } from "../generated/zod.gen";
11+
import type { RestartFromStep } from "@cloudflare/workflows-shared/src/binding";
1112
import type { z } from "zod";
1213

1314
// ============================================================================
@@ -30,7 +31,7 @@ interface DirectoryEntry {
3031
interface WorkflowHandle {
3132
pause(): Promise<void>;
3233
resume(): Promise<void>;
33-
restart(): Promise<void>;
34+
restart(options?: { from?: RestartFromStep }): Promise<void>;
3435
terminate(): Promise<void>;
3536
sendEvent(args: { payload: unknown; type: string }): Promise<void>;
3637
status(): Promise<{ status: string; output?: unknown; error?: unknown }>;
@@ -871,7 +872,10 @@ export async function changeWorkflowInstanceStatus(
871872
}
872873

873874
try {
874-
const body = (await c.req.json()) as { action: string };
875+
const body = (await c.req.json()) as {
876+
action: string;
877+
from?: RestartFromStep;
878+
};
875879
const { action } = body;
876880

877881
if (!["pause", "resume", "restart", "terminate"].includes(action)) {
@@ -891,9 +895,19 @@ export async function changeWorkflowInstanceStatus(
891895
case "resume":
892896
await handle.resume();
893897
break;
894-
case "restart":
895-
await handle.restart();
898+
case "restart": {
899+
if (body.from && !body.from.name) {
900+
return errorResponse(
901+
400,
902+
10001,
903+
"'from.name' is required when restarting from a specific step."
904+
);
905+
}
906+
const opts = body.from ? { from: body.from } : undefined;
907+
// TODO(vaish): remove cast once @cloudflare/workers-types ships restart options
908+
await (handle as unknown as WorkflowHandle).restart(opts);
896909
break;
910+
}
897911
case "terminate":
898912
await handle.terminate();
899913
break;

packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
import type { WorkflowBinding } from "@cloudflare/workflows-shared/src/binding";
1+
import type {
2+
WorkflowBinding,
3+
WorkflowInstanceRestartOptions,
4+
} from "@cloudflare/workflows-shared/src/binding";
25

36
class WorkflowImpl implements Workflow {
47
constructor(private binding: WorkflowBinding) {}
@@ -86,9 +89,13 @@ class InstanceImpl implements WorkflowInstance {
8689
await instance.terminate();
8790
}
8891

89-
public async restart(): Promise<void> {
92+
public async restart(
93+
options?: WorkflowInstanceRestartOptions
94+
): Promise<void> {
9095
using instance = await this.getInstance();
91-
await instance.restart();
96+
// TODO(vaish): remove @ts-expect-error once @cloudflare/workers-types ships restart options
97+
// @ts-expect-error WorkflowInstance type does not include options yet
98+
await instance.restart(options);
9299
}
93100

94101
public async status(): Promise<InstanceStatus> {

packages/workflows-shared/src/binding.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,17 @@ type Env = {
2121
BINDING_NAME: string;
2222
};
2323

24+
// TODO(vaish): import from @cloudflare/workers-types once restart options are published
25+
export interface RestartFromStep {
26+
name: string;
27+
count?: number;
28+
type?: "do" | "sleep" | "waitForEvent";
29+
}
30+
31+
export interface WorkflowInstanceRestartOptions {
32+
from?: RestartFromStep;
33+
}
34+
2435
// this.env.WORKFLOW is WorkflowBinding
2536
export class WorkflowBinding extends WorkerEntrypoint<Env> {
2637
constructor(ctx: ExecutionContext, env: Env) {
@@ -208,9 +219,11 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
208219
}
209220
}
210221

211-
public async restart(): Promise<void> {
222+
public async restart(
223+
options?: WorkflowInstanceRestartOptions
224+
): Promise<void> {
212225
try {
213-
await this.stub.changeInstanceStatus("restart");
226+
await this.stub.changeInstanceStatus("restart", options?.from);
214227
} catch (e) {
215228
// restart causes instance abortion
216229
if (!isUserTriggeredRestart(e)) {

packages/workflows-shared/src/engine.ts

Lines changed: 42 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,29 @@ import {
1515
isAbortError,
1616
PreservedNonRetryableError,
1717
shouldPreserveNonRetryableError,
18+
stepNotFoundError,
1819
WorkflowFatalError,
1920
} from "./lib/errors";
2021
import {
2122
ENGINE_TIMEOUT,
2223
GracePeriodSemaphore,
2324
startGracePeriod,
2425
} from "./lib/gracePeriodSemaphore";
26+
import {
27+
readAndClearRestartFromStep,
28+
resolveGroupKeysToWipe,
29+
storeRestartFromStep,
30+
wipeRestartState,
31+
} from "./lib/restart";
2532
import {
2633
createReplayReadableStream,
2734
getInvalidStoredStreamOutputError,
2835
getStoredStreamOutputPreview,
2936
StreamOutputState,
3037
} from "./lib/streams";
3138
import { TimePriorityQueue } from "./lib/timePriorityQueue";
32-
import {
33-
isModifierKey,
34-
MODIFIER_KEYS,
35-
WorkflowInstanceModifier,
36-
} from "./modifier";
39+
import { MODIFIER_KEYS, WorkflowInstanceModifier } from "./modifier";
40+
import type { RestartFromStep } from "./binding";
3741
import type { Event } from "./context";
3842
import type { InstanceMetadata, RawInstanceLog } from "./instance";
3943
import type { StreamOutputMeta } from "./lib/streams";
@@ -753,7 +757,8 @@ export class Engine extends DurableObject<Env> {
753757
}
754758

755759
async changeInstanceStatus(
756-
newStatus: "resume" | "pause" | "terminate" | "restart"
760+
newStatus: "resume" | "pause" | "terminate" | "restart",
761+
from?: RestartFromStep
757762
) {
758763
const metadata =
759764
await this.ctx.storage.get<InstanceMetadata>(INSTANCE_METADATA);
@@ -802,6 +807,12 @@ export class Engine extends DurableObject<Env> {
802807
break;
803808
}
804809
case "restart":
810+
if (from) {
811+
if (!resolveGroupKeysToWipe(this.ctx.storage.sql, from)) {
812+
throw stepNotFoundError(from.name);
813+
}
814+
await storeRestartFromStep(this.ctx.storage, from);
815+
}
805816
await this.userTriggeredRestart();
806817
break;
807818
}
@@ -889,61 +900,27 @@ export class Engine extends DurableObject<Env> {
889900
await this.abort(ABORT_REASONS.USER_RESTART);
890901
}
891902

892-
private getMockedEventMapKeys(allKeys: Map<string, unknown>): Set<string> {
893-
const mockEventTypes = new Set<string>();
894-
for (const key of allKeys.keys()) {
895-
if (key.startsWith(MODIFIER_KEYS.MOCK_EVENT)) {
896-
mockEventTypes.add(key.slice(MODIFIER_KEYS.MOCK_EVENT.length));
897-
}
898-
}
899-
900-
if (mockEventTypes.size === 0) {
901-
return new Set();
902-
}
903+
async attemptRestart() {
904+
const restartFromStep = await readAndClearRestartFromStep(this.ctx.storage);
903905

904-
const preserved = new Set<string>();
905-
for (const key of allKeys.keys()) {
906-
if (key.startsWith(`${EVENT_MAP_PREFIX}\n`)) {
907-
// EVENT_MAP keys are formatted as "EVENT_MAP\n{type}\n{idx}"
908-
const eventType = key.split("\n")[1];
909-
if (eventType !== undefined && mockEventTypes.has(eventType)) {
910-
preserved.add(key);
911-
}
906+
let groupKeysToWipe: Set<string> | null = null;
907+
if (restartFromStep) {
908+
groupKeysToWipe = resolveGroupKeysToWipe(
909+
this.ctx.storage.sql,
910+
restartFromStep
911+
);
912+
if (!groupKeysToWipe) {
913+
throw stepNotFoundError(restartFromStep.name);
912914
}
913915
}
914916

915-
return preserved;
916-
}
917-
918-
async attemptRestart() {
919-
this.ctx.storage.sql.exec("DELETE FROM states");
920-
this.ctx.storage.sql.exec("DELETE FROM priority_queue");
921-
// Only delete non-mock streaming chunks. Mock stream outputs are stored
922-
// at attempt=0 (see modifier.ts mockStepResult) and their sentinels
923-
// survive restart via isModifierKey(), so the underlying SQL rows must
924-
// be preserved too.
925-
this.ctx.storage.sql.exec(
926-
"DELETE FROM streaming_step_chunks WHERE attempt != 0"
917+
await wipeRestartState(
918+
this.ctx.storage,
919+
ENGINE_STATUS_KEY,
920+
PAUSE_DATETIME,
921+
groupKeysToWipe
927922
);
928923

929-
const allKeys = await this.ctx.storage.list();
930-
const preservedEventMapKeys = this.getMockedEventMapKeys(allKeys);
931-
932-
// Remove all KV keys except:
933-
// - INSTANCE_METADATA (needed to re-run the workflow)
934-
// - Modifier/mock keys (so mocks survive restart)
935-
// - EVENT_MAP entries for mocked event types
936-
for (const key of allKeys.keys()) {
937-
if (
938-
key === INSTANCE_METADATA ||
939-
isModifierKey(key) ||
940-
preservedEventMapKeys.has(key)
941-
) {
942-
continue;
943-
}
944-
await this.ctx.storage.delete(key);
945-
}
946-
947924
const metadata =
948925
await this.ctx.storage.get<InstanceMetadata>(INSTANCE_METADATA);
949926

@@ -956,14 +933,16 @@ export class Engine extends DurableObject<Env> {
956933

957934
const { accountId, workflow, version, instance, event } = metadata;
958935

959-
this.writeLog(InstanceEvent.WORKFLOW_QUEUED, null, null, {
960-
params: event.payload,
961-
versionId: version.id,
962-
trigger: {
963-
source: InstanceTrigger.API,
964-
},
965-
});
966-
this.writeLog(InstanceEvent.WORKFLOW_START, null, null, {});
936+
if (!groupKeysToWipe) {
937+
this.writeLog(InstanceEvent.WORKFLOW_QUEUED, null, null, {
938+
params: event.payload,
939+
versionId: version.id,
940+
trigger: {
941+
source: InstanceTrigger.API,
942+
},
943+
});
944+
this.writeLog(InstanceEvent.WORKFLOW_START, null, null, {});
945+
}
967946

968947
void this.init(accountId, workflow, version, instance, event);
969948
}

packages/workflows-shared/src/lib/errors.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,10 @@ function getCompatFlag(name: string): boolean {
114114
export function shouldPreserveNonRetryableError(): boolean {
115115
return getCompatFlag("workflows_preserve_non_retryable_error_message");
116116
}
117+
118+
export function stepNotFoundError(name: string): WorkflowError {
119+
return createWorkflowError(
120+
`Step "${name}" not found in execution history`,
121+
"instance.cannot_restart"
122+
);
123+
}

0 commit comments

Comments
 (0)