diff --git a/.changeset/step-context-parameters.md b/.changeset/step-context-parameters.md new file mode 100644 index 0000000000..11dd40f84b --- /dev/null +++ b/.changeset/step-context-parameters.md @@ -0,0 +1,7 @@ +--- +"@cloudflare/workflows-shared": minor +--- + +Add `step` and `config` properties to the workflow step context + +The callback passed to `step.do()` now receives `ctx.step` (with `name` and `count`) and `ctx.config` (the fully resolved step configuration with defaults merged in), in addition to the existing `ctx.attempt`. diff --git a/packages/workflows-shared/src/context.ts b/packages/workflows-shared/src/context.ts index b3e35c0fc0..57a40a58ed 100644 --- a/packages/workflows-shared/src/context.ts +++ b/packages/workflows-shared/src/context.ts @@ -64,7 +64,12 @@ export type StepState = { }; export type WorkflowStepContext = { + step: { + name: string; + count: number; + }; attempt: number; + config: ResolvedStepConfig; }; const PAUSE_DATETIME = "PAUSE_DATETIME"; @@ -508,7 +513,11 @@ export class Context extends RpcTarget { } else { timeoutTask = timeoutPromise(); result = await Promise.race([ - doWrapperClosure({ attempt: stepState.attemptedCount }), + doWrapperClosure({ + step: { name, count }, + attempt: stepState.attemptedCount, + config: structuredClone(config), + }), timeoutTask, ]); } diff --git a/packages/workflows-shared/tests/context.test.ts b/packages/workflows-shared/tests/context.test.ts index 22709adda6..52d10a8f21 100644 --- a/packages/workflows-shared/tests/context.test.ts +++ b/packages/workflows-shared/tests/context.test.ts @@ -164,6 +164,218 @@ describe("Context", () => { // Without disableRetryDelays, this would take 20+ seconds (10s + 10s) expect(elapsed).toBeLessThan(5000); }); + + // NOTE: The `step` and `config` fields on WorkflowStepContext are new additions + // that haven't been published to @cloudflare/workers-types yet. The runtime + // provides them, but TypeScript doesn't know about them, so we capture ctx as + // `unknown` and assert on the shape. + // TODO: Remove these workarounds once https://github.com/cloudflare/workerd/pull/6523 lands. + + it("should provide step name and count in context", async ({ expect }) => { + let receivedCtx: unknown; + + const engineStub = await runWorkflow( + "MOCK-INSTANCE-STEP-CTX", + async (_event, step) => { + const result = await step.do("my step", async (ctx) => { + receivedCtx = ctx; + return "done"; + }); + return result; + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 1000 } + ); + + expect(receivedCtx).toMatchObject({ + step: { name: "my step", count: 1 }, + }); + }); + + it("should increment step count for steps with the same name", async ({ + expect, + }) => { + const receivedContexts: unknown[] = []; + + const engineStub = await runWorkflow( + "MOCK-INSTANCE-STEP-COUNT", + async (_event, step) => { + await step.do("repeated step", async (ctx) => { + receivedContexts.push(ctx); + return "first"; + }); + await step.do("repeated step", async (ctx) => { + receivedContexts.push(ctx); + return "second"; + }); + await step.do("different step", async (ctx) => { + receivedContexts.push(ctx); + return "third"; + }); + return "done"; + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 1000 } + ); + + expect(receivedContexts[0]).toMatchObject({ + step: { name: "repeated step", count: 1 }, + }); + expect(receivedContexts[1]).toMatchObject({ + step: { name: "repeated step", count: 2 }, + }); + expect(receivedContexts[2]).toMatchObject({ + step: { name: "different step", count: 1 }, + }); + }); + + it("should provide resolved config with defaults in context", async ({ + expect, + }) => { + let receivedCtx: unknown; + + const engineStub = await runWorkflow( + "MOCK-INSTANCE-CTX-CONFIG-DEFAULTS", + async (_event, step) => { + // No config provided — ctx.config should have all defaults + const result = await step.do("default config step", async (ctx) => { + receivedCtx = ctx; + return "done"; + }); + return result; + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 1000 } + ); + + expect(receivedCtx).toMatchObject({ + config: { + retries: { + limit: 5, + delay: 1000, + backoff: "exponential", + }, + timeout: "10 minutes", + }, + }); + }); + + it("should provide resolved config with user overrides merged in context", async ({ + expect, + }) => { + let receivedCtx: unknown; + + const engineStub = await runWorkflow( + "MOCK-INSTANCE-CTX-CONFIG-OVERRIDES", + async (_event, step) => { + const result = await step.do( + "custom config step", + { + retries: { + limit: 3, + delay: 500, + }, + timeout: "5 minutes", + }, + async (ctx) => { + receivedCtx = ctx; + return "done"; + } + ); + return result; + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 1000 } + ); + + // User overrides should be merged with defaults + expect(receivedCtx).toMatchObject({ + config: { + retries: { + limit: 3, + delay: 500, + backoff: "exponential", + }, + timeout: "5 minutes", + }, + }); + }); + + it("should not allow user callback to mutate engine retry config", async ({ + expect, + }) => { + const receivedAttempts: number[] = []; + + const engineStub = await runWorkflow( + "MOCK-INSTANCE-CTX-CONFIG-MUTATION", + async (_event, step) => { + const result = await step.do( + "mutating step", + { + retries: { + limit: 1, + delay: 0, + }, + }, + async (ctx) => { + receivedAttempts.push(ctx.attempt); + // Attempt to escalate retries from 1 to 100 + ( + ctx as unknown as { config: { retries: { limit: number } } } + ).config.retries.limit = 100; + throw new Error("retry me"); + } + ); + return result; + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_FAILURE + ); + }, + { timeout: 1000 } + ); + + // With limit: 1, the engine should execute attempt 1 + 1 retry = 2 attempts total. + // If the mutation leaked, this would keep going far beyond 2. + expect(receivedAttempts).toEqual([1, 2]); + }); }); // ── Helpers for stream tests ────────────────────────────────────────────────