From bcf818c2c7fc3f6650b2a9ad925bcbc0530e6ebb Mon Sep 17 00:00:00 2001 From: Karthik Kalyan <105607645+karthikscale3@users.noreply.github.com> Date: Thu, 16 Apr 2026 05:53:29 -0700 Subject: [PATCH 1/5] disable minification for web (#1768) --- .changeset/bright-discovery-talk.md | 5 +++++ packages/web/vite.config.ts | 7 +++++++ 2 files changed, 12 insertions(+) create mode 100644 .changeset/bright-discovery-talk.md diff --git a/.changeset/bright-discovery-talk.md b/.changeset/bright-discovery-talk.md new file mode 100644 index 0000000000..34d567ec74 --- /dev/null +++ b/.changeset/bright-discovery-talk.md @@ -0,0 +1,5 @@ +--- +"@workflow/web": patch +--- + +Disable Vite minification so the published build contains readable code, reducing false-positive obfuscation flags from supply chain security scanners (Socket). diff --git a/packages/web/vite.config.ts b/packages/web/vite.config.ts index b44148336f..781f13dc78 100644 --- a/packages/web/vite.config.ts +++ b/packages/web/vite.config.ts @@ -10,6 +10,13 @@ export default defineConfig(({ command, isSsrBuild }) => ({ isSsrBuild && !process.env.VERCEL ? { input: './server/app.ts' } : undefined, + // Disable minification so the published npm package contains readable + // code. Without this, Vite's esbuild minifier produces single-line + // mega-bundles that supply-chain security scanners (e.g. Socket) flag + // as "obfuscated code". The app is a self-hosted observability tool + // where the unminified size difference is negligible — gzip/brotli at + // the serving layer compresses the wire payload regardless. + minify: false, }, // Bundle all dependencies into the server build so that @workflow/web // can be installed and run without needing any of the UI dependencies From f41fb74d38fd5405470449edd9f18f6a86abaec7 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 16 Apr 2026 08:51:48 -0700 Subject: [PATCH 2/5] docs: remove experimental labeling from workflow ai api refs (#1774) --- .../content/docs/api-reference/workflow-ai/durable-agent.mdx | 4 ---- docs/content/docs/api-reference/workflow-ai/index.mdx | 5 ----- .../api-reference/workflow-ai/workflow-chat-transport.mdx | 4 ---- 3 files changed, 13 deletions(-) diff --git a/docs/content/docs/api-reference/workflow-ai/durable-agent.mdx b/docs/content/docs/api-reference/workflow-ai/durable-agent.mdx index fe5954d5d0..edd95a8985 100644 --- a/docs/content/docs/api-reference/workflow-ai/durable-agent.mdx +++ b/docs/content/docs/api-reference/workflow-ai/durable-agent.mdx @@ -9,10 +9,6 @@ related: - /docs/ai/defining-tools --- - -The `@workflow/ai` package is currently in active development and should be considered experimental. - - The `DurableAgent` class enables you to create AI-powered agents that can maintain state across workflow steps, call tools, and gracefully handle interruptions and resumptions. Tool calls can be implemented as workflow steps for automatic retries, or as regular workflow-level logic utilizing core library features such as [`sleep()`](/docs/api-reference/workflow/sleep) and [Hooks](/docs/foundations/hooks). diff --git a/docs/content/docs/api-reference/workflow-ai/index.mdx b/docs/content/docs/api-reference/workflow-ai/index.mdx index 606fa508db..b532a1455f 100644 --- a/docs/content/docs/api-reference/workflow-ai/index.mdx +++ b/docs/content/docs/api-reference/workflow-ai/index.mdx @@ -1,6 +1,5 @@ --- title: "@workflow/ai" -icon: FlaskConical description: Helpers for building AI-powered workflows with the AI SDK. type: overview summary: Explore helpers for integrating AI SDK to build durable AI-powered workflows. @@ -8,10 +7,6 @@ related: - /docs/ai --- - -The `@workflow/ai` package is currently in active development and should be considered experimental. - - Helpers for integrating AI SDK for building AI-powered workflows. ## Classes diff --git a/docs/content/docs/api-reference/workflow-ai/workflow-chat-transport.mdx b/docs/content/docs/api-reference/workflow-ai/workflow-chat-transport.mdx index bde3bd1d58..f89583e5ec 100644 --- a/docs/content/docs/api-reference/workflow-ai/workflow-chat-transport.mdx +++ b/docs/content/docs/api-reference/workflow-ai/workflow-chat-transport.mdx @@ -9,10 +9,6 @@ related: - /docs/ai/resumable-streams --- - -The `@workflow/ai` package is currently in active development and should be considered experimental. - - A chat transport implementation for the AI SDK that provides reliable message streaming with automatic reconnection to interrupted streams. This transport is a drop-in replacement for the default AI SDK transport, enabling seamless recovery from network issues, page refreshes, or Vercel Function timeouts. From 40fe57fa75b43131ecabe36e91c11bdd05559027 Mon Sep 17 00:00:00 2001 From: Karthik Kalyan <105607645+karthikscale3@users.noreply.github.com> Date: Thu, 16 Apr 2026 08:52:55 -0700 Subject: [PATCH 3/5] fix sitemap (#1773) --- docs/next.config.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docs/next.config.ts b/docs/next.config.ts index 8333357460..07e1a9fb2d 100644 --- a/docs/next.config.ts +++ b/docs/next.config.ts @@ -23,10 +23,6 @@ const config: NextConfig = { return { beforeFiles: [ - { - source: '/sitemap.xml', - destination: 'https://crawled-sitemap.vercel.sh/useworkflow.dev-.xml', - }, { source: '/docs/:path*', destination: '/llms.mdx/:path*', From 340c0856813b23e9be966a2022933d6040a3b062 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 16 Apr 2026 09:52:44 -0700 Subject: [PATCH 4/5] [world-vercel] Use stream close control frame to decide whether to reconnect to stream (#1742) --- .changeset/clean-lights-boil.md | 5 + .../src/streamer-reconnect.test.ts | 307 ++++++++++++++++++ packages/world-vercel/src/streamer.test.ts | 199 +++++++++++- packages/world-vercel/src/streamer.ts | 204 +++++++++++- 4 files changed, 696 insertions(+), 19 deletions(-) create mode 100644 .changeset/clean-lights-boil.md create mode 100644 packages/world-vercel/src/streamer-reconnect.test.ts diff --git a/.changeset/clean-lights-boil.md b/.changeset/clean-lights-boil.md new file mode 100644 index 0000000000..29999e18c0 --- /dev/null +++ b/.changeset/clean-lights-boil.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": patch +--- + +Use custom stream close control frame to decide whether to reconnect to stream diff --git a/packages/world-vercel/src/streamer-reconnect.test.ts b/packages/world-vercel/src/streamer-reconnect.test.ts new file mode 100644 index 0000000000..cf6b6236a4 --- /dev/null +++ b/packages/world-vercel/src/streamer-reconnect.test.ts @@ -0,0 +1,307 @@ +import { + afterEach, + beforeEach, + describe, + expect, + it, + vi, + type Mock, +} from 'vitest'; +import { + parseStreamControlFrame, + STREAM_CONTROL_FRAME_SIZE, +} from './streamer.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Build a control frame matching the workflow-server wire format. */ +function buildControlFrame(done: boolean, nextIndex: number): Uint8Array { + const frame = new Uint8Array(STREAM_CONTROL_FRAME_SIZE); + frame[4] = done ? 1 : 0; + new DataView(frame.buffer).setUint32(5, nextIndex, false); + frame.set(new Uint8Array([0x57, 0x46, 0x43, 0x54]), 9); // "WFCT" + return frame; +} + +/** Create a ReadableStream that emits the given byte chunks in order. */ +function chunkedStream(chunks: Uint8Array[]): ReadableStream { + let i = 0; + return new ReadableStream({ + pull(controller) { + if (i < chunks.length) { + controller.enqueue(chunks[i++]); + } else { + controller.close(); + } + }, + }); +} + +/** Collect every byte from a ReadableStream into one Uint8Array. */ +async function drain(stream: ReadableStream): Promise { + const reader = stream.getReader(); + const parts: Uint8Array[] = []; + for (;;) { + const { done, value } = await reader.read(); + if (done) break; + parts.push(value); + } + const len = parts.reduce((s, p) => s + p.length, 0); + const out = new Uint8Array(len); + let off = 0; + for (const p of parts) { + out.set(p, off); + off += p.length; + } + return out; +} + +// --------------------------------------------------------------------------- +// Mock setup — isolate streamer from real auth / HTTP +// --------------------------------------------------------------------------- + +vi.mock('@vercel/oidc', () => ({ + getVercelOidcToken: () => Promise.resolve('test-token'), +})); + +describe('streams.get reconnection (integration)', () => { + let fetchMock: Mock; + const originalFetch = globalThis.fetch; + + beforeEach(() => { + fetchMock = vi.fn(); + globalThis.fetch = fetchMock as unknown as typeof fetch; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + vi.restoreAllMocks(); + }); + + /** Import createStreamer lazily so the mock is in place. */ + async function getStreamer() { + const { createStreamer } = await import('./streamer.js'); + return createStreamer({ token: 'test-token' }); + } + + /** Build a mock Response whose body is a ReadableStream of byte chunks. */ + function streamResponse(...chunks: Uint8Array[]): Response { + return new Response(chunkedStream(chunks), { + status: 200, + headers: { 'Content-Type': 'application/octet-stream' }, + }); + } + + it('returns data unchanged when server sends done=true control frame', async () => { + const data = new TextEncoder().encode('hello'); + const control = buildControlFrame(true, 5); + + fetchMock.mockResolvedValueOnce(streamResponse(data, control)); + + const streamer = await getStreamer(); + const result = await drain( + await streamer.streams.get('run_test', 'strm_test') + ); + + expect(result).toEqual(data); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('reconnects when server sends done=false and resumes from nextIndex', async () => { + const chunk1 = new TextEncoder().encode('aaa'); + const chunk2 = new TextEncoder().encode('bbb'); + const timeout = buildControlFrame(false, 3); // timeout at chunk 3 + const done = buildControlFrame(true, 6); // done on second connection + + // First connection: returns chunk1 + timeout control frame + fetchMock.mockResolvedValueOnce(streamResponse(chunk1, timeout)); + // Second connection: returns chunk2 + done control frame + fetchMock.mockResolvedValueOnce(streamResponse(chunk2, done)); + + const streamer = await getStreamer(); + const result = await drain( + await streamer.streams.get('run_test', 'strm_test') + ); + + // Should have received both chunks' data + const expected = new Uint8Array([...chunk1, ...chunk2]); + expect(result).toEqual(expected); + + // Should have made two fetch calls + expect(fetchMock).toHaveBeenCalledTimes(2); + + // Second call should have startIndex=3 (from control frame) + const secondUrl = fetchMock.mock.calls[1][0] as URL; + expect(secondUrl.toString()).toContain('startIndex=3'); + }); + + it('handles multiple consecutive reconnections', async () => { + const chunks = [ + new TextEncoder().encode('a'), + new TextEncoder().encode('b'), + new TextEncoder().encode('c'), + ]; + + fetchMock + .mockResolvedValueOnce( + streamResponse(chunks[0], buildControlFrame(false, 10)) + ) + .mockResolvedValueOnce( + streamResponse(chunks[1], buildControlFrame(false, 20)) + ) + .mockResolvedValueOnce( + streamResponse(chunks[2], buildControlFrame(true, 30)) + ); + + const streamer = await getStreamer(); + const result = await drain( + await streamer.streams.get('run_test', 'strm_test') + ); + + const expected = new Uint8Array([...chunks[0], ...chunks[1], ...chunks[2]]); + expect(result).toEqual(expected); + expect(fetchMock).toHaveBeenCalledTimes(3); + + // Verify startIndex progression + const urls = fetchMock.mock.calls.map((c: unknown[]) => + (c[0] as URL).toString() + ); + expect(urls[0]).toContain('startIndex=0'); + expect(urls[1]).toContain('startIndex=10'); + expect(urls[2]).toContain('startIndex=20'); + }); + + it('passes startIndex from caller on initial connection', async () => { + const data = new Uint8Array([42]); + fetchMock.mockResolvedValueOnce( + streamResponse(data, buildControlFrame(true, 8)) + ); + + const streamer = await getStreamer(); + await drain(await streamer.streams.get('run_test', 'strm_test', 5)); + + const url = (fetchMock.mock.calls[0][0] as URL).toString(); + expect(url).toContain('startIndex=5'); + }); + + it('handles control frame split across two read chunks', async () => { + const data = new TextEncoder().encode('hello'); + const control = buildControlFrame(true, 1); + + // Split the control frame in the middle (5 bytes + 8 bytes) + const controlPart1 = control.slice(0, 5); + const controlPart2 = control.slice(5); + + fetchMock.mockResolvedValueOnce( + streamResponse(data, controlPart1, controlPart2) + ); + + const streamer = await getStreamer(); + const result = await drain( + await streamer.streams.get('run_test', 'strm_test') + ); + + expect(result).toEqual(data); + }); + + it('handles data + control frame coalesced into one chunk', async () => { + const data = new TextEncoder().encode('xyz'); + const control = buildControlFrame(true, 3); + const combined = new Uint8Array(data.length + control.length); + combined.set(data, 0); + combined.set(control, data.length); + + fetchMock.mockResolvedValueOnce(streamResponse(combined)); + + const streamer = await getStreamer(); + const result = await drain( + await streamer.streams.get('run_test', 'strm_test') + ); + + expect(result).toEqual(data); + }); + + it('works with no data — only control frame (empty stream, done)', async () => { + const control = buildControlFrame(true, 0); + fetchMock.mockResolvedValueOnce(streamResponse(control)); + + const streamer = await getStreamer(); + const result = await drain( + await streamer.streams.get('run_test', 'strm_test') + ); + + expect(result.length).toBe(0); + }); + + it('works with no data — only control frame (immediate timeout, reconnects)', async () => { + const timeout = buildControlFrame(false, 0); + const data = new TextEncoder().encode('after-reconnect'); + const done = buildControlFrame(true, 5); + + fetchMock + .mockResolvedValueOnce(streamResponse(timeout)) + .mockResolvedValueOnce(streamResponse(data, done)); + + const streamer = await getStreamer(); + const result = await drain( + await streamer.streams.get('run_test', 'strm_test') + ); + + expect(result).toEqual(data); + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('falls through when no control frame is present (backward compat)', async () => { + const data = new TextEncoder().encode('legacy server'); + + // Old server: no control frame, just data + fetchMock.mockResolvedValueOnce(streamResponse(data)); + + const streamer = await getStreamer(); + const result = await drain( + await streamer.streams.get('run_test', 'strm_test') + ); + + expect(result).toEqual(data); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('propagates network error to consumer without retrying', async () => { + const data = new TextEncoder().encode('partial'); + + // Create a stream that errors mid-read + let callCount = 0; + const errorStream = new ReadableStream({ + pull(controller) { + if (callCount === 0) { + callCount++; + controller.enqueue(data); + } else { + controller.error(new Error('connection reset')); + } + }, + }); + + fetchMock.mockResolvedValueOnce(new Response(errorStream, { status: 200 })); + + const streamer = await getStreamer(); + const stream = await streamer.streams.get('run_test', 'strm_test'); + + // The error should propagate to the consumer rather than silently closing + await expect(drain(stream)).rejects.toThrow('connection reset'); + + expect(fetchMock).toHaveBeenCalledTimes(1); + // Should NOT have attempted a second fetch (no reconnection on error) + }); + + it('throws on non-200 response', async () => { + fetchMock.mockResolvedValueOnce(new Response('not found', { status: 404 })); + + const streamer = await getStreamer(); + await expect( + streamer.streams.get('run_test', 'strm_missing') + ).rejects.toThrow('Failed to fetch stream: 404'); + }); +}); diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts index 67f950a143..4666dbfbd2 100644 --- a/packages/world-vercel/src/streamer.test.ts +++ b/packages/world-vercel/src/streamer.test.ts @@ -1,5 +1,10 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; -import { encodeMultiChunks, MAX_CHUNKS_PER_REQUEST } from './streamer.js'; +import { + encodeMultiChunks, + MAX_CHUNKS_PER_REQUEST, + parseStreamControlFrame, + STREAM_CONTROL_FRAME_SIZE, +} from './streamer.js'; describe('encodeMultiChunks', () => { /** @@ -169,6 +174,194 @@ describe('encodeMultiChunks', () => { }); }); +/** + * Build a control frame matching the workflow-server format. + */ +function buildControlFrame(done: boolean, nextIndex: number): Uint8Array { + const frame = new Uint8Array(STREAM_CONTROL_FRAME_SIZE); + // Bytes 0-3: zero-frame marker (already 0x00) + frame[4] = done ? 1 : 0; + new DataView(frame.buffer).setUint32(5, nextIndex, false); + // Magic footer "WFCT" + frame.set(new Uint8Array([0x57, 0x46, 0x43, 0x54]), 9); + return frame; +} + +describe('parseStreamControlFrame', () => { + it('parses a valid done=true control frame', () => { + const frame = buildControlFrame(true, 42); + const result = parseStreamControlFrame(frame); + expect(result).toEqual({ + done: true, + nextIndex: 42, + totalLength: STREAM_CONTROL_FRAME_SIZE, + }); + }); + + it('parses a valid done=false (timeout) control frame', () => { + const frame = buildControlFrame(false, 100); + const result = parseStreamControlFrame(frame); + expect(result).toEqual({ + done: false, + nextIndex: 100, + totalLength: STREAM_CONTROL_FRAME_SIZE, + }); + }); + + it('parses control frame appended after data bytes', () => { + const data = new Uint8Array([1, 2, 3, 4, 5]); + const frame = buildControlFrame(false, 7); + const combined = new Uint8Array(data.length + frame.length); + combined.set(data, 0); + combined.set(frame, data.length); + + const result = parseStreamControlFrame(combined); + expect(result).toEqual({ + done: false, + nextIndex: 7, + totalLength: STREAM_CONTROL_FRAME_SIZE, + }); + }); + + it('returns null for buffer shorter than control frame size', () => { + expect(parseStreamControlFrame(new Uint8Array(12))).toBeNull(); + expect(parseStreamControlFrame(new Uint8Array(0))).toBeNull(); + }); + + it('returns null when magic footer does not match', () => { + const frame = buildControlFrame(true, 0); + frame[12] = 0xff; // corrupt magic footer + expect(parseStreamControlFrame(frame)).toBeNull(); + }); + + it('returns null when zero-frame marker is not all zeros', () => { + const frame = buildControlFrame(true, 0); + frame[0] = 1; // corrupt zero-frame marker + expect(parseStreamControlFrame(frame)).toBeNull(); + }); + + it('handles nextIndex=0', () => { + const frame = buildControlFrame(false, 0); + const result = parseStreamControlFrame(frame); + expect(result).toEqual({ + done: false, + nextIndex: 0, + totalLength: STREAM_CONTROL_FRAME_SIZE, + }); + }); + + it('handles large nextIndex values', () => { + const frame = buildControlFrame(true, 0xffffffff); + const result = parseStreamControlFrame(frame); + expect(result?.nextIndex).toBe(0xffffffff); + }); +}); + +describe('streams.get reconnection', () => { + /** + * Helper to create a ReadableStream from chunks, optionally appending + * a control frame to the last chunk or as a separate chunk. + */ + function makeServerStream( + dataChunks: Uint8Array[], + controlFrame?: Uint8Array + ): ReadableStream { + const chunks = [...dataChunks]; + if (controlFrame) { + chunks.push(controlFrame); + } + let i = 0; + return new ReadableStream({ + pull(controller) { + if (i < chunks.length) { + controller.enqueue(chunks[i++]); + } else { + controller.close(); + } + }, + }); + } + + /** + * Collect all bytes from a ReadableStream into a single Uint8Array. + */ + async function collectStream( + stream: ReadableStream + ): Promise { + const reader = stream.getReader(); + const parts: Uint8Array[] = []; + for (;;) { + const { done, value } = await reader.read(); + if (done) break; + parts.push(value); + } + const totalLength = parts.reduce((sum, p) => sum + p.length, 0); + const result = new Uint8Array(totalLength); + let offset = 0; + for (const part of parts) { + result.set(part, offset); + offset += part.length; + } + return result; + } + + it('strips control frame and returns only data when done=true', async () => { + const data = new TextEncoder().encode('hello world'); + const control = buildControlFrame(true, 99); + + const allBytes = await collectStream(makeServerStream([data], control)); + + // The raw stream contains data + control + expect(allBytes.length).toBe(data.length + control.length); + + // parseStreamControlFrame should find the control frame at the tail + const parsed = parseStreamControlFrame(allBytes); + expect(parsed).toEqual({ + done: true, + nextIndex: 99, + totalLength: STREAM_CONTROL_FRAME_SIZE, + }); + + // Data portion should match + const dataPortion = allBytes.subarray( + 0, + allBytes.length - parsed!.totalLength + ); + expect(dataPortion).toEqual(data); + }); + + it('control frame embedded in same chunk as data is correctly parsed', async () => { + const data = new Uint8Array([10, 20, 30]); + const control = buildControlFrame(false, 5); + + // Combine into a single chunk (simulates TCP coalescing) + const combined = new Uint8Array(data.length + control.length); + combined.set(data, 0); + combined.set(control, data.length); + + const parsed = parseStreamControlFrame(combined); + expect(parsed).not.toBeNull(); + expect(parsed!.done).toBe(false); + expect(parsed!.nextIndex).toBe(5); + + const dataPortion = combined.subarray( + 0, + combined.length - parsed!.totalLength + ); + expect(dataPortion).toEqual(data); + }); + + it('no false positive on data that happens to end with zero bytes', async () => { + // Create data ending with zeros but no valid magic footer + const data = new Uint8Array(20); + data.fill(0); + data[19] = 0x42; // not "WFCT" + + const parsed = parseStreamControlFrame(data); + expect(parsed).toBeNull(); + }); +}); + // vi.mock is hoisted by vitest, so it cannot be truly scoped to a // describe block. Keeping it here (next to the tests that need it) // makes the intent clear. The encodeMultiChunks tests above are pure @@ -202,7 +395,7 @@ describe('streams.get', () => { expect(fetchSpy).toHaveBeenCalledTimes(1); const url = new URL(fetchSpy.mock.calls[0][0] as string); - expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream'); + expect(url.pathname).toBe('/v3/runs/run-123/stream/my-stream'); }); it('passes startIndex as a query parameter', async () => { @@ -216,7 +409,7 @@ describe('streams.get', () => { await streamer.streams.get('run-123', 'my-stream', 5); const url = new URL(fetchSpy.mock.calls[0][0] as string); - expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream'); + expect(url.pathname).toBe('/v3/runs/run-123/stream/my-stream'); expect(url.searchParams.get('startIndex')).toBe('5'); }); }); diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 2021762a31..259f441d4f 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -23,9 +23,81 @@ export const MAX_CHUNKS_PER_REQUEST = 1000; // (partial writes, long-lived reads), and duplex streams are incompatible // with undici's experimental H2 support. -function getStreamUrl(name: string, runId: string, httpConfig: HttpConfig) { +/** + * Stream control frame constants, mirroring workflow-server's format. + * + * Control frame (13 bytes): + * [0-3] Zero-frame marker (0x00 0x00 0x00 0x00) + * [4] Flags — bit 0: done (1 = complete, 0 = timeout/reconnect) + * [5-8] nextIndex — big-endian uint32, chunk index to resume from + * [9-12] Magic footer — "WFCT" (0x57 0x46 0x43 0x54) + */ +export const STREAM_CONTROL_FRAME_SIZE = 13; +const STREAM_CONTROL_MAGIC = new Uint8Array([0x57, 0x46, 0x43, 0x54]); + +export interface StreamControlFrame { + done: boolean; + nextIndex: number; +} + +/** + * Try to parse a stream control frame from the tail of a buffer. + * Returns the parsed frame and the byte length of the control data, + * or null if no valid control frame is present. + */ +export function parseStreamControlFrame( + buffer: Uint8Array +): (StreamControlFrame & { totalLength: number }) | null { + if (buffer.length < STREAM_CONTROL_FRAME_SIZE) return null; + + const offset = buffer.length - STREAM_CONTROL_FRAME_SIZE; + + // Check zero-frame marker (bytes 0-3 must be 0x00) + if ( + buffer[offset] !== 0 || + buffer[offset + 1] !== 0 || + buffer[offset + 2] !== 0 || + buffer[offset + 3] !== 0 + ) { + return null; + } + + // Check magic footer at bytes 9-12 + if ( + buffer[offset + 9] !== STREAM_CONTROL_MAGIC[0] || + buffer[offset + 10] !== STREAM_CONTROL_MAGIC[1] || + buffer[offset + 11] !== STREAM_CONTROL_MAGIC[2] || + buffer[offset + 12] !== STREAM_CONTROL_MAGIC[3] + ) { + return null; + } + + const flags = buffer[offset + 4]; + const view = new DataView(buffer.buffer, buffer.byteOffset + offset + 5, 4); + const nextIndex = view.getUint32(0, false); + + return { + done: (flags & 1) === 1, + nextIndex, + totalLength: STREAM_CONTROL_FRAME_SIZE, + }; +} + +function concatUint8Arrays(a: Uint8Array, b: Uint8Array): Uint8Array { + const result = new Uint8Array(a.length + b.length); + result.set(a, 0); + result.set(b, a.length); + return result; +} + +function getStreamUrl( + name: string, + runId: string, + httpConfig: HttpConfig, + version = 'v2' +) { return new URL( - `${httpConfig.baseUrl}/v2/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}` + `${httpConfig.baseUrl}/${version}/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}` ); } @@ -182,21 +254,121 @@ export function createStreamer(config?: APIConfig): Streamer { }, async get(runId: string, name: string, startIndex?: number) { - const httpConfig = await getHttpConfig(config); - const url = getStreamUrl(name, runId, httpConfig); - if (typeof startIndex === 'number') { - url.searchParams.set('startIndex', String(startIndex)); - } - const response = await fetch(url, { - headers: httpConfig.headers, + let currentStartIndex = startIndex ?? 0; + + // Cap reconnections to prevent infinite loops if the server + // never completes the stream. 50 reconnects at 2-min server + // timeout ≈ 100 minutes of streaming, which is generous. + const MAX_RECONNECTS = 50; + let reconnectCount = 0; + + const connect = async (): Promise< + ReadableStreamDefaultReader + > => { + const httpConfig = await getHttpConfig(config); + const url = getStreamUrl(name, runId, httpConfig, 'v3'); + url.searchParams.set('startIndex', String(currentStartIndex)); + const response = await fetch(url, { + headers: httpConfig.headers, + }); + if (!response.ok) { + throw new Error(`Failed to fetch stream: ${response.status}`); + } + if (!response.body) { + throw new Error('No response body for stream'); + } + return (response.body as ReadableStream).getReader(); + }; + + let reader = await connect(); + + // Hold back the last STREAM_CONTROL_FRAME_SIZE bytes at all times + // so we can detect the control frame when the stream closes. + let tailBuffer = new Uint8Array(0); + + return new ReadableStream({ + pull: async (controller) => { + for (;;) { + let result: { done: boolean; value?: Uint8Array }; + try { + result = await reader.read(); + } catch (err) { + // Network error — not a clean close. Forward any buffered + // data and propagate the error so consumers know the stream + // was truncated. + if (tailBuffer.length > 0) { + controller.enqueue(tailBuffer); + tailBuffer = new Uint8Array(0); + } + controller.error(err); + return; + } + + if (!result.done) { + // Append new data to tail buffer, forward everything except + // the last STREAM_CONTROL_FRAME_SIZE bytes. + const combined = concatUint8Arrays(tailBuffer, result.value!); + const holdBack = Math.min( + STREAM_CONTROL_FRAME_SIZE, + combined.length + ); + if (combined.length > holdBack) { + controller.enqueue(combined.subarray(0, -holdBack)); + tailBuffer = combined.slice(-holdBack); + return; + } + // Everything fits in the holdback buffer — nothing to enqueue + // yet. Keep reading so we don't rely on the ReadableStream + // re-invoking pull when no chunk was enqueued. + tailBuffer = new Uint8Array(combined); + continue; + } + + // Stream closed — check tail for control frame. + const control = parseStreamControlFrame(tailBuffer); + + if (control) { + // Forward any data bytes that preceded the control frame. + const dataLen = tailBuffer.length - control.totalLength; + if (dataLen > 0) { + controller.enqueue(tailBuffer.subarray(0, dataLen)); + } + tailBuffer = new Uint8Array(0); + + if (control.done) { + controller.close(); + return; + } + + // Timeout — reconnect from the next chunk index. + reconnectCount++; + if (reconnectCount > MAX_RECONNECTS) { + controller.error( + new Error( + `Stream exceeded maximum reconnection attempts (${MAX_RECONNECTS})` + ) + ); + return; + } + currentStartIndex = control.nextIndex; + reader = await connect(); + continue; + } + + // No control frame (older server or connection error). + // Forward remaining bytes and close. + if (tailBuffer.length > 0) { + controller.enqueue(tailBuffer); + tailBuffer = new Uint8Array(0); + } + controller.close(); + return; + } + }, + cancel: async () => { + await reader.cancel(); + }, }); - if (!response.ok) { - throw new Error(`Failed to fetch stream: ${response.status}`); - } - if (!response.body) { - throw new Error('No response body for stream'); - } - return response.body as ReadableStream; }, async getChunks( From f102ed925d9b6199b37126f2494c93cf61bdfdb1 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 16 Apr 2026 09:58:10 -0700 Subject: [PATCH 5/5] Add Python SDK beta documentation page (#1775) * feat: add Python SVG icon and related documentation Slack-Thread: https://vercel.slack.com/archives/C0AG7R4PU22/p1776354109663229?thread_ts=1776354109.663229&cid=C0AG7R4PU22 Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> * feat: reintroduce Python SDK card in getting-started guide Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> * feat: add beta badge to Python SDK documentation sidebar item Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> * feat: enhance sidebar item with badge and layout update Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> * feat: refactor sidebar item badges to use array and function for mapping Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> * feat: add Python getting started guide card in beta Added a new card for the Python getting started guide with a beta badge. Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> * feat: simplify description of Workflow Python SDK Updated the description for clarity and conciseness. Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> * feat: enhance Python SDK documentation and examples Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> * fix: update Python SDK documentation link for language parameter Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> --------- Co-authored-by: v0 Co-authored-by: Pranay Prakash <1797812+pranaygp@users.noreply.github.com> --- .../[lang]/(home)/components/frameworks.tsx | 106 +++++++---- docs/components/geistdocs/sidebar.tsx | 42 +++-- docs/content/docs/getting-started/index.mdx | 9 +- docs/content/docs/getting-started/meta.json | 3 +- docs/content/docs/getting-started/python.mdx | 165 ++++++++++++++++++ 5 files changed, 278 insertions(+), 47 deletions(-) create mode 100644 docs/content/docs/getting-started/python.mdx diff --git a/docs/app/[lang]/(home)/components/frameworks.tsx b/docs/app/[lang]/(home)/components/frameworks.tsx index 0f0b3ffbc6..46bf385348 100644 --- a/docs/app/[lang]/(home)/components/frameworks.tsx +++ b/docs/app/[lang]/(home)/components/frameworks.tsx @@ -1,11 +1,11 @@ -"use client"; +'use client'; -import { track } from "@vercel/analytics"; -import Link from "next/link"; -import type { ComponentProps } from "react"; -import { toast } from "sonner"; +import { track } from '@vercel/analytics'; +import Link from 'next/link'; +import type { ComponentProps } from 'react'; +import { toast } from 'sonner'; -export const Express = (props: ComponentProps<"svg">) => ( +export const Express = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const Fastify = (props: ComponentProps<"svg">) => ( +export const Fastify = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const AstroDark = (props: ComponentProps<"svg">) => ( +export const AstroDark = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const AstroLight = (props: ComponentProps<"svg">) => ( +export const AstroLight = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const AstroGray = (props: ComponentProps<"svg">) => ( +export const AstroGray = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const TanStack = (props: ComponentProps<"svg">) => ( +export const TanStack = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const TanStackGray = (props: ComponentProps<"svg">) => ( +export const TanStackGray = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const Vite = (props: ComponentProps<"svg">) => ( +export const Vite = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const Nitro = (props: ComponentProps<"svg">) => ( +export const Nitro = (props: ComponentProps<'svg'>) => ( ) => ( /> ) => ( ); -export const SvelteKit = (props: ComponentProps<"svg">) => ( +export const SvelteKit = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const SvelteKitGray = (props: ComponentProps<"svg">) => ( +export const SvelteKitGray = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const Nuxt = (props: ComponentProps<"svg">) => ( +export const Nuxt = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const NuxtGray = (props: ComponentProps<"svg">) => ( +export const NuxtGray = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const Hono = (props: ComponentProps<"svg">) => ( +export const Hono = (props: ComponentProps<'svg'>) => ( Hono ) => ( ); -export const HonoGray = (props: ComponentProps<"svg">) => ( +export const HonoGray = (props: ComponentProps<'svg'>) => ( Hono ) => ( ); -export const Bun = (props: ComponentProps<"svg">) => ( +export const Bun = (props: ComponentProps<'svg'>) => ( ) => ( id="Top" d="M35.12,5.53A16.41,16.41,0,0,1,29.49,18c-.28.25-.06.73.3.59,3.37-1.31,7.92-5.23,6-13.14C35.71,5,35.12,5.12,35.12,5.53Zm2.27,0A16.24,16.24,0,0,1,39,19c-.12.35.31.65.55.36C41.74,16.56,43.65,11,37.93,5,37.64,4.74,37.19,5.14,37.39,5.49Zm2.76-.17A16.42,16.42,0,0,1,47,17.12a.33.33,0,0,0,.65.11c.92-3.49.4-9.44-7.17-12.53C40.08,4.54,39.82,5.08,40.15,5.32ZM21.69,15.76a16.94,16.94,0,0,0,10.47-9c.18-.36.75-.22.66.18-1.73,8-7.52,9.67-11.12,9.45C21.32,16.4,21.33,15.87,21.69,15.76Z" fill="#ccbea7" - style={{ fillRule: "evenodd" }} + style={{ fillRule: 'evenodd' }} /> ) => ( ); -export const BunGray = (props: ComponentProps<"svg">) => ( +export const BunGray = (props: ComponentProps<'svg'>) => ( ) => ( id="Top" d="M35.12,5.53A16.41,16.41,0,0,1,29.49,18c-.28.25-.06.73.3.59,3.37-1.31,7.92-5.23,6-13.14C35.71,5,35.12,5.12,35.12,5.53Zm2.27,0A16.24,16.24,0,0,1,39,19c-.12.35.31.65.55.36C41.74,16.56,43.65,11,37.93,5,37.64,4.74,37.19,5.14,37.39,5.49Zm2.76-.17A16.42,16.42,0,0,1,47,17.12a.33.33,0,0,0,.65.11c.92-3.49.4-9.44-7.17-12.53C40.08,4.54,39.82,5.08,40.15,5.32ZM21.69,15.76a16.94,16.94,0,0,0,10.47-9c.18-.36.75-.22.66.18-1.73,8-7.52,9.67-11.12,9.45C21.32,16.4,21.33,15.87,21.69,15.76Z" fill="var(--color-background)" - style={{ fillRule: "evenodd" }} + style={{ fillRule: 'evenodd' }} /> ) => ( ); -export const Nest = (props: ComponentProps<"svg">) => ( +export const Nest = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const NestGray = (props: ComponentProps<"svg">) => ( +export const NestGray = (props: ComponentProps<'svg'>) => ( ) => ( ); -export const Next = (props: ComponentProps<"svg">) => ( +export const Python = (props: ComponentProps<'svg'>) => ( + + Python + + + + + + + + + + + + + +); + +export const Next = (props: ComponentProps<'svg'>) => ( Next.js @@ -700,10 +736,10 @@ export const Next = (props: ComponentProps<"svg">) => ( export const Frameworks = () => { const handleRequest = () => { - track("Framework requested", { framework: "tanstack" }); - toast.success("Request received", { + track('Framework requested', { framework: 'tanstack' }); + toast.success('Request received', { description: - "Thanks for expressing interest in TanStack. We will be adding support for it soon.", + 'Thanks for expressing interest in TanStack. We will be adding support for it soon.', }); }; diff --git a/docs/components/geistdocs/sidebar.tsx b/docs/components/geistdocs/sidebar.tsx index 60c9b8bb8b..aa1f78b86b 100644 --- a/docs/components/geistdocs/sidebar.tsx +++ b/docs/components/geistdocs/sidebar.tsx @@ -20,9 +20,19 @@ import { SheetHeader, SheetTitle, } from '@/components/ui/sheet'; +import { Badge } from '@/components/ui/badge'; import { useSidebarContext } from '@/hooks/geistdocs/use-sidebar'; import { SearchButton } from './search'; +// Map of URL suffixes to badges shown inline next to the sidebar item name. +const SIDEBAR_ITEM_BADGES: Array<{ suffix: string; label: string }> = [ + { suffix: '/docs/getting-started/python', label: 'Beta' }, +]; + +function getSidebarBadge(url: string): string | undefined { + return SIDEBAR_ITEM_BADGES.find((b) => url.endsWith(b.suffix))?.label; +} + export const Sidebar = () => { const { root } = useTreeContext(); const { isOpen, setIsOpen } = useSidebarContext(); @@ -109,16 +119,28 @@ export const Folder: SidebarPageTreeComponents['Folder'] = ({ ); }; -export const Item: SidebarPageTreeComponents['Item'] = ({ item }) => ( - - {item.name} - -); +export const Item: SidebarPageTreeComponents['Item'] = ({ item }) => { + const badgeLabel = getSidebarBadge(item.url); + + return ( + + {item.name} + {badgeLabel ? ( + + {badgeLabel} + + ) : null} + + ); +}; export const Separator: SidebarPageTreeComponents['Separator'] = ({ item }) => ( diff --git a/docs/content/docs/getting-started/index.mdx b/docs/content/docs/getting-started/index.mdx index 4d2f578582..4b51b62d76 100644 --- a/docs/content/docs/getting-started/index.mdx +++ b/docs/content/docs/getting-started/index.mdx @@ -8,7 +8,7 @@ related: - /docs/foundations/workflows-and-steps --- -import { Next, Nitro, SvelteKit, Nuxt, Hono, Bun, AstroDark, AstroLight, TanStack, Vite, Express, Nest, Fastify } from "@/app/[lang]/(home)/components/frameworks"; +import { Next, Nitro, SvelteKit, Nuxt, Hono, Bun, AstroDark, AstroLight, TanStack, Vite, Express, Nest, Fastify, Python } from "@/app/[lang]/(home)/components/frameworks"; @@ -63,6 +63,13 @@ import { Next, Nitro, SvelteKit, Nuxt, Hono, Bun, AstroDark, AstroLight, TanStac SvelteKit + +
+ + Python + Beta +
+
diff --git a/docs/content/docs/getting-started/meta.json b/docs/content/docs/getting-started/meta.json index d0e189dd73..60e0131621 100644 --- a/docs/content/docs/getting-started/meta.json +++ b/docs/content/docs/getting-started/meta.json @@ -9,7 +9,8 @@ "nitro", "nuxt", "sveltekit", - "vite" + "vite", + "python" ], "defaultOpen": true } diff --git a/docs/content/docs/getting-started/python.mdx b/docs/content/docs/getting-started/python.mdx new file mode 100644 index 0000000000..40c715b0a3 --- /dev/null +++ b/docs/content/docs/getting-started/python.mdx @@ -0,0 +1,165 @@ +--- +title: Python +description: Build durable workflows and AI agents in Python with the Vercel SDK. +type: guide +summary: Set up the Workflow Python SDK in your Python application. +prerequisites: + - /docs/getting-started +related: + - /docs/foundations + - /docs/foundations/workflows-and-steps +--- + + +The Python SDK is currently in **beta**. APIs and behavior may change. For the latest documentation and updates, see the [official Vercel Workflow Python documentation](https://vercel.com/docs/workflow/python?language=py). + + +You can build durable workflows in Python using the [`vercel` Python SDK](https://pypi.org/project/vercel/). Your workflow code can pause, resume, and maintain state, just like the JavaScript and TypeScript Workflow SDK. + +## Getting Started + +Install the `vercel` package: + +```bash filename="Terminal" +pip install vercel +``` + +Configure `experimentalServices` in your `vercel.json`: + +```json filename="vercel.json" +{ + "experimentalServices": { + "ai_content_workflow": { + "type": "worker", + "entrypoint": "app/workflows/ai_content_workflow.py", + "topics": ["__wkf_*"] + } + } +} +``` + +## Workflows + +A workflow is a stateful function that coordinates multi-step logic over time. Create a `Workflows` instance and use the `@wf.workflow` decorator to mark a function as durable: + +```python filename="app/workflow.py" {3} +from vercel import workflow + +wf = workflow.Workflows() +``` + +```python filename="app/workflows/ai_content_workflow.py" {3} +from app.workflow import wf + +@wf.workflow +async def ai_content_workflow(*, topic: str): + draft = await generate_draft(topic=topic) + summary = await summarize_draft(draft=draft) + + return { + "draft": draft, + "summary": summary, + } +``` + +Under the hood, the workflow compiles into a route that orchestrates execution. All inputs and outputs are recorded in an event log. If a deploy or crash happens, the system replays execution deterministically from where it stopped. + +## Steps + +A step is a stateless function that runs a unit of durable work inside a workflow. Use `@wf.step` to mark a function as a step: + +```python filename="app/steps/generate_draft.py" {4,8} +import random +from app.workflow import wf + +@wf.step +async def generate_draft(*, topic: str): + return await ai_generate(prompt=f"Write a blog post about {topic}") + +@wf.step +async def summarize_draft(*, draft: str): + summary = await ai_summarize(text=draft) + + # Simulate a transient error. The step automatically retries. + if random.random() < 0.3: + raise Exception("Transient AI provider error") + + return summary +``` + +Each step compiles into an isolated route. While the step executes, the workflow suspends without consuming resources. When the step completes, the workflow resumes automatically where it left off. + +## Sleep + +Sleep pauses a workflow for a specified duration without consuming compute resources: + +```python filename="app/workflows/ai_refine.py" {7} +from vercel import workflow + +@wf.workflow +async def ai_refine_workflow(*, draft_id: str): + draft = await fetch_draft(draft_id) + + await workflow.sleep("7 days") # Wait 7 days to gather more signals. + + refined = await refine_draft(draft) + + return { + "draft_id": draft_id, + "refined": refined, + } +``` + +The sleep call pauses the workflow and consumes no resources. The workflow resumes automatically when the time expires. + +## Hooks + +A hook lets a workflow wait for external events such as user actions, webhooks, or third-party API responses. + +Define a hook model with Pydantic and `workflow.BaseHook`: + +```python filename="app/workflows/approval.py" {3,14} +from vercel import workflow + +class Approval(BaseModel, workflow.BaseHook): + """Human approval for AI-generated drafts""" + + decision: Literal["approved", "changes"] + notes: str | None = None + +@wf.workflow +async def ai_approval_workflow(*, topic: str): + draft = await generate_draft(topic=topic) + + # Wait for human approval events + async for event in Approval.wait(token="draft-123"): + if event.decision == "approved": + await publish_draft(draft) + break + + revised = await refine_draft(draft, event.notes) + await publish_draft(revised) +``` + +Resume the workflow when data arrives: + +```python filename="app/api/resume.py" {5} +@app.post("/api/resume") +async def resume(approval: Approval): + """Resume the workflow when an approval is received""" + + await approval.resume("draft-123") + return {"ok": True} +``` + +When a hook receives data, the workflow resumes automatically. You don't need polling, message queues, or manual state management. + +## Learn More + +For comprehensive documentation, examples, and the latest updates, visit the [official Vercel Workflow Python documentation](https://vercel.com/docs/workflow/python). + +## Next Steps + +- Learn more about the [Foundations](/docs/foundations). +- Check [Errors](/docs/errors) if you encounter issues. +- Explore the [API Reference](/docs/api-reference).