diff --git a/docs/content/docs/getting-started/meta.json b/docs/content/docs/getting-started/meta.json
index d0e189dd73..60e0131621 100644
--- a/docs/content/docs/getting-started/meta.json
+++ b/docs/content/docs/getting-started/meta.json
@@ -9,7 +9,8 @@
"nitro",
"nuxt",
"sveltekit",
- "vite"
+ "vite",
+ "python"
],
"defaultOpen": true
}
diff --git a/docs/content/docs/getting-started/python.mdx b/docs/content/docs/getting-started/python.mdx
new file mode 100644
index 0000000000..40c715b0a3
--- /dev/null
+++ b/docs/content/docs/getting-started/python.mdx
@@ -0,0 +1,165 @@
+---
+title: Python
+description: Build durable workflows and AI agents in Python with the Vercel SDK.
+type: guide
+summary: Set up the Workflow Python SDK in your Python application.
+prerequisites:
+ - /docs/getting-started
+related:
+ - /docs/foundations
+ - /docs/foundations/workflows-and-steps
+---
+
+
+The Python SDK is currently in **beta**. APIs and behavior may change. For the latest documentation and updates, see the [official Vercel Workflow Python documentation](https://vercel.com/docs/workflow/python?language=py).
+
+
+You can build durable workflows in Python using the [`vercel` Python SDK](https://pypi.org/project/vercel/). Your workflow code can pause, resume, and maintain state, just like the JavaScript and TypeScript Workflow SDK.
+
+## Getting Started
+
+Install the `vercel` package:
+
+```bash filename="Terminal"
+pip install vercel
+```
+
+Configure `experimentalServices` in your `vercel.json`:
+
+```json filename="vercel.json"
+{
+ "experimentalServices": {
+ "ai_content_workflow": {
+ "type": "worker",
+ "entrypoint": "app/workflows/ai_content_workflow.py",
+ "topics": ["__wkf_*"]
+ }
+ }
+}
+```
+
+## Workflows
+
+A workflow is a stateful function that coordinates multi-step logic over time. Create a `Workflows` instance and use the `@wf.workflow` decorator to mark a function as durable:
+
+```python filename="app/workflow.py" {3}
+from vercel import workflow
+
+wf = workflow.Workflows()
+```
+
+```python filename="app/workflows/ai_content_workflow.py" {3}
+from app.workflow import wf
+
+@wf.workflow
+async def ai_content_workflow(*, topic: str):
+ draft = await generate_draft(topic=topic)
+ summary = await summarize_draft(draft=draft)
+
+ return {
+ "draft": draft,
+ "summary": summary,
+ }
+```
+
+Under the hood, the workflow compiles into a route that orchestrates execution. All inputs and outputs are recorded in an event log. If a deploy or crash happens, the system replays execution deterministically from where it stopped.
+
+## Steps
+
+A step is a stateless function that runs a unit of durable work inside a workflow. Use `@wf.step` to mark a function as a step:
+
+```python filename="app/steps/generate_draft.py" {4,8}
+import random
+from app.workflow import wf
+
+@wf.step
+async def generate_draft(*, topic: str):
+ return await ai_generate(prompt=f"Write a blog post about {topic}")
+
+@wf.step
+async def summarize_draft(*, draft: str):
+ summary = await ai_summarize(text=draft)
+
+ # Simulate a transient error. The step automatically retries.
+ if random.random() < 0.3:
+ raise Exception("Transient AI provider error")
+
+ return summary
+```
+
+Each step compiles into an isolated route. While the step executes, the workflow suspends without consuming resources. When the step completes, the workflow resumes automatically where it left off.
+
+## Sleep
+
+Sleep pauses a workflow for a specified duration without consuming compute resources:
+
+```python filename="app/workflows/ai_refine.py" {7}
+from vercel import workflow
+
+@wf.workflow
+async def ai_refine_workflow(*, draft_id: str):
+ draft = await fetch_draft(draft_id)
+
+ await workflow.sleep("7 days") # Wait 7 days to gather more signals.
+
+ refined = await refine_draft(draft)
+
+ return {
+ "draft_id": draft_id,
+ "refined": refined,
+ }
+```
+
+The sleep call pauses the workflow and consumes no resources. The workflow resumes automatically when the time expires.
+
+## Hooks
+
+A hook lets a workflow wait for external events such as user actions, webhooks, or third-party API responses.
+
+Define a hook model with Pydantic and `workflow.BaseHook`:
+
+```python filename="app/workflows/approval.py" {3,14}
+from vercel import workflow
+
+class Approval(BaseModel, workflow.BaseHook):
+ """Human approval for AI-generated drafts"""
+
+ decision: Literal["approved", "changes"]
+ notes: str | None = None
+
+@wf.workflow
+async def ai_approval_workflow(*, topic: str):
+ draft = await generate_draft(topic=topic)
+
+ # Wait for human approval events
+ async for event in Approval.wait(token="draft-123"):
+ if event.decision == "approved":
+ await publish_draft(draft)
+ break
+
+ revised = await refine_draft(draft, event.notes)
+ await publish_draft(revised)
+```
+
+Resume the workflow when data arrives:
+
+```python filename="app/api/resume.py" {5}
+@app.post("/api/resume")
+async def resume(approval: Approval):
+ """Resume the workflow when an approval is received"""
+
+ await approval.resume("draft-123")
+ return {"ok": True}
+```
+
+When a hook receives data, the workflow resumes automatically. You don't need polling, message queues, or manual state management.
+
+## Learn More
+
+For comprehensive documentation, examples, and the latest updates, visit the [official Vercel Workflow Python documentation](https://vercel.com/docs/workflow/python).
+
+## Next Steps
+
+- Learn more about the [Foundations](/docs/foundations).
+- Check [Errors](/docs/errors) if you encounter issues.
+- Explore the [API Reference](/docs/api-reference).
diff --git a/docs/next.config.ts b/docs/next.config.ts
index 8333357460..07e1a9fb2d 100644
--- a/docs/next.config.ts
+++ b/docs/next.config.ts
@@ -23,10 +23,6 @@ const config: NextConfig = {
return {
beforeFiles: [
- {
- source: '/sitemap.xml',
- destination: 'https://crawled-sitemap.vercel.sh/useworkflow.dev-.xml',
- },
{
source: '/docs/:path*',
destination: '/llms.mdx/:path*',
diff --git a/packages/web/vite.config.ts b/packages/web/vite.config.ts
index b44148336f..781f13dc78 100644
--- a/packages/web/vite.config.ts
+++ b/packages/web/vite.config.ts
@@ -10,6 +10,13 @@ export default defineConfig(({ command, isSsrBuild }) => ({
isSsrBuild && !process.env.VERCEL
? { input: './server/app.ts' }
: undefined,
+ // Disable minification so the published npm package contains readable
+ // code. Without this, Vite's esbuild minifier produces single-line
+ // mega-bundles that supply-chain security scanners (e.g. Socket) flag
+ // as "obfuscated code". The app is a self-hosted observability tool
+ // where the unminified size difference is negligible — gzip/brotli at
+ // the serving layer compresses the wire payload regardless.
+ minify: false,
},
// Bundle all dependencies into the server build so that @workflow/web
// can be installed and run without needing any of the UI dependencies
diff --git a/packages/world-vercel/src/streamer-reconnect.test.ts b/packages/world-vercel/src/streamer-reconnect.test.ts
new file mode 100644
index 0000000000..cf6b6236a4
--- /dev/null
+++ b/packages/world-vercel/src/streamer-reconnect.test.ts
@@ -0,0 +1,307 @@
+import {
+ afterEach,
+ beforeEach,
+ describe,
+ expect,
+ it,
+ vi,
+ type Mock,
+} from 'vitest';
+import {
+ parseStreamControlFrame,
+ STREAM_CONTROL_FRAME_SIZE,
+} from './streamer.js';
+
+// ---------------------------------------------------------------------------
+// Helpers
+// ---------------------------------------------------------------------------
+
+/** Build a control frame matching the workflow-server wire format. */
+function buildControlFrame(done: boolean, nextIndex: number): Uint8Array {
+ const frame = new Uint8Array(STREAM_CONTROL_FRAME_SIZE);
+ frame[4] = done ? 1 : 0;
+ new DataView(frame.buffer).setUint32(5, nextIndex, false);
+ frame.set(new Uint8Array([0x57, 0x46, 0x43, 0x54]), 9); // "WFCT"
+ return frame;
+}
+
+/** Create a ReadableStream that emits the given byte chunks in order. */
+function chunkedStream(chunks: Uint8Array[]): ReadableStream {
+ let i = 0;
+ return new ReadableStream({
+ pull(controller) {
+ if (i < chunks.length) {
+ controller.enqueue(chunks[i++]);
+ } else {
+ controller.close();
+ }
+ },
+ });
+}
+
+/** Collect every byte from a ReadableStream into one Uint8Array. */
+async function drain(stream: ReadableStream): Promise {
+ const reader = stream.getReader();
+ const parts: Uint8Array[] = [];
+ for (;;) {
+ const { done, value } = await reader.read();
+ if (done) break;
+ parts.push(value);
+ }
+ const len = parts.reduce((s, p) => s + p.length, 0);
+ const out = new Uint8Array(len);
+ let off = 0;
+ for (const p of parts) {
+ out.set(p, off);
+ off += p.length;
+ }
+ return out;
+}
+
+// ---------------------------------------------------------------------------
+// Mock setup — isolate streamer from real auth / HTTP
+// ---------------------------------------------------------------------------
+
+vi.mock('@vercel/oidc', () => ({
+ getVercelOidcToken: () => Promise.resolve('test-token'),
+}));
+
+describe('streams.get reconnection (integration)', () => {
+ let fetchMock: Mock;
+ const originalFetch = globalThis.fetch;
+
+ beforeEach(() => {
+ fetchMock = vi.fn();
+ globalThis.fetch = fetchMock as unknown as typeof fetch;
+ });
+
+ afterEach(() => {
+ globalThis.fetch = originalFetch;
+ vi.restoreAllMocks();
+ });
+
+ /** Import createStreamer lazily so the mock is in place. */
+ async function getStreamer() {
+ const { createStreamer } = await import('./streamer.js');
+ return createStreamer({ token: 'test-token' });
+ }
+
+ /** Build a mock Response whose body is a ReadableStream of byte chunks. */
+ function streamResponse(...chunks: Uint8Array[]): Response {
+ return new Response(chunkedStream(chunks), {
+ status: 200,
+ headers: { 'Content-Type': 'application/octet-stream' },
+ });
+ }
+
+ it('returns data unchanged when server sends done=true control frame', async () => {
+ const data = new TextEncoder().encode('hello');
+ const control = buildControlFrame(true, 5);
+
+ fetchMock.mockResolvedValueOnce(streamResponse(data, control));
+
+ const streamer = await getStreamer();
+ const result = await drain(
+ await streamer.streams.get('run_test', 'strm_test')
+ );
+
+ expect(result).toEqual(data);
+ expect(fetchMock).toHaveBeenCalledTimes(1);
+ });
+
+ it('reconnects when server sends done=false and resumes from nextIndex', async () => {
+ const chunk1 = new TextEncoder().encode('aaa');
+ const chunk2 = new TextEncoder().encode('bbb');
+ const timeout = buildControlFrame(false, 3); // timeout at chunk 3
+ const done = buildControlFrame(true, 6); // done on second connection
+
+ // First connection: returns chunk1 + timeout control frame
+ fetchMock.mockResolvedValueOnce(streamResponse(chunk1, timeout));
+ // Second connection: returns chunk2 + done control frame
+ fetchMock.mockResolvedValueOnce(streamResponse(chunk2, done));
+
+ const streamer = await getStreamer();
+ const result = await drain(
+ await streamer.streams.get('run_test', 'strm_test')
+ );
+
+ // Should have received both chunks' data
+ const expected = new Uint8Array([...chunk1, ...chunk2]);
+ expect(result).toEqual(expected);
+
+ // Should have made two fetch calls
+ expect(fetchMock).toHaveBeenCalledTimes(2);
+
+ // Second call should have startIndex=3 (from control frame)
+ const secondUrl = fetchMock.mock.calls[1][0] as URL;
+ expect(secondUrl.toString()).toContain('startIndex=3');
+ });
+
+ it('handles multiple consecutive reconnections', async () => {
+ const chunks = [
+ new TextEncoder().encode('a'),
+ new TextEncoder().encode('b'),
+ new TextEncoder().encode('c'),
+ ];
+
+ fetchMock
+ .mockResolvedValueOnce(
+ streamResponse(chunks[0], buildControlFrame(false, 10))
+ )
+ .mockResolvedValueOnce(
+ streamResponse(chunks[1], buildControlFrame(false, 20))
+ )
+ .mockResolvedValueOnce(
+ streamResponse(chunks[2], buildControlFrame(true, 30))
+ );
+
+ const streamer = await getStreamer();
+ const result = await drain(
+ await streamer.streams.get('run_test', 'strm_test')
+ );
+
+ const expected = new Uint8Array([...chunks[0], ...chunks[1], ...chunks[2]]);
+ expect(result).toEqual(expected);
+ expect(fetchMock).toHaveBeenCalledTimes(3);
+
+ // Verify startIndex progression
+ const urls = fetchMock.mock.calls.map((c: unknown[]) =>
+ (c[0] as URL).toString()
+ );
+ expect(urls[0]).toContain('startIndex=0');
+ expect(urls[1]).toContain('startIndex=10');
+ expect(urls[2]).toContain('startIndex=20');
+ });
+
+ it('passes startIndex from caller on initial connection', async () => {
+ const data = new Uint8Array([42]);
+ fetchMock.mockResolvedValueOnce(
+ streamResponse(data, buildControlFrame(true, 8))
+ );
+
+ const streamer = await getStreamer();
+ await drain(await streamer.streams.get('run_test', 'strm_test', 5));
+
+ const url = (fetchMock.mock.calls[0][0] as URL).toString();
+ expect(url).toContain('startIndex=5');
+ });
+
+ it('handles control frame split across two read chunks', async () => {
+ const data = new TextEncoder().encode('hello');
+ const control = buildControlFrame(true, 1);
+
+ // Split the control frame in the middle (5 bytes + 8 bytes)
+ const controlPart1 = control.slice(0, 5);
+ const controlPart2 = control.slice(5);
+
+ fetchMock.mockResolvedValueOnce(
+ streamResponse(data, controlPart1, controlPart2)
+ );
+
+ const streamer = await getStreamer();
+ const result = await drain(
+ await streamer.streams.get('run_test', 'strm_test')
+ );
+
+ expect(result).toEqual(data);
+ });
+
+ it('handles data + control frame coalesced into one chunk', async () => {
+ const data = new TextEncoder().encode('xyz');
+ const control = buildControlFrame(true, 3);
+ const combined = new Uint8Array(data.length + control.length);
+ combined.set(data, 0);
+ combined.set(control, data.length);
+
+ fetchMock.mockResolvedValueOnce(streamResponse(combined));
+
+ const streamer = await getStreamer();
+ const result = await drain(
+ await streamer.streams.get('run_test', 'strm_test')
+ );
+
+ expect(result).toEqual(data);
+ });
+
+ it('works with no data — only control frame (empty stream, done)', async () => {
+ const control = buildControlFrame(true, 0);
+ fetchMock.mockResolvedValueOnce(streamResponse(control));
+
+ const streamer = await getStreamer();
+ const result = await drain(
+ await streamer.streams.get('run_test', 'strm_test')
+ );
+
+ expect(result.length).toBe(0);
+ });
+
+ it('works with no data — only control frame (immediate timeout, reconnects)', async () => {
+ const timeout = buildControlFrame(false, 0);
+ const data = new TextEncoder().encode('after-reconnect');
+ const done = buildControlFrame(true, 5);
+
+ fetchMock
+ .mockResolvedValueOnce(streamResponse(timeout))
+ .mockResolvedValueOnce(streamResponse(data, done));
+
+ const streamer = await getStreamer();
+ const result = await drain(
+ await streamer.streams.get('run_test', 'strm_test')
+ );
+
+ expect(result).toEqual(data);
+ expect(fetchMock).toHaveBeenCalledTimes(2);
+ });
+
+ it('falls through when no control frame is present (backward compat)', async () => {
+ const data = new TextEncoder().encode('legacy server');
+
+ // Old server: no control frame, just data
+ fetchMock.mockResolvedValueOnce(streamResponse(data));
+
+ const streamer = await getStreamer();
+ const result = await drain(
+ await streamer.streams.get('run_test', 'strm_test')
+ );
+
+ expect(result).toEqual(data);
+ expect(fetchMock).toHaveBeenCalledTimes(1);
+ });
+
+ it('propagates network error to consumer without retrying', async () => {
+ const data = new TextEncoder().encode('partial');
+
+ // Create a stream that errors mid-read
+ let callCount = 0;
+ const errorStream = new ReadableStream({
+ pull(controller) {
+ if (callCount === 0) {
+ callCount++;
+ controller.enqueue(data);
+ } else {
+ controller.error(new Error('connection reset'));
+ }
+ },
+ });
+
+ fetchMock.mockResolvedValueOnce(new Response(errorStream, { status: 200 }));
+
+ const streamer = await getStreamer();
+ const stream = await streamer.streams.get('run_test', 'strm_test');
+
+ // The error should propagate to the consumer rather than silently closing
+ await expect(drain(stream)).rejects.toThrow('connection reset');
+
+ expect(fetchMock).toHaveBeenCalledTimes(1);
+ // Should NOT have attempted a second fetch (no reconnection on error)
+ });
+
+ it('throws on non-200 response', async () => {
+ fetchMock.mockResolvedValueOnce(new Response('not found', { status: 404 }));
+
+ const streamer = await getStreamer();
+ await expect(
+ streamer.streams.get('run_test', 'strm_missing')
+ ).rejects.toThrow('Failed to fetch stream: 404');
+ });
+});
diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts
index 67f950a143..4666dbfbd2 100644
--- a/packages/world-vercel/src/streamer.test.ts
+++ b/packages/world-vercel/src/streamer.test.ts
@@ -1,5 +1,10 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
-import { encodeMultiChunks, MAX_CHUNKS_PER_REQUEST } from './streamer.js';
+import {
+ encodeMultiChunks,
+ MAX_CHUNKS_PER_REQUEST,
+ parseStreamControlFrame,
+ STREAM_CONTROL_FRAME_SIZE,
+} from './streamer.js';
describe('encodeMultiChunks', () => {
/**
@@ -169,6 +174,194 @@ describe('encodeMultiChunks', () => {
});
});
+/**
+ * Build a control frame matching the workflow-server format.
+ */
+function buildControlFrame(done: boolean, nextIndex: number): Uint8Array {
+ const frame = new Uint8Array(STREAM_CONTROL_FRAME_SIZE);
+ // Bytes 0-3: zero-frame marker (already 0x00)
+ frame[4] = done ? 1 : 0;
+ new DataView(frame.buffer).setUint32(5, nextIndex, false);
+ // Magic footer "WFCT"
+ frame.set(new Uint8Array([0x57, 0x46, 0x43, 0x54]), 9);
+ return frame;
+}
+
+describe('parseStreamControlFrame', () => {
+ it('parses a valid done=true control frame', () => {
+ const frame = buildControlFrame(true, 42);
+ const result = parseStreamControlFrame(frame);
+ expect(result).toEqual({
+ done: true,
+ nextIndex: 42,
+ totalLength: STREAM_CONTROL_FRAME_SIZE,
+ });
+ });
+
+ it('parses a valid done=false (timeout) control frame', () => {
+ const frame = buildControlFrame(false, 100);
+ const result = parseStreamControlFrame(frame);
+ expect(result).toEqual({
+ done: false,
+ nextIndex: 100,
+ totalLength: STREAM_CONTROL_FRAME_SIZE,
+ });
+ });
+
+ it('parses control frame appended after data bytes', () => {
+ const data = new Uint8Array([1, 2, 3, 4, 5]);
+ const frame = buildControlFrame(false, 7);
+ const combined = new Uint8Array(data.length + frame.length);
+ combined.set(data, 0);
+ combined.set(frame, data.length);
+
+ const result = parseStreamControlFrame(combined);
+ expect(result).toEqual({
+ done: false,
+ nextIndex: 7,
+ totalLength: STREAM_CONTROL_FRAME_SIZE,
+ });
+ });
+
+ it('returns null for buffer shorter than control frame size', () => {
+ expect(parseStreamControlFrame(new Uint8Array(12))).toBeNull();
+ expect(parseStreamControlFrame(new Uint8Array(0))).toBeNull();
+ });
+
+ it('returns null when magic footer does not match', () => {
+ const frame = buildControlFrame(true, 0);
+ frame[12] = 0xff; // corrupt magic footer
+ expect(parseStreamControlFrame(frame)).toBeNull();
+ });
+
+ it('returns null when zero-frame marker is not all zeros', () => {
+ const frame = buildControlFrame(true, 0);
+ frame[0] = 1; // corrupt zero-frame marker
+ expect(parseStreamControlFrame(frame)).toBeNull();
+ });
+
+ it('handles nextIndex=0', () => {
+ const frame = buildControlFrame(false, 0);
+ const result = parseStreamControlFrame(frame);
+ expect(result).toEqual({
+ done: false,
+ nextIndex: 0,
+ totalLength: STREAM_CONTROL_FRAME_SIZE,
+ });
+ });
+
+ it('handles large nextIndex values', () => {
+ const frame = buildControlFrame(true, 0xffffffff);
+ const result = parseStreamControlFrame(frame);
+ expect(result?.nextIndex).toBe(0xffffffff);
+ });
+});
+
+describe('streams.get reconnection', () => {
+ /**
+ * Helper to create a ReadableStream from chunks, optionally appending
+ * a control frame to the last chunk or as a separate chunk.
+ */
+ function makeServerStream(
+ dataChunks: Uint8Array[],
+ controlFrame?: Uint8Array
+ ): ReadableStream {
+ const chunks = [...dataChunks];
+ if (controlFrame) {
+ chunks.push(controlFrame);
+ }
+ let i = 0;
+ return new ReadableStream({
+ pull(controller) {
+ if (i < chunks.length) {
+ controller.enqueue(chunks[i++]);
+ } else {
+ controller.close();
+ }
+ },
+ });
+ }
+
+ /**
+ * Collect all bytes from a ReadableStream into a single Uint8Array.
+ */
+ async function collectStream(
+ stream: ReadableStream
+ ): Promise {
+ const reader = stream.getReader();
+ const parts: Uint8Array[] = [];
+ for (;;) {
+ const { done, value } = await reader.read();
+ if (done) break;
+ parts.push(value);
+ }
+ const totalLength = parts.reduce((sum, p) => sum + p.length, 0);
+ const result = new Uint8Array(totalLength);
+ let offset = 0;
+ for (const part of parts) {
+ result.set(part, offset);
+ offset += part.length;
+ }
+ return result;
+ }
+
+ it('strips control frame and returns only data when done=true', async () => {
+ const data = new TextEncoder().encode('hello world');
+ const control = buildControlFrame(true, 99);
+
+ const allBytes = await collectStream(makeServerStream([data], control));
+
+ // The raw stream contains data + control
+ expect(allBytes.length).toBe(data.length + control.length);
+
+ // parseStreamControlFrame should find the control frame at the tail
+ const parsed = parseStreamControlFrame(allBytes);
+ expect(parsed).toEqual({
+ done: true,
+ nextIndex: 99,
+ totalLength: STREAM_CONTROL_FRAME_SIZE,
+ });
+
+ // Data portion should match
+ const dataPortion = allBytes.subarray(
+ 0,
+ allBytes.length - parsed!.totalLength
+ );
+ expect(dataPortion).toEqual(data);
+ });
+
+ it('control frame embedded in same chunk as data is correctly parsed', async () => {
+ const data = new Uint8Array([10, 20, 30]);
+ const control = buildControlFrame(false, 5);
+
+ // Combine into a single chunk (simulates TCP coalescing)
+ const combined = new Uint8Array(data.length + control.length);
+ combined.set(data, 0);
+ combined.set(control, data.length);
+
+ const parsed = parseStreamControlFrame(combined);
+ expect(parsed).not.toBeNull();
+ expect(parsed!.done).toBe(false);
+ expect(parsed!.nextIndex).toBe(5);
+
+ const dataPortion = combined.subarray(
+ 0,
+ combined.length - parsed!.totalLength
+ );
+ expect(dataPortion).toEqual(data);
+ });
+
+ it('no false positive on data that happens to end with zero bytes', async () => {
+ // Create data ending with zeros but no valid magic footer
+ const data = new Uint8Array(20);
+ data.fill(0);
+ data[19] = 0x42; // not "WFCT"
+
+ const parsed = parseStreamControlFrame(data);
+ expect(parsed).toBeNull();
+ });
+});
+
// vi.mock is hoisted by vitest, so it cannot be truly scoped to a
// describe block. Keeping it here (next to the tests that need it)
// makes the intent clear. The encodeMultiChunks tests above are pure
@@ -202,7 +395,7 @@ describe('streams.get', () => {
expect(fetchSpy).toHaveBeenCalledTimes(1);
const url = new URL(fetchSpy.mock.calls[0][0] as string);
- expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream');
+ expect(url.pathname).toBe('/v3/runs/run-123/stream/my-stream');
});
it('passes startIndex as a query parameter', async () => {
@@ -216,7 +409,7 @@ describe('streams.get', () => {
await streamer.streams.get('run-123', 'my-stream', 5);
const url = new URL(fetchSpy.mock.calls[0][0] as string);
- expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream');
+ expect(url.pathname).toBe('/v3/runs/run-123/stream/my-stream');
expect(url.searchParams.get('startIndex')).toBe('5');
});
});
diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts
index 2021762a31..259f441d4f 100644
--- a/packages/world-vercel/src/streamer.ts
+++ b/packages/world-vercel/src/streamer.ts
@@ -23,9 +23,81 @@ export const MAX_CHUNKS_PER_REQUEST = 1000;
// (partial writes, long-lived reads), and duplex streams are incompatible
// with undici's experimental H2 support.
-function getStreamUrl(name: string, runId: string, httpConfig: HttpConfig) {
+/**
+ * Stream control frame constants, mirroring workflow-server's format.
+ *
+ * Control frame (13 bytes):
+ * [0-3] Zero-frame marker (0x00 0x00 0x00 0x00)
+ * [4] Flags — bit 0: done (1 = complete, 0 = timeout/reconnect)
+ * [5-8] nextIndex — big-endian uint32, chunk index to resume from
+ * [9-12] Magic footer — "WFCT" (0x57 0x46 0x43 0x54)
+ */
+export const STREAM_CONTROL_FRAME_SIZE = 13;
+const STREAM_CONTROL_MAGIC = new Uint8Array([0x57, 0x46, 0x43, 0x54]);
+
+export interface StreamControlFrame {
+ done: boolean;
+ nextIndex: number;
+}
+
+/**
+ * Try to parse a stream control frame from the tail of a buffer.
+ * Returns the parsed frame and the byte length of the control data,
+ * or null if no valid control frame is present.
+ */
+export function parseStreamControlFrame(
+ buffer: Uint8Array
+): (StreamControlFrame & { totalLength: number }) | null {
+ if (buffer.length < STREAM_CONTROL_FRAME_SIZE) return null;
+
+ const offset = buffer.length - STREAM_CONTROL_FRAME_SIZE;
+
+ // Check zero-frame marker (bytes 0-3 must be 0x00)
+ if (
+ buffer[offset] !== 0 ||
+ buffer[offset + 1] !== 0 ||
+ buffer[offset + 2] !== 0 ||
+ buffer[offset + 3] !== 0
+ ) {
+ return null;
+ }
+
+ // Check magic footer at bytes 9-12
+ if (
+ buffer[offset + 9] !== STREAM_CONTROL_MAGIC[0] ||
+ buffer[offset + 10] !== STREAM_CONTROL_MAGIC[1] ||
+ buffer[offset + 11] !== STREAM_CONTROL_MAGIC[2] ||
+ buffer[offset + 12] !== STREAM_CONTROL_MAGIC[3]
+ ) {
+ return null;
+ }
+
+ const flags = buffer[offset + 4];
+ const view = new DataView(buffer.buffer, buffer.byteOffset + offset + 5, 4);
+ const nextIndex = view.getUint32(0, false);
+
+ return {
+ done: (flags & 1) === 1,
+ nextIndex,
+ totalLength: STREAM_CONTROL_FRAME_SIZE,
+ };
+}
+
+function concatUint8Arrays(a: Uint8Array, b: Uint8Array): Uint8Array {
+ const result = new Uint8Array(a.length + b.length);
+ result.set(a, 0);
+ result.set(b, a.length);
+ return result;
+}
+
+function getStreamUrl(
+ name: string,
+ runId: string,
+ httpConfig: HttpConfig,
+ version = 'v2'
+) {
return new URL(
- `${httpConfig.baseUrl}/v2/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}`
+ `${httpConfig.baseUrl}/${version}/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}`
);
}
@@ -182,21 +254,121 @@ export function createStreamer(config?: APIConfig): Streamer {
},
async get(runId: string, name: string, startIndex?: number) {
- const httpConfig = await getHttpConfig(config);
- const url = getStreamUrl(name, runId, httpConfig);
- if (typeof startIndex === 'number') {
- url.searchParams.set('startIndex', String(startIndex));
- }
- const response = await fetch(url, {
- headers: httpConfig.headers,
+ let currentStartIndex = startIndex ?? 0;
+
+ // Cap reconnections to prevent infinite loops if the server
+ // never completes the stream. 50 reconnects at 2-min server
+ // timeout ≈ 100 minutes of streaming, which is generous.
+ const MAX_RECONNECTS = 50;
+ let reconnectCount = 0;
+
+ const connect = async (): Promise<
+ ReadableStreamDefaultReader
+ > => {
+ const httpConfig = await getHttpConfig(config);
+ const url = getStreamUrl(name, runId, httpConfig, 'v3');
+ url.searchParams.set('startIndex', String(currentStartIndex));
+ const response = await fetch(url, {
+ headers: httpConfig.headers,
+ });
+ if (!response.ok) {
+ throw new Error(`Failed to fetch stream: ${response.status}`);
+ }
+ if (!response.body) {
+ throw new Error('No response body for stream');
+ }
+ return (response.body as ReadableStream).getReader();
+ };
+
+ let reader = await connect();
+
+ // Hold back the last STREAM_CONTROL_FRAME_SIZE bytes at all times
+ // so we can detect the control frame when the stream closes.
+ let tailBuffer = new Uint8Array(0);
+
+ return new ReadableStream({
+ pull: async (controller) => {
+ for (;;) {
+ let result: { done: boolean; value?: Uint8Array };
+ try {
+ result = await reader.read();
+ } catch (err) {
+ // Network error — not a clean close. Forward any buffered
+ // data and propagate the error so consumers know the stream
+ // was truncated.
+ if (tailBuffer.length > 0) {
+ controller.enqueue(tailBuffer);
+ tailBuffer = new Uint8Array(0);
+ }
+ controller.error(err);
+ return;
+ }
+
+ if (!result.done) {
+ // Append new data to tail buffer, forward everything except
+ // the last STREAM_CONTROL_FRAME_SIZE bytes.
+ const combined = concatUint8Arrays(tailBuffer, result.value!);
+ const holdBack = Math.min(
+ STREAM_CONTROL_FRAME_SIZE,
+ combined.length
+ );
+ if (combined.length > holdBack) {
+ controller.enqueue(combined.subarray(0, -holdBack));
+ tailBuffer = combined.slice(-holdBack);
+ return;
+ }
+ // Everything fits in the holdback buffer — nothing to enqueue
+ // yet. Keep reading so we don't rely on the ReadableStream
+ // re-invoking pull when no chunk was enqueued.
+ tailBuffer = new Uint8Array(combined);
+ continue;
+ }
+
+ // Stream closed — check tail for control frame.
+ const control = parseStreamControlFrame(tailBuffer);
+
+ if (control) {
+ // Forward any data bytes that preceded the control frame.
+ const dataLen = tailBuffer.length - control.totalLength;
+ if (dataLen > 0) {
+ controller.enqueue(tailBuffer.subarray(0, dataLen));
+ }
+ tailBuffer = new Uint8Array(0);
+
+ if (control.done) {
+ controller.close();
+ return;
+ }
+
+ // Timeout — reconnect from the next chunk index.
+ reconnectCount++;
+ if (reconnectCount > MAX_RECONNECTS) {
+ controller.error(
+ new Error(
+ `Stream exceeded maximum reconnection attempts (${MAX_RECONNECTS})`
+ )
+ );
+ return;
+ }
+ currentStartIndex = control.nextIndex;
+ reader = await connect();
+ continue;
+ }
+
+ // No control frame (older server or connection error).
+ // Forward remaining bytes and close.
+ if (tailBuffer.length > 0) {
+ controller.enqueue(tailBuffer);
+ tailBuffer = new Uint8Array(0);
+ }
+ controller.close();
+ return;
+ }
+ },
+ cancel: async () => {
+ await reader.cancel();
+ },
});
- if (!response.ok) {
- throw new Error(`Failed to fetch stream: ${response.status}`);
- }
- if (!response.body) {
- throw new Error('No response body for stream');
- }
- return response.body as ReadableStream;
},
async getChunks(