Skip to content

Commit 8e2cf1e

Browse files
committed
[Workflows] Add step name and config to Workflows step context
1 parent 36c2c13 commit 8e2cf1e

3 files changed

Lines changed: 229 additions & 1 deletion

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@cloudflare/workflows-shared": minor
3+
---
4+
5+
Add `step` and `config` properties to the workflow step context
6+
7+
The callback passed to `step.do()` now receives `ctx.step` (with `name` and `count`) and `ctx.config` (the fully resolved step configuration with defaults merged in), in addition to the existing `ctx.attempt`.

packages/workflows-shared/src/context.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ export type StepState = {
6464
};
6565

6666
export type WorkflowStepContext = {
67+
step: {
68+
name: string;
69+
count: number;
70+
};
6771
attempt: number;
72+
config: ResolvedStepConfig;
6873
};
6974
const PAUSE_DATETIME = "PAUSE_DATETIME";
7075

@@ -508,7 +513,11 @@ export class Context extends RpcTarget {
508513
} else {
509514
timeoutTask = timeoutPromise();
510515
result = await Promise.race([
511-
doWrapperClosure({ attempt: stepState.attemptedCount }),
516+
doWrapperClosure({
517+
step: { name, count },
518+
attempt: stepState.attemptedCount,
519+
config: structuredClone(config),
520+
}),
512521
timeoutTask,
513522
]);
514523
}

