Skip to content

Commit cc60a3e

Browse files
antfuclaude
andcommitted
feat(devframe): async generator RPC functions
Add `type: 'generator'` for `defineRpcFunction` so handlers declared as `async function*` stream their yields to the caller without manual channel scaffolding. The framework substitutes the user's definition with an internal action wrapper that allocates a sink on a hidden `devframe:rpc:generators` channel and returns a stream-id envelope; the client wrapper unwraps the envelope and `streaming.subscribe()`s automatically, so `await rpc.call(name, args)` resolves to a ready-to-iterate `StreamReader<Y>`. Cancellation flows through `getCurrentRpcStream()` — an AsyncLocalStorage helper that mirrors `getCurrentRpcSession()` and exposes the sink's `signal`, `streamId`, and originating session inside the handler body. Server-side callers can iterate without paying for the streaming round-trip via `invokeLocalGenerator(rpc, name, ...args)`. Per-stream `replayWindow` defaults to 256 (floored to 1) to win the client-subscribe-vs-first-yield race; this required threading the existing `RpcStreamingChannel.start()` helper to accept a per-stream override on top of the channel default. Generator definitions reject `agent`, `cacheable`, `dump`, `snapshot`, and `jsonSerializable: true` at registration via four new diagnostics (DF0033–DF0036). DF0034 fires at runtime if the handler doesn't return an `AsyncIterable`. Also fixes a pre-existing replay bug in `node/rpc-streaming.ts`: when a producer closed with an error and a subscriber arrived during the `closedStreamRetention` window, the late subscriber received a clean close instead of the original error. The streaming record now captures the end payload and replays it on subscribe. Migrations & dogfooding: - Streaming guide gains an "Async Generator RPC" section ahead of the manual channel section, framed as the recommended path. - `devframe-streaming-chat` example adds a `:tokenize` generator alongside the existing `:send` action so readers can compare both approaches in one place. - `skills/devframe` and `skills/vite-devtools-kit` get matching guidance. - 12 new integration tests (`rpc-generators.test.ts`) covering happy path, cooperative cancel, throw mid-stream, throw before first yield, late-subscriber replay, concurrent isolation, `invokeLocalGenerator`, and all four validation diagnostics. tsnapi snapshots updated for the new public exports (`getCurrentRpcStream`, `invokeLocalGenerator`, `attachRpcGenerators`, `RpcGeneratorStreamContext`, `RpcGeneratorFunctionDefinition`). Pre-PR checklist (lint + 462 tests + typecheck + build) all green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4c2f6c9 commit cc60a3e

30 files changed

Lines changed: 1441 additions & 123 deletions

File tree

devframe/docs/.vitepress/sidebar.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export default function devframeSidebar(prefix = ''): DefaultTheme.SidebarItem[]
2727
text: 'Error Reference',
2828
link: `${prefix}/errors/`,
2929
collapsed: true,
30-
items: Array.from({ length: 32 }, (_, i) => {
30+
items: Array.from({ length: 36 }, (_, i) => {
3131
const code = `DF${String(i + 1).padStart(4, '0')}`
3232
return { text: code, link: `${prefix}/errors/${code}` }
3333
}),

devframe/docs/errors/DF0033.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# DF0033: Generator with Agent Exposure
6+
7+
> Package: `devframe`
8+
9+
## Message
10+
11+
> Generator RPC function "`{name}`" has `agent` set, which is not supported in the current release.
12+
13+
## Cause
14+
15+
`type: 'generator'` RPC functions stream their yields through the existing streaming-channel transport, which currently uses `structured-clone-es` regardless of the source function's serialization preferences. Agent (MCP) exposure requires strict JSON serialization for tool inputs and outputs, so generator functions cannot be exposed as agent tools yet.
16+
17+
Streaming-MCP support is planned as a follow-up.
18+
19+
## Example
20+
21+
```ts
22+
defineRpcFunction({
23+
name: 'plugin:tail-logs',
24+
type: 'generator',
25+
yields: v.string(),
26+
agent: { description: 'Tail server logs' }, // ← rejected
27+
async* handler() {
28+
yield 'log line 1'
29+
},
30+
})
31+
```
32+
33+
## Fix
34+
35+
Remove the `agent` field, or change the function type to `'query'` / `'static'` if it can return a single value:
36+
37+
```ts
38+
defineRpcFunction({
39+
name: 'plugin:tail-logs',
40+
type: 'generator',
41+
yields: v.string(),
42+
async* handler() {
43+
yield 'log line 1'
44+
},
45+
})
46+
```
47+
48+
## Source
49+
50+
`packages/devframe/src/rpc/validation.ts`

devframe/docs/errors/DF0034.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# DF0034: Generator Handler Not Async-Iterable
6+
7+
> Package: `devframe`
8+
9+
## Message
10+
11+
> Generator RPC function "`{name}`" handler did not return an `AsyncIterable`.
12+
13+
## Cause
14+
15+
A `type: 'generator'` RPC function expects its handler to return an `AsyncIterable<Y>` — typically by being declared as `async function*`. The framework drives the iterator with `for await`, so it must implement `[Symbol.asyncIterator]`.
16+
17+
This error is raised at runtime on the first call, not at registration, because TypeScript cannot statically distinguish `async function*` from `async function` in all cases (e.g. when the handler is supplied via `setup()`).
18+
19+
## Example
20+
21+
```ts
22+
// ❌ Wrong — async function (not generator) returning a string
23+
defineRpcFunction({
24+
name: 'plugin:tokens',
25+
type: 'generator',
26+
yields: v.string(),
27+
handler: async () => 'hello',
28+
})
29+
```
30+
31+
## Fix
32+
33+
Declare the handler as `async function*` and `yield` instead of `return`:
34+
35+
```ts
36+
defineRpcFunction({
37+
name: 'plugin:tokens',
38+
type: 'generator',
39+
yields: v.string(),
40+
async* handler() {
41+
yield 'hello'
42+
yield 'world'
43+
},
44+
})
45+
```
46+
47+
If the handler comes from `setup()`, ensure `setup()` returns `{ handler: async function*() { ... } }`.
48+
49+
## Source
50+
51+
`packages/devframe/src/node/rpc-generators.ts`

devframe/docs/errors/DF0035.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# DF0035: Generator Missing Yields Schema
6+
7+
> Package: `devframe`
8+
9+
## Message
10+
11+
> Generator RPC function "`{name}`" declares `args` but no `yields` schema — yielded value type cannot be inferred.
12+
13+
## Cause
14+
15+
When a `type: 'generator'` RPC function declares an `args` schema, the framework expects a matching `yields` schema for end-to-end type inference. Without it, `rpc.call(name, ...)` resolves to `Promise<StreamReader<unknown>>` on the client, defeating the purpose of having validated arguments.
16+
17+
This mirrors the requirement that `query`-typed functions with `args` also declare `returns`.
18+
19+
## Example
20+
21+
```ts
22+
// ❌ Wrong — args present but no yields schema
23+
defineRpcFunction({
24+
name: 'plugin:search',
25+
type: 'generator',
26+
args: [v.object({ query: v.string() })],
27+
async* handler({ query }) {
28+
yield query
29+
},
30+
})
31+
```
32+
33+
## Fix
34+
35+
Add a `yields` schema describing the type of each yielded value:
36+
37+
```ts
38+
defineRpcFunction({
39+
name: 'plugin:search',
40+
type: 'generator',
41+
args: [v.object({ query: v.string() })],
42+
yields: v.string(),
43+
async* handler({ query }) {
44+
yield query
45+
},
46+
})
47+
```
48+
49+
If you don't need argument validation either, drop both schemas — the call type falls back to `() => Promise<StreamReader<unknown>>`.
50+
51+
## Source
52+
53+
`packages/devframe/src/rpc/validation.ts`

devframe/docs/errors/DF0036.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# DF0036: Generator Has Inapplicable Option
6+
7+
> Package: `devframe`
8+
9+
## Message
10+
11+
> Generator RPC function "`{name}`" sets `{option}`, which has no effect on generator-typed functions.
12+
13+
## Cause
14+
15+
Generators are streaming primitives — they yield a sequence of values over time rather than returning a single cacheable result. Options that only make sense for request-response RPC types are rejected at registration:
16+
17+
- `cacheable` — caching a streaming result would replay a stale `streamId` pointing to a closed stream.
18+
- `jsonSerializable: true` — generator yields flow through the existing streaming-channel transport, which uses `structured-clone-es` regardless. The flag is misleading on a generator.
19+
- `dump` and `snapshot` — already handled by [`DF0027`](./DF0027) and [`DF0028`](./DF0028).
20+
21+
## Example
22+
23+
```ts
24+
// ❌ Wrong — `cacheable` has no effect on a generator
25+
defineRpcFunction({
26+
name: 'plugin:tokens',
27+
type: 'generator',
28+
cacheable: true,
29+
yields: v.string(),
30+
async* handler() { yield 'hi' },
31+
})
32+
```
33+
34+
## Fix
35+
36+
Remove the option, or change the function type to one that supports it:
37+
38+
```ts
39+
defineRpcFunction({
40+
name: 'plugin:tokens',
41+
type: 'generator',
42+
yields: v.string(),
43+
async* handler() { yield 'hi' },
44+
})
45+
```
46+
47+
## Source
48+
49+
`packages/devframe/src/rpc/validation.ts`

devframe/docs/errors/index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,7 @@ Emitted by `devframe` — framework-neutral host / shared-state / auth surface.
5050
| [DF0030](./DF0030) | error | Unknown Stream ID ||
5151
| [DF0031](./DF0031) | error | Write to Closed Stream ||
5252
| [DF0032](./DF0032) | error | Streaming Channel Already Registered ||
53+
| [DF0033](./DF0033) | error | Generator with Agent Exposure ||
54+
| [DF0034](./DF0034) | error | Generator Handler Not Async-Iterable ||
55+
| [DF0035](./DF0035) | error | Generator Missing Yields Schema ||
56+
| [DF0036](./DF0036) | error | Generator Has Inapplicable Option ||

devframe/docs/guide/streaming.md

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ outline: deep
66

77
Devframe's streaming-channel API provides server→client push for chunk-style data — chat deltas, log lines, build progress, anything you'd otherwise express as a sequence of fire-and-forget events. It builds on the same WebSocket transport as the rest of the RPC layer, but adds the conventions every chunked feed needs: stream IDs, cooperative cancellation, replay on reconnect, and first-class **Web Streams** interop.
88

9+
For the common case of "I have an `async function*` and I want to ship its yields to the client", reach for [Async Generator RPC](#async-generator-rpc) below — it auto-allocates the sink, wires cancellation, and the client receives a ready-to-iterate `StreamReader<Y>` from `rpc.call(...)`. Drop down to the lower-level channel API in this guide when you need fan-out, late-replay across multiple subscribers, or client-to-server uploads.
10+
911
## Overview
1012

1113
```mermaid
@@ -233,8 +235,95 @@ If you need authoritative state rather than every intermediate value, prefer [sh
233235
| Replay on reconnect | Fire-and-forget signaling | Diff-based sync between clients |
234236
| Client-to-server uploads (files, mic frames) | | |
235237

238+
## Async Generator RPC
239+
240+
The streaming-channel API is the foundation; the most common shape — "stream the yields of an `async function*` to the caller" — has a higher-level wrapper that hides the channel altogether.
241+
242+
### Defining a generator
243+
244+
Set `type: 'generator'` on `defineRpcFunction` and write the handler as `async function*`:
245+
246+
```ts
247+
import { defineRpcFunction } from 'devframe'
248+
import { getCurrentRpcStream } from 'devframe/node'
249+
import * as v from 'valibot'
250+
251+
defineRpcFunction({
252+
name: 'my-devtool:chat',
253+
type: 'generator',
254+
args: [v.object({ prompt: v.string() })],
255+
yields: v.string(),
256+
async* handler({ prompt }) {
257+
const { signal } = getCurrentRpcStream()!
258+
for (const token of fakeTokens(prompt)) {
259+
if (signal.aborted)
260+
return
261+
yield token
262+
}
263+
},
264+
})
265+
```
266+
267+
`getCurrentRpcStream()` is `AsyncLocalStorage`-backed (mirrors `getCurrentRpcSession()`); inside the generator body it returns `{ signal, streamId, session }`. Outside, it returns `undefined`. Poll `signal.aborted` and exit cooperatively when the consumer cancels.
268+
269+
### Calling from the client
270+
271+
```ts
272+
import { connectDevtool } from 'devframe/client'
273+
274+
const rpc = await connectDevtool()
275+
const reader = await rpc.call('my-devtool:chat', { prompt: 'Hi' })
276+
277+
for await (const token of reader)
278+
appendToken(token)
279+
280+
// or pipe out as a Web Stream:
281+
await reader.readable.pipeTo(downloadWritable)
282+
283+
reader.cancel() // sends cancel upstream; server's `signal` flips
284+
```
285+
286+
`rpc.call` resolves to `Promise<StreamReader<Y>>` for generator-typed functions. The reader is the same shape as `rpc.streaming.subscribe()` — it's both `AsyncIterable<Y>` and exposes `.readable: ReadableStream<Y>`. Pick one surface per reader (they share an internal queue).
287+
288+
### Per-function options
289+
290+
| Field | Default | Effect |
291+
|-------|---------|--------|
292+
| `yields` || Valibot schema for each yielded value. Optional; required if `args` is set ([`DF0035`](../errors/DF0035)). Drives client-side type inference of `StreamReader<Y>`. |
293+
| `replayWindow` | `256` | Per-stream ring-buffer size. Floored to `1` — the client subscribe lands a few ms after the wrapper allocates the sink, so early yields must be replayable. |
294+
| `closedStreamRetention` | `30_000` | Milliseconds the stream is held open for late subscribers after the producer closes. Mirrors the channel-level option. |
295+
296+
### Local invocation
297+
298+
Server-side callers can iterate a generator without paying for the streaming round-trip:
299+
300+
```ts
301+
import { invokeLocalGenerator } from 'devframe/node'
302+
303+
for await (const token of await invokeLocalGenerator<string>(rpc, 'my-devtool:chat', { prompt: 'Hi' })) {
304+
// process tokens directly — no transport, no sink allocation
305+
}
306+
```
307+
308+
`invokeLocalGenerator` returns the bare `AsyncIterable<Y>` from the user's handler. `getCurrentRpcStream()` returns `undefined` in this path (no stream, no signal).
309+
310+
### What you can't put on a generator
311+
312+
- `agent: { ... }` — streaming-MCP exposure is deferred. ([`DF0033`](../errors/DF0033))
313+
- `cacheable: true` — caching a streaming response would replay a stale `streamId`. ([`DF0036`](../errors/DF0036))
314+
- `jsonSerializable: true` — chunks always travel via `structured-clone` regardless. ([`DF0036`](../errors/DF0036))
315+
- `dump`, `snapshot` — streaming results don't have a "snapshot" semantics. ([`DF0027`](../errors/DF0027), [`DF0028`](../errors/DF0028))
316+
317+
### When to reach for the lower-level channel API
318+
319+
Generator RPC covers the 90% case. Drop down to `ctx.rpc.streaming.create(...)` directly when you need:
320+
321+
- **Fan-out** — multiple subscribers seeing the same stream from a single producer. Generator RPC allocates a fresh stream per call.
322+
- **Long-running side-channel streams** — terminal output, file watches, anything that lives beyond a single RPC call.
323+
- **Client-to-server uploads**`channel.openInbound()` with a paired action handler to allocate the id (see [Client-to-Server Uploads](#client-to-server-uploads)).
324+
236325
## Reference
237326

238-
- API surface: `RpcStreamingHost`, `RpcStreamingChannel<T>`, `StreamSink<T>`, `StreamReader<T>` in `devframe/types`.
239-
- Working example: [`devframe/examples/devframe-streaming-chat`](https://github.com/vitejs/devtools/tree/main/devframe/examples/devframe-streaming-chat).
240-
- Errors: [`DF0029`](../errors/DF0029) (overflow), [`DF0030`](../errors/DF0030) (unknown stream id), [`DF0031`](../errors/DF0031) (write to closed stream), [`DF0032`](../errors/DF0032) (channel name collision).
327+
- API surface: `RpcStreamingHost`, `RpcStreamingChannel<T>`, `StreamSink<T>`, `StreamReader<T>` in `devframe/types`. Generator RPC: `getCurrentRpcStream`, `invokeLocalGenerator` in `devframe/node`.
328+
- Working example: [`devframe/examples/devframe-streaming-chat`](https://github.com/vitejs/devtools/tree/main/devframe/examples/devframe-streaming-chat)`:send` uses the channel API directly, `:tokenize` uses generator RPC.
329+
- Errors: [`DF0029`](../errors/DF0029) (overflow), [`DF0030`](../errors/DF0030) (unknown stream id), [`DF0031`](../errors/DF0031) (write to closed stream), [`DF0032`](../errors/DF0032) (channel name collision), [`DF0033`](../errors/DF0033) (generator + agent), [`DF0034`](../errors/DF0034) (handler not async-iterable), [`DF0035`](../errors/DF0035) (generator missing yields), [`DF0036`](../errors/DF0036) (inapplicable generator option).

devframe/examples/devframe-streaming-chat/src/devtool.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { fileURLToPath } from 'node:url'
22
import { defineRpcFunction } from 'devframe'
3+
import { getCurrentRpcStream } from 'devframe/node'
34
import { defineDevtool } from 'devframe/types'
45
import { nanoid } from 'devframe/utils/nanoid'
56
import * as v from 'valibot'
@@ -213,6 +214,34 @@ export default defineDevtool({
213214
},
214215
}))
215216

217+
// ─── Generator RPC demo ──────────────────────────────────────────────
218+
// The action above (`:send`) wires shared-state plumbing and streaming
219+
// by hand. The generator below shows the lower-friction alternative
220+
// for token feeds that don't need history side-effects: declare a
221+
// handler as `async function*` and the framework auto-allocates a
222+
// sink, drains yields onto it, and gives callers back a
223+
// `StreamReader<string>`. `getCurrentRpcStream()` exposes the abort
224+
// signal for cooperative cancellation.
225+
ctx.rpc.register(defineRpcFunction({
226+
name: 'devframe-streaming-chat:tokenize',
227+
type: 'generator',
228+
args: [v.object({
229+
prompt: v.string(),
230+
intervalMs: v.optional(v.number(), 35),
231+
})],
232+
yields: v.string(),
233+
replayWindow: 1024,
234+
async* handler({ prompt, intervalMs = 35 }) {
235+
const ctx = getCurrentRpcStream()
236+
for (const token of fakeTokens(prompt)) {
237+
if (ctx?.signal.aborted)
238+
return
239+
yield token
240+
await new Promise(r => setTimeout(r, intervalMs))
241+
}
242+
},
243+
}))
244+
216245
ctx.views.hostStatic(BASE_PATH, distDir)
217246
ctx.docks.register({
218247
id: 'devframe-streaming-chat',

devframe/packages/devframe/src/client/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ export * from './context'
22
export * from './docks'
33
export * from './rpc'
44
export { getDevToolsRpcClient as connectDevtool } from './rpc'
5+
export * from './rpc-generators'
56
export * from './rpc-streaming'

0 commit comments

Comments
 (0)