Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
---
title: Distributed Abort Controller
description: A distributed AbortController that uses durable workflows for cross-process cancellation signaling.
type: guide
summary: Build a distributed abort controller that uses workflow streams and hooks to propagate cancellation signals across process boundaries.
---

Use this pattern when you need an `AbortController`-like interface that works across distributed systems. The controller uses a durable workflow to coordinate cancellation — calling `.abort()` on one machine triggers the `.signal` on any other machine.

## When to use this

- **Cross-process cancellation** — Cancel a long-running operation from a different server, worker, or edge function
- **Durable cancellation** — The abort signal persists even if the process that created it crashes
- **UI stop buttons** — Let users cancel operations running on the server from the browser
- **Timeout coordination** — The built-in TTL auto-expires stale controllers

## Pattern

The `DistributedAbortController` class encapsulates a workflow that:
1. Accepts a user-provided unique ID (like a chat ID or task ID)
2. Creates or reconnects to an existing workflow using that ID
3. Waits for a hook signal OR TTL expiration
4. Writes a cancellation message to the run's stream when triggered

### Core Implementation

```typescript lineNumbers
import { defineHook, getWritable, sleep } from "workflow";
import { start, getRun, getHookByToken } from "workflow/api";

// Default TTL: 24 hours
const DEFAULT_TTL_MS = 24 * 60 * 60 * 1000;
// Default grace period: 1 hour (keeps hook alive after abort for late subscribers)
const DEFAULT_GRACE_MS = 60 * 60 * 1000;

// Hook to trigger the abort signal
export const abortHook = defineHook<{ reason?: string }>();

// The abort message written to the stream
export type AbortMessage = {
type: "abort";
reason?: string;
expired?: boolean;
};

// Helper to create a consistent hook token from the user ID
function getAbortToken(id: string): string {
return `abort:${id}`;
}

// Step function that writes the abort message to the stream
async function writeAbortSignal(reason?: string, expired?: boolean) {
"use step";

const writable = getWritable<AbortMessage>();
const writer = writable.getWriter();
try {
await writer.write({ type: "abort", reason, expired });
} finally {
writer.releaseLock();
}
await writable.close();
}

// Workflow that waits for abort or TTL expiration
export async function abortControllerWorkflow(
id: string,
ttlMs: number,
graceMs: number
) {
"use workflow";

const startTime = Date.now();
const hook = abortHook.create({ token: getAbortToken(id) });

// Race: manual abort OR TTL expiration // [!code highlight]
const result = await Promise.race([
hook.then((payload) => ({
reason: payload.reason,
expired: false,
})),
sleep(`${ttlMs}ms`).then(() => ({
reason: "Controller expired",
expired: true,
})),
]);

await writeAbortSignal(result.reason, result.expired);

// Only sleep through grace period on TTL expiration (keeps hook alive for late subscribers). // [!code highlight]
// Manual aborts complete immediately.
if (result.expired) {
const elapsed = Date.now() - startTime;
const remainingTime = graceMs - (elapsed - ttlMs);
if (remainingTime > 0) {
await sleep(`${remainingTime}ms`); // [!code highlight]
}
}

return { aborted: true, reason: result.reason, expired: result.expired };
}

/**
* A distributed abort controller that works across process boundaries.
* Uses a semantically meaningful ID (like a chat ID or task ID) to coordinate.
*/
export class DistributedAbortController {
private id: string;
readonly runId: string;

private constructor(id: string, runId: string) {
this.id = id;
this.runId = runId;
}

/**
* Creates or reconnects to a distributed abort controller.
* If a controller with this ID already exists, reconnects to it.
* Otherwise, starts a new workflow.
*
* @param id - A unique, semantically meaningful ID (e.g., "chat:123")
* @param options.ttlMs - Time-to-live in ms (default: 24 hours)
* @param options.graceMs - Grace period after abort (default: 1 hour)
*/
static async create( // [!code highlight]
id: string,
options: { ttlMs?: number; graceMs?: number } = {}
): Promise<DistributedAbortController> {
const { ttlMs = DEFAULT_TTL_MS, graceMs = DEFAULT_GRACE_MS } = options;
const token = getAbortToken(id);

// Try to find an existing run with this hook token
const existingHook = await getHookByToken(token).catch(() => null); // [!code highlight]

if (existingHook) {
// Reconnect to existing controller
return new DistributedAbortController(id, existingHook.runId);
}

// Create a new workflow
const run = await start(abortControllerWorkflow, [id, ttlMs, graceMs]); // [!code highlight]
return new DistributedAbortController(id, run.runId);
}

/**
* Triggers the abort signal.
* Idempotent: safe to call multiple times or after the workflow has completed.
*/
async abort(reason?: string): Promise<void> { // [!code highlight]
try {
await abortHook.resume(getAbortToken(this.id), { reason });
} catch (error) {
const msg = error instanceof Error ? error.message.toLowerCase() : '';
if (msg.includes('not found') || msg.includes('expired')) {
return;
}
throw error;
}
}

/**
* Returns an AbortSignal that fires when abort() is called or TTL expires.
* The signal fires with a reason indicating what triggered it.
*/
get signal(): AbortSignal { // [!code highlight]
const run = getRun<{ aborted: boolean; reason?: string; expired?: boolean }>(this.runId);
const controller = new AbortController();
const readable = run.getReadable<AbortMessage>();

(async () => {
const reader = readable.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value.type === "abort") {
const reason = value.expired
? `${value.reason} (expired)`
: value.reason;
controller.abort(reason);
break;
}
}
} catch (error) {
if (!controller.signal.aborted) {
controller.abort(
error instanceof Error ? error.message : "Stream read failed"
);
}
} finally {
reader.releaseLock();
}
})();

return controller.signal;
}
}
```

### Usage: Single Process

```typescript lineNumbers
import { DistributedAbortController } from "./distributed-abort-controller";

// Create a controller with a meaningful ID
const controller = await DistributedAbortController.create("chat:user-123");

// Get the signal and use it with fetch
const signal = controller.signal;
const response = await fetch("https://api.example.com/long-operation", {
signal,
});

// Later: abort the operation
await controller.abort("User cancelled");
```

### Usage: Cross-Process Coordination

```typescript lineNumbers
import { DistributedAbortController } from "./distributed-abort-controller";

// Process A: Create the controller
const controller = await DistributedAbortController.create("task:build-123");
// start long operation using controller.signal...

// Process B: Reconnect and abort (no run ID sharing needed!)
const sameController = await DistributedAbortController.create("task:build-123"); // [!code highlight]
await sameController.abort("Cancelled by admin");

// Process C: Reconnect and listen
const anotherRef = await DistributedAbortController.create("task:build-123");
anotherRef.signal.addEventListener("abort", (e) => {
console.log("Task was cancelled:", (e.target as AbortSignal).reason);
});
```

### Custom TTL

```typescript lineNumbers
import { DistributedAbortController } from "./distributed-abort-controller";

// Short-lived controller for a quick operation (5 minutes)
const shortLived = await DistributedAbortController.create("quick-task", {
ttlMs: 5 * 60 * 1000,
});

// Long-lived controller for batch jobs (7 days)
const longLived = await DistributedAbortController.create("batch-job", {
ttlMs: 7 * 24 * 60 * 60 * 1000,
});

// When TTL expires, the signal fires with expired reason
shortLived.signal.addEventListener("abort", (e) => {
const reason = (e.target as AbortSignal).reason;
if (reason?.includes("expired")) {
console.log("Controller expired, cleaning up...");
}
});
```

### API Route for Remote Abort

```typescript lineNumbers
import { DistributedAbortController } from "@/lib/distributed-abort-controller";

export async function POST(
request: Request,
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const { reason } = await request.json();

const controller = await DistributedAbortController.create(id);
await controller.abort(reason || "Cancelled via API");

return Response.json({ success: true });
}
```

### Client Cancel Button

```tsx lineNumbers
"use client";

export function CancelButton({ taskId }: { taskId: string }) {
const handleCancel = async () => {
await fetch(`/api/abort/${taskId}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ reason: "User clicked cancel" }),
});
};

return (
<button type="button" onClick={handleCancel}>
Cancel Operation
</button>
);
}
```

## Tips

- **Use semantic IDs** — Use meaningful IDs like `chat:123` or `task:abc` instead of random UUIDs
- **Create is idempotent** — Calling `create()` with the same ID reconnects to the existing controller
- **TTL auto-cleanup** — Workflows self-terminate after TTL expires; no manual cleanup needed
- **Signal is a getter** — Each access to `.signal` creates a new listener; cache it if needed
- **One-shot** — Once aborted or expired, the workflow completes; create a new controller for new operations

## Key APIs

- [`defineHook()`](/docs/api-reference/workflow/define-hook) — type-safe hook for the abort trigger
- [`getWritable()`](/docs/api-reference/workflow/get-writable) — write abort messages to the stream
- [`sleep()`](/docs/api-reference/workflow/sleep) — TTL timer for auto-expiration
- [`start()`](/docs/api-reference/workflow-api/start) — start the abort controller workflow
- [`getHookByToken()`](/docs/api-reference/workflow-api/get-hook-by-token) — find existing run by hook token
- [`getRun()`](/docs/api-reference/workflow-api/get-run) — reconnect to the workflow's readable stream
3 changes: 2 additions & 1 deletion docs/content/docs/cookbook/common-patterns/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"idempotency",
"webhooks",
"content-router",
"child-workflows"
"child-workflows",
"distributed-abort-controller"
]
}
1 change: 1 addition & 0 deletions docs/content/docs/cookbook/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ A curated collection of workflow patterns with clean, copy-paste code examples f
- [**Webhooks**](/cookbook/common-patterns/webhooks) — Receive HTTP callbacks from external services and process them durably
- [**Conditional Routing**](/cookbook/common-patterns/content-router) — Route payloads to different step handlers based on content
- [**Child Workflows**](/cookbook/common-patterns/child-workflows) — Spawn and orchestrate child workflows from a parent
- [**Distributed Abort Controller**](/cookbook/common-patterns/distributed-abort-controller) — Build a cross-process abort controller using workflow streams and hooks

## Agent Patterns

Expand Down
8 changes: 8 additions & 0 deletions docs/lib/cookbook-tree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export const slugToCategory: Record<string, string> = {
webhooks: 'common-patterns',
'content-router': 'common-patterns',
'child-workflows': 'common-patterns',
'distributed-abort-controller': 'common-patterns',

// Agent Patterns
'durable-agent': 'agent-patterns',
Expand Down Expand Up @@ -124,6 +125,13 @@ export const recipes: Record<string, Recipe> = {
'Spawn and orchestrate child workflows from a parent, polling for completion and handling partial failures.',
category: 'common-patterns',
},
'distributed-abort-controller': {
slug: 'distributed-abort-controller',
title: 'Distributed Abort Controller',
description:
'Build a cross-process abort controller using workflow streams and hooks to coordinate cancellation by semantic ID.',
category: 'common-patterns',
},

// Agent Patterns
'durable-agent': {
Expand Down
Loading
Loading