Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion devframe/docs/.vitepress/sidebar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export default function devframeSidebar(prefix = ''): DefaultTheme.SidebarItem[]
text: 'Error Reference',
link: `${prefix}/errors/`,
collapsed: true,
items: Array.from({ length: 32 }, (_, i) => {
items: Array.from({ length: 36 }, (_, i) => {
const code = `DF${String(i + 1).padStart(4, '0')}`
return { text: code, link: `${prefix}/errors/${code}` }
}),
Expand Down
50 changes: 50 additions & 0 deletions devframe/docs/errors/DF0033.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
outline: deep
---

# DF0033: Generator with Agent Exposure

> Package: `devframe`

## Message

> Generator RPC function "`{name}`" has `agent` set, which is not supported in the current release.

## Cause

`type: 'generator'` RPC functions stream their yields through the existing streaming-channel transport, which currently uses `structured-clone-es` regardless of the source function's serialization preferences. Agent (MCP) exposure requires strict JSON serialization for tool inputs and outputs, so generator functions cannot be exposed as agent tools yet.

Streaming-MCP support is planned as a follow-up.

## Example

```ts
defineRpcFunction({
name: 'plugin:tail-logs',
type: 'generator',
yields: v.string(),
agent: { description: 'Tail server logs' }, // ← rejected
async* handler() {
yield 'log line 1'
},
})
```

## Fix

Remove the `agent` field, or change the function type to `'query'` / `'static'` if it can return a single value:

```ts
defineRpcFunction({
name: 'plugin:tail-logs',
type: 'generator',
yields: v.string(),
async* handler() {
yield 'log line 1'
},
})
```

## Source

`packages/devframe/src/rpc/validation.ts`
51 changes: 51 additions & 0 deletions devframe/docs/errors/DF0034.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
outline: deep
---

# DF0034: Generator Handler Not Async-Iterable

> Package: `devframe`

## Message

> Generator RPC function "`{name}`" handler did not return an `AsyncIterable`.

## Cause

A `type: 'generator'` RPC function expects its handler to return an `AsyncIterable<Y>` — typically by being declared as `async function*`. The framework drives the iterator with `for await`, so it must implement `[Symbol.asyncIterator]`.

This error is raised at runtime on the first call, not at registration, because TypeScript cannot statically distinguish `async function*` from `async function` in all cases (e.g. when the handler is supplied via `setup()`).

## Example

```ts
// ❌ Wrong — async function (not generator) returning a string
defineRpcFunction({
name: 'plugin:tokens',
type: 'generator',
yields: v.string(),
handler: async () => 'hello',
})
```

## Fix

Declare the handler as `async function*` and `yield` instead of `return`:

```ts
defineRpcFunction({
name: 'plugin:tokens',
type: 'generator',
yields: v.string(),
async* handler() {
yield 'hello'
yield 'world'
},
})
```

If the handler comes from `setup()`, ensure `setup()` returns `{ handler: async function*() { ... } }`.

## Source

`packages/devframe/src/node/rpc-generators.ts`
53 changes: 53 additions & 0 deletions devframe/docs/errors/DF0035.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
---
outline: deep
---

# DF0035: Generator Missing Yields Schema

> Package: `devframe`

## Message

> Generator RPC function "`{name}`" declares `args` but no `yields` schema — yielded value type cannot be inferred.

## Cause

When a `type: 'generator'` RPC function declares an `args` schema, the framework expects a matching `yields` schema for end-to-end type inference. Without it, `rpc.call(name, ...)` resolves to `Promise<StreamReader<unknown>>` on the client, defeating the purpose of having validated arguments.

This mirrors the requirement that `query`-typed functions with `args` also declare `returns`.

## Example

```ts
// ❌ Wrong — args present but no yields schema
defineRpcFunction({
name: 'plugin:search',
type: 'generator',
args: [v.object({ query: v.string() })],
async* handler({ query }) {
yield query
},
})
```

## Fix

Add a `yields` schema describing the type of each yielded value:

```ts
defineRpcFunction({
name: 'plugin:search',
type: 'generator',
args: [v.object({ query: v.string() })],
yields: v.string(),
async* handler({ query }) {
yield query
},
})
```

If you don't need argument validation either, drop both schemas — the call type falls back to `() => Promise<StreamReader<unknown>>`.

## Source

`packages/devframe/src/rpc/validation.ts`
49 changes: 49 additions & 0 deletions devframe/docs/errors/DF0036.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
outline: deep
---

# DF0036: Generator Has Inapplicable Option

> Package: `devframe`

## Message

> Generator RPC function "`{name}`" sets `{option}`, which has no effect on generator-typed functions.

## Cause

Generators are streaming primitives — they yield a sequence of values over time rather than returning a single cacheable result. Options that only make sense for request-response RPC types are rejected at registration:

- `cacheable` — caching a streaming result would replay a stale `streamId` pointing to a closed stream.
- `jsonSerializable: true` — generator yields flow through the existing streaming-channel transport, which uses `structured-clone-es` regardless. The flag is misleading on a generator.
- `dump` and `snapshot` — already handled by [`DF0027`](./DF0027) and [`DF0028`](./DF0028).

## Example

```ts
// ❌ Wrong — `cacheable` has no effect on a generator
defineRpcFunction({
name: 'plugin:tokens',
type: 'generator',
cacheable: true,
yields: v.string(),
async* handler() { yield 'hi' },
})
```

## Fix

Remove the option, or change the function type to one that supports it:

```ts
defineRpcFunction({
name: 'plugin:tokens',
type: 'generator',
yields: v.string(),
async* handler() { yield 'hi' },
})
```

## Source

`packages/devframe/src/rpc/validation.ts`
4 changes: 4 additions & 0 deletions devframe/docs/errors/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ Emitted by `devframe` — framework-neutral host / shared-state / auth surface.
| [DF0030](./DF0030) | error | Unknown Stream ID | — |
| [DF0031](./DF0031) | error | Write to Closed Stream | — |
| [DF0032](./DF0032) | error | Streaming Channel Already Registered | — |
| [DF0033](./DF0033) | error | Generator with Agent Exposure | — |
| [DF0034](./DF0034) | error | Generator Handler Not Async-Iterable | — |
| [DF0035](./DF0035) | error | Generator Missing Yields Schema | — |
| [DF0036](./DF0036) | error | Generator Has Inapplicable Option | — |
95 changes: 92 additions & 3 deletions devframe/docs/guide/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ outline: deep

Devframe's streaming-channel API provides server→client push for chunk-style data — chat deltas, log lines, build progress, anything you'd otherwise express as a sequence of fire-and-forget events. It builds on the same WebSocket transport as the rest of the RPC layer, but adds the conventions every chunked feed needs: stream IDs, cooperative cancellation, replay on reconnect, and first-class **Web Streams** interop.

For the common case of "I have an `async function*` and I want to ship its yields to the client", reach for [Async Generator RPC](#async-generator-rpc) below — it auto-allocates the sink, wires cancellation, and the client receives a ready-to-iterate `StreamReader<Y>` from `rpc.call(...)`. Drop down to the lower-level channel API in this guide when you need fan-out, late-replay across multiple subscribers, or client-to-server uploads.

## Overview

```mermaid
Expand Down Expand Up @@ -233,8 +235,95 @@ If you need authoritative state rather than every intermediate value, prefer [sh
| Replay on reconnect | Fire-and-forget signaling | Diff-based sync between clients |
| Client-to-server uploads (files, mic frames) | | |

## Async Generator RPC

The streaming-channel API is the foundation; the most common shape — "stream the yields of an `async function*` to the caller" — has a higher-level wrapper that hides the channel altogether.

### Defining a generator

Set `type: 'generator'` on `defineRpcFunction` and write the handler as `async function*`:

```ts
import { defineRpcFunction } from 'devframe'
import { getCurrentRpcStream } from 'devframe/node'
import * as v from 'valibot'

defineRpcFunction({
name: 'my-devtool:chat',
type: 'generator',
args: [v.object({ prompt: v.string() })],
yields: v.string(),
async* handler({ prompt }) {
const { signal } = getCurrentRpcStream()!
for (const token of fakeTokens(prompt)) {
if (signal.aborted)
return
yield token
}
},
})
```

`getCurrentRpcStream()` is `AsyncLocalStorage`-backed (mirrors `getCurrentRpcSession()`); inside the generator body it returns `{ signal, streamId, session }`. Outside, it returns `undefined`. Poll `signal.aborted` and exit cooperatively when the consumer cancels.

### Calling from the client

```ts
import { connectDevtool } from 'devframe/client'

const rpc = await connectDevtool()
const reader = await rpc.call('my-devtool:chat', { prompt: 'Hi' })

for await (const token of reader)
appendToken(token)

// or pipe out as a Web Stream:
await reader.readable.pipeTo(downloadWritable)

reader.cancel() // sends cancel upstream; server's `signal` flips
```

`rpc.call` resolves to `Promise<StreamReader<Y>>` for generator-typed functions. The reader is the same shape as `rpc.streaming.subscribe()` — it's both `AsyncIterable<Y>` and exposes `.readable: ReadableStream<Y>`. Pick one surface per reader (they share an internal queue).

### Per-function options

| Field | Default | Effect |
|-------|---------|--------|
| `yields` | — | Valibot schema for each yielded value. Optional; required if `args` is set ([`DF0035`](../errors/DF0035)). Drives client-side type inference of `StreamReader<Y>`. |
| `replayWindow` | `256` | Per-stream ring-buffer size. Floored to `1` — the client subscribe lands a few ms after the wrapper allocates the sink, so early yields must be replayable. |
| `closedStreamRetention` | `30_000` | Milliseconds the stream is held open for late subscribers after the producer closes. Mirrors the channel-level option. |

### Local invocation

Server-side callers can iterate a generator without paying for the streaming round-trip:

```ts
import { invokeLocalGenerator } from 'devframe/node'

for await (const token of await invokeLocalGenerator<string>(rpc, 'my-devtool:chat', { prompt: 'Hi' })) {
// process tokens directly — no transport, no sink allocation
}
```

`invokeLocalGenerator` returns the bare `AsyncIterable<Y>` from the user's handler. `getCurrentRpcStream()` returns `undefined` in this path (no stream, no signal).

### What you can't put on a generator

- `agent: { ... }` — streaming-MCP exposure is deferred. ([`DF0033`](../errors/DF0033))
- `cacheable: true` — caching a streaming response would replay a stale `streamId`. ([`DF0036`](../errors/DF0036))
- `jsonSerializable: true` — chunks always travel via `structured-clone` regardless. ([`DF0036`](../errors/DF0036))
- `dump`, `snapshot` — streaming results don't have a "snapshot" semantics. ([`DF0027`](../errors/DF0027), [`DF0028`](../errors/DF0028))

### When to reach for the lower-level channel API

Generator RPC covers the 90% case. Drop down to `ctx.rpc.streaming.create(...)` directly when you need:

- **Fan-out** — multiple subscribers seeing the same stream from a single producer. Generator RPC allocates a fresh stream per call.
- **Long-running side-channel streams** — terminal output, file watches, anything that lives beyond a single RPC call.
- **Client-to-server uploads** — `channel.openInbound()` with a paired action handler to allocate the id (see [Client-to-Server Uploads](#client-to-server-uploads)).

## Reference

- API surface: `RpcStreamingHost`, `RpcStreamingChannel<T>`, `StreamSink<T>`, `StreamReader<T>` in `devframe/types`.
- Working example: [`devframe/examples/devframe-streaming-chat`](https://github.com/vitejs/devtools/tree/main/devframe/examples/devframe-streaming-chat).
- Errors: [`DF0029`](../errors/DF0029) (overflow), [`DF0030`](../errors/DF0030) (unknown stream id), [`DF0031`](../errors/DF0031) (write to closed stream), [`DF0032`](../errors/DF0032) (channel name collision).
- API surface: `RpcStreamingHost`, `RpcStreamingChannel<T>`, `StreamSink<T>`, `StreamReader<T>` in `devframe/types`. Generator RPC: `getCurrentRpcStream`, `invokeLocalGenerator` in `devframe/node`.
- Working example: [`devframe/examples/devframe-streaming-chat`](https://github.com/vitejs/devtools/tree/main/devframe/examples/devframe-streaming-chat) — `:send` uses the channel API directly, `:tokenize` uses generator RPC.
- Errors: [`DF0029`](../errors/DF0029) (overflow), [`DF0030`](../errors/DF0030) (unknown stream id), [`DF0031`](../errors/DF0031) (write to closed stream), [`DF0032`](../errors/DF0032) (channel name collision), [`DF0033`](../errors/DF0033) (generator + agent), [`DF0034`](../errors/DF0034) (handler not async-iterable), [`DF0035`](../errors/DF0035) (generator missing yields), [`DF0036`](../errors/DF0036) (inapplicable generator option).
29 changes: 29 additions & 0 deletions devframe/examples/devframe-streaming-chat/src/devtool.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { fileURLToPath } from 'node:url'
import { defineRpcFunction } from 'devframe'
import { getCurrentRpcStream } from 'devframe/node'
import { defineDevtool } from 'devframe/types'
import { nanoid } from 'devframe/utils/nanoid'
import * as v from 'valibot'
Expand Down Expand Up @@ -213,6 +214,34 @@ export default defineDevtool({
},
}))

// ─── Generator RPC demo ──────────────────────────────────────────────
// The action above (`:send`) wires shared-state plumbing and streaming
// by hand. The generator below shows the lower-friction alternative
// for token feeds that don't need history side-effects: declare a
// handler as `async function*` and the framework auto-allocates a
// sink, drains yields onto it, and gives callers back a
// `StreamReader<string>`. `getCurrentRpcStream()` exposes the abort
// signal for cooperative cancellation.
ctx.rpc.register(defineRpcFunction({
name: 'devframe-streaming-chat:tokenize',
type: 'generator',
args: [v.object({
prompt: v.string(),
intervalMs: v.optional(v.number(), 35),
})],
yields: v.string(),
replayWindow: 1024,
async* handler({ prompt, intervalMs = 35 }) {
const ctx = getCurrentRpcStream()
for (const token of fakeTokens(prompt)) {
if (ctx?.signal.aborted)
return
yield token
await new Promise(r => setTimeout(r, intervalMs))
}
},
}))

ctx.views.hostStatic(BASE_PATH, distDir)
ctx.docks.register({
id: 'devframe-streaming-chat',
Expand Down
1 change: 1 addition & 0 deletions devframe/packages/devframe/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export * from './context'
export * from './docks'
export * from './rpc'
export { getDevToolsRpcClient as connectDevtool } from './rpc'
export * from './rpc-generators'
export * from './rpc-streaming'
Loading
Loading