packages/workflows-shared/tests/context.test.ts

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,218 @@ describe("Context", () => {
164164
// Without disableRetryDelays, this would take 20+ seconds (10s + 10s)
165165
expect(elapsed).toBeLessThan(5000);
166166
});
167+
168+
// NOTE: The `step` and `config` fields on WorkflowStepContext are new additions
169+
// that haven't been published to @cloudflare/workers-types yet. The runtime
170+
// provides them, but TypeScript doesn't know about them, so we capture ctx as
171+
// `unknown` and assert on the shape.
172+
// TODO: Remove these workarounds once https://github.com/cloudflare/workerd/pull/6523 lands.
173+
174+
it("should provide step name and count in context", async ({ expect }) => {
175+
let receivedCtx: unknown;
176+
177+
const engineStub = await runWorkflow(
178+
"MOCK-INSTANCE-STEP-CTX",
179+
async (_event, step) => {
180+
const result = await step.do("my step", async (ctx) => {
181+
receivedCtx = ctx;
182+
return "done";
183+
});
184+
return result;
185+
}
186+
);
187+
188+
await vi.waitUntil(
189+
async () => {
190+
const logs = (await engineStub.readLogs()) as EngineLogs;
191+
return logs.logs.some(
192+
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
193+
);
194+
},
195+
{ timeout: 1000 }
196+
);
197+
198+
expect(receivedCtx).toMatchObject({
199+
step: { name: "my step", count: 1 },
200+
});
201+
});
202+
203+
it("should increment step count for steps with the same name", async ({
204+
expect,
205+
}) => {
206+
const receivedContexts: unknown[] = [];
207+
208+
const engineStub = await runWorkflow(
209+
"MOCK-INSTANCE-STEP-COUNT",
210+
async (_event, step) => {
211+
await step.do("repeated step", async (ctx) => {
212+
receivedContexts.push(ctx);
213+
return "first";
214+
});
215+
await step.do("repeated step", async (ctx) => {
216+
receivedContexts.push(ctx);
217+
return "second";
218+
});
219+
await step.do("different step", async (ctx) => {
220+
receivedContexts.push(ctx);
221+
return "third";
222+
});
223+
return "done";
224+
}
225+
);
226+
227+
await vi.waitUntil(
228+
async () => {
229+
const logs = (await engineStub.readLogs()) as EngineLogs;
230+
return logs.logs.some(
231+
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
232+
);
233+
},
234+
{ timeout: 1000 }
235+
);
236+
237+
expect(receivedContexts[0]).toMatchObject({
238+
step: { name: "repeated step", count: 1 },
239+
});
240+
expect(receivedContexts[1]).toMatchObject({
241+
step: { name: "repeated step", count: 2 },
242+
});
243+
expect(receivedContexts[2]).toMatchObject({
244+
step: { name: "different step", count: 1 },
245+
});
246+
});
247+
248+
it("should provide resolved config with defaults in context", async ({
249+
expect,
250+
}) => {
251+
let receivedCtx: unknown;
252+
253+
const engineStub = await runWorkflow(
254+
"MOCK-INSTANCE-CTX-CONFIG-DEFAULTS",
255+
async (_event, step) => {
256+
// No config provided — ctx.config should have all defaults
257+
const result = await step.do("default config step", async (ctx) => {
258+
receivedCtx = ctx;
259+
return "done";
260+
});
261+
return result;
262+
}
263+
);
264+
265+
await vi.waitUntil(
266+
async () => {
267+
const logs = (await engineStub.readLogs()) as EngineLogs;
268+
return logs.logs.some(
269+
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
270+
);
271+
},
272+
{ timeout: 1000 }
273+
);
274+
275+
expect(receivedCtx).toMatchObject({
276+
config: {
277+
retries: {
278+
limit: 5,
279+
delay: 1000,
280+
backoff: "exponential",
281+
},
282+
timeout: "10 minutes",
283+
},
284+
});
285+
});
286+
287+
it("should provide resolved config with user overrides merged in context", async ({
288+
expect,
289+
}) => {
290+
let receivedCtx: unknown;
291+
292+
const engineStub = await runWorkflow(
293+
"MOCK-INSTANCE-CTX-CONFIG-OVERRIDES",
294+
async (_event, step) => {
295+
const result = await step.do(
296+
"custom config step",
297+
{
298+
retries: {
299+
limit: 3,
300+
delay: 500,
301+
},
302+
timeout: "5 minutes",
303+
},
304+
async (ctx) => {
305+
receivedCtx = ctx;
306+
return "done";
307+
}
308+
);
309+
return result;
310+
}
311+
);
312+
313+
await vi.waitUntil(
314+
async () => {
315+
const logs = (await engineStub.readLogs()) as EngineLogs;
316+
return logs.logs.some(
317+
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
318+
);
319+
},
320+
{ timeout: 1000 }
321+
);
322+
323+
// User overrides should be merged with defaults
324+
expect(receivedCtx).toMatchObject({
325+
config: {
326+
retries: {
327+
limit: 3,
328+
delay: 500,
329+
backoff: "exponential",
330+
},
331+
timeout: "5 minutes",
332+
},
333+
});
334+
});
335+
336+
it("should not allow user callback to mutate engine retry config", async ({
337+
expect,
338+
}) => {
339+
const receivedAttempts: number[] = [];
340+
341+
const engineStub = await runWorkflow(
342+
"MOCK-INSTANCE-CTX-CONFIG-MUTATION",
343+
async (_event, step) => {
344+
const result = await step.do(
345+
"mutating step",
346+
{
347+
retries: {
348+
limit: 1,
349+
delay: 0,
350+
},
351+
},
352+
async (ctx) => {
353+
receivedAttempts.push(ctx.attempt);
354+
// Attempt to escalate retries from 1 to 100
355+
(
356+
ctx as unknown as { config: { retries: { limit: number } } }
357+
).config.retries.limit = 100;
358+
throw new Error("retry me");
359+
}
360+
);
361+
return result;
362+
}
363+
);
364+
365+
await vi.waitUntil(
366+
async () => {
367+
const logs = (await engineStub.readLogs()) as EngineLogs;
368+
return logs.logs.some(
369+
(val) => val.event === InstanceEvent.WORKFLOW_FAILURE
370+
);
371+
},
372+
{ timeout: 1000 }
373+
);
374+
375+
// With limit: 1, the engine should execute attempt 1 + 1 retry = 2 attempts total.
376+
// If the mutation leaked, this would keep going far beyond 2.
377+
expect(receivedAttempts).toEqual([1, 2]);
378+
});
167379
});
168380

169381
// ── Helpers for stream tests ────────────────────────────────────────────────

0 commit comments

Comments
 (0)