Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/step-context-parameters.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@cloudflare/workflows-shared": minor
---

Add `step` and `config` properties to the workflow step context

The callback passed to `step.do()` now receives `ctx.step` (with `name` and `count`) and `ctx.config` (the fully resolved step configuration with defaults merged in), in addition to the existing `ctx.attempt`.
11 changes: 10 additions & 1 deletion packages/workflows-shared/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ export type StepState = {
};

export type WorkflowStepContext = {
step: {
name: string;
count: number;
};
attempt: number;
config: ResolvedStepConfig;
};
const PAUSE_DATETIME = "PAUSE_DATETIME";

Expand Down Expand Up @@ -508,7 +513,11 @@ export class Context extends RpcTarget {
} else {
timeoutTask = timeoutPromise();
result = await Promise.race([
doWrapperClosure({ attempt: stepState.attemptedCount }),
doWrapperClosure({
step: { name, count },
Comment thread
avenceslau marked this conversation as resolved.
attempt: stepState.attemptedCount,
config: structuredClone(config),
}),
Comment thread
pombosilva marked this conversation as resolved.
timeoutTask,
]);
}
Expand Down
212 changes: 212 additions & 0 deletions packages/workflows-shared/tests/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,218 @@ describe("Context", () => {
// Without disableRetryDelays, this would take 20+ seconds (10s + 10s)
expect(elapsed).toBeLessThan(5000);
});

// NOTE: The `step` and `config` fields on WorkflowStepContext are new additions
// that haven't been published to @cloudflare/workers-types yet. The runtime
// provides them, but TypeScript doesn't know about them, so we capture ctx as
// `unknown` and assert on the shape.
Comment thread
pombosilva marked this conversation as resolved.
// TODO: Remove these workarounds once https://github.com/cloudflare/workerd/pull/6523 lands.

it("should provide step name and count in context", async ({ expect }) => {
let receivedCtx: unknown;

const engineStub = await runWorkflow(
"MOCK-INSTANCE-STEP-CTX",
async (_event, step) => {
const result = await step.do("my step", async (ctx) => {
receivedCtx = ctx;
return "done";
});
return result;
}
);

await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return logs.logs.some(
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
);
},
{ timeout: 1000 }
);

expect(receivedCtx).toMatchObject({
step: { name: "my step", count: 1 },
});
});

it("should increment step count for steps with the same name", async ({
expect,
}) => {
const receivedContexts: unknown[] = [];

const engineStub = await runWorkflow(
"MOCK-INSTANCE-STEP-COUNT",
async (_event, step) => {
await step.do("repeated step", async (ctx) => {
receivedContexts.push(ctx);
return "first";
});
await step.do("repeated step", async (ctx) => {
receivedContexts.push(ctx);
return "second";
});
await step.do("different step", async (ctx) => {
receivedContexts.push(ctx);
return "third";
});
return "done";
}
);

await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return logs.logs.some(
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
);
},
{ timeout: 1000 }
);

expect(receivedContexts[0]).toMatchObject({
step: { name: "repeated step", count: 1 },
});
expect(receivedContexts[1]).toMatchObject({
step: { name: "repeated step", count: 2 },
});
expect(receivedContexts[2]).toMatchObject({
step: { name: "different step", count: 1 },
});
});

it("should provide resolved config with defaults in context", async ({
expect,
}) => {
let receivedCtx: unknown;

const engineStub = await runWorkflow(
"MOCK-INSTANCE-CTX-CONFIG-DEFAULTS",
async (_event, step) => {
// No config provided — ctx.config should have all defaults
const result = await step.do("default config step", async (ctx) => {
receivedCtx = ctx;
return "done";
});
return result;
}
);

await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return logs.logs.some(
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
);
},
{ timeout: 1000 }
);

expect(receivedCtx).toMatchObject({
config: {
retries: {
limit: 5,
delay: 1000,
backoff: "exponential",
},
timeout: "10 minutes",
},
});
});

it("should provide resolved config with user overrides merged in context", async ({
expect,
}) => {
let receivedCtx: unknown;

const engineStub = await runWorkflow(
"MOCK-INSTANCE-CTX-CONFIG-OVERRIDES",
async (_event, step) => {
const result = await step.do(
"custom config step",
{
retries: {
limit: 3,
delay: 500,
},
timeout: "5 minutes",
},
async (ctx) => {
receivedCtx = ctx;
return "done";
}
);
return result;
}
);

await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return logs.logs.some(
(val) => val.event === InstanceEvent.WORKFLOW_SUCCESS
);
},
{ timeout: 1000 }
);

// User overrides should be merged with defaults
expect(receivedCtx).toMatchObject({
config: {
retries: {
limit: 3,
delay: 500,
backoff: "exponential",
},
timeout: "5 minutes",
},
});
});

it("should not allow user callback to mutate engine retry config", async ({
expect,
}) => {
const receivedAttempts: number[] = [];

const engineStub = await runWorkflow(
"MOCK-INSTANCE-CTX-CONFIG-MUTATION",
async (_event, step) => {
const result = await step.do(
"mutating step",
{
retries: {
limit: 1,
delay: 0,
},
},
async (ctx) => {
receivedAttempts.push(ctx.attempt);
// Attempt to escalate retries from 1 to 100
(
ctx as unknown as { config: { retries: { limit: number } } }
).config.retries.limit = 100;
throw new Error("retry me");
}
);
return result;
}
);

await vi.waitUntil(
async () => {
const logs = (await engineStub.readLogs()) as EngineLogs;
return logs.logs.some(
(val) => val.event === InstanceEvent.WORKFLOW_FAILURE
);
},
{ timeout: 1000 }
);

// With limit: 1, the engine should execute attempt 1 + 1 retry = 2 attempts total.
// If the mutation leaked, this would keep going far beyond 2.
expect(receivedAttempts).toEqual([1, 2]);
});
});

// ── Helpers for stream tests ────────────────────────────────────────────────
Expand Down
Loading