diff --git a/.blog/durable-execution-layer.md b/.blog/durable-execution-layer.md new file mode 100644 index 0000000000..5df116a3f8 --- /dev/null +++ b/.blog/durable-execution-layer.md @@ -0,0 +1,155 @@ +# Workflow DevKit makes Agents durable + +## Thesis + +Production AI agents are not single HTTP requests. They are long-running programs that plan, call tools, wait on external systems, and keep internal state across dozens of decisions. + +Stateless compute fights that shape. A cold start or a timeout resets the process mid-loop. A retry replays side effects unless you build your own idempotency ledger. Teams end up rebuilding durable execution out of database rows, queues, and scheduled jobs. + +Workflow DevKit turns that pile of infrastructure back into code. You write an Agent as a workflow function. The runtime persists progress as an event log and deterministically replays the workflow to reconstruct state after failures, cold starts, or scale events. + +## Current state + +Most "production agent" stacks ship the same diagram with different logos: + +* A `agent_runs` table that stores conversation state, tool history, and a cursor. +* A queue that re-invokes the agent after every tool call. +* A cron job that scans for stuck runs, retries failed calls, and advances timers. +* Idempotency keys everywhere to avoid double-charging, double-emailing, or double-writing. + +This works, but it costs engineering time forever. Every tool integration becomes a mini state machine. Every new failure mode adds another column: `attempt`, `next_run_at`, `last_error`, `lock_owner`. The "agent" ends up split across handlers that must agree on invariants. + +Here's the pattern in code. + +**Before: DB row + queue for an Agent tool-calling loop** + +```ts +import { sql } from "./db"; +import { queue } from "./queue"; + +export async function runAgent(runId: string) { + const run = await sql`SELECT * FROM agent_runs WHERE id=${runId}`; + try { + const next = await llmPlan(run.state); + const toolOut = await callTool(next.tool, next.args, { + idempotencyKey: `${runId}:${run.step}`, + }); + await sql`UPDATE agent_runs SET state=${toolOut.state}, step=${run.step + 1} + WHERE id=${runId}`; + await queue.add("agent", { runId }, { jobId: `${runId}:${run.step + 1}` }); + } catch (err) { + await sql`UPDATE agent_runs SET retries=${run.retries + 1}, last_error=${String(err)} + WHERE id=${runId}`; + await queue.add("agent", { runId }, { delay: backoff(run.retries) }); + } +} +``` + +The code above "works" until it doesn't. You now own locking, exactly-once semantics, backoff, and recovery. Any bug that advances `step` at the wrong time corrupts the run. Any mismatch between the stored cursor and the tool history produces duplicated tool calls. + +## The shift + +Durable execution flips the control plane. Instead of persisting *state* and reconstructing control flow, you persist *control flow* and reconstruct state. + +Workflow DevKit records every side effect boundary as an event. When the workflow restarts, the runtime replays the workflow from the top in a deterministic sandbox and feeds it the same event stream. Completed steps return their recorded results. Pending steps suspend the workflow and get scheduled. The workflow code stays readable because it is still just async TypeScript. + +**After: the same Agent as a workflow with steps** + +```ts +type AgentState = { messages: string[]; done: boolean }; + +async function llmPlan(state: AgentState) { + 'use step'; + return decideNextAction(state.messages); +} +async function callTool(name: string, args: unknown) { + 'use step'; + return tools[name](args); +} + +export async function agentLoop(initial: AgentState) { + 'use workflow'; + let state = initial; + while (state.done === false) { + const plan = await llmPlan(state); + state = await callTool(plan.tool, plan.args); + } + return state; +} +``` + +The pain disappears because you stopped simulating a runtime in tables. The workflow function is the state machine. The durable log is the source of truth. Retries stop being a cross-cutting concern you re-implement for every tool. + +## The vision + +Agents need four things that plain serverless does not provide: + +1. **State across tool calls.** The agent has to remember what already happened. +2. **Selective retries.** A transient failure should retry one tool call, not the entire run. +3. **Parallel execution.** Agents fan out: retrieval + enrichment + verification. +4. **Long waits.** Human-in-the-loop and external systems do not fit in a 10-60 second timeout. + +Workflow DevKit maps those directly onto existing JavaScript primitives: + +* Use local variables for state. The runtime reconstructs them by replay. +* Use `FatalError` and `RetryableError` inside steps to control retry and backoff. +* Use `Promise.all()` and `Promise.race()` in workflows for fanout and competition. +* Use `sleep()` for durable delays and hooks to pause until an external event arrives. + +That last pair matters for agents because "waiting" is normal. A workflow can suspend while it waits for a webhook, a human approval, or an upstream batch job. The runtime resumes the workflow when the event shows up, without you writing a scheduler. + +Retries are the other place teams burn weeks. The usual solution is a cron-driven state machine that retries failed calls and advances a `next_retry_at` timestamp. + +**Before: cron + state machine retry for flaky API calls** + +```ts +import { sql } from "./db"; + +export async function retryCron() { + const jobs = await sql`SELECT * FROM api_calls + WHERE status='retry' AND run_at < now() + LIMIT 100`; + for (const job of jobs.rows) { + const res = await fetch(job.url, { method: "POST", body: job.body }); + const status = res.status < 500 ? "done" : "retry"; + await sql`UPDATE api_calls SET status=${status}, attempts=${job.attempts + 1}, + run_at=${nextRunAt(job.attempts)} WHERE id=${job.id}`; + } +} +``` + +That code turns "retry an HTTP call" into an operational subsystem. The database becomes a task scheduler. The cron job becomes a reliability layer. + +**After: RetryableError inside a step** + +```ts +import { FatalError, RetryableError } from "workflow"; + +async function postInvoice(id: string) { + 'use step'; + const origin = process.env.INVOICE_API_ORIGIN ?? ""; + const res = await fetch(`${origin}/invoices/${id}`, { method: "POST" }); + if (res.status >= 500) throw new RetryableError("invoice API 5xx", { retryAfter: "30s" }); + if (res.ok === false) throw new FatalError(`invoice API ${res.status}`); + return res.json(); +} + +export async function invoiceAgent(id: string) { + 'use workflow'; + return await postInvoice(id); +} +``` + +The step throws a structured error. The runtime persists that failure, schedules a retry with backoff, and replays the workflow without re-running completed work. + +## Next steps + +Treat "Agent" as a workflow boundary, not a request handler. Keep the workflow deterministic and push I/O into steps. If a piece of code needs the network, the filesystem, or a timer, it belongs in a step. + +Start small. Pick one agent loop that currently writes state to a database and triggers itself via a queue. Move the loop into a workflow function. Wrap each tool call in a step function. Replace cron-based retry with `RetryableError` and durable `sleep()`. + +Run the workflow locally, then inspect the event log and step timeline. + +```bash +npx -y -p @workflow/cli wf inspect runs +``` diff --git a/.blog/how-deterministic-replay-works-for-ai-agents.md b/.blog/how-deterministic-replay-works-for-ai-agents.md new file mode 100644 index 0000000000..a8561a6925 --- /dev/null +++ b/.blog/how-deterministic-replay-works-for-ai-agents.md @@ -0,0 +1,161 @@ +# How Workflow DevKit executes Agents with deterministic replay + +## Problem + +An Agent that calls tools is a distributed system in a single function body. It crosses process boundaries every time it waits on the network, hits a timeout, or gets retried by the platform. Stateless retries re-run code, not intent. + +The usual mitigation is "checkpoint everything." After every tool call you write a cursor and a blob of state to durable storage. On restart you read the checkpoint and try to reconstruct what happened. This approach turns agent code into a database-backed interpreter. + +## Approach + +Workflow DevKit splits Agent code into two execution models: + +* **Workflow functions** (`'use workflow'`) run inside a sandboxed VM. They orchestrate control flow, hold state in local variables, and stay deterministic. +* **Step functions** (`'use step'`) run with full Node.js access. They perform side effects: network calls, SDKs, file I/O, crypto, and timers. + +The runtime persists every step boundary as an event in an append-only log. When the workflow runs again, it replays the workflow from the top, feeds it the same event stream, and returns recorded results for completed steps. Only missing or failed steps execute. + +That design targets the failure modes that break agents in production: cold starts mid-conversation, platform timeouts, partial success in parallel fanout, and flaky tool calls. + +## Implementation details + +### Build-time split: workflow bundle vs step bundle + +A workflow file contains both orchestrator code and side-effecting code. Workflow DevKit's build pipeline uses an SWC transform to recognize the `'use workflow'` / `'use step'` directives and split them into separate bundles. + +That split is what makes the runtime model crisp: orchestrators run in a deterministic VM, and steps run in normal Node.js. You still write a single file. + +### Determinism in the workflow VM + +The workflow VM runs under constraints that make replay reliable: + +* `Math.random()` is seeded per workflow run. +* `Date.now()` is fixed and advanced based on event timestamps during replay. +* `crypto.getRandomValues()` and `crypto.randomUUID()` are deterministic. +* `process.env` is copied and frozen. +* Timer APIs (`setTimeout`, `setInterval`, `setImmediate`) throw. Use durable `sleep()` instead. +* Global `fetch` is blocked in workflows. Put network I/O in steps. + +This matters for agents because non-determinism breaks replay. If the orchestrator reads "now" or random data to decide which tool to call, it must see the same values on every replay. + +### Event log + suspension + +A workflow run consumes an ordered event stream. When the workflow hits an awaited step, it looks for events with the step's correlation id: + +* `step_created` confirms the step exists. +* `step_started`, `step_retrying`, `step_completed`, `step_failed` drive resolution. +* `wait_created` / `wait_completed` back durable `sleep()`. +* `hook_created` and hook completion events back external resumes. + +When an awaited step has no matching event yet, the workflow throws a `WorkflowSuspension`. The suspension carries a queue of pending invocations (steps, waits, hooks). The runtime handler persists the missing `*_created` events and enqueues step executions with an idempotency key equal to the correlation id. + +The workflow stops at that point. Step workers run, append completion or retry events, and re-enqueue the workflow. On the next replay, the workflow re-runs the same code and picks up exactly where it left off. + +### Built-in retries at the step boundary + +Step execution owns retries. A step can fail in three ways: + +* Throw `FatalError` to fail the step and bubble the error to the workflow. +* Throw `RetryableError` to retry with an explicit `retryAfter`. +* Throw any other error to retry with the default policy, up to `maxRetries` (default is 3). + +Retries do not re-run completed steps. The event log preserves the successful work and the orchestrator replays it. + +## Code patterns + +### Crash recovery without checkpoints + +**Before: manual checkpoint writes and cursor recovery** + +```ts +import { sql } from "./db"; + +export async function agentHandler(runId: string) { + const run = await sql`SELECT cursor, state FROM agent_runs WHERE id=${runId}`; + let { cursor, state } = run.rows[0]; + while (cursor < state.plan.length) { + const out = await tools[state.plan[cursor]](state); + cursor += 1; + state = { ...state, out }; + await sql`UPDATE agent_runs SET cursor=${cursor}, state=${state} WHERE id=${runId}`; + } + return state; +} +``` + +This is a checkpointed interpreter. Every loop iteration writes to storage so the next invocation can reconstruct progress. + +**After: deterministic replay, no explicit checkpoints** + +```ts +async function runTool(name: string, input: unknown) { + 'use step'; + return tools[name](input); +} + +export async function agentRun(plan: { name: string }[], initial: unknown) { + 'use workflow'; + let state = initial; + for (const action of plan) state = await runTool(action.name, state); + return state; +} +``` + +The workflow stores state in local variables. The runtime reconstructs those variables on replay by feeding recorded step results back into the same loop. + +### Parallel fanout without bespoke orchestration + +Agents fan out to keep latency bounded: search + fetch + summarize in parallel. The hard part is partial success. One branch can succeed while another fails, and a stateless retry re-executes both unless you persist per-branch outputs. + +**Before: custom fanout bookkeeping to avoid redoing work** + +```ts +import { sql } from "./db"; + +export async function fanout(runId: string) { + await sql`UPDATE runs SET status='running' WHERE id=${runId}`; + const [a, b] = await Promise.allSettled([callA(), callB()]); + if (a.status === "fulfilled") await sql`UPDATE runs SET a=${a.value} WHERE id=${runId}`; + if (b.status === "fulfilled") await sql`UPDATE runs SET b=${b.value} WHERE id=${runId}`; + if (a.status === "rejected" || b.status === "rejected") throw new Error("retry later"); + return { a: a.value, b: b.value }; +} +``` + +You persist intermediate results because the platform does not. + +**After: Promise.all over durable steps** + +```ts +async function fetchA() { + 'use step'; + return callA(); +} +async function fetchB() { + 'use step'; + return callB(); +} + +export async function fanoutWorkflow() { + 'use workflow'; + const [a, b] = await Promise.all([fetchA(), fetchB()]); + return { a, b }; +} +``` + +Each step has its own event history and retry policy. If `fetchB()` fails and retries, `fetchA()` replays from its `step_completed` event without re-executing. + +## Results + +Workflow DevKit moves agent reliability into the runtime instead of your app code: + +* Cold starts and timeouts resume from the event log, not from ad hoc checkpoints. +* Tool-call retries are selective. Completed steps return recorded results. +* Parallel fanout uses ordinary `Promise.all()` with independent step retries. +* Long waits become first-class via durable `sleep()` and hook-based resume. + +The operational surface area shrinks. You stop maintaining a queue protocol, a scheduler, and a state machine schema per agent. + +```bash +npx -y -p @workflow/cli wf inspect runs --limit 10 +``` diff --git a/.blog/stateless-retries-multiply-agent-side-effects.md b/.blog/stateless-retries-multiply-agent-side-effects.md new file mode 100644 index 0000000000..97ff7a1d21 --- /dev/null +++ b/.blog/stateless-retries-multiply-agent-side-effects.md @@ -0,0 +1,134 @@ +# Workflow DevKit keeps Agents' tool-call volume linear under retries + +## Headline finding + +Stateless retries turn an Agent's tool calls into repeated work. As the number of tool calls per run grows, the expected number of executed calls grows faster than linearly because a single failure forces a full replay of the prefix. + +Workflow DevKit changes the unit of retry. The workflow replays deterministically, but completed steps return recorded results. A transient failure retries one step, not the entire Agent turn. + +## Methodology + +Model an Agent run as `N` sequential tool calls. Each call fails transiently with probability `p` and succeeds with probability `q = 1 - p`. + +Compare two retry strategies: + +* **Stateless retry:** a failure restarts the whole run from tool call 1. +* **Durable step retry:** a failure retries only the failed call; prior successful calls do not re-execute. + +This isolates the retry surface area. It does not assume anything about the LLM or tools beyond an independent per-call failure rate. + +## Data + +With stateless retry, the run completes only after it achieves `N` consecutive successful calls. The expected number of executed calls is: + +`E_stateless = (1 - q^N) / (p * q^N)` + +With durable step retry, each call is a geometric retry until success, so: + +`E_durable = N / q` + +Concrete numbers: + +* `p = 0.02`, `N = 40`: stateless `62.2` calls vs durable `40.8` calls (1.52x). +* `p = 0.05`, `N = 20`: stateless `35.8` calls vs durable `21.1` calls (1.70x). +* `p = 0.10`, `N = 40`: stateless `666.5` calls vs durable `44.4` calls (15.0x). + +The ratio compounds because stateless retry forces the run to finish the entire chain without a single transient failure. Durable steps turn that into independent retries per call. + +## Core insight + +In agent workloads, the expensive part is not the control flow. It is the tool boundary: API calls, database writes, emails, payments, rate-limited endpoints. Stateless retry replays those boundaries unless the application builds its own ledger of what already executed. + +That ledger is the same thing a durable runtime provides: an event log keyed by stable correlation ids. Workflow DevKit already emits a correlation id per step and records its lifecycle (`created`, `started`, `retrying`, `completed`, `failed`). Replay rehydrates the workflow and returns step results without re-executing successful calls. + +## Practical takeaway + +Use durable steps for every side-effecting tool call. Keep the workflow function deterministic and let the runtime handle replay and selective retry. If a tool supports idempotency keys, derive the key from the step correlation id instead of inventing your own scheme. + +### Stateless retry duplicates work + +**Before: retrying an Agent turn replays the full prefix** + +```ts +export async function agentTurn(input: Input) { + for (let attempt = 1; attempt <= 5; attempt += 1) { + try { + const a = await toolA(input); + const b = await toolB(a); + const c = await toolC(b); + return { a, b, c }; + } catch (err) { + if (attempt === 5) throw err; + await sleepMs(1000 * attempt); + } + } + throw new Error("unreachable"); +} +``` + +**After: durable steps replay successful calls and retry only the failed one** + +```ts +import { RetryableError } from "workflow"; + +async function toolA(input: Input) { 'use step'; return callA(input); } +async function toolB(a: A) { 'use step'; return callB(a); } +async function toolC(b: B) { + 'use step'; + const res = await callC(b); + if (res.transient === true) throw new RetryableError("toolC transient", { retryAfter: "2s" }); + return res; +} + +export async function agentTurn(input: Input) { + 'use workflow'; + const a = await toolA(input); + const b = await toolB(a); + return await toolC(b); +} +``` + +### Stop managing idempotency keys by hand + +**Before: generating and persisting idempotency keys across retries** + +```ts +import { sql } from "./db"; +import { randomUUID } from "crypto"; + +export async function purchase(runId: string, userId: string) { + const row = await sql`SELECT charge_key, email_key FROM runs WHERE id=${runId}`; + const chargeKey = row.charge_key ?? randomUUID(); + const emailKey = row.email_key ?? randomUUID(); + await sql`UPDATE runs SET charge_key=${chargeKey}, email_key=${emailKey} WHERE id=${runId}`; + await stripe.charges.create({ amount: 499, customer: userId }, { idempotencyKey: chargeKey }); + await sendReceiptEmail(userId, { idempotencyKey: emailKey }); +} +``` + +**After: use the step correlation id as the idempotency key** + +```ts +import { getStepMetadata } from "workflow"; + +async function chargeCard(userId: string, amount: number) { + 'use step'; + const { stepId } = getStepMetadata(); + return stripe.charges.create({ amount, customer: userId }, { idempotencyKey: stepId }); +} +async function sendReceipt(userId: string) { + 'use step'; + const { stepId } = getStepMetadata(); + await mailer.sendReceipt({ userId }, { idempotencyKey: stepId }); +} + +export async function purchase(userId: string) { + 'use workflow'; + await chargeCard(userId, 499); + await sendReceipt(userId); +} +``` + +```bash +npx -y -p @workflow/cli wf inspect runs +``` diff --git a/docs/content/docs/meta.json b/docs/content/docs/meta.json index 218fd4fa51..afcbce4849 100644 --- a/docs/content/docs/meta.json +++ b/docs/content/docs/meta.json @@ -7,6 +7,7 @@ "how-it-works", "observability", "ai", + "recipes", "deploying", "errors", "api-reference" diff --git a/docs/content/docs/recipes/index.mdx b/docs/content/docs/recipes/index.mdx new file mode 100644 index 0000000000..8cfeff4f03 --- /dev/null +++ b/docs/content/docs/recipes/index.mdx @@ -0,0 +1,17 @@ +--- +title: Recipes +description: Real-world workflow patterns for common integration scenarios. +type: overview +summary: Production-ready patterns for webhooks, event processing, and service integrations. +related: + - /docs/foundations/hooks + - /docs/foundations/common-patterns +--- + +These recipes demonstrate real-world patterns for integrating Workflow DevKit with external services. Each recipe builds on the [Hooks & Webhooks](/docs/foundations/hooks) and [Common Patterns](/docs/foundations/common-patterns) foundations to solve specific integration challenges. + + + + Receive and process webhooks from Slack, GitHub, and other external services. + + diff --git a/docs/content/docs/recipes/meta.json b/docs/content/docs/recipes/meta.json new file mode 100644 index 0000000000..1b4ad5174f --- /dev/null +++ b/docs/content/docs/recipes/meta.json @@ -0,0 +1,4 @@ +{ + "title": "Recipes", + "pages": ["webhook-integrations"] +} diff --git a/docs/content/docs/recipes/webhook-integrations.mdx b/docs/content/docs/recipes/webhook-integrations.mdx new file mode 100644 index 0000000000..abfdef85b8 --- /dev/null +++ b/docs/content/docs/recipes/webhook-integrations.mdx @@ -0,0 +1,368 @@ +--- +title: Webhook Integrations +description: Receive and process webhooks from external services like Slack, GitHub, and third-party APIs using durable workflows. +type: integration +summary: Build durable webhook processors that survive failures and scale automatically. +prerequisites: + - /docs/foundations/hooks +related: + - /docs/foundations/common-patterns + - /docs/api-reference/workflow/create-webhook + - /docs/api-reference/workflow/create-hook +--- + +Webhooks are how external services communicate events to your application. Workflow DevKit turns webhook handlers into durable processors that survive failures, replay deterministically, and scale without additional infrastructure. + +This guide covers common webhook integration patterns using [`createWebhook()`](/docs/api-reference/workflow/create-webhook) and [`createHook()`](/docs/api-reference/workflow/create-hook). If you are new to these primitives, start with the [Hooks & Webhooks](/docs/foundations/hooks) foundation guide first. + +## Slack Event Processing + +A common pattern is building a workflow that receives Slack events via webhook and processes each message as a durable step. Use a custom token based on the channel ID so your Slack webhook handler can route events to the correct workflow instance. + +```typescript title="workflows/slack-events.ts" lineNumbers +import { createWebhook, type RequestWithResponse } from "workflow"; + +interface SlackEvent { + type: string; + user: string; + text: string; + channel: string; + ts: string; +} + +async function acknowledgeSlack(request: RequestWithResponse) { + "use step"; + await request.respondWith( + new Response(JSON.stringify({ ok: true }), { + headers: { "Content-Type": "application/json" }, + }) + ); +} + +async function processSlackEvent(event: SlackEvent) { // [!code highlight] + "use step"; // [!code highlight] + + if (event.type === "message") { + // Call your internal APIs, update a database, trigger notifications + console.log(`[${event.channel}] ${event.user}: ${event.text}`); + } +} // [!code highlight] + +export async function slackEventProcessor(channelId: string) { + "use workflow"; + + const webhook = createWebhook({ // [!code highlight] + token: `slack_events:${channelId}`, // [!code highlight] + respondWith: "manual", // [!code highlight] + }); // [!code highlight] + + for await (const request of webhook) { // [!code highlight] + const body = await request.json(); + + // Acknowledge immediately so Slack does not retry + await acknowledgeSlack(request); + + // Process each event as a durable step + await processSlackEvent(body.event); + + if (body.event?.text === "stop") { + break; + } + } +} +``` + +Key points: +- The custom `token` lets your Slack API route reconstruct the webhook URL for any channel +- `respondWith: "manual"` allows you to acknowledge the request before processing +- Each event is processed in a step, so failures retry without losing the event + +## GitHub Webhook Handler + +GitHub sends webhooks for pushes, pull requests, issues, and other repository events. Build a workflow that receives these events, verifies the signature, and routes to the appropriate handler. + +```typescript title="workflows/github-webhook.ts" lineNumbers +import { createWebhook, type RequestWithResponse } from "workflow"; + +interface GitHubEvent { + action?: string; + repository: { full_name: string }; + sender: { login: string }; + [key: string]: unknown; +} + +async function verifyAndParse( + request: RequestWithResponse, + secret: string +): Promise<{ eventType: string; payload: GitHubEvent }> { + "use step"; // [!code highlight] + + const signature = request.headers.get("x-hub-signature-256") ?? ""; + const body = await request.text(); + + // Verify HMAC signature + const encoder = new TextEncoder(); + const key = await crypto.subtle.importKey( + "raw", + encoder.encode(secret), + { name: "HMAC", hash: "SHA-256" }, + false, + ["sign"] + ); + const sig = await crypto.subtle.sign("HMAC", key, encoder.encode(body)); + const expected = "sha256=" + Array.from(new Uint8Array(sig)) + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); + + if (signature !== expected) { + throw new Error("Invalid GitHub webhook signature"); + } + + await request.respondWith(new Response("OK", { status: 200 })); + + const eventType = request.headers.get("x-github-event") ?? "unknown"; + return { eventType, payload: JSON.parse(body) }; +} + +async function handlePush(payload: GitHubEvent) { + "use step"; + console.log(`Push to ${payload.repository.full_name} by ${payload.sender.login}`); + // Trigger builds, run tests, update dashboards +} + +async function handlePullRequest(payload: GitHubEvent) { + "use step"; + console.log(`PR ${payload.action} on ${payload.repository.full_name}`); + // Run checks, post comments, update project boards +} + +async function handleIssue(payload: GitHubEvent) { + "use step"; + console.log(`Issue ${payload.action} on ${payload.repository.full_name}`); + // Triage, assign, notify team +} + +export async function githubWebhookHandler(repoName: string) { // [!code highlight] + "use workflow"; // [!code highlight] + + const webhook = createWebhook({ + token: `github:${repoName}`, + respondWith: "manual", + }); + + for await (const request of webhook) { + const { eventType, payload } = await verifyAndParse( + request, + process.env.GITHUB_WEBHOOK_SECRET! + ); + + switch (eventType) { // [!code highlight] + case "push": // [!code highlight] + await handlePush(payload); // [!code highlight] + break; // [!code highlight] + case "pull_request": // [!code highlight] + await handlePullRequest(payload); // [!code highlight] + break; // [!code highlight] + case "issues": // [!code highlight] + await handleIssue(payload); // [!code highlight] + break; // [!code highlight] + } // [!code highlight] + } +} // [!code highlight] +``` + + +Signature verification runs as a step so it has full access to Node.js crypto APIs. Workflow functions run in a [sandboxed VM](/docs/foundations/workflows-and-steps) without direct access to these modules. + + +## Webhook Forwarding + +Some integrations require forwarding a single webhook to multiple downstream systems. Use `Promise.all` to fan out in parallel, with each forwarding call as an independent retryable step. + +```typescript title="workflows/webhook-forwarder.ts" lineNumbers +import { createWebhook, type RequestWithResponse } from "workflow"; + +async function acknowledgeRequest(request: RequestWithResponse) { + "use step"; + await request.respondWith( + Response.json({ status: "forwarding" }, { status: 202 }) + ); +} + +async function forwardToEndpoint( // [!code highlight] + url: string, // [!code highlight] + payload: string, // [!code highlight] + headers: Record // [!code highlight] +): Promise<{ url: string; status: number }> { // [!code highlight] + "use step"; // [!code highlight] + + const response = await fetch(url, { + method: "POST", + body: payload, + headers: { "Content-Type": "application/json", ...headers }, + }); + + if (!response.ok) { + // Throwing here triggers automatic retry + throw new Error(`Forward to ${url} failed: ${response.status}`); + } + + return { url, status: response.status }; +} // [!code highlight] + +export async function webhookForwarder(endpoints: string[]) { + "use workflow"; + + const webhook = createWebhook({ respondWith: "manual" }); + + for await (const request of webhook) { + const payload = await request.text(); + await acknowledgeRequest(request); + + // Fan out to all endpoints in parallel + const results = await Promise.all( // [!code highlight] + endpoints.map((url) => forwardToEndpoint(url, payload, {})) // [!code highlight] + ); // [!code highlight] + + console.log("Forwarded to", results.length, "endpoints"); + } +} +``` + +Because each `forwardToEndpoint` call is a separate step, a failure to one endpoint retries independently without affecting the others. The workflow is durable across all of them. + +## Scheduled Task Processing + +For recurring tasks, combine [`sleep()`](/docs/api-reference/workflow/sleep) with step execution to build a durable cron-like processor. Unlike traditional cron jobs, the workflow maintains state between iterations and survives restarts. + +```typescript title="workflows/scheduled-sync.ts" lineNumbers +import { sleep } from "workflow"; + +async function fetchLatestData(): Promise<{ count: number; updatedAt: string }> { + "use step"; + + const response = await fetch("https://api.example.com/data"); + const data = await response.json(); + return data as { count: number; updatedAt: string }; +} + +async function syncToDatabase(data: { count: number; updatedAt: string }) { + "use step"; + console.log("Syncing", data.count, "records from", data.updatedAt); + // Write to your database +} + +export async function scheduledSync(intervalMinutes: number, maxRuns: number) { + "use workflow"; + + for (let i = 0; i < maxRuns; i++) { // [!code highlight] + const data = await fetchLatestData(); + await syncToDatabase(data); + + console.log(`Run ${i + 1}/${maxRuns} complete`); + + if (i < maxRuns - 1) { + await sleep(`${intervalMinutes} minutes`); // [!code highlight] + } + } + + console.log("Scheduled sync complete"); +} +``` + + +`sleep()` is durable - if the workflow restarts during a sleep, it resumes when the original sleep duration expires rather than starting over. See [Common Patterns](/docs/foundations/common-patterns) for more on combining `sleep()` with `Promise.race` for timeouts. + + +## Webhook Response Patterns + +Workflow DevKit supports three response modes for webhooks. Choose the one that fits your integration requirements. + +### Default (202 Accepted) + +When no `respondWith` option is set, the webhook automatically returns `202 Accepted` to the caller. This is the simplest option for services that only need delivery confirmation. + +```typescript title="workflows/simple-receiver.ts" lineNumbers +import { createWebhook } from "workflow"; + +export async function simpleReceiver() { + "use workflow"; + + // Caller receives 202 Accepted automatically + const webhook = createWebhook(); // [!code highlight] + + const request = await webhook; + const data = await request.json(); + await processEvent(data); +} + +declare function processEvent(data: unknown): Promise; // @setup +``` + +### Static Response + +Provide a fixed `Response` object that is returned for every request. Use this when the caller expects a specific response format. + +```typescript title="workflows/static-response.ts" lineNumbers +import { createWebhook } from "workflow"; + +export async function staticResponseWebhook() { + "use workflow"; + + const webhook = createWebhook({ + respondWith: Response.json({ received: true, status: "processing" }), // [!code highlight] + }); + + const request = await webhook; + const data = await request.json(); + await processEvent(data); +} + +declare function processEvent(data: unknown): Promise; // @setup +``` + +### Dynamic Response (Manual Mode) + +Set `respondWith: "manual"` to control the response from a step function. This is required when the response depends on request content. + +```typescript title="workflows/dynamic-response.ts" lineNumbers +import { createWebhook, type RequestWithResponse } from "workflow"; + +async function respond(request: RequestWithResponse, body: Record, status: number) { + "use step"; + await request.respondWith( // [!code highlight] + Response.json(body, { status }) // [!code highlight] + ); // [!code highlight] +} + +export async function dynamicResponseWebhook() { + "use workflow"; + + const webhook = createWebhook({ respondWith: "manual" }); // [!code highlight] + + const request = await webhook; + const data = await request.json(); + + if (!data.type) { + await respond(request, { error: "Missing type field" }, 400); + return; + } + + await respond(request, { accepted: true, type: data.type }, 200); + await processEvent(data); +} + +declare function processEvent(data: unknown): Promise; // @setup +``` + + +`respondWith()` must be called from a step function. See the [Hooks & Webhooks guide](/docs/foundations/hooks) for details on this requirement. + + +## Related Documentation + +- [Hooks & Webhooks](/docs/foundations/hooks) - Core hook and webhook primitives +- [Common Patterns](/docs/foundations/common-patterns) - Sequential, parallel, and timeout patterns +- [`createWebhook()` API Reference](/docs/api-reference/workflow/create-webhook) - Full webhook API +- [`createHook()` API Reference](/docs/api-reference/workflow/create-hook) - Low-level hook API +- [Errors and Retries](/docs/foundations/errors-and-retries) - Retry semantics for step functions diff --git a/packages/next/README.md b/packages/next/README.md index bb8604d934..dbd1e8ea1f 100644 --- a/packages/next/README.md +++ b/packages/next/README.md @@ -1,3 +1,199 @@ # @workflow/next -Next.js plugin for [Workflow DevKit](https://useworkflow.dev). +Next.js integration for Workflow DevKit. + +## Install + +```bash +npm install workflow next +# or +pnpm add workflow next +# or +yarn add workflow next +# or +bun add workflow next +``` + +`next` is a peer dependency. `workflow` includes this package as `workflow/next`. + +## Usage + +Wrap your Next config with `withWorkflow()`. + +```ts +import type { NextConfig } from 'next'; +import { withWorkflow } from '@workflow/next'; + +const nextConfig: NextConfig = { + // your Next.js config +}; + +export default withWorkflow(nextConfig); +``` + +### Type signature + +```ts +import type { NextConfig } from 'next'; + +export declare function withWorkflow( + nextConfigOrFn: + | NextConfig + | (( + phase: string, + ctx: { defaultConfig: NextConfig } + ) => Promise), + { + workflows, + }?: { + workflows?: { + local?: { + port?: number; + dataDir?: string; + }; + }; + } +): ( + phase: string, + ctx: { defaultConfig: NextConfig } +) => Promise; +``` + +### Example: object config + +```ts +import type { NextConfig } from 'next'; +import { withWorkflow } from '@workflow/next'; + +const nextConfig: NextConfig = { + reactStrictMode: true, +}; + +export default withWorkflow(nextConfig, { + workflows: { + local: { + port: 3152, + }, + }, +}); +``` + +### Example: async config function + +```ts +import type { NextConfig } from 'next'; +import { withWorkflow } from '@workflow/next'; + +export default withWorkflow(async (phase, { defaultConfig }) => { + const nextConfig: NextConfig = { + ...defaultConfig, + reactStrictMode: true, + }; + + if (phase === 'phase-production-build') { + nextConfig.productionBrowserSourceMaps = true; + } + + return nextConfig; +}); +``` + +## What `withWorkflow()` does + +When you wrap your config, `withWorkflow()`: + +1. Sets runtime defaults for local and Vercel worlds. +2. Registers the Workflow loader in both Turbopack and webpack. +3. Builds generated workflow routes in `.well-known/workflow/v1/*`. +4. Watches source files in development and incrementally rebuilds bundles. +5. Avoids duplicate builder runs per process using `WORKFLOW_NEXT_PRIVATE_BUILT`. + +## Environment variables + +| Variable | Used by | Behavior | +| --- | --- | --- | +| `WORKFLOW_TARGET_WORLD` | `withWorkflow()` + runtime world selection | If not set: defaults to `local` when not on Vercel, and `vercel` when `VERCEL_DEPLOYMENT_ID` is present. | +| `WORKFLOW_LOCAL_DATA_DIR` | Local world runtime | Set to `.next/workflow-data` by `withWorkflow()` when defaulting to local world. You can override it explicitly in your environment. | +| `PORT` | Next dev/build process | Set from `workflows.local.port` when running outside Vercel. | +| `WORKFLOW_NEXT_PRIVATE_BUILT` | `withWorkflow()` internals | Internal guard to ensure builder setup runs once per main process. | +| `WORKFLOW_PUBLIC_MANIFEST` | Builder/public output | When set to `1`, copies `manifest.json` to `public/.well-known/workflow/v1/manifest.json` so Next serves it publicly. | +| `WATCHPACK_WATCHER_LIMIT` | Watch mode on macOS | Set to `20` during dev watch mode on Darwin to mitigate slow watcher teardown behavior. | + +## Package exports + +| Export path | Description | +| --- | --- | +| `@workflow/next` | Main Next integration export. Provides `withWorkflow()`. | +| `@workflow/next/loader` | Loader that applies Workflow client-mode transforms for `"use workflow"` and `"use step"`. | +| `@workflow/next/runtime` | Re-export of `@workflow/core/dist/runtime` for runtime compatibility. | + +If you install the umbrella `workflow` package, these are available from `workflow/next` and related subpaths. + +## Generated `.well-known/workflow/v1/*` files + +`@workflow/next` generates these files under your app directory (`app/` or `src/app/`): + +| File | Purpose | Public route | +| --- | --- | --- | +| `.well-known/workflow/v1/flow/route.js` | Workflow orchestration handler bundle. | `POST /.well-known/workflow/v1/flow` | +| `.well-known/workflow/v1/step/route.js` | Step execution handler bundle. | `POST /.well-known/workflow/v1/step` | +| `.well-known/workflow/v1/webhook/[token]/route.js` | Webhook delivery handler bundle. | `POST /.well-known/workflow/v1/webhook/:token` | +| `.well-known/workflow/v1/manifest.json` | Workflow/step/class manifest (with graph metadata). | Not public unless `WORKFLOW_PUBLIC_MANIFEST=1` | +| `.well-known/workflow/v1/config.json` | Production function trigger config for Next build output. | Internal build artifact | +| `.well-known/workflow/v1/.gitignore` | Prevents committing generated artifacts. | N/A | + +If your app uses `pages/` only, the builder creates a sibling `app/` (or `src/app/`) directory for generated routes. + +## How generated files work at runtime + +1. Your app calls `start()` with a transformed workflow function. +2. Runtime posts to `/.well-known/workflow/v1/flow` to advance orchestration. +3. Steps execute through `/.well-known/workflow/v1/step`. +4. Webhook resumptions arrive through `/.well-known/workflow/v1/webhook/:token`. +5. Manifest metadata is used by tooling and can be exposed for observability. + +## Serving the manifest publicly + +To expose the manifest over HTTP, set: + +```bash +WORKFLOW_PUBLIC_MANIFEST=1 +``` + +On build, `@workflow/next` copies: + +- From: `app/.well-known/workflow/v1/manifest.json` (or `src/app/...`) +- To: `public/.well-known/workflow/v1/manifest.json` + +Next.js then serves it at: + +- `/.well-known/workflow/v1/manifest.json` + +## Troubleshooting + +### `'start' received an invalid workflow function` + +- Ensure your workflow function has `"use workflow"`. +- Ensure step functions use `"use step"` where required. +- Ensure `next.config.*` is wrapped with `withWorkflow()`. + +### Workflow routes return 404 + +- Confirm one of these exists: `app/`, `src/app/`, `pages/`, or `src/pages/`. +- Confirm generated files exist under `.well-known/workflow/v1/*`. +- If using a Next proxy handler, exclude `/.well-known/workflow/` paths. + +### Manifest route is missing + +- Set `WORKFLOW_PUBLIC_MANIFEST=1` before running/building. +- Rebuild so `manifest.json` is copied into `public/.well-known/workflow/v1/`. + +### Next.js 16.1+ build error + +If you see: + +```text +Error: Cannot find module 'next/dist/lib/server-external-packages.json' +``` + +Upgrade to `workflow@4.0.1-beta.26` or newer. diff --git a/packages/swc-plugin-workflow/spec.md b/packages/swc-plugin-workflow/spec.md index a697ad9b8e..dddeb78866 100644 --- a/packages/swc-plugin-workflow/spec.md +++ b/packages/swc-plugin-workflow/spec.md @@ -576,7 +576,10 @@ registerSerializationClass("class//./input//Point", Point); ## Static Methods -Static class methods can be marked with directives. Instance methods are **not supported**. +Static class methods can be marked with directives. Instance methods are supported for `"use step"` (with custom serialization), but `"use workflow"` is only supported on static methods. + +- `"use step"`: supported on **static** and **instance** methods (instance methods require custom serialization). +- `"use workflow"`: supported on **static** methods only (instance methods are rejected). ### Static Step Method @@ -795,6 +798,8 @@ Files containing classes with custom serialization are automatically discovered This allows serialization classes to be defined in separate files (such as Next.js API routes or utility modules) and still be registered in the serialization system when the application is built. +> **Compatibility note:** For auto-discovery of serialization-only files, prefer importing `WORKFLOW_SERIALIZE` / `WORKFLOW_DESERIALIZE` from `@workflow/serde` consistently. If you import these symbols from `@vercel/workflow` in a file that contains only serialization classes (no `"use step"`/`"use workflow"`), the file may not match the discovery heuristic. If you must use `@vercel/workflow`, either use `Symbol.for("workflow-serialize"/"workflow-deserialize")` directly or ensure the file also contains a workflow directive so it is transformed. + ### Cross-Context Class Registration Classes with custom serialization are automatically included in **all bundle contexts** (step, workflow, client) to ensure they can be properly serialized and deserialized when crossing execution boundaries: @@ -850,6 +855,9 @@ The plugin emits errors for invalid usage: |-------|-------------| | Non-async function | Functions with `"use step"` or `"use workflow"` must be async | | Instance methods with `"use workflow"` | Only static methods can have `"use workflow"` (not instance methods) | +| Forbidden `this` in step function | Step functions cannot reference `this` (they are hoisted and executed out of instance context). | +| Forbidden `arguments` in step function | Step functions cannot reference `arguments`. Use explicit parameters or rest params instead. | +| Forbidden `super` in step function | Step functions cannot use `super` calls. Move that logic outside the step boundary. | | Misplaced directive | Directive must be at top of file or start of function body | | Conflicting directives | Cannot have both `"use step"` and `"use workflow"` at module level | | Invalid exports | Module-level directive files can only export async functions |