Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rivetkit-typescript/packages/workflow-engine/docs/retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ await ctx.step({

- `maxRetries` defaults to 3.
- `retryBackoffBase` and `retryBackoffMax` control exponential delay.
- `timeout` limits how long the step can run; timeouts are treated as critical failures.
- `timeout` limits how long the step can run; by default a timeout is treated as a critical failure with no retry. Set `retryOnTimeout: true` on the step to retry timeouts like any other error.

## Unrecoverable Errors

Expand All @@ -46,7 +46,7 @@ await ctx.step("halt", async () => {
});
```

`StepTimeoutError` is also treated as critical, so timeouts bypass retries.
`StepTimeoutError` is treated as critical by default and bypasses retries. Opt into normal retry behavior with `retryOnTimeout: true`; when retries exhaust on a timeout the `try`-step failure `kind` is `"timeout"`.

## Exhaustion and Recovery

Expand Down
14 changes: 10 additions & 4 deletions rivetkit-typescript/packages/workflow-engine/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,9 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
}
entry.dirty = true;

// Timeout errors are treated as critical (no retry)
if (error instanceof StepTimeoutError) {
// Timeout errors are treated as critical by default. Steps opt
// into retrying on timeout with retryOnTimeout: true.
if (error instanceof StepTimeoutError && !config.retryOnTimeout) {
metadata.status = "exhausted";
metadata.error = String(error);
await this.notifyStepError(config, metadata.attempts, error, {
Expand Down Expand Up @@ -1010,7 +1011,10 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
attachTryStepFailure(
new StepExhaustedError(config.name, String(error)),
{
kind: "exhausted",
kind:
error instanceof StepTimeoutError
? "timeout"
: "exhausted",
stepName: config.name,
attempts: metadata.attempts,
error: extractErrorInfo(error),
Expand All @@ -1029,7 +1033,9 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
*
* Note: This does NOT cancel the underlying operation. JavaScript Promises
* cannot be cancelled once started. When a timeout occurs:
* - The step is marked as failed with StepTimeoutError
* - The step is rejected with StepTimeoutError. By default this is treated
* as a critical failure with no retry. Set retryOnTimeout: true on the
* step config to retry timeouts like any other error.
* - The underlying async operation continues running in the background
* - Any side effects from the operation may still occur
*
Expand Down
2 changes: 2 additions & 0 deletions rivetkit-typescript/packages/workflow-engine/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ export interface StepConfig<T> {
retryBackoffMax?: number;
/** Timeout in ms for step execution (default: 30000). Set to 0 to disable. */
timeout?: number;
/** If true, step timeouts retry like any other error instead of failing immediately as critical. Default: false. */
retryOnTimeout?: boolean;
}

export type TryStepCatchKind =
Expand Down
48 changes: 48 additions & 0 deletions rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,54 @@ for (const mode of modes) {
).rejects.toThrow(CriticalError);
});

it("should retry steps that exceed timeout when retryOnTimeout is set", async () => {
let attempts = 0;
const workflow = async (ctx: WorkflowContextInterface) => {
return await ctx.step({
name: "timeout-step",
timeout: 5,
retryOnTimeout: true,
maxRetries: 2,
retryBackoffBase: 1,
retryBackoffMax: 1,
run: async () => {
attempts++;
await new Promise((resolve) => setTimeout(resolve, 25));
return "late";
},
});
};

if (mode === "yield") {
const firstResult = await runWorkflow(
"wf-1",
workflow,
undefined,
driver,
{ mode },
).result;
expect(firstResult.state).toBe("sleeping");
await new Promise((resolve) => setTimeout(resolve, 10));

const secondResult = await runWorkflow(
"wf-1",
workflow,
undefined,
driver,
{ mode },
).result;
expect(secondResult.state).toBe("sleeping");
await new Promise((resolve) => setTimeout(resolve, 10));
}

await expect(
runWorkflow("wf-1", workflow, undefined, driver, { mode })
.result,
).rejects.toThrow(StepExhaustedError);

expect(attempts).toBe(3);
});

it("should fail when a step is not awaited", async () => {
const workflow = async (ctx: WorkflowContextInterface) => {
const first = ctx.step("step-a", async () => "a");
Expand Down
Loading