diff --git a/docs/content/docs/cookbook/common-patterns/distributed-abort-controller.mdx b/docs/content/docs/cookbook/common-patterns/distributed-abort-controller.mdx new file mode 100644 index 0000000000..ebe3108cab --- /dev/null +++ b/docs/content/docs/cookbook/common-patterns/distributed-abort-controller.mdx @@ -0,0 +1,318 @@ +--- +title: Distributed Abort Controller +description: A distributed AbortController that uses durable workflows for cross-process cancellation signaling. +type: guide +summary: Build a distributed abort controller that uses workflow streams and hooks to propagate cancellation signals across process boundaries. +--- + +Use this pattern when you need an `AbortController`-like interface that works across distributed systems. The controller uses a durable workflow to coordinate cancellation — calling `.abort()` on one machine triggers the `.signal` on any other machine. + +## When to use this + +- **Cross-process cancellation** — Cancel a long-running operation from a different server, worker, or edge function +- **Durable cancellation** — The abort signal persists even if the process that created it crashes +- **UI stop buttons** — Let users cancel operations running on the server from the browser +- **Timeout coordination** — The built-in TTL auto-expires stale controllers + +## Pattern + +The `DistributedAbortController` class encapsulates a workflow that: +1. Accepts a user-provided unique ID (like a chat ID or task ID) +2. Creates or reconnects to an existing workflow using that ID +3. Waits for a hook signal OR TTL expiration +4. Writes a cancellation message to the run's stream when triggered + +### Core Implementation + +```typescript lineNumbers +import { defineHook, getWritable, sleep } from "workflow"; +import { start, getRun, getHookByToken } from "workflow/api"; + +// Default TTL: 24 hours +const DEFAULT_TTL_MS = 24 * 60 * 60 * 1000; +// Default grace period: 1 hour (keeps hook alive after abort for late subscribers) +const DEFAULT_GRACE_MS = 60 * 60 * 1000; + +// Hook to trigger the abort signal +export const abortHook = defineHook<{ reason?: string }>(); + +// The abort message written to the stream +export type AbortMessage = { + type: "abort"; + reason?: string; + expired?: boolean; +}; + +// Helper to create a consistent hook token from the user ID +function getAbortToken(id: string): string { + return `abort:${id}`; +} + +// Step function that writes the abort message to the stream +async function writeAbortSignal(reason?: string, expired?: boolean) { + "use step"; + + const writable = getWritable(); + const writer = writable.getWriter(); + try { + await writer.write({ type: "abort", reason, expired }); + } finally { + writer.releaseLock(); + } + await writable.close(); +} + +// Workflow that waits for abort or TTL expiration +export async function abortControllerWorkflow( + id: string, + ttlMs: number, + graceMs: number +) { + "use workflow"; + + const startTime = Date.now(); + const hook = abortHook.create({ token: getAbortToken(id) }); + + // Race: manual abort OR TTL expiration // [!code highlight] + const result = await Promise.race([ + hook.then((payload) => ({ + reason: payload.reason, + expired: false, + })), + sleep(`${ttlMs}ms`).then(() => ({ + reason: "Controller expired", + expired: true, + })), + ]); + + await writeAbortSignal(result.reason, result.expired); + + // Only sleep through grace period on TTL expiration (keeps hook alive for late subscribers). // [!code highlight] + // Manual aborts complete immediately. + if (result.expired) { + const elapsed = Date.now() - startTime; + const remainingTime = graceMs - (elapsed - ttlMs); + if (remainingTime > 0) { + await sleep(`${remainingTime}ms`); // [!code highlight] + } + } + + return { aborted: true, reason: result.reason, expired: result.expired }; +} + +/** + * A distributed abort controller that works across process boundaries. + * Uses a semantically meaningful ID (like a chat ID or task ID) to coordinate. + */ +export class DistributedAbortController { + private id: string; + readonly runId: string; + + private constructor(id: string, runId: string) { + this.id = id; + this.runId = runId; + } + + /** + * Creates or reconnects to a distributed abort controller. + * If a controller with this ID already exists, reconnects to it. + * Otherwise, starts a new workflow. + * + * @param id - A unique, semantically meaningful ID (e.g., "chat:123") + * @param options.ttlMs - Time-to-live in ms (default: 24 hours) + * @param options.graceMs - Grace period after abort (default: 1 hour) + */ + static async create( // [!code highlight] + id: string, + options: { ttlMs?: number; graceMs?: number } = {} + ): Promise { + const { ttlMs = DEFAULT_TTL_MS, graceMs = DEFAULT_GRACE_MS } = options; + const token = getAbortToken(id); + + // Try to find an existing run with this hook token + const existingHook = await getHookByToken(token).catch(() => null); // [!code highlight] + + if (existingHook) { + // Reconnect to existing controller + return new DistributedAbortController(id, existingHook.runId); + } + + // Create a new workflow + const run = await start(abortControllerWorkflow, [id, ttlMs, graceMs]); // [!code highlight] + return new DistributedAbortController(id, run.runId); + } + + /** + * Triggers the abort signal. + * Idempotent: safe to call multiple times or after the workflow has completed. + */ + async abort(reason?: string): Promise { // [!code highlight] + try { + await abortHook.resume(getAbortToken(this.id), { reason }); + } catch (error) { + const msg = error instanceof Error ? error.message.toLowerCase() : ''; + if (msg.includes('not found') || msg.includes('expired')) { + return; + } + throw error; + } + } + + /** + * Returns an AbortSignal that fires when abort() is called or TTL expires. + * The signal fires with a reason indicating what triggered it. + */ + get signal(): AbortSignal { // [!code highlight] + const run = getRun<{ aborted: boolean; reason?: string; expired?: boolean }>(this.runId); + const controller = new AbortController(); + const readable = run.getReadable(); + + (async () => { + const reader = readable.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value.type === "abort") { + const reason = value.expired + ? `${value.reason} (expired)` + : value.reason; + controller.abort(reason); + break; + } + } + } catch (error) { + if (!controller.signal.aborted) { + controller.abort( + error instanceof Error ? error.message : "Stream read failed" + ); + } + } finally { + reader.releaseLock(); + } + })(); + + return controller.signal; + } +} +``` + +### Usage: Single Process + +```typescript lineNumbers +import { DistributedAbortController } from "./distributed-abort-controller"; + +// Create a controller with a meaningful ID +const controller = await DistributedAbortController.create("chat:user-123"); + +// Get the signal and use it with fetch +const signal = controller.signal; +const response = await fetch("https://api.example.com/long-operation", { + signal, +}); + +// Later: abort the operation +await controller.abort("User cancelled"); +``` + +### Usage: Cross-Process Coordination + +```typescript lineNumbers +import { DistributedAbortController } from "./distributed-abort-controller"; + +// Process A: Create the controller +const controller = await DistributedAbortController.create("task:build-123"); +// start long operation using controller.signal... + +// Process B: Reconnect and abort (no run ID sharing needed!) +const sameController = await DistributedAbortController.create("task:build-123"); // [!code highlight] +await sameController.abort("Cancelled by admin"); + +// Process C: Reconnect and listen +const anotherRef = await DistributedAbortController.create("task:build-123"); +anotherRef.signal.addEventListener("abort", (e) => { + console.log("Task was cancelled:", (e.target as AbortSignal).reason); +}); +``` + +### Custom TTL + +```typescript lineNumbers +import { DistributedAbortController } from "./distributed-abort-controller"; + +// Short-lived controller for a quick operation (5 minutes) +const shortLived = await DistributedAbortController.create("quick-task", { + ttlMs: 5 * 60 * 1000, +}); + +// Long-lived controller for batch jobs (7 days) +const longLived = await DistributedAbortController.create("batch-job", { + ttlMs: 7 * 24 * 60 * 60 * 1000, +}); + +// When TTL expires, the signal fires with expired reason +shortLived.signal.addEventListener("abort", (e) => { + const reason = (e.target as AbortSignal).reason; + if (reason?.includes("expired")) { + console.log("Controller expired, cleaning up..."); + } +}); +``` + +### API Route for Remote Abort + +```typescript lineNumbers +import { DistributedAbortController } from "@/lib/distributed-abort-controller"; + +export async function POST( + request: Request, + { params }: { params: Promise<{ id: string }> } +) { + const { id } = await params; + const { reason } = await request.json(); + + const controller = await DistributedAbortController.create(id); + await controller.abort(reason || "Cancelled via API"); + + return Response.json({ success: true }); +} +``` + +### Client Cancel Button + +```tsx lineNumbers +"use client"; + +export function CancelButton({ taskId }: { taskId: string }) { + const handleCancel = async () => { + await fetch(`/api/abort/${taskId}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ reason: "User clicked cancel" }), + }); + }; + + return ( + + ); +} +``` + +## Tips + +- **Use semantic IDs** — Use meaningful IDs like `chat:123` or `task:abc` instead of random UUIDs +- **Create is idempotent** — Calling `create()` with the same ID reconnects to the existing controller +- **TTL auto-cleanup** — Workflows self-terminate after TTL expires; no manual cleanup needed +- **Signal is a getter** — Each access to `.signal` creates a new listener; cache it if needed +- **One-shot** — Once aborted or expired, the workflow completes; create a new controller for new operations + +## Key APIs + +- [`defineHook()`](/docs/api-reference/workflow/define-hook) — type-safe hook for the abort trigger +- [`getWritable()`](/docs/api-reference/workflow/get-writable) — write abort messages to the stream +- [`sleep()`](/docs/api-reference/workflow/sleep) — TTL timer for auto-expiration +- [`start()`](/docs/api-reference/workflow-api/start) — start the abort controller workflow +- [`getHookByToken()`](/docs/api-reference/workflow-api/get-hook-by-token) — find existing run by hook token +- [`getRun()`](/docs/api-reference/workflow-api/get-run) — reconnect to the workflow's readable stream diff --git a/docs/content/docs/cookbook/common-patterns/meta.json b/docs/content/docs/cookbook/common-patterns/meta.json index 4da9959546..e8eaed4c27 100644 --- a/docs/content/docs/cookbook/common-patterns/meta.json +++ b/docs/content/docs/cookbook/common-patterns/meta.json @@ -10,6 +10,7 @@ "idempotency", "webhooks", "content-router", - "child-workflows" + "child-workflows", + "distributed-abort-controller" ] } diff --git a/docs/content/docs/cookbook/index.mdx b/docs/content/docs/cookbook/index.mdx index 0eaaac0e1f..5624a042ec 100644 --- a/docs/content/docs/cookbook/index.mdx +++ b/docs/content/docs/cookbook/index.mdx @@ -17,6 +17,7 @@ A curated collection of workflow patterns with clean, copy-paste code examples f - [**Webhooks**](/cookbook/common-patterns/webhooks) — Receive HTTP callbacks from external services and process them durably - [**Conditional Routing**](/cookbook/common-patterns/content-router) — Route payloads to different step handlers based on content - [**Child Workflows**](/cookbook/common-patterns/child-workflows) — Spawn and orchestrate child workflows from a parent +- [**Distributed Abort Controller**](/cookbook/common-patterns/distributed-abort-controller) — Build a cross-process abort controller using workflow streams and hooks ## Agent Patterns diff --git a/docs/lib/cookbook-tree.ts b/docs/lib/cookbook-tree.ts index 996d18f866..81099e7ec6 100644 --- a/docs/lib/cookbook-tree.ts +++ b/docs/lib/cookbook-tree.ts @@ -37,6 +37,7 @@ export const slugToCategory: Record = { webhooks: 'common-patterns', 'content-router': 'common-patterns', 'child-workflows': 'common-patterns', + 'distributed-abort-controller': 'common-patterns', // Agent Patterns 'durable-agent': 'agent-patterns', @@ -124,6 +125,13 @@ export const recipes: Record = { 'Spawn and orchestrate child workflows from a parent, polling for completion and handling partial failures.', category: 'common-patterns', }, + 'distributed-abort-controller': { + slug: 'distributed-abort-controller', + title: 'Distributed Abort Controller', + description: + 'Build a cross-process abort controller using workflow streams and hooks to coordinate cancellation by semantic ID.', + category: 'common-patterns', + }, // Agent Patterns 'durable-agent': { diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index c31109bbbc..1ae7a2239d 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -2331,4 +2331,109 @@ describe('e2e', () => { }); } ); + + // ============================================================ + // Distributed Abort Controller + // ============================================================ + test( + 'distributedAbortController - manual abort triggers signal', + { timeout: 60_000 }, + async () => { + const controllerId = `test-abort-${Math.random().toString(36).slice(2)}`; + + // Start the abort controller workflow with short TTL for testing + const run = await start( + await e2e('distributedAbortControllerWorkflow'), + [controllerId, 60_000, 10_000] // 60s TTL, 10s grace + ); + + // Wait for the hook to be registered + await sleep(3_000); + + // Get the abort signal (reads from stream) + const readable = await run.getReadable(); + const reader = readable.getReader(); + + // Trigger abort via hook + const token = `distributed-abort:${controllerId}`; + const hook = await getHookByToken(token); + expect(hook.runId).toBe(run.runId); + await resumeHook(token, { reason: 'User cancelled' }); + + // Read the abort message from the stream + const { value } = await reader.read(); + reader.releaseLock(); + + expect(value).toEqual({ + type: 'abort', + reason: 'User cancelled', + expired: false, + }); + } + ); + + test( + 'distributedAbortController - TTL expiration triggers signal', + { timeout: 30_000 }, + async () => { + const controllerId = `test-expire-${Math.random().toString(36).slice(2)}`; + + // Start with very short TTL (3 seconds) + const run = await start( + await e2e('distributedAbortControllerWorkflow'), + [controllerId, 3_000, 1_000] // 3s TTL, 1s grace + ); + + // Get the abort signal stream + const readable = await run.getReadable(); + const reader = readable.getReader(); + + // Wait for TTL to expire and read the abort message + const { value } = await reader.read(); + reader.releaseLock(); + + expect(value).toEqual({ + type: 'abort', + reason: 'Controller expired', + expired: true, + }); + } + ); + + test( + 'distributedAbortController - reconnect to existing controller', + { timeout: 60_000 }, + async () => { + const controllerId = `test-reconnect-${Math.random().toString(36).slice(2)}`; + const token = `distributed-abort:${controllerId}`; + + // Start first controller + const run1 = await start( + await e2e('distributedAbortControllerWorkflow'), + [controllerId, 60_000, 10_000] + ); + + // Wait for hook to be registered + await sleep(3_000); + + // Look up the hook - should find the same run + const hook = await getHookByToken(token); + expect(hook.runId).toBe(run1.runId); + + // A second lookup should still find the same run (hook persists) + const hook2 = await getHookByToken(token); + expect(hook2.runId).toBe(run1.runId); + + // Abort the controller + await resumeHook(token, { reason: 'Test complete' }); + + // Workflow should still be running (grace period), so hook should still be findable + await sleep(1_000); + const hookAfterAbort = await getHookByToken(token).catch(() => null); + // Hook may or may not be disposed depending on timing, but run should complete + const returnValue = await run1.returnValue; + expect(returnValue.aborted).toBe(true); + expect(returnValue.reason).toBe('Test complete'); + } + ); }); diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index cfbfd65b3d..b698e0a292 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -13,7 +13,7 @@ import { RetryableError, sleep, } from 'workflow'; -import { getRun, Run, resumeHook, start } from 'workflow/api'; +import { getHookByToken, getRun, resumeHook, Run, start } from 'workflow/api'; import { importedStepOnly } from './_imported_step_only'; import { callThrower, stepThatThrowsFromHelper } from './helpers'; @@ -1716,3 +1716,165 @@ export async function fibonacciWorkflow(n: number): Promise { const [a, b] = await Promise.all([runA.returnValue, runB.returnValue]); return a + b; } + +////////////////////////////////////////////////////////// +// Distributed Abort Controller +////////////////////////////////////////////////////////// + +// Message type written to the stream when abort fires +export type AbortMessage = { + type: 'abort'; + reason?: string; + expired: boolean; +}; + +// Helper to derive abort hook token from user-provided ID +function getAbortToken(id: string): string { + return `distributed-abort:${id}`; +} + +// Step function that writes the abort message to the stream +async function writeAbortSignal(reason?: string, expired = false) { + 'use step'; + const writable = getWritable(); + const writer = writable.getWriter(); + try { + await writer.write({ type: 'abort', reason, expired }); + } finally { + writer.releaseLock(); + } + await writable.close(); +} + +/** + * Workflow that backs the DistributedAbortController. + * Waits for either: + * 1. Manual abort via hook trigger + * 2. TTL expiration + * + * After abort, sleeps until TTL + grace period to keep hook alive + * for late subscribers. + */ +export async function distributedAbortControllerWorkflow( + id: string, + ttlMs: number, + graceMs: number +) { + 'use workflow'; + + const startTime = Date.now(); + const hook = createHook<{ reason?: string }>({ token: getAbortToken(id) }); + + // Race: manual abort OR TTL expiration + const result = await Promise.race([ + hook.then((payload) => ({ + reason: payload.reason, + expired: false, + })), + sleep(ttlMs).then(() => ({ + reason: 'Controller expired', + expired: true, + })), + ]); + + // Write the abort signal to the stream + await writeAbortSignal(result.reason, result.expired); + + // Only sleep through grace period on TTL expiration (keeps hook alive for late subscribers). + // Manual aborts complete immediately. + if (result.expired) { + const elapsed = Date.now() - startTime; + const remainingTime = graceMs - (elapsed - ttlMs); + if (remainingTime > 0) { + await sleep(remainingTime); + } + } + + return { aborted: true, reason: result.reason, expired: result.expired }; +} + +/** + * DistributedAbortController - a cross-process abort controller backed by a durable workflow. + * + * Usage: + * const controller = await DistributedAbortController.create('my-task-123'); + * // In any process: + * controller.abort('User cancelled'); + * // In any other process: + * const signal = await controller.signal; + * signal.addEventListener('abort', () => console.log('Aborted!')); + */ +export class DistributedAbortController { + private constructor( + public readonly id: string, + public readonly runId: string + ) {} + + /** + * Creates or reconnects to a distributed abort controller. + * If a controller with this ID already exists, reconnects to it. + * Otherwise, starts a new workflow. + */ + static async create( + id: string, + options: { ttlMs?: number; graceMs?: number } = {} + ): Promise { + const { ttlMs = 24 * 60 * 60 * 1000, graceMs = 60 * 60 * 1000 } = options; + const token = getAbortToken(id); + + // Try to find an existing run with this hook token + const existingHook = await getHookByToken(token).catch(() => null); + + if (existingHook) { + // Reconnect to existing controller + return new DistributedAbortController(id, existingHook.runId); + } + + // Create a new workflow + const run = await start(distributedAbortControllerWorkflow, [ + id, + ttlMs, + graceMs, + ]); + return new DistributedAbortController(id, run.runId); + } + + /** + * Triggers the abort signal across all processes. + */ + async abort(reason?: string): Promise { + const token = getAbortToken(this.id); + await resumeHook(token, { reason }); + } + + /** + * Returns an AbortSignal that fires when the controller is aborted. + * Listens to the workflow's output stream. + */ + get signal(): Promise { + return (async () => { + const controller = new AbortController(); + const run = getRun(this.runId); + const readable = await run.getReadable(); + const reader = readable.getReader(); + + // Read from stream in background + (async () => { + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value && value.type === 'abort') { + controller.abort(value.reason); + break; + } + } + } finally { + reader.releaseLock(); + } + })(); + + return controller.signal; + })(); + } +} diff --git a/workbench/vitest/test/cookbook-common.test.ts b/workbench/vitest/test/cookbook-common.test.ts index 08fb7e1d0c..6d00b0903e 100644 --- a/workbench/vitest/test/cookbook-common.test.ts +++ b/workbench/vitest/test/cookbook-common.test.ts @@ -13,6 +13,11 @@ import { parentWorkflow, childWorkflow, } from '../workflows/cookbook/child-workflows.js'; +import { + DistributedAbortController, + abortControllerWorkflow, + abortHook, +} from '../workflows/cookbook/distributed-abort-controller.js'; describe('saga', () => { it('should run compensations in reverse on FatalError', async () => { @@ -140,3 +145,86 @@ describe('child-workflows', () => { expect(result.result).toEqual({ item: 'x', result: 'processed-x' }); }); }); + +describe('distributed-abort-controller', () => { + const TTL_MS = 5 * 60 * 1000; + const GRACE_MS = 60 * 1000; + + it('should abort via hook and emit message on stream', async () => { + const testId = `test-${Date.now()}`; + + const run = await start(abortControllerWorkflow, [ + testId, + TTL_MS, + GRACE_MS, + ]); + + const hook = await waitForHook(run); + expect(hook.token).toBe(`abort:${testId}`); + + await abortHook.resume(`abort:${testId}`, { reason: 'User cancelled' }); + + const result = await run.returnValue; + expect(result).toEqual({ + aborted: true, + reason: 'User cancelled', + expired: false, + }); + }); + + it('should emit abort message on the readable stream', async () => { + const testId = `stream-test-${Date.now()}`; + + const run = await start(abortControllerWorkflow, [ + testId, + TTL_MS, + GRACE_MS, + ]); + + const readable = run.getReadable<{ type: string; reason?: string }>(); + const reader = readable.getReader(); + + await waitForHook(run); + await abortHook.resume(`abort:${testId}`, { reason: 'Stream test' }); + + const { value, done } = await reader.read(); + expect(done).toBe(false); + expect(value).toEqual({ + type: 'abort', + reason: 'Stream test', + expired: false, + }); + + reader.releaseLock(); + + const result = await run.returnValue; + expect(result.aborted).toBe(true); + }); + + it('should work with DistributedAbortController instance', async () => { + const testId = `signal-test-${Date.now()}`; + + const controller = await DistributedAbortController.create(testId, { + ttlMs: TTL_MS, + graceMs: GRACE_MS, + }); + + const signal = controller.signal; + expect(signal.aborted).toBe(false); + + const abortPromise = new Promise((resolve) => { + signal.addEventListener('abort', () => { + resolve(signal.reason as string | undefined); + }); + }); + + // Wait for the workflow to register the hook before resuming it + await waitForHook(getRun(controller.runId)); + + await controller.abort('Signal test reason'); + + const reason = await abortPromise; + expect(reason).toBe('Signal test reason'); + expect(signal.aborted).toBe(true); + }); +}); diff --git a/workbench/vitest/workflows/cookbook/distributed-abort-controller.ts b/workbench/vitest/workflows/cookbook/distributed-abort-controller.ts new file mode 100644 index 0000000000..2258e15549 --- /dev/null +++ b/workbench/vitest/workflows/cookbook/distributed-abort-controller.ts @@ -0,0 +1,201 @@ +/** + * Cookbook: distributed-abort-controller pattern + * + * Demonstrates a distributed AbortController that uses a durable workflow + * to coordinate cancellation signals across process boundaries. + * + * Usage: + * const controller = await DistributedAbortController.create("chat:123"); + * controller.signal.addEventListener("abort", () => console.log("Aborted!")); + * await controller.abort("User cancelled"); + */ +import { defineHook, getWritable, sleep } from 'workflow'; +import { getHookByToken, getRun, start } from 'workflow/api'; + +// Default TTL: 24 hours in milliseconds +const DEFAULT_TTL_MS = 24 * 60 * 60 * 1000; +// Default grace period: 1 hour (keeps hook alive after abort for late subscribers) +const DEFAULT_GRACE_MS = 60 * 60 * 1000; + +// Hook to trigger the abort signal +export const abortHook = defineHook<{ reason?: string }>(); + +// The abort message written to the stream +export type AbortMessage = { + type: 'abort'; + reason?: string; + expired?: boolean; +}; + +// Helper to create a consistent hook token from the user ID +function getAbortToken(id: string): string { + return `abort:${id}`; +} + +/** + * Step function that writes the abort message to the stream. + * Writing must happen inside a step, not directly in the workflow. + */ +async function writeAbortSignal(reason?: string, expired?: boolean) { + 'use step'; + + const writable = getWritable(); + const writer = writable.getWriter(); + try { + await writer.write({ type: 'abort', reason, expired }); + } finally { + writer.releaseLock(); + } + await writable.close(); +} + +/** + * Workflow that waits for the abort hook or TTL expiration. + * Accepts a user-provided ID to use as the hook token. + * After abort/expiration, sleeps until TTL + grace period to keep hook + * alive for late subscribers. + */ +export async function abortControllerWorkflow( + id: string, + ttlMs: number, + graceMs: number +) { + 'use workflow'; + + const startTime = Date.now(); + const hook = abortHook.create({ token: getAbortToken(id) }); + + // Race: manual abort OR TTL expiration + const result = await Promise.race([ + hook.then((payload) => ({ + reason: payload.reason, + expired: false, + })), + sleep(`${ttlMs}ms`).then(() => ({ + reason: 'Controller expired', + expired: true, + })), + ]); + + // Write the abort message inside a step + await writeAbortSignal(result.reason, result.expired); + + // Only sleep through grace period on TTL expiration (keeps hook alive for late subscribers). + // Manual aborts complete immediately — no need to keep the workflow running. + if (result.expired) { + const elapsed = Date.now() - startTime; + const remainingTime = graceMs - (elapsed - ttlMs); + if (remainingTime > 0) { + await sleep(`${remainingTime}ms`); + } + } + + return { aborted: true, reason: result.reason, expired: result.expired }; +} + +/** + * A distributed abort controller that works across process boundaries. + * Uses a semantically meaningful ID (like a chat ID or task ID) to coordinate. + * + * Unlike the standard AbortController which only works in a single process, + * this version uses a durable workflow to coordinate the abort signal. + * Any process with the same ID can create/reconnect, abort, or listen. + */ +export class DistributedAbortController { + private id: string; + readonly runId: string; + + private constructor(id: string, runId: string) { + this.id = id; + this.runId = runId; + } + + /** + * Creates or reconnects to a distributed abort controller. + * If a controller with this ID already exists, reconnects to it. + * Otherwise, starts a new workflow. + * + * @param id - A unique, semantically meaningful ID (e.g., "chat:123") + * @param options.ttlMs - Time-to-live in ms (default: 24 hours) + * @param options.graceMs - Grace period after abort to keep hook alive (default: 1 hour) + */ + static async create( + id: string, + options: { ttlMs?: number; graceMs?: number } = {} + ): Promise { + const { ttlMs = DEFAULT_TTL_MS, graceMs = DEFAULT_GRACE_MS } = options; + const token = getAbortToken(id); + + // Try to find an existing run with this hook token + const existingHook = await getHookByToken(token).catch(() => null); + + if (existingHook) { + // Reconnect to existing controller + return new DistributedAbortController(id, existingHook.runId); + } + + // Create a new workflow + const run = await start(abortControllerWorkflow, [id, ttlMs, graceMs]); + return new DistributedAbortController(id, run.runId); + } + + /** + * Triggers the abort signal. + * Can be called from any process with this controller instance. + * Idempotent: safe to call multiple times or after the workflow has completed. + * + * @param reason - Optional reason for the cancellation + */ + async abort(reason?: string): Promise { + try { + await abortHook.resume(getAbortToken(this.id), { reason }); + } catch (error) { + const msg = error instanceof Error ? error.message.toLowerCase() : ''; + if (msg.includes('not found') || msg.includes('expired')) { + return; + } + throw error; + } + } + + /** + * Returns an AbortSignal that fires when abort() is called or TTL expires. + * The signal fires with a reason indicating what triggered it. + */ + get signal(): AbortSignal { + const run = getRun<{ + aborted: boolean; + reason?: string; + expired?: boolean; + }>(this.runId); + const controller = new AbortController(); + const readable = run.getReadable(); + + (async () => { + const reader = readable.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value.type === 'abort') { + const reason = value.expired + ? `${value.reason} (expired)` + : value.reason; + controller.abort(reason); + break; + } + } + } catch (error) { + if (!controller.signal.aborted) { + controller.abort( + error instanceof Error ? error.message : 'Stream read failed' + ); + } + } finally { + reader.releaseLock(); + } + })(); + + return controller.signal; + } +}