|
| 1 | +--- |
| 2 | +outline: deep |
| 3 | +--- |
| 4 | + |
| 5 | +# Streaming |
| 6 | + |
| 7 | +DevTools Kit ships a first-class streaming-channel API for chunk-style data flowing in either direction between server and client — chat deltas, log lines, build progress, file uploads, mic / screen-share frames. It's the same primitive as DevFrame's [streaming guide](/devframe/guide/streaming), surfaced through the Kit's Vite plugin idioms. |
| 8 | + |
| 9 | +Reach for streaming when you need: |
| 10 | + |
| 11 | +- Token-by-token rendering with low latency (LLM deltas, terminal output). |
| 12 | +- Per-call lifecycles with cooperative cancellation. |
| 13 | +- Replay on reconnect — a panel reopened mid-stream picks up where it left off. |
| 14 | +- Client-to-server uploads without inventing a multipart protocol. |
| 15 | + |
| 16 | +For *snapshot* state that survives reconnect and syncs across panels, prefer [shared state](./shared-state) instead. |
| 17 | + |
| 18 | +## Overview |
| 19 | + |
| 20 | +```mermaid |
| 21 | +sequenceDiagram |
| 22 | + participant Producer as Producer (server) |
| 23 | + participant Channel as ctx.rpc.streaming<br/>channel |
| 24 | + participant Browser as Subscriber (browser) |
| 25 | +
|
| 26 | + Producer->>Channel: start({ id }) |
| 27 | + Channel-->>Browser: chunk(seq=1, "...") |
| 28 | + Channel-->>Browser: chunk(seq=2, "...") |
| 29 | + Producer->>Channel: close() |
| 30 | + Channel-->>Browser: end() |
| 31 | +``` |
| 32 | + |
| 33 | +A **channel** owns a wire namespace. Each call to `channel.start()` produces an individual **stream** keyed by an id (auto-generated unless you pass one). Subscribers join by `(channelName, id)`. |
| 34 | + |
| 35 | +## Server-to-Client (the common case) |
| 36 | + |
| 37 | +### Defining a Channel |
| 38 | + |
| 39 | +In your `devtools.setup`, create the channel once. Channels are framework-neutral, so the same code works whether you ship via `@vitejs/devtools-kit` (Vite) or DevFrame's other adapters: |
| 40 | + |
| 41 | +```ts |
| 42 | +/// <reference types="@vitejs/devtools-kit" /> |
| 43 | +import type { Plugin } from 'vite' |
| 44 | +import { defineRpcFunction } from '@vitejs/devtools-kit' |
| 45 | +import * as v from 'valibot' |
| 46 | + |
| 47 | +export default function chatPlugin(): Plugin { |
| 48 | + return { |
| 49 | + name: 'my-plugin', |
| 50 | + devtools: { |
| 51 | + async setup(ctx) { |
| 52 | + const channel = ctx.rpc.streaming.create<string>('my-plugin:chat', { |
| 53 | + replayWindow: 256, |
| 54 | + }) |
| 55 | + |
| 56 | + ctx.rpc.register(defineRpcFunction({ |
| 57 | + name: 'my-plugin:start-chat', |
| 58 | + type: 'action', |
| 59 | + jsonSerializable: true, |
| 60 | + args: [v.object({ prompt: v.string() })], |
| 61 | + returns: v.object({ streamId: v.string() }), |
| 62 | + handler: async ({ prompt }) => { |
| 63 | + const stream = channel.start() |
| 64 | + ;(async () => { |
| 65 | + for await (const token of fakeLLM(prompt, { signal: stream.signal })) { |
| 66 | + if (stream.signal.aborted) |
| 67 | + break |
| 68 | + stream.write(token) |
| 69 | + } |
| 70 | + stream.close() |
| 71 | + })() |
| 72 | + return { streamId: stream.id } |
| 73 | + }, |
| 74 | + })) |
| 75 | + }, |
| 76 | + }, |
| 77 | + } |
| 78 | +} |
| 79 | +``` |
| 80 | + |
| 81 | +The channel name follows the same `<plugin-id>:<name>` convention as RPC functions and shared-state keys. |
| 82 | + |
| 83 | +### Producing — Three Surfaces, One Stream |
| 84 | + |
| 85 | +The handle returned by `channel.start({ id? })` is both an imperative producer and a Web Streams `WritableStream<T>`: |
| 86 | + |
| 87 | +```ts |
| 88 | +const stream = channel.start({ id: 'optional-explicit-id' }) |
| 89 | + |
| 90 | +// Imperative — minimal, hand-rolled producers |
| 91 | +stream.write(chunk) |
| 92 | +stream.error(err) // terminal failure |
| 93 | +stream.close() // terminal success |
| 94 | +stream.signal // AbortSignal — flips when consumers cancel |
| 95 | +stream.id // string — what clients subscribe to |
| 96 | + |
| 97 | +// Web Streams — pipe any ReadableStream<T> in: |
| 98 | +sourceReadable.pipeTo(stream.writable, { signal: stream.signal }) |
| 99 | + |
| 100 | +// Convenience — start + pipe in one call: |
| 101 | +const stream2 = await channel.pipeFrom(sourceReadable) |
| 102 | +``` |
| 103 | + |
| 104 | +Producers should poll `stream.signal.aborted` and exit cooperatively when it flips: |
| 105 | + |
| 106 | +```ts |
| 107 | +for (const token of source) { |
| 108 | + if (stream.signal.aborted) |
| 109 | + return |
| 110 | + stream.write(token) |
| 111 | +} |
| 112 | +stream.close() |
| 113 | +``` |
| 114 | + |
| 115 | +#### Node.js Stream Interop |
| 116 | + |
| 117 | +Web Streams are the canonical surface, but Node 17+ ships free converters that bridge to `node:stream`: |
| 118 | + |
| 119 | +```ts |
| 120 | +import { Readable, Writable } from 'node:stream' |
| 121 | + |
| 122 | +// Pipe a Node Readable into the streaming channel |
| 123 | +sourceNodeReadable.pipe(Writable.fromWeb(stream.writable)) |
| 124 | + |
| 125 | +// Pipe the channel out to a Node Writable |
| 126 | +Readable.fromWeb(reader.readable).pipe(targetNodeWritable) |
| 127 | +``` |
| 128 | + |
| 129 | +DevTools Kit doesn't wrap these — they're standard library, and the surface stays small. |
| 130 | + |
| 131 | +### Consuming — `for await` or `pipeTo` |
| 132 | + |
| 133 | +The client returns a reader that's both an `AsyncIterable<T>` and exposes a `ReadableStream<T>`: |
| 134 | + |
| 135 | +```ts |
| 136 | +import { getDevToolsRpcClient } from '@vitejs/devtools-kit/client' |
| 137 | + |
| 138 | +const rpc = await getDevToolsRpcClient() |
| 139 | +const { streamId } = await rpc.call('my-plugin:start-chat', { |
| 140 | + prompt: 'Hello', |
| 141 | +}) |
| 142 | + |
| 143 | +const reader = rpc.streaming.subscribe<string>('my-plugin:chat', streamId) |
| 144 | + |
| 145 | +// Async iterable — the simplest consumer pattern |
| 146 | +for await (const token of reader) |
| 147 | + appendToken(token) |
| 148 | + |
| 149 | +// Or pipe to a DOM-side WritableStream |
| 150 | +await reader.readable.pipeTo(downloadWritable) |
| 151 | + |
| 152 | +reader.cancel() // sends cancel upstream; server stream.signal flips |
| 153 | +``` |
| 154 | + |
| 155 | +Pick one surface per reader — they share a single internal queue, so concurrent draining will race. |
| 156 | + |
| 157 | +## Client-to-Server Uploads |
| 158 | + |
| 159 | +The same channel works in reverse for chunk-style uploads — file content, mic / screen-share frames, browser-side logs forwarded to disk, anything you'd otherwise hand-roll as `multipart` over HTTP. The pattern uses one normal RPC call to allocate the id, then dedicated streaming events for the chunks: |
| 160 | + |
| 161 | +```ts |
| 162 | +// Server — typically inside an action handler |
| 163 | +ctx.rpc.register(defineRpcFunction({ |
| 164 | + name: 'my-plugin:upload-file', |
| 165 | + type: 'action', |
| 166 | + args: [v.object({ name: v.string() })], |
| 167 | + returns: v.object({ uploadId: v.string() }), |
| 168 | + handler: async ({ name }) => { |
| 169 | + const reader = channel.openInbound() |
| 170 | + |
| 171 | + // Process chunks asynchronously — the action returns immediately |
| 172 | + // so the client can start uploading. |
| 173 | + ;(async () => { |
| 174 | + const file = createWriteStream(name) |
| 175 | + for await (const chunk of reader) |
| 176 | + file.write(chunk) |
| 177 | + file.close() |
| 178 | + })() |
| 179 | + |
| 180 | + return { uploadId: reader.id } |
| 181 | + }, |
| 182 | +})) |
| 183 | +``` |
| 184 | + |
| 185 | +```ts |
| 186 | +// Client |
| 187 | +const { uploadId } = await rpc.call('my-plugin:upload-file', { |
| 188 | + name: 'capture.bin', |
| 189 | +}) |
| 190 | +const upload = rpc.streaming.upload<Uint8Array>('my-plugin:files', uploadId) |
| 191 | + |
| 192 | +// Imperative |
| 193 | +upload.write(chunk1) |
| 194 | +upload.write(chunk2) |
| 195 | +upload.close() |
| 196 | + |
| 197 | +// Or pipe a Web ReadableStream straight in: |
| 198 | +fileReadable.pipeTo(upload.writable, { signal: upload.signal }) |
| 199 | +``` |
| 200 | + |
| 201 | +Lifecycle mirrors the outbound case: |
| 202 | + |
| 203 | +- `upload.signal` aborts when the **server** calls `reader.cancel()` (the server cancellation broadcasts an `upload-cancel` to the uploading session). |
| 204 | +- `upload.error(err)` propagates as a thrown error inside the server's `for await`. |
| 205 | +- If the client disconnects mid-upload, the server's `for await` exits with an `UploadDisconnected` error so consumers can clean up. |
| 206 | + |
| 207 | +Each `openInbound()` allocates a fresh server-allocated id and is owned by exactly one uploading session — there's no fan-in, no shared subscribers, and no replay (the producer is the client, so reconnect means restart). |
| 208 | + |
| 209 | +## Lifecycle and Cancellation |
| 210 | + |
| 211 | +| Event | Server side | Client side | |
| 212 | +|-------|-------------|-------------| |
| 213 | +| Producer calls `stream.close()` / `stream.error(err)` | Broadcasts `end` to subscribers | `for await` resolves (success) or throws (error) | |
| 214 | +| Consumer calls `reader.cancel()` | `stream.signal` aborts when the **last** subscriber cancels — handlers should poll and exit | Reader marks itself cancelled; `for await` ends without iterating | |
| 215 | +| WS disconnects | When the **last** subscriber drops, server aborts `stream.signal` | Reader stays alive; resubscribes automatically when trust is re-established | |
| 216 | +| Panel closes mid-stream | Reader cancel cascades upstream | — | |
| 217 | + |
| 218 | +A stream with multiple subscribers stays alive until the last one cancels or disconnects. Producers should always make `stream.signal.aborted` part of their inner loop. |
| 219 | + |
| 220 | +## Replay on Reconnect |
| 221 | + |
| 222 | +When the channel is created with `replayWindow: N`, the server keeps a rolling buffer of the last `N` chunks per stream. On (re)subscribe, the client passes the highest sequence number it has seen, and the server replays anything newer before resuming live. |
| 223 | + |
| 224 | +```ts |
| 225 | +ctx.rpc.streaming.create<string>('my-plugin:chat', { |
| 226 | + replayWindow: 256, // chunks to retain per stream id |
| 227 | + closedStreamRetention: 30_000, // ms to hold closed streams for late subscribers |
| 228 | +}) |
| 229 | +``` |
| 230 | + |
| 231 | +`closedStreamRetention` defaults to 30 seconds when `replayWindow > 0` (so a panel re-opened a few seconds after a chat finishes still gets the full transcript), or 0 when replay is disabled. Set it explicitly to opt in or out. |
| 232 | + |
| 233 | +## Backpressure |
| 234 | + |
| 235 | +The client maintains a bounded queue per subscription (`highWaterMark`, default 256). When the consumer can't keep up, the **oldest** queued chunk is dropped and a [`DF0029`](/devframe/errors/DF0029) warning is logged. This is best-effort — proper transport-level backpressure isn't worth threading through the RPC layer for the streaming use cases that exist today. |
| 236 | + |
| 237 | +```ts |
| 238 | +const reader = rpc.streaming.subscribe('my-plugin:chat', id, { |
| 239 | + highWaterMark: 1024, // raise if you expect bursts the consumer can recover from |
| 240 | +}) |
| 241 | +``` |
| 242 | + |
| 243 | +If you need authoritative state rather than every intermediate value, prefer [shared state](./shared-state) — it carries Immer patches with delivery guarantees, at the cost of being structured rather than streaming. |
| 244 | + |
| 245 | +## Combining Streaming with Shared State |
| 246 | + |
| 247 | +Token-level streaming and shared-state snapshots compose naturally for chat-style UIs: |
| 248 | + |
| 249 | +- The conversation **log** lives in shared state (survives reloads, syncs across panels). |
| 250 | +- Active responses use a **streaming** channel for low-latency token rendering. |
| 251 | +- The action that starts a response appends a user message + assistant placeholder to shared state, kicks off the producer, and on producer close commits the joined content back to shared state. |
| 252 | + |
| 253 | +```ts |
| 254 | +const channel = ctx.rpc.streaming.create<string>('my-plugin:chat-tokens', { |
| 255 | + replayWindow: 1024, |
| 256 | +}) |
| 257 | +const history = await ctx.rpc.sharedState.get('my-plugin:chat-history', { |
| 258 | + initialValue: { messages: [] as ChatMessage[] }, |
| 259 | +}) |
| 260 | + |
| 261 | +ctx.rpc.register(defineRpcFunction({ |
| 262 | + name: 'my-plugin:send', |
| 263 | + type: 'action', |
| 264 | + args: [v.object({ prompt: v.string() })], |
| 265 | + returns: v.object({ streamId: v.string(), assistantId: v.string() }), |
| 266 | + handler: async ({ prompt }) => { |
| 267 | + const stream = channel.start() |
| 268 | + const assistantId = crypto.randomUUID() |
| 269 | + |
| 270 | + history.mutate((draft) => { |
| 271 | + draft.messages.push({ |
| 272 | + id: crypto.randomUUID(), |
| 273 | + role: 'user', |
| 274 | + content: prompt, |
| 275 | + }) |
| 276 | + draft.messages.push({ |
| 277 | + id: assistantId, |
| 278 | + role: 'assistant', |
| 279 | + content: '', |
| 280 | + streamId: stream.id, |
| 281 | + }) |
| 282 | + }) |
| 283 | + |
| 284 | + let acc = '' |
| 285 | + ;(async () => { |
| 286 | + for await (const token of fakeLLM(prompt, { signal: stream.signal })) { |
| 287 | + if (stream.signal.aborted) |
| 288 | + break |
| 289 | + stream.write(token) |
| 290 | + acc += token |
| 291 | + } |
| 292 | + stream.close() |
| 293 | + // Commit final content; clients now read the message from |
| 294 | + // shared state and drop the live overlay. |
| 295 | + history.mutate((draft) => { |
| 296 | + const msg = draft.messages.find(m => m.id === assistantId) |
| 297 | + if (msg) { |
| 298 | + msg.content = acc |
| 299 | + msg.streamId = undefined |
| 300 | + } |
| 301 | + }) |
| 302 | + })() |
| 303 | + |
| 304 | + return { streamId: stream.id, assistantId } |
| 305 | + }, |
| 306 | +})) |
| 307 | +``` |
| 308 | + |
| 309 | +A working version of this pattern lives in [`devframe/examples/devframe-streaming-chat`](https://github.com/vitejs/devtools/tree/main/devframe/examples/devframe-streaming-chat). |
| 310 | + |
| 311 | +## When to Use Streaming vs Events vs Shared State |
| 312 | + |
| 313 | +| Use streaming for | Use `event`-typed RPC for | Use shared state for | |
| 314 | +|-------------------|---------------------------|----------------------| |
| 315 | +| Token / chunk feeds (LLM deltas, build logs) | Notifications without payload (`refresh`, `clear`) | Long-lived UI state (selections, panel layout) | |
| 316 | +| Per-call lifecycles with cancellation | Cross-cutting signals broadcast to all clients | Reactive snapshots that survive reconnect | |
| 317 | +| Replay on reconnect | Fire-and-forget signaling | Diff-based sync between clients | |
| 318 | +| Client-to-server uploads (files, mic frames) | | | |
| 319 | + |
| 320 | +## Reference |
| 321 | + |
| 322 | +- API surface: `RpcStreamingHost`, `RpcStreamingChannel<T>`, `StreamSink<T>`, `StreamReader<T>` — re-exported from `@vitejs/devtools-kit`. |
| 323 | +- Working example: [`devframe/examples/devframe-streaming-chat`](https://github.com/vitejs/devtools/tree/main/devframe/examples/devframe-streaming-chat). |
| 324 | +- Errors: [`DF0029`](/devframe/errors/DF0029) (overflow), [`DF0030`](/devframe/errors/DF0030) (unknown stream id), [`DF0031`](/devframe/errors/DF0031) (write to closed stream), [`DF0032`](/devframe/errors/DF0032) (channel name collision). |
0 commit comments