Skip to content

Commit 01770cf

Browse files
committed
version upgrades
1 parent 0496388 commit 01770cf

File tree

5 files changed

+549
-3
lines changed

5 files changed

+549
-3
lines changed

docs/ai-chat/backend.mdx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,10 @@ The `reason` field tells you why messages are being prepared:
791791
| `"compaction-rebuild"` | Rebuilding from a previous compaction summary |
792792
| `"compaction-result"` | Fresh compaction just produced these messages |
793793

794+
### Version upgrades
795+
796+
Chat agent runs are pinned to the worker version they started on. When you deploy a new version, suspended runs resume on the old code. Call `chat.requestUpgrade()` in `onTurnStart` to skip `run()` and exit immediately — the transport re-triggers the same message on the latest version. See the [Version Upgrades pattern](/ai-chat/patterns/version-upgrades) for the full guide.
797+
794798
### Runtime configuration
795799

796800
#### chat.setTurnTimeout()

docs/ai-chat/client-protocol.mdx

Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
---
2+
title: "Client Protocol"
3+
sidebarTitle: "Client Protocol"
4+
description: "The wire protocol for building custom chat transports — how clients communicate with chat agents over input streams and SSE."
5+
---
6+
7+
This page documents the protocol that chat clients use to communicate with `chat.agent()` tasks. Use this if you're building a custom transport (e.g., for a Slack bot, CLI tool, or native app) instead of using the built-in `TriggerChatTransport` or `AgentChat`.
8+
9+
<Note>
10+
Most users don't need this. Use [`TriggerChatTransport`](/ai-chat/frontend) for browser apps or [`AgentChat`](/ai-chat/server-chat) for server-side code. This page is for building your own from scratch.
11+
</Note>
12+
13+
## Overview
14+
15+
The protocol has three parts:
16+
17+
1. **Trigger** — start a new run by calling the task trigger API
18+
2. **Input streams** — send messages and signals to a running agent
19+
3. **Output stream** — subscribe to the agent's response via SSE
20+
21+
```mermaid
22+
sequenceDiagram
23+
participant Client
24+
participant API as Trigger.dev API
25+
participant Agent as Chat Agent Run
26+
27+
Client->>API: POST /api/v1/tasks/{taskId}/trigger (first message)
28+
API-->>Client: { id: runId, publicAccessToken }
29+
Client->>API: GET /realtime/v1/streams/{runId}/chat (SSE subscribe)
30+
Agent-->>Client: UIMessageChunk stream...
31+
Agent-->>Client: { type: "trigger:turn-complete" }
32+
Client->>API: POST /realtime/v1/streams/{runId}/input/chat-messages (next message)
33+
Agent-->>Client: UIMessageChunk stream...
34+
Agent-->>Client: { type: "trigger:turn-complete" }
35+
```
36+
37+
## Step 1: Trigger the first run
38+
39+
Start a conversation by triggering the agent task. The payload follows the `ChatTaskWirePayload` shape:
40+
41+
```bash
42+
POST /api/v1/tasks/{taskId}/trigger
43+
Authorization: Bearer <secret-key-or-jwt>
44+
Content-Type: application/json
45+
46+
{
47+
"payload": {
48+
"messages": [
49+
{
50+
"id": "msg-1",
51+
"role": "user",
52+
"parts": [{ "type": "text", "text": "Hello!" }]
53+
}
54+
],
55+
"chatId": "conversation-123",
56+
"trigger": "submit-message",
57+
"metadata": { "userId": "user-456" }
58+
},
59+
"options": {
60+
"tags": ["chat:conversation-123"]
61+
}
62+
}
63+
```
64+
65+
The response body contains the `runId`:
66+
67+
```json
68+
{
69+
"id": "run_abc123"
70+
}
71+
```
72+
73+
The **response headers** contain the public access token (a JWT scoped to this run):
74+
75+
The `x-trigger-jwt` header contains a JWT with `read:runs:{runId}` and `write:inputStreams:{runId}` scopes. Use this for all stream operations.
76+
77+
Store the `runId` and the `x-trigger-jwt` value — you need both for input streams and SSE.
78+
79+
<Note>
80+
The built-in SDK clients (`TriggerChatTransport`, `AgentChat`) extract the JWT from the response header automatically. If you're using the `ApiClient` from `@trigger.dev/core/v3`, `triggerTask()` returns `{ id, publicAccessToken }` with the header already extracted.
81+
</Note>
82+
83+
### Preloading (optional)
84+
85+
To preload an agent before the first message, trigger with `"trigger": "preload"` and an empty `messages` array:
86+
87+
```json
88+
{
89+
"payload": {
90+
"messages": [],
91+
"chatId": "conversation-123",
92+
"trigger": "preload",
93+
"metadata": { "userId": "user-456" }
94+
}
95+
}
96+
```
97+
98+
The agent starts, runs `onPreload`, and waits for the first real message via the input stream.
99+
100+
## Step 2: Subscribe to the output stream
101+
102+
Subscribe to the agent's response via SSE:
103+
104+
```
105+
GET /realtime/v1/streams/{runId}/chat
106+
Authorization: Bearer <publicAccessToken>
107+
Accept: text/event-stream
108+
```
109+
110+
### Stream format (S2)
111+
112+
The output stream uses [S2](https://s2.dev) (a durable streaming service) under the hood. SSE events arrive as **batches** — each event has `event: batch` and a `data` field containing an array of records:
113+
114+
```json
115+
event: batch
116+
data: {
117+
"records": [
118+
{
119+
"body": "{\"data\": {\"type\": \"text-delta\", \"delta\": \"Hello\"}, \"id\": \"abc123\"}",
120+
"seq_num": 1,
121+
"timestamp": 1712150400000
122+
},
123+
{
124+
"body": "{\"data\": {\"type\": \"text-delta\", \"delta\": \" world\"}, \"id\": \"def456\"}",
125+
"seq_num": 2,
126+
"timestamp": 1712150400001
127+
}
128+
]
129+
}
130+
```
131+
132+
Each record's `body` is a JSON string containing `{ data, id }`. The `data` field is the actual `UIMessageChunk`. The `seq_num` is used for stream resumption.
133+
134+
**Recommended:** Use `SSEStreamSubscription` from `@trigger.dev/core/v3` to handle parsing automatically — it takes care of batch decoding, deduplication, and resume tracking:
135+
136+
```ts
137+
import { SSEStreamSubscription } from "@trigger.dev/core/v3";
138+
139+
const subscription = new SSEStreamSubscription(
140+
`${baseUrl}/realtime/v1/streams/${runId}/chat`,
141+
{
142+
headers: { Authorization: `Bearer ${publicAccessToken}` },
143+
timeoutInSeconds: 120,
144+
}
145+
);
146+
147+
const stream = await subscription.subscribe();
148+
const reader = stream.getReader();
149+
150+
while (true) {
151+
const { done, value } = await reader.read();
152+
if (done) break;
153+
154+
// value is { id: string, chunk: UIMessageChunk, timestamp: number }
155+
const chunk = value.chunk;
156+
157+
if (chunk.type === "trigger:turn-complete") break;
158+
if (chunk.type === "text-delta") process.stdout.write(chunk.delta);
159+
}
160+
```
161+
162+
If you prefer to parse the S2 protocol yourself, see the [S2 documentation](https://s2.dev/docs) for the full SSE batch protocol reference.
163+
164+
### Chunk types
165+
166+
Each chunk's `data` field is a `UIMessageChunk` from the [AI SDK](https://ai-sdk.dev/docs/ai-sdk-ui/ui-message-stream). The stream contains standard AI SDK chunk types (`text-delta`, `reasoning-delta`, `tool-input-available`, `tool-output-available`, `error`, etc.) plus two Trigger.dev-specific control chunks.
167+
168+
See the [AI SDK UIMessageStream documentation](https://ai-sdk.dev/docs/ai-sdk-ui/ui-message-stream) for the full list of chunk types and their shapes.
169+
170+
### `trigger:turn-complete`
171+
172+
Signals that the agent's turn is finished — stop reading and wait for user input.
173+
174+
```json
175+
{
176+
"type": "trigger:turn-complete",
177+
"publicAccessToken": "eyJ..."
178+
}
179+
```
180+
181+
| Field | Type | Description |
182+
| --- | --- | --- |
183+
| `type` | `"trigger:turn-complete"` | Always this string |
184+
| `publicAccessToken` | `string` (optional) | A refreshed JWT for this run. If present, replace your stored token with this one — the previous token may be close to expiry. |
185+
186+
When you receive this chunk:
187+
1. Update `publicAccessToken` if one is included
188+
2. Close the stream reader
189+
3. Wait for the next user message before subscribing again
190+
191+
### `trigger:upgrade-required`
192+
193+
Signals that the agent cannot handle this message on its current version and the client should retry on a new run. This is emitted when the agent calls [`chat.requestUpgrade()`](/ai-chat/patterns/version-upgrades) before processing the turn.
194+
195+
```json
196+
{
197+
"type": "trigger:upgrade-required"
198+
}
199+
```
200+
201+
When you receive this chunk:
202+
1. Close the stream reader
203+
2. Clear the current session
204+
3. Immediately trigger a **new run** with the full message history and `continuation: true` (same as [Step 4: Handle continuations](#step-4-handle-continuations))
205+
4. Subscribe to the new run's stream and pipe it through to the consumer
206+
207+
The user's message is **not lost** — it gets replayed on the new version. The built-in clients (`TriggerChatTransport`, `AgentChat`) handle this transparently. The consumer sees a seamless response from the upgraded agent.
208+
209+
### Resuming a stream
210+
211+
If the SSE connection drops, reconnect with the `Last-Event-ID` header set to the last `seq_num` you received:
212+
213+
```
214+
GET /realtime/v1/streams/{runId}/chat
215+
Authorization: Bearer <publicAccessToken>
216+
Last-Event-ID: 42
217+
```
218+
219+
`SSEStreamSubscription` tracks this automatically via its `lastEventId` option.
220+
221+
## Step 3: Send subsequent messages
222+
223+
After the first turn, send messages via the run's input stream instead of triggering a new run:
224+
225+
```bash
226+
POST /realtime/v1/streams/{runId}/input/chat-messages
227+
Authorization: Bearer <publicAccessToken>
228+
Content-Type: application/json
229+
230+
{
231+
"data": {
232+
"messages": [
233+
{
234+
"id": "msg-2",
235+
"role": "user",
236+
"parts": [{ "type": "text", "text": "Tell me more" }]
237+
}
238+
],
239+
"chatId": "conversation-123",
240+
"trigger": "submit-message",
241+
"metadata": { "userId": "user-456" }
242+
}
243+
}
244+
```
245+
246+
Note the `{ "data": ... }` wrapper — the input stream API wraps the payload in a `data` field.
247+
248+
After sending, subscribe to the output stream again (same URL, same auth) to receive the response.
249+
250+
<Warning>
251+
On turn 2+, only send the **new** message(s) in the `messages` array — not the full history. The agent accumulates the conversation internally. On turn 1 (or after a continuation), send the **full** message history.
252+
</Warning>
253+
254+
## Pending and steering messages
255+
256+
You can send messages to the agent **while it's still streaming a response**. These are called pending messages — the agent receives them mid-turn and can inject them between tool-call steps.
257+
258+
Send a pending message to the same `chat-messages` input stream:
259+
260+
```bash
261+
POST /realtime/v1/streams/{runId}/input/chat-messages
262+
Authorization: Bearer <publicAccessToken>
263+
Content-Type: application/json
264+
265+
{
266+
"data": {
267+
"messages": [
268+
{
269+
"id": "msg-steering-1",
270+
"role": "user",
271+
"parts": [{ "type": "text", "text": "Actually, focus on the security issues first" }]
272+
}
273+
],
274+
"chatId": "conversation-123",
275+
"trigger": "submit-message",
276+
"metadata": { "userId": "user-456" }
277+
}
278+
}
279+
```
280+
281+
This is the same endpoint and format as a normal message. The difference is timing — the agent is already streaming. What happens to the message depends on the agent's `pendingMessages` configuration:
282+
283+
- **With `pendingMessages.shouldInject`**: The message is injected into the model's context at the next `prepareStep` boundary (between tool-call steps). The agent sees it and can adjust its behavior mid-response.
284+
- **Without `pendingMessages` config**: The message queues for the next turn. It becomes the `currentWirePayload` for the following turn, skipping the wait-for-message phase.
285+
286+
See [Pending Messages](/ai-chat/pending-messages) for how to configure the agent side.
287+
288+
<Note>
289+
Unlike a normal `sendMessage`, pending messages should **not** cancel the active stream subscription. Keep reading the current response stream — the agent incorporates the pending message into the same turn or queues it for the next one.
290+
</Note>
291+
292+
## Step 4: Handle continuations
293+
294+
A run can end for several reasons: idle timeout, max turns reached, `chat.requestUpgrade()`, or cancellation. When this happens, the input stream POST will fail (400 "Cannot send to input stream on a completed run").
295+
296+
When this error occurs, trigger a **new run** with the full message history and `continuation: true`:
297+
298+
```json
299+
{
300+
"payload": {
301+
"messages": [/* full UIMessage history */],
302+
"chatId": "conversation-123",
303+
"trigger": "submit-message",
304+
"metadata": { "userId": "user-456" },
305+
"continuation": true,
306+
"previousRunId": "run_abc123"
307+
}
308+
}
309+
```
310+
311+
The new run picks up the latest deployed version automatically. The agent's `onChatStart` hook receives `continuation: true` and `previousRunId` so it can distinguish from a brand new conversation.
312+
313+
<Tip>
314+
This is how [version upgrades](/ai-chat/patterns/version-upgrades) work transparently — the agent calls `chat.requestUpgrade()`, the run exits, and the client's next message triggers a continuation on the new version. No special handling needed beyond the standard continuation flow.
315+
</Tip>
316+
317+
## Stopping and closing
318+
319+
### Stop the current turn
320+
321+
Send a stop signal to interrupt the agent mid-response:
322+
323+
```bash
324+
POST /realtime/v1/streams/{runId}/input/chat-stop
325+
Authorization: Bearer <publicAccessToken>
326+
Content-Type: application/json
327+
328+
{
329+
"data": { "stop": true }
330+
}
331+
```
332+
333+
The agent's stop signal fires, `streamText` aborts, and a `trigger:turn-complete` chunk is emitted.
334+
335+
### Close the conversation
336+
337+
Send a close signal to end the conversation gracefully:
338+
339+
```bash
340+
POST /realtime/v1/streams/{runId}/input/chat-messages
341+
Authorization: Bearer <publicAccessToken>
342+
Content-Type: application/json
343+
344+
{
345+
"data": {
346+
"messages": [],
347+
"chatId": "conversation-123",
348+
"trigger": "close"
349+
}
350+
}
351+
```
352+
353+
The agent exits its loop and the run completes. If you skip this, the agent closes on its own when the idle/turn timeout expires.
354+
355+
## Session state
356+
357+
A client needs to track per-conversation:
358+
359+
| Field | Description |
360+
| --- | --- |
361+
| `chatId` | Stable conversation ID (survives continuations) |
362+
| `runId` | Current run ID (changes on continuation) |
363+
| `publicAccessToken` | JWT for stream auth (refreshed on each turn-complete) |
364+
| `lastEventId` | Last SSE event ID (for stream resumption) |
365+
366+
On continuation, `runId` and `publicAccessToken` change. `chatId` stays the same.
367+
368+
## Authentication
369+
370+
| Operation | Auth |
371+
| --- | --- |
372+
| Trigger task | Secret API key or scoped JWT with `write:tasks` |
373+
| Input stream POST | JWT with `write:inputStreams` scope for the run |
374+
| Output stream GET | JWT with `read:runs` scope for the run |
375+
376+
The `publicAccessToken` returned from the trigger response has both `read:runs` and `write:inputStreams` scopes for the run. Use it for all stream operations.
377+
378+
## See also
379+
380+
- [`TriggerChatTransport`](/ai-chat/frontend) — built-in frontend transport (implements this protocol)
381+
- [`AgentChat`](/ai-chat/server-chat) — built-in server-side client (implements this protocol)
382+
- [Backend lifecycle](/ai-chat/backend#lifecycle-hooks) — what the agent does on each event
383+
- [Version upgrades](/ai-chat/patterns/version-upgrades) — how `chat.requestUpgrade()` uses continuations

0 commit comments

Comments
 (0)