Skip to content

Commit dbe6ae9

Browse files
committed
[Workflows] Add step name and config to Workflows step context
1 parent 36c2c13 commit dbe6ae9

3 files changed

Lines changed: 226 additions & 1 deletion

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@cloudflare/workflows-shared": minor
3+
---
4+
5+
Add `step` and `config` properties to the workflow step context
6+
7+
The callback passed to `step.do()` now receives `ctx.step` (with `name` and `count`) and `ctx.config` (the fully resolved step configuration with defaults merged in), in addition to the existing `ctx.attempt`.

packages/workflows-shared/src/context.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ export type StepState = {
6464
};
6565

6666
export type WorkflowStepContext = {
67+
step: {
68+
name: string;
69+
count: number;
70+
};
6771
attempt: number;
72+
config: ResolvedStepConfig;
6873
};
6974
const PAUSE_DATETIME = "PAUSE_DATETIME";
7075

@@ -508,7 +513,11 @@ export class Context extends RpcTarget {
508513
} else {
509514
timeoutTask = timeoutPromise();
510515
result = await Promise.race([
511-
doWrapperClosure({ attempt: stepState.attemptedCount }),
516+
doWrapperClosure({
517+
step: { name, count },
518+
attempt: stepState.attemptedCount,
519+
config: { ...config, retries: { ...config.retries } },
520+
}),
512521
timeoutTask,
513522
]);
514523
}

packages/workflows-shared/tests/context.test.ts

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,215 @@ describe("Context", () => {
164164
// Without disableRetryDelays, this would take 20+ seconds (10s + 10s)
165165
expect(elapsed).toBeLessThan(5000);
166166
});
167+
168+
// NOTE: The `step` and `config` fields on WorkflowStepContext are new additions
169+
// that haven't been published to @cloudflare/workers-types yet. The runtime
170+
// provides them, but TypeScript doesn't know about them, so we capture ctx as
171+
// `unknown` and assert on the shape.
172+
173+
it("should provide step name and count in context", async ({ expect }) => {
174+
let receivedCtx: unknown;
175+
176+
const engineStub = await runWorkflow(
177+
"MOCK-INSTANCE-STEP-CTX",
178+
async (_event, step) => {
179+
const result = await step.do("my step", async (ctx) => {
180+
receivedCtx = ctx;
181+
return "done";
182+
});
183+
return result;
184+
}
185+
);
186+
187+
await vi.waitUntil(
188+
async () => {
189+
const logs = (await engineStub.readLogs()) as EngineLogs;
190+
return logs.logs.some(
191+
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
192+
);
193+
},
194+
{ timeout: 1000 }
195+
);
196+
197+
expect(receivedCtx).toMatchObject({
198+
step: { name: "my step", count: 1 },
199+
});
200+
});
201+
202+
it("should increment step count for steps with the same name", async ({
203+
expect,
204+
}) => {
205+
const receivedContexts: unknown[] = [];
206+
207+
const engineStub = await runWorkflow(
208+
"MOCK-INSTANCE-STEP-COUNT",
209+
async (_event, step) => {
210+
await step.do("repeated step", async (ctx) => {
211+
receivedContexts.push(ctx);
212+
return "first";
213+
});
214+
await step.do("repeated step", async (ctx) => {
215+
receivedContexts.push(ctx);
216+
return "second";
217+
});
218+
await step.do("different step", async (ctx) => {
219+
receivedContexts.push(ctx);
220+
return "third";
221+
});
222+
return "done";
223+
}
224+
);
225+
226+
await vi.waitUntil(
227+
async () => {
228+
const logs = (await engineStub.readLogs()) as EngineLogs;
229+
return logs.logs.some(
230+
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
231+
);
232+
},
233+
{ timeout: 1000 }
234+
);
235+
236+
expect(receivedContexts[0]).toMatchObject({
237+
step: { name: "repeated step", count: 1 },
238+
});
239+
expect(receivedContexts[1]).toMatchObject({
240+
step: { name: "repeated step", count: 2 },
241+
});
242+
expect(receivedContexts[2]).toMatchObject({
243+
step: { name: "different step", count: 1 },
244+
});
245+
});
246+
247+
it("should provide resolved config with defaults in context", async ({
248+
expect,
249+
}) => {
250+
let receivedCtx: unknown;
251+
252+
const engineStub = await runWorkflow(
253+
"MOCK-INSTANCE-CTX-CONFIG-DEFAULTS",
254+
async (_event, step) => {
255+
// No config provided — ctx.config should have all defaults
256+
const result = await step.do("default config step", async (ctx) => {
257+
receivedCtx = ctx;
258+
return "done";
259+
});
260+
return result;
261+
}
262+
);
263+
264+
await vi.waitUntil(
265+
async () => {
266+
const logs = (await engineStub.readLogs()) as EngineLogs;
267+
return logs.logs.some(
268+
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
269+
);
270+
},
271+
{ timeout: 1000 }
272+
);
273+
274+
expect(receivedCtx).toMatchObject({
275+
config: {
276+
retries: {
277+
limit: 5,
278+
delay: 1000,
279+
backoff: "exponential",
280+
},
281+
timeout: "10 minutes",
282+
},
283+
});
284+
});
285+
286+
it("should provide resolved config with user overrides merged in context", async ({
287+
expect,
288+
}) => {
289+
let receivedCtx: unknown;
290+
291+
const engineStub = await runWorkflow(
292+
"MOCK-INSTANCE-CTX-CONFIG-OVERRIDES",
293+
async (_event, step) => {
294+
const result = await step.do(
295+
"custom config step",
296+
{
297+
retries: {
298+
limit: 3,
299+
delay: 500,
300+
},
301+
timeout: "5 minutes",
302+
},
303+
async (ctx) => {
304+
receivedCtx = ctx;
305+
return "done";
306+
}
307+
);
308+
return result;
309+
}
310+
);
311+
312+
await vi.waitUntil(
313+
async () => {
314+
const logs = (await engineStub.readLogs()) as EngineLogs;
315+
return logs.logs.some(
316+
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
317+
);
318+
},
319+
{ timeout: 1000 }
320+
);
321+
322+
// User overrides should be merged with defaults
323+
expect(receivedCtx).toMatchObject({
324+
config: {
325+
retries: {
326+
limit: 3,
327+
delay: 500,
328+
backoff: "exponential",
329+
},
330+
timeout: "5 minutes",
331+
},
332+
});
333+
});
334+
335+
it("should not allow user callback to mutate engine retry config", async ({
336+
expect,
337+
}) => {
338+
const receivedAttempts: number[] = [];
339+
340+
const engineStub = await runWorkflow(
341+
"MOCK-INSTANCE-CTX-CONFIG-MUTATION",
342+
async (_event, step) => {
343+
const result = await step.do(
344+
"mutating step",
345+
{
346+
retries: {
347+
limit: 1,
348+
delay: 0,
349+
},
350+
},
351+
async (ctx) => {
352+
receivedAttempts.push(ctx.attempt);
353+
// Attempt to escalate retries from 1 to 100
354+
(ctx as unknown as { config: { retries: { limit: number } } }).config.retries.limit = 100;
355+
throw new Error("retry me");
356+
}
357+
);
358+
return result;
359+
}
360+
);
361+
362+
await vi.waitUntil(
363+
async () => {
364+
const logs = (await engineStub.readLogs()) as EngineLogs;
365+
return logs.logs.some(
366+
(val) => val.event === InstanceEvent.WORKFLOW_FAILURE
367+
);
368+
},
369+
{ timeout: 1000 }
370+
);
371+
372+
// With limit: 1, the engine should execute attempt 1 + 1 retry = 2 attempts total.
373+
// If the mutation leaked, this would keep going far beyond 2.
374+
expect(receivedAttempts).toEqual([1, 2]);
375+
});
167376
});
168377

169378
// ── Helpers for stream tests ────────────────────────────────────────────────

0 commit comments

Comments
 (0)