Skip to content

Commit c202f3a

Browse files
ericallamclaude
andcommitted
feat(chat.agent): delta-only wire and history reconstruction at run boot
The wire used to ship the full UIMessage array on every send. Long chats with tool results crossed the 512 KiB body cap on /realtime/v1/sessions/{id}/in/append around turn 10-30 and stalled. Each .in/append now carries at most one new message. The agent rebuilds prior history at run boot from a JSON snapshot in object storage plus a wait=0 replay of the session.out tail. Snapshot writes are awaited after every onTurnComplete so they survive idle suspend; reads happen only at boot. Registering hydrateMessages short-circuits both paths. SDK: - ChatTaskWirePayload drops `messages`, adds `message` + `headStartMessages` - run boot reconstructs the accumulator from snapshot + replay - per-turn handling does delta merge, not full-history reset - chat.headStart bridges the first-ever turn via headStartMessages - onValidateMessages skipped when no incoming message - transports, mid-stream pendingMessages, head-start route handler updated Server: - new /realtime/v1/sessions/:session/:io/records endpoint (wait=0 drain) - ensureRunForSession sets continuation: true and previousRunId on cross-run continuations so the boot gate triggers snapshot read - /api/v1/packets/ and /api/v2/packets/ exempt from customer rate limits - s2Append retries transient failures with exponential backoff and surfaces undici cause codes in the diagnostic log References + tests: - 4 new SDK unit tests, 2 new webapp integration tests - mockChatAgent harness updated to the new wire - new reference tools: getCurrentTime, searchHackerNews, createGithubIssue - chat.tsx bridge exposes regenerate and error for smoke-test capture - chat-client-burst-test orchestrator for concurrent-send stress Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3ffc73f commit c202f3a

25 files changed

Lines changed: 3456 additions & 198 deletions
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
`chat.agent` wire is now delta-only — clients ship at most one new message per `.in/append` instead of the full `UIMessage[]` history. The agent rebuilds prior history at run boot from a JSON snapshot in object storage plus a `wait=0` replay of the `session.out` tail. Long chats stop hitting the 512 KiB body cap on `/realtime/v1/sessions/{id}/in/append`. Snapshot writes happen after every `onTurnComplete`, awaited so they survive idle suspend; reads happen only at run boot. Registering a `hydrateMessages` hook short-circuits both the snapshot read/write and the replay — the customer is the source of truth for history.
7+
8+
Custom transports that constructed `ChatTaskWirePayload` directly need to drop the `messages: UIMessage[]` field and use `message?: UIMessage` (singular). Built-in transports (`TriggerChatTransport`, `AgentChat`) handle the change below the customer-facing surface — most apps need no changes. Configure object-store env vars (`OBJECT_STORE_*`) on your webapp deployment if you haven't already; without an object store and without `hydrateMessages`, conversations don't survive run boundaries.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { $replica } from "~/db.server";
4+
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
5+
import {
6+
canonicalSessionAddressingKey,
7+
isSessionFriendlyIdForm,
8+
resolveSessionByIdOrExternalId,
9+
} from "~/services/realtime/sessions.server";
10+
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
11+
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
12+
13+
const ParamsSchema = z.object({
14+
session: z.string(),
15+
io: z.enum(["out", "in"]),
16+
});
17+
18+
const SearchSchema = z.object({
19+
// S2 sequence number — same cursor format as the SSE Last-Event-ID
20+
// (the SSE `id:` field on session-channel events is the seq_num,
21+
// stringified). Records returned have `seqNum > afterEventId`.
22+
afterEventId: z.string().regex(/^\d+$/).optional(),
23+
});
24+
25+
// GET: non-SSE, `wait=0` drain of a session channel. Returns a JSON body
26+
// `{ records: StreamRecord[] }` with whatever records exist after
27+
// `afterEventId` (or from the head if absent) and closes immediately.
28+
//
29+
// Used by the SDK's `replaySessionOutTail` at run boot — the SSE long-poll
30+
// path costs ~1s per fresh chat (the timeout duration) regardless of stream
31+
// content, which is unacceptable on the first-message TTFC budget. This
32+
// route gives the agent a cheap "what's there right now" peek instead.
33+
//
34+
// Same row-optional addressing as the SSE GET route in `…$io.ts`: we
35+
// resolve via `resolveSessionByIdOrExternalId` and only 404 for opaque
36+
// `session_*` friendlyIds (which must reference a real row). External-id
37+
// form falls through with `row: null` so the boot path doesn't 404 on a
38+
// fresh chat that hasn't written its first chunk yet.
39+
const loader = createLoaderApiRoute(
40+
{
41+
params: ParamsSchema,
42+
searchParams: SearchSchema,
43+
allowJWT: true,
44+
corsStrategy: "all",
45+
findResource: async (params, auth) => {
46+
const row = await resolveSessionByIdOrExternalId(
47+
$replica,
48+
auth.environment.id,
49+
params.session
50+
);
51+
if (!row && isSessionFriendlyIdForm(params.session)) {
52+
return undefined;
53+
}
54+
return {
55+
row,
56+
addressingKey: canonicalSessionAddressingKey(row, params.session),
57+
};
58+
},
59+
authorization: {
60+
action: "read",
61+
resource: ({ row, addressingKey }) => {
62+
const ids = new Set<string>([addressingKey]);
63+
if (row) {
64+
ids.add(row.friendlyId);
65+
if (row.externalId) ids.add(row.externalId);
66+
}
67+
return { sessions: [...ids] };
68+
},
69+
superScopes: ["read:sessions", "read:all", "admin"],
70+
},
71+
},
72+
async ({ params, authentication, resource, searchParams }) => {
73+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
74+
session: resource.row,
75+
organization: resource.row ? null : authentication.environment.organization,
76+
});
77+
78+
if (!(realtimeStream instanceof S2RealtimeStreams)) {
79+
return new Response("Session channels require the S2 realtime backend", {
80+
status: 501,
81+
});
82+
}
83+
84+
const afterSeqNum =
85+
searchParams.afterEventId !== undefined ? Number(searchParams.afterEventId) : undefined;
86+
87+
const records = await realtimeStream.readSessionStreamRecords(
88+
resource.addressingKey,
89+
params.io,
90+
afterSeqNum
91+
);
92+
93+
return json({ records });
94+
}
95+
);
96+
97+
export { loader };

apps/webapp/app/services/apiRateLimit.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({
6363
/^\/api\/v1\/runs\/[^\/]+\/attempts$/, // /api/v1/runs/$runFriendlyId/attempts
6464
/^\/api\/v1\/waitpoints\/tokens\/[^\/]+\/callback\/[^\/]+$/, // /api/v1/waitpoints/tokens/$waitpointFriendlyId/callback/$hash
6565
/^\/api\/v\d+\/deployments/, // /api/v{1,2,3,n}/deployments/*
66+
// Internal SDK plumbing — packets are presigned-URL handshakes for
67+
// payload uploads (v2 PUT) and downloads (v1 GET), authenticated via
68+
// run-scoped JWT, called once per task/turn boundary by the runtime.
69+
// Same shape as `/api/v1/runs/$runFriendlyId/attempts` above; not a
70+
// customer-facing surface so customer rate limits shouldn't apply.
71+
/^\/api\/v1\/packets\//,
72+
/^\/api\/v2\/packets\//,
6673
],
6774
log: {
6875
rejections: env.API_RATE_LIMIT_REJECTION_LOGS_ENABLED === "1",

apps/webapp/app/services/realtime/s2realtimeStreams.server.ts

Lines changed: 100 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -441,8 +441,16 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
441441

442442
// ---------- Internals: S2 REST ----------
443443
private async s2Append(stream: string, body: S2AppendInput): Promise<S2AppendAck> {
444-
// POST /v1/streams/{stream}/records (JSON)
445-
const res = await fetch(`${this.baseUrl}/streams/${encodeURIComponent(stream)}/records`, {
444+
// POST /v1/streams/{stream}/records (JSON).
445+
//
446+
// Retries transient failures (network errors and 5xx) up to 3 times with
447+
// exponential backoff. Undici's "fetch failed" errors observed locally
448+
// are pre-connection (DNS/TCP) so the request never reaches S2, making
449+
// retry safe — the alternative is a 500 surfacing to the SDK transport,
450+
// which then retries the whole `/in/append` round-trip and pollutes
451+
// logs. 4xx are not retried (genuine client errors).
452+
const url = `${this.baseUrl}/streams/${encodeURIComponent(stream)}/records`;
453+
const init: RequestInit = {
446454
method: "POST",
447455
headers: {
448456
Authorization: `Bearer ${this.token}`,
@@ -451,12 +459,60 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
451459
"S2-Basin": this.basin,
452460
},
453461
body: JSON.stringify(body),
454-
});
455-
if (!res.ok) {
456-
const text = await res.text().catch(() => "");
457-
throw new Error(`S2 append failed: ${res.status} ${res.statusText} ${text}`);
462+
};
463+
464+
const maxAttempts = 3;
465+
const backoffsMs = [100, 250, 600];
466+
let lastError: unknown;
467+
468+
for (let attempt = 0; attempt < maxAttempts; attempt++) {
469+
// The `try` only wraps `fetch` — once we have a Response we handle status
470+
// outside the catch, so a 4xx throw can't be swallowed and retried.
471+
let res: Response | undefined;
472+
try {
473+
res = await fetch(url, init);
474+
} catch (err) {
475+
lastError = err;
476+
}
477+
478+
if (res) {
479+
if (res.ok) {
480+
return (await res.json()) as S2AppendAck;
481+
}
482+
const text = await res.text().catch(() => "");
483+
const httpError = new Error(
484+
`S2 append failed: ${res.status} ${res.statusText} ${text}`
485+
);
486+
if (res.status >= 400 && res.status < 500) {
487+
// 4xx — caller-side problem (auth, malformed body, closed stream).
488+
// Retrying won't help.
489+
throw httpError;
490+
}
491+
// 5xx — retryable.
492+
lastError = httpError;
493+
}
494+
495+
const isLastAttempt = attempt === maxAttempts - 1;
496+
const diagnostics = describeFetchError(lastError);
497+
if (isLastAttempt) {
498+
this.logger.error("S2 append failed after retries", {
499+
stream,
500+
attempts: maxAttempts,
501+
...diagnostics,
502+
});
503+
break;
504+
}
505+
506+
this.logger.warn("S2 append transient failure, retrying", {
507+
stream,
508+
attempt: attempt + 1,
509+
nextDelayMs: backoffsMs[attempt],
510+
...diagnostics,
511+
});
512+
await new Promise((resolve) => setTimeout(resolve, backoffsMs[attempt]));
458513
}
459-
return (await res.json()) as S2AppendAck;
514+
515+
throw lastError instanceof Error ? lastError : new Error(String(lastError));
460516
}
461517

462518
private async getS2AccessToken(id: string): Promise<string> {
@@ -560,3 +616,40 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
560616
return Number.isFinite(n) && n >= 0 ? n + 1 : undefined;
561617
}
562618
}
619+
620+
// Pulls the underlying network error out of undici's generic "fetch failed".
621+
// undici sets `error.cause` to either a SystemError-shaped object with `code`
622+
// (e.g. `ECONNRESET`, `UND_ERR_SOCKET`, `ETIMEDOUT`), `errno`, and `syscall`,
623+
// or — for happy-eyeballs / multi-address connect attempts — an
624+
// `AggregateError` whose `errors[]` each carry their own code. Surfacing
625+
// those tells us whether failures are pre-connection (DNS / TCP), mid-stream
626+
// socket resets, or genuine S2 server errors.
627+
function describeFetchError(err: unknown): Record<string, unknown> {
628+
if (!(err instanceof Error)) {
629+
return { error: String(err) };
630+
}
631+
const out: Record<string, unknown> = {
632+
error: err.message,
633+
name: err.name,
634+
};
635+
const cause = (err as { cause?: unknown }).cause;
636+
if (cause && typeof cause === "object") {
637+
const c = cause as Record<string, unknown>;
638+
if (typeof c.code === "string") out.causeCode = c.code;
639+
if (typeof c.errno === "number" || typeof c.errno === "string") out.causeErrno = c.errno;
640+
if (typeof c.syscall === "string") out.causeSyscall = c.syscall;
641+
if (typeof c.message === "string") out.causeMessage = c.message;
642+
if (Array.isArray(c.errors)) {
643+
out.causeErrors = c.errors
644+
.filter((e: unknown): e is Error => e instanceof Error)
645+
.map((e) => ({
646+
message: e.message,
647+
code: (e as { code?: unknown }).code,
648+
syscall: (e as { syscall?: unknown }).syscall,
649+
address: (e as { address?: unknown }).address,
650+
port: (e as { port?: unknown }).port,
651+
}));
652+
}
653+
}
654+
return out;
655+
}

0 commit comments

Comments
 (0)