From 2fcc484a2b83a4c9bd5784acd1f3d707656f32f7 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Sat, 23 May 2026 12:00:31 +0100 Subject: [PATCH 1/5] fix(webapp): route OrganizationDataStoresRegistry writes through the writer prisma (#3722) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Bug The `OrganizationDataStoresRegistry` singleton in `apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts` was constructed with `$replica`. That client was then used by both the polling read path *and* by `addDataStore` / `updateDataStore` / `deleteDataStore` (and their backing `SecretStore.setSecret` upserts). The write methods route through the read replica, which Postgres rejects with **error code 25006**: ``` Invalid prisma.secretStore.upsert() invocation: ConnectorError(ConnectorError { user_facing_error: None, kind: QueryError(PostgresError { code: "25006", message: "cannot execute INSERT in a read-only transaction", ... }), transient: false }) ``` User-visible symptom: the admin `/admin/data-stores` "Add data store" form returns a 400 with this error wrapped, so no `OrganizationDataStore` row can ever be created via the UI. The read path (`loadFromDatabase` polling + `SecretStore.getSecret`) is unaffected because `findMany` + secret read are read-only. ## Fix Change the registry constructor to take both a writer and a replica: ```ts constructor(writer: PrismaClient, replica: PrismaClient | PrismaReplicaClient) ``` - `loadFromDatabase()` keeps using `_replica` (and its `SecretStore.getSecret` calls) — these are background cache-fillers, not on user-latency-sensitive paths. - `addDataStore` / `updateDataStore` / `deleteDataStore` (and their `SecretStore.setSecret` / `deleteSecret` calls) now use `_writer`. `organizationDataStoresRegistryInstance.server.ts` passes `(prisma, $replica)` from `~/db.server`. Test sites that constructed with `(prisma)` now pass `(prisma, prisma)` — the testcontainer exposes a single client, so the writer/replica split collapses to one connection. ## Files - `apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts` — constructor + read/write split - `apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts` — pass `prisma` alongside `$replica` - `apps/webapp/test/organizationDataStoresRegistry.test.ts` — 14 call sites bumped - `apps/webapp/test/clickhouseFactory.test.ts` — 5 call sites bumped ## Test plan - [x] Existing `organizationDataStoresRegistry.test.ts` + `clickhouseFactory.test.ts` still pass (constructor sites updated; behavior unchanged for tests). - [ ] After deploy to test cloud, retry `/admin/data-stores` "Add data store" form for the HIPAA org — should now succeed and the row should appear. - [ ] Verify the registry's polling reload picks up the new row within `ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS` (60s default) and the factory starts routing to the org-scoped instance. --- .../organizationDataStoresRegistry.server.ts | 35 +++++++++++++------ ...zationDataStoresRegistryInstance.server.ts | 4 +-- apps/webapp/test/clickhouseFactory.test.ts | 10 +++--- .../organizationDataStoresRegistry.test.ts | 26 +++++++------- 4 files changed, 44 insertions(+), 31 deletions(-) diff --git a/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts b/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts index dff741cab0f..07a4f1d3592 100644 --- a/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts +++ b/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts @@ -7,7 +7,19 @@ import { getSecretStore } from "../secrets/secretStore.server"; import { ClickhouseConnectionSchema } from "../clickhouse/clickhouseSecretSchemas.server"; export class OrganizationDataStoresRegistry { - private _prisma: PrismaClient | PrismaReplicaClient; + /** + * Writer client — used by every method that mutates state + * (`addDataStore` / `updateDataStore` / `deleteDataStore` and their backing + * SecretStore writes). Must be the primary connection; replica-targeted + * writes are rejected by Postgres with code 25006 (read-only transaction). + */ + private _writer: PrismaClient; + /** + * Read client used by the polling `loadFromDatabase()` (and its + * `SecretStore.getSecret` lookups). Can be a replica — these are + * cache-fillers, not on hot user-facing paths. + */ + private _replica: PrismaClient | PrismaReplicaClient; /** Keyed by `${organizationId}:${kind}` */ private _lookup: Map = new Map(); private _loaded = false; @@ -21,8 +33,9 @@ export class OrganizationDataStoresRegistry { */ readonly isReady: Promise; - constructor(prisma: PrismaClient | PrismaReplicaClient) { - this._prisma = prisma; + constructor(writer: PrismaClient, replica: PrismaClient | PrismaReplicaClient) { + this._writer = writer; + this._replica = replica; this.isReady = new Promise((resolve) => { this._readyResolve = resolve; }); @@ -37,10 +50,10 @@ export class OrganizationDataStoresRegistry { // same `${orgId}:${kind}` appears in multiple rows. The registry must never // throw on overlap — failing the load would break every customer, not just the // misconfigured orgs — so we keep the first entry and log an error instead. - const rows = await this._prisma.organizationDataStore.findMany({ + const rows = await this._replica.organizationDataStore.findMany({ orderBy: { key: "asc" }, }); - const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); + const secretStore = getSecretStore("DATABASE", { prismaClient: this._replica }); const lookup = new Map(); /** Tracks which row's `key` already owns each `${orgId}:${kind}` so we can log conflicts. */ @@ -125,10 +138,10 @@ export class OrganizationDataStoresRegistry { }) { const secretKey = this.#secretKey(key, kind); - const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); + const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer }); await secretStore.setSecret(secretKey, config); - return this._prisma.organizationDataStore.create({ + return this._writer.organizationDataStore.create({ data: { key, organizationIds, @@ -153,11 +166,11 @@ export class OrganizationDataStoresRegistry { const secretKey = this.#secretKey(key, kind); if (config) { - const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); + const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer }); await secretStore.setSecret(secretKey, config); } - return this._prisma.organizationDataStore.update({ + return this._writer.organizationDataStore.update({ where: { key, }, @@ -170,12 +183,12 @@ export class OrganizationDataStoresRegistry { async deleteDataStore({ key, kind }: { key: string; kind: DataStoreKind }) { const secretKey = this.#secretKey(key, kind); - const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); + const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer }); await secretStore.deleteSecret(secretKey).catch(() => { // Secret may not exist — proceed with deletion }); - await this._prisma.organizationDataStore.delete({ where: { key } }); + await this._writer.organizationDataStore.delete({ where: { key } }); } /** diff --git a/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts b/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts index 24ec572c5d5..239683965ea 100644 --- a/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts +++ b/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts @@ -1,5 +1,5 @@ import pRetry from "p-retry"; -import { $replica } from "~/db.server"; +import { $replica, prisma } from "~/db.server"; import { env } from "~/env.server"; import { logger } from "~/services/logger.server"; import { signalsEmitter } from "~/services/signals.server"; @@ -7,7 +7,7 @@ import { singleton } from "~/utils/singleton"; import { OrganizationDataStoresRegistry } from "./organizationDataStoresRegistry.server"; export const organizationDataStoresRegistry = singleton("organizationDataStoresRegistry", () => { - const registry = new OrganizationDataStoresRegistry($replica); + const registry = new OrganizationDataStoresRegistry(prisma, $replica); // Runs as soon as this singleton is created (first import of this module). The // registry’s `isReady` promise resolves when this eventually succeeds. diff --git a/apps/webapp/test/clickhouseFactory.test.ts b/apps/webapp/test/clickhouseFactory.test.ts index 501462ab677..7f19ea1f218 100644 --- a/apps/webapp/test/clickhouseFactory.test.ts +++ b/apps/webapp/test/clickhouseFactory.test.ts @@ -16,7 +16,7 @@ describe("ClickHouse Factory", () => { postgresTest( "returns default client when org has no data store", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.loadFromDatabase(); const factory = new ClickhouseFactory(registry); @@ -28,7 +28,7 @@ describe("ClickHouse Factory", () => { postgresTest( "returns org-specific client when a data store is configured", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "factory-store", @@ -52,7 +52,7 @@ describe("ClickHouse Factory", () => { postgresTest( "two orgs sharing the same data store get the same cached client", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "shared-factory-store", @@ -75,7 +75,7 @@ describe("ClickHouse Factory", () => { postgresTest( "two data stores with different URLs produce different clients", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "store-a", @@ -104,7 +104,7 @@ describe("ClickHouse Factory", () => { postgresTest( "after reload with a deleted store, org falls back to default", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "removable-store", diff --git a/apps/webapp/test/organizationDataStoresRegistry.test.ts b/apps/webapp/test/organizationDataStoresRegistry.test.ts index f8ff9dde41f..9d94e0447fe 100644 --- a/apps/webapp/test/organizationDataStoresRegistry.test.ts +++ b/apps/webapp/test/organizationDataStoresRegistry.test.ts @@ -13,19 +13,19 @@ const TEST_URL_2 = "https://default:password@clickhouse2.example.com:8443"; describe("OrganizationDataStoresRegistry", () => { postgresTest("isLoaded is false before loadFromDatabase", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); expect(registry.isLoaded).toBe(false); expect(registry.get("any-org", "CLICKHOUSE")).toBeNull(); }); postgresTest("isLoaded is true after loadFromDatabase", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.loadFromDatabase(); expect(registry.isLoaded).toBe(true); }); postgresTest("isReady resolves after loadFromDatabase", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); let resolved = false; registry.isReady.then(() => { resolved = true; @@ -36,13 +36,13 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("get returns null when no data stores exist", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.loadFromDatabase(); expect(registry.get("org-1", "CLICKHOUSE")).toBeNull(); }); postgresTest("addDataStore creates a row and stores the secret", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "test-store", @@ -63,7 +63,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("loadFromDatabase resolves secrets and makes orgs available via get", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "hipaa-store", @@ -81,7 +81,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("get returns null for orgs not in any data store", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "partial-store", @@ -97,7 +97,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("multiple orgs can share the same data store", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "shared-store", @@ -120,7 +120,7 @@ describe("OrganizationDataStoresRegistry", () => { postgresTest( "when an org appears in multiple data stores, first row by id asc wins", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); const sharedOrg = "org-dup-overlap"; await registry.addDataStore({ @@ -151,7 +151,7 @@ describe("OrganizationDataStoresRegistry", () => { ); postgresTest("updateDataStore updates organizationIds and rotates the secret", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "update-store", @@ -176,7 +176,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("reload picks up changes made after initial load", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.loadFromDatabase(); expect(registry.get("org-reload", "CLICKHOUSE")).toBeNull(); @@ -194,7 +194,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("deleteDataStore removes the row and its secret", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "delete-store", @@ -210,7 +210,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("after delete and reload, org no longer has a data store", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "ephemeral-store", From c0365d36fbcdba932f497eec8dc3752979aa32be Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 23 May 2026 16:08:10 +0100 Subject: [PATCH 2/5] fix(webapp): precise S2 record cap + CORS 413 on session append (#3720) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Two improvements to session `.in/append`: - Oversize-body 413 responses now carry CORS headers, so browser fetches see a readable status instead of an opaque `TypeError: Failed to fetch`. App-side retry-on-disconnect loops no longer spin forever on a permanently-rejected payload. - The per-record cap is now computed precisely against S2's actual ceiling instead of the conservative 512 KiB floor. Legitimate ~600-900 KiB tool outputs (search results, file content) now succeed; pathological all-quote content that would double under JSON escape still rejects cleanly. ## Design S2 enforces a per-record metered size of `8 + 2*H + Σ(header name + value) + body ≤ 1048576` bytes. With no record headers (our case), the budget reduces to `body ≤ 1048568`. Verified empirically against cloud S2 — append succeeds at metered=1048576 and 422s at 1048577 with `record must have metered size less than 1 MiB`. The old `MAX_APPEND_BODY_BYTES = 512 KiB` was derived by assuming worst-case JSON escape doubling (every byte becomes `\"` or `\\`), giving `(1 MiB - overhead) / 2`. Safe, but rejects ~half the legitimate input space. The new flow: 1. Pre-cap the HTTP body at 1 MiB (DoS guard against reading arbitrary garbage before we can compute the wrap). 2. After reading, `S2RealtimeStreams.#appendPartByName` computes `Buffer.byteLength(JSON.stringify({data: part, id: partId}), "utf8") + 8` and throws `S2RecordTooLargeError` (a `ServiceValidationError` with status 413) if it would exceed S2's ceiling. The route's existing error branch maps the throw to a 413 with a descriptive message. The 413 CORS fix is a single-line change in `apiBuilder.server.ts` — `wrapResponse` was being skipped on the body-too-large branch; every other error branch wraps; the 413 was the exception. ## Test plan - Empirically verified against cloud S2 with a boundary scan across `[1048568, 1048569, ..., 1048576]` and across H ∈ {0, 1×5 hdr bytes, 1×14 hdr bytes} — the formula matches exactly - Browser-side fetch on a 700 KiB POST now resolves with a readable `status: 413` (no `TypeError: Failed to fetch`) - A 900 KiB ASCII tool output now passes (would have 413'd at 512 KiB pre-fix) --- .server-changes/realtime-append-cap.md | 6 +++ ...ealtime.v1.sessions.$session.$io.append.ts | 18 ++++++--- ...ltime.v1.streams.$runId.input.$streamId.ts | 30 ++++++++++++--- ...ealtime.v1.sessions.$session.$io.append.ts | 10 +++-- .../realtime/s2realtimeStreams.server.ts | 38 ++++++++++++++++++- .../routeBuilders/apiBuilder.server.ts | 6 ++- 6 files changed, 90 insertions(+), 18 deletions(-) create mode 100644 .server-changes/realtime-append-cap.md diff --git a/.server-changes/realtime-append-cap.md b/.server-changes/realtime-append-cap.md new file mode 100644 index 00000000000..cfbd3b7fa1a --- /dev/null +++ b/.server-changes/realtime-append-cap.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Session `.in/append` returns readable 413s on oversize bodies (was failing browser fetches as opaque `TypeError: Failed to fetch`) and now rejects only records that would actually exceed S2's per-record ceiling, instead of guessing at a conservative pre-encoding cap. diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts index 792b6480d8d..dbdb9f47d52 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts @@ -27,12 +27,18 @@ const ParamsSchema = z.object({ // POST: server-side append of a single record to a session channel. Mirrors // the existing /realtime/v1/streams/:runId/:target/:streamId/append route, // scoped to a Session primitive. -// S2 enforces a 1 MiB per-record limit (metered as -// `8 + 2*H + Σ(header name+value) + body`). We cap the raw HTTP body at -// 512 KiB so the JSON wrapper (`{"data":"...","id":"..."}`), string -// escaping, and any future per-record header additions all stay comfortably -// below S2's ceiling. See https://s2.dev/docs/limits. -const MAX_APPEND_BODY_BYTES = 1024 * 512; +// +// The HTTP body cap here is just a DoS pre-guard — set generously at +// 1 MiB so we don't buffer arbitrarily large inputs before we can +// compute the wrapped size. The actual S2 per-record limit (verified +// empirically against cloud S2) is enforced precisely inside +// `S2RealtimeStreams.#appendPartByName` — it throws +// `S2RecordTooLargeError` (a `ServiceValidationError` with status +// 413) when the metered record size would exceed S2's 1 MiB ceiling +// after JSON wrapping. That lets legitimate bodies up to ~1023 KiB +// raw through (ASCII or low-escape content) while still rejecting +// pathological all-quote content that would double on wrap. +const MAX_APPEND_BODY_BYTES = 1024 * 1024; const { action, loader } = createActionApiRoute( { diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts index 335116043d1..a404e6a76ae 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts @@ -1,4 +1,5 @@ import { json } from "@remix-run/server-runtime"; +import { tryCatch } from "@trigger.dev/core/utils"; import { z } from "zod"; import { $replica } from "~/db.server"; import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; @@ -13,6 +14,7 @@ import { } from "~/services/routeBuilders/apiBuilder.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { engine } from "~/v3/runEngine.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; const ParamsSchema = z.object({ runId: z.string(), @@ -77,13 +79,29 @@ const { action } = createActionApiRoute( const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`; const record = JSON.stringify(body.data.data); - // Append the record to the per-stream S2 stream (auto-creates on first write) - await realtimeStream.appendPart( - record, - recordId, - run.friendlyId, - `$trigger.input:${params.streamId}` + // Append the record to the per-stream S2 stream (auto-creates on + // first write). `appendPart` can throw `S2RecordTooLargeError` (a + // `ServiceValidationError` with status 413) when the wrapped + // record exceeds S2's per-record ceiling — surface that as 413 + // rather than letting it propagate to the apiBuilder catch-all + // as a generic 500. + const [appendError] = await tryCatch( + realtimeStream.appendPart( + record, + recordId, + run.friendlyId, + `$trigger.input:${params.streamId}` + ) ); + if (appendError) { + if (appendError instanceof ServiceValidationError) { + return json( + { ok: false, error: appendError.message }, + { status: appendError.status ?? 422 } + ); + } + throw appendError; + } // Check Redis cache for a linked .wait() waitpoint (fast, no DB hit if none) // Get first, complete, then delete — so the mapping survives if completeWaitpoint throws diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts index 04d6ec2d8e2..77f03f4ce5c 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts @@ -24,10 +24,12 @@ const ParamsSchema = z.object({ io: z.enum(["out", "in"]), }); -// S2 record body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append -// route — keep it well under S2's 1 MiB per-record limit so JSON wrapping, -// string escaping, and any future per-record headers stay safe. -const MAX_APPEND_BODY_BYTES = 1024 * 512; +// HTTP body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append +// route — DoS pre-guard only. The actual S2 per-record limit is +// enforced precisely by `S2RealtimeStreams.#appendPartByName` +// (throws `S2RecordTooLargeError` with status 413 when the metered +// record size would exceed S2's 1 MiB ceiling after JSON wrapping). +const MAX_APPEND_BODY_BYTES = 1024 * 1024; // POST: Append a single record to a Session channel from the dashboard // playground. Mirrors the public `POST /realtime/v1/sessions/:session/:io/append` diff --git a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts index 07061071446..b91f47c6b3c 100644 --- a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts @@ -4,6 +4,36 @@ import { StreamIngestor, StreamRecord, StreamResponder, StreamResponseOptions } import { Logger, LogLevel } from "@trigger.dev/core/logger"; import { headerValue } from "@trigger.dev/core/v3"; import { randomUUID } from "node:crypto"; +import { ServiceValidationError } from "~/v3/services/common.server"; + +// S2's per-record metered-size limit. Verified empirically against +// cloud S2: append succeeds at metered=1048576 and 422s at 1048577 +// with `"record must have metered size less than 1 MiB"` (the "less +// than" wording is slightly off — the boundary is inclusive). +// +// Metered size formula: +// metered = 8 (record overhead) + 2*H + Σ(header name + value) + body +// where `body` is the unescaped record body length in UTF-8 bytes and +// `H` is the number of S2 record headers. +// +// We attach no record headers (H=0), so the budget reduces to: +// 8 + body ≤ 1048576 → body ≤ 1048568 +export const S2_MAX_METERED_BYTES = 1024 * 1024; // 1 MiB +export const S2_RECORD_BASE_OVERHEAD_BYTES = 8; + +/** + * Thrown when a record's metered size would exceed S2's hard per-record + * limit. Caught by the route handler and surfaced as 413. + */ +export class S2RecordTooLargeError extends ServiceValidationError { + constructor(public readonly meteredBytes: number) { + super( + `Record metered size ${meteredBytes} bytes exceeds the S2 per-record limit of ${S2_MAX_METERED_BYTES} bytes. Reduce tool-output size or split into smaller parts.`, + 413 + ); + this.name = "S2RecordTooLargeError"; + } +} export type S2RealtimeStreamsOptions = { // S2 @@ -181,8 +211,14 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { async #appendPartByName(part: string, partId: string, s2Stream: string): Promise { this.logger.debug(`S2 appending to stream`, { part, stream: s2Stream }); + const recordBody = JSON.stringify({ data: part, id: partId }); + const meteredBytes = Buffer.byteLength(recordBody, "utf8") + S2_RECORD_BASE_OVERHEAD_BYTES; + if (meteredBytes > S2_MAX_METERED_BYTES) { + throw new S2RecordTooLargeError(meteredBytes); + } + const result = await this.s2Append(s2Stream, { - records: [{ body: JSON.stringify({ data: part, id: partId }) }], + records: [{ body: recordBody }], }); this.logger.debug(`S2 append result`, { result }); diff --git a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts index a66de63e113..0c661a7e684 100644 --- a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts +++ b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts @@ -777,7 +777,11 @@ export function createActionApiRoute< const contentLength = request.headers.get("content-length"); if (!contentLength || parseInt(contentLength) > maxContentLength) { - return json({ error: "Request body too large" }, { status: 413 }); + return await wrapResponse( + request, + json({ error: "Request body too large" }, { status: 413 }), + corsStrategy !== "none" + ); } } From 75679c7518e7900721853ba0f2f1840ed2b74670 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 23 May 2026 17:09:39 +0100 Subject: [PATCH 3/5] fix(sdk): chat HITL continuations no longer break the next LLM call (#3719) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Multi-step reasoning agents with HITL tools (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) failed on `chat.addToolOutput(...)` continuations — either the wire payload blew the `.in/append` cap (reasoning blobs + tool inputs routinely > 512 KiB), or app-side slimming workarounds got overwritten server-side and the next LLM call landed a tool call with no `arguments`. Both modes are fixed. ## Design The per-turn merge in `chat.agent` now overlays only the tool-part state advances (`output-available` / `output-error` / `approval-responded` / `output-denied`) from the wire copy onto the hydrated/snapshot chain. Previously it replaced the entire message, which dropped `input`, reasoning, and text from the LLM's view whenever the wire was slim. In parallel, `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim the assistant message themselves on `submit-message` continuations: ship `{ id, role, parts: [] }`, everything else reconstructed server-side from `hydrateMessages` or the durable snapshot. Continuation payloads drop from 600 KiB – 1 MiB to ~1 KiB. `references/ai-chat` `aiChatHydrated.hydrateMessages` now upserts by id instead of pushing. With slim continuations, a blind push duplicates the assistant id in the returned chain — the merge updates the first match, the slim duplicate goes straight to `toModelMessages` with no `input`, and the LLM 4xx's. This is the canonical pattern customers should mirror in their own hydrate implementations. ## Test plan - 11 new tests (slim helper unit + slim+merge integration for HITL, approval, default no-hydrate branch) - Full SDK suite: 239 tests pass across 19 files - End-to-end sweep against `references/ai-chat`: 19 customer-side smoke tests green; HITL wire bodies confirmed at ~1 KiB (was 600 KiB+); no provider 4xx errors across OpenAI Responses or Anthropic --- .changeset/chat-slim-wire-merge.md | 31 + packages/trigger-sdk/src/v3/ai-shared.ts | 148 +++++ packages/trigger-sdk/src/v3/ai.ts | 204 ++++++- packages/trigger-sdk/src/v3/chat-client.ts | 11 +- packages/trigger-sdk/src/v3/chat.ts | 12 +- .../trigger-sdk/test/mockChatAgent.test.ts | 572 +++++++++++++++++- packages/trigger-sdk/test/wire-shape.test.ts | 292 +++++++++ references/ai-chat/src/trigger/chat.ts | 41 +- 8 files changed, 1256 insertions(+), 55 deletions(-) create mode 100644 .changeset/chat-slim-wire-merge.md diff --git a/.changeset/chat-slim-wire-merge.md b/.changeset/chat-slim-wire-merge.md new file mode 100644 index 00000000000..19ea48a8cdd --- /dev/null +++ b/.changeset/chat-slim-wire-merge.md @@ -0,0 +1,31 @@ +--- +"@trigger.dev/sdk": patch +--- + +Fix `chat.agent` HITL continuations on reasoning-heavy turns. Two changes that work together: + +- The per-turn merge now overlays the wire copy's tool-part state advancement onto the agent's existing chain — `state` + the matching resolution field (`output` / `errorText` / `approval`) come from the wire, everything else (text, reasoning, tool `input`, provider metadata) stays whatever the snapshot or `hydrateMessages` returned. Previously a full-message replace overwrote those fields with whatever the client shipped, so a slimmed wire copy landed a tool call with no `arguments` on the next LLM call. Covers `output-available` / `output-error` (HITL `addToolOutput`) and `approval-responded` / `output-denied` (approval flow). +- `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim assistant messages that carry advanced tool parts. The wire payload is just `{ id, role, parts: [] }` for `submit-message` continuations; everything else passes through. Reasoning blobs and full tool inputs no longer ride the wire on every `addToolOutput` / `addToolApproveResponse`, so continuation payloads stay well under the `.in/append` cap on long agent loops. + +Note: `onValidateMessages` receives the slim wire on HITL turns. If you call `validateUIMessages` from `ai` against the full `messages` array it will reject the slim assistant; filter to user messages (or skip on HITL turns) — see the updated docstring on `onValidateMessages` for the recommended pattern. + +For `hydrateMessages` hooks that persist the chain, this release also adds a small helper to the `@trigger.dev/sdk/ai` surface: + +```ts +import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai"; + +chat.agent({ + hydrateMessages: async ({ chatId, trigger, incomingMessages }) => { + const record = await db.chat.findUnique({ where: { id: chatId } }); + const stored = record?.messages ?? []; + if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { + await db.chat.update({ where: { id: chatId }, data: { messages: stored } }); + } + return stored; + }, +}); +``` + +It pushes fresh user messages by id, no-ops on HITL continuations (the incoming shares an id with the existing assistant — the runtime overlays the new tool-state advance), and skips on non-`submit-message` triggers. Returns `true` if it mutated `stored` so the caller knows whether to persist. + +Net effect: `chat.addToolOutput(...)` / `chat.addToolApproveResponse(...)` on multi-step reasoning agents (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) no longer blows the cap and no longer corrupts the LLM input. diff --git a/packages/trigger-sdk/src/v3/ai-shared.ts b/packages/trigger-sdk/src/v3/ai-shared.ts index 9370ea4a164..35b61910563 100644 --- a/packages/trigger-sdk/src/v3/ai-shared.ts +++ b/packages/trigger-sdk/src/v3/ai-shared.ts @@ -198,3 +198,151 @@ export type InferChatUIMessage = TTask extends Task< > ? TUIM : UIMessage; + +/** + * Upsert an incoming wire message into the customer's DB-backed chain + * inside a `hydrateMessages` hook. Returns `true` iff the chain was + * mutated (the caller should persist). + * + * Handles the three cases that matter: + * + * - **Non-submit-message trigger** (`regenerate-message` / `action`, + * or `submit-message` with no incoming): no-op. Returns `false`. + * - **Incoming id already in `stored`** (HITL `addToolOutput` / + * `addToolApproveResponse` continuation — the wire carries the + * existing assistant's id with a slim resolution payload): no-op. + * The runtime's per-turn merge overlays the new tool-state advance + * onto the existing entry; pushing again would duplicate the row + * in the chain you return, and the duplicate slim copy would hit + * `toModelMessages` with no `input`. Returns `false`. + * - **Incoming id not in `stored`** (typically a fresh user message + * on a new turn): push. Returns `true`. + * + * Mutates `stored` in place. The caller persists `stored`, not the + * return value. + * + * @example + * ```ts + * import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai"; + * + * chat.agent({ + * hydrateMessages: async ({ chatId, trigger, incomingMessages }) => { + * const record = await db.chat.findUnique({ where: { id: chatId } }); + * const stored = record?.messages ?? []; + * if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { + * await db.chat.update({ where: { id: chatId }, data: { messages: stored } }); + * } + * return stored; + * }, + * }); + * ``` + */ +export function upsertIncomingMessage( + stored: TMsg[], + event: { + trigger: "submit-message" | "regenerate-message" | "action"; + incomingMessages: TMsg[]; + } +): boolean { + if (event.trigger !== "submit-message") return false; + if (event.incomingMessages.length === 0) return false; + const newMsg = event.incomingMessages[event.incomingMessages.length - 1]; + if (!newMsg) return false; + if (newMsg.id) { + const existingIdx = stored.findIndex((m) => m.id === newMsg.id); + if (existingIdx !== -1) return false; + } + stored.push(newMsg); + return true; +} + +/** + * Tool-part states that the client advances and ships back over the wire. + * Covers HITL `addToolOutput` (output-available / output-error) and the + * approval flow (approval-responded / output-denied). `input-streaming` / + * `input-available` / `approval-requested` are server-emitted only — if + * we see them on the wire we treat them as no-ops and skip the slim/merge. + */ +function isWireAdvanceableToolState( + state: unknown +): state is "output-available" | "output-error" | "approval-responded" | "output-denied" { + return ( + state === "output-available" || + state === "output-error" || + state === "approval-responded" || + state === "output-denied" + ); +} + +/** Whether a tool-UI part is a static (`tool-${name}`) or dynamic tool. */ +function isToolPartType(type: unknown): boolean { + return typeof type === "string" && (type.startsWith("tool-") || type === "dynamic-tool"); +} + +/** + * Slim an outgoing assistant message before it ships on `submit-message`. + * + * When the client calls `addToolOutput(...)` to resolve a HITL tool (or + * `addToolApproveResponse(...)` to approve/deny one), the AI SDK turns + * it into a `submit-message` whose `messages.at(-1)` is the existing + * assistant message with the new state stitched onto a single tool + * part. On a reasoning-heavy multi-step turn, that full assistant + * message can be 600 KB – 1 MB (encrypted reasoning blobs, reasoning + * text, full tool `input` JSON, prior tool outputs) — well over the + * `.in/append` cap. + * + * The agent runtime only consumes the wire-advanced fields of those + * tool parts (state + output / errorText / approval). Everything else + * (text, reasoning, tool `input`) is rebuilt server-side from the + * durable snapshot or `hydrateMessages`. So we drop everything but + * the advanced tool parts here, and reduce those to just the fields + * the server overlays. + * + * The slim only fires when the assistant message carries at least one + * wire-advanceable tool part. Plain assistant resends (no resolved / + * approval-responded tool) and non-assistant messages pass through + * untouched. + * + * Pairs with the per-turn merge on the agent side + * (`mergeIncomingIntoHydrated` in `ai.ts`). + */ +export function slimSubmitMessageForWire( + message: TMsg +): TMsg { + if (!message) return message; + if (message.role !== "assistant") return message; + const parts = (message.parts ?? []) as any[]; + const advancedToolParts = parts.filter( + (p) => + p && + typeof p === "object" && + isToolPartType(p.type) && + isWireAdvanceableToolState(p.state) + ); + if (advancedToolParts.length === 0) return message; + const slimParts = advancedToolParts.map((p: any) => { + const base: Record = { + type: p.type, + toolCallId: p.toolCallId, + state: p.state, + }; + if (p.type === "dynamic-tool" && typeof p.toolName === "string") { + base.toolName = p.toolName; + } + if (p.state === "output-available") { + base.output = p.output; + if (p.approval !== undefined) base.approval = p.approval; + } else if (p.state === "output-error") { + if (p.errorText !== undefined) base.errorText = p.errorText; + if (p.approval !== undefined) base.approval = p.approval; + } else if (p.state === "approval-responded" || p.state === "output-denied") { + if (p.approval !== undefined) base.approval = p.approval; + } + return base; + }); + return { + id: message.id, + role: message.role, + parts: slimParts, + } as unknown as TMsg; +} diff --git a/packages/trigger-sdk/src/v3/ai.ts b/packages/trigger-sdk/src/v3/ai.ts index d5c176a4a56..d1a6a226023 100644 --- a/packages/trigger-sdk/src/v3/ai.ts +++ b/packages/trigger-sdk/src/v3/ai.ts @@ -2128,6 +2128,110 @@ function extractNewToolResultsFromHistory( return out; } +/** + * Per-turn merge of an incoming wire `UIMessage` onto the matching entry + * a `hydrateMessages` hook (or the default accumulator) provides. Used + * to fold tool-state advances from the client into the agent's + * authoritative chain without trusting the wire copy for fields the + * LLM consumes. + * + * `hydrated` is treated as the source of truth for everything outside + * tool-state advancement: text, reasoning blobs, provider metadata, + * and tool `input` all stay as hydrated had them. We only overlay + * tool parts whose incoming state is wire-advanced — `output-available` + * / `output-error` (HITL `addToolOutput`) or `approval-responded` / + * `output-denied` (approval flow) — and only the corresponding + * resolution fields (`output` / `errorText` / `approval`). Hydrated + * `input` and everything else stay put. + * + * Without this, a slim wire copy (which `TriggerChatTransport` / + * `AgentChat.sendRaw` ship by default on HITL continuations) would + * clobber the hydrated assistant — the next LLM call would receive a + * tool call with no `input` and 4xx. + * + * @internal + */ +function mergeIncomingIntoHydrated( + hydrated: TMsg, + incoming: UIMessage +): TMsg { + const incomingAdvancedByCallId = new Map(); + for (const part of (incoming.parts ?? []) as any[]) { + if (!isToolUIPart(part)) continue; + const toolCallId = part.toolCallId; + if (typeof toolCallId !== "string" || toolCallId.length === 0) continue; + if (!isWireAdvanceableToolState(part.state)) continue; + incomingAdvancedByCallId.set(toolCallId, part); + } + + if (incomingAdvancedByCallId.size === 0) return hydrated; + + let mutated = false; + const hydratedParts = (hydrated.parts ?? []) as any[]; + const mergedParts = hydratedParts.map((part) => { + if (!isToolUIPart(part)) return part; + const toolCallId = part.toolCallId; + if (typeof toolCallId !== "string" || toolCallId.length === 0) return part; + const incomingPart = incomingAdvancedByCallId.get(toolCallId); + if (!incomingPart) return part; + // Terminal hydrated states (`output-available`, `output-error`, + // `output-denied`) are authoritative — never regressed by a stale + // wire arrival (replay, retry, out-of-order). `output-denied` + // matters here because the wire's `approval-responded` could + // otherwise overwrite a hydrated denial back to a non-terminal + // state. + if (isResolvedToolState(part.state) || part.state === "output-denied") { + return part; + } + // Same state on both sides — no progression to apply. + if (part.state === incomingPart.state) return part; + mutated = true; + if (incomingPart.state === "output-available") { + return { + ...part, + state: incomingPart.state, + output: incomingPart.output, + ...(incomingPart.approval !== undefined ? { approval: incomingPart.approval } : {}), + }; + } + if (incomingPart.state === "output-error") { + return { + ...part, + state: incomingPart.state, + errorText: incomingPart.errorText, + ...(incomingPart.approval !== undefined ? { approval: incomingPart.approval } : {}), + }; + } + // approval-responded / output-denied — overlay state + approval. + return { + ...part, + state: incomingPart.state, + ...(incomingPart.approval !== undefined ? { approval: incomingPart.approval } : {}), + }; + }); + + if (!mutated) return hydrated; + return { ...hydrated, parts: mergedParts }; +} + +/** + * Mirror of `slimSubmitMessageForWire`'s predicate. Kept here so the + * agent runtime doesn't have to import from `ai-shared.ts` for a + * one-liner. See that file for the full state-machine docs. + * + * @internal + */ +function isWireAdvanceableToolState( + state: unknown +): state is "output-available" | "output-error" | "approval-responded" | "output-denied" { + return ( + state === "output-available" || + state === "output-error" || + state === "approval-responded" || + state === "output-denied" + ); +} + /** * Imperative API for reading and modifying the accumulated message history. * @@ -2467,7 +2571,7 @@ export type PendingMessagesOptions = { // React hooks (`@trigger.dev/sdk/chat/react`) can import it without // dragging `ai.ts` into the browser graph. Re-exported here so // `@trigger.dev/sdk/ai` consumers still see it. -export { PENDING_MESSAGE_INJECTED_TYPE } from "./ai-shared.js"; +export { PENDING_MESSAGE_INJECTED_TYPE, upsertIncomingMessage } from "./ai-shared.js"; import { PENDING_MESSAGE_INJECTED_TYPE } from "./ai-shared.js"; /** @internal */ @@ -3876,7 +3980,14 @@ export type HydrateMessagesEvent = { - /** The incoming UI messages for this turn (after cleanup of aborted tool parts). */ + /** + * The incoming UI messages for this turn (after cleanup of aborted tool parts). + * + * For HITL continuations the assistant entry is slim — `state` + `output` / + * `errorText` / `approval` only, no `input` or other parts. Don't pass the + * full `messages` array to `validateUIMessages` from `ai`; filter to user + * messages (or your own subset) first. + */ messages: TUIM[]; /** The unique identifier for the chat session. */ chatId: string; @@ -4372,8 +4483,13 @@ export type ChatAgentOptions< * * Return the validated messages array. Throw to abort the turn with an error. * - * This is the right place to call the AI SDK's `validateUIMessages` to catch - * malformed messages from storage or untrusted input before they reach the model. + * This is the right place to call the AI SDK's `validateUIMessages` on fresh + * user input. For HITL continuations (`addToolOutput` / + * `addToolApproveResponse`), the wire carries a slim assistant message — only + * the resolved tool parts, with `state` + `output` / `errorText` / `approval` + * and no `input`. `validateUIMessages` against the AI SDK schema rejects + * that shape, so filter to user messages (or skip validation entirely) on + * those turns. * * @example * ```ts @@ -4382,7 +4498,11 @@ export type ChatAgentOptions< * chat.agent({ * id: "my-chat", * onValidateMessages: async ({ messages }) => { - * return validateUIMessages({ messages, tools: chatTools }); + * const userMessages = messages.filter((m) => m.role === "user"); + * if (userMessages.length > 0) { + * await validateUIMessages({ messages: userMessages, tools: chatTools }); + * } + * return messages; * }, * run: async ({ messages }) => { * return streamText({ model, messages, tools: chatTools }); @@ -6038,6 +6158,20 @@ function chatAgent< } if (hydrateMessages) { + // Snapshot the ids the accumulator knew BEFORE this + // turn ran — used below to decide whether an + // incoming wire message is genuinely new or just a + // state advance on an existing entry. We can't use + // the post-`hydrateMessages` array for this because + // the canonical hook pattern pushes the incoming + // user message into the persisted chain and + // returns it. + const previouslyKnownMessageIds = new Set( + accumulatedUIMessages + .map((m) => m.id) + .filter((id): id is string => typeof id === "string") + ); + // Backend hydration: load the full message history from the user's // backend, replacing the built-in accumulator entirely. With slim // wire, `incomingMessages` is consistently 0-or-1-length — what @@ -6071,15 +6205,23 @@ function chatAgent< } ); - // Auto-merge tool approval updates: if any incoming wire message - // has an ID that matches a hydrated message, replace it. This makes - // tool approvals work transparently with backend hydration. + // Per-turn merge of incoming wire messages onto the hydrated + // chain. Hydrated stays authoritative for text, reasoning + // blobs, provider metadata, and tool `input`; we only + // overlay tool-part state/output/errorText for tool calls + // the wire copy has just resolved. Apps that slim the wire + // copy to fit the .in/append cap (or drop fields they + // re-source from their own DB) get the hydrated copy + // through unchanged. const merged = [...hydrated] as TUIMessage[]; for (const incoming of cleanedUIMessages) { if (!incoming.id) continue; const idx = merged.findIndex((m) => m.id === incoming.id); if (idx !== -1) { - merged[idx] = incoming as TUIMessage; + merged[idx] = mergeIncomingIntoHydrated( + merged[idx]!, + incoming + ) as TUIMessage; } } @@ -6087,15 +6229,32 @@ function chatAgent< accumulatedMessages = await toModelMessages(merged); locals.set(chatCurrentUIMessagesKey, accumulatedUIMessages); - // Track new messages for onTurnComplete.newUIMessages + // Track new messages for onTurnComplete.newUIMessages. + // Only push for genuinely new ids — HITL continuations + // whose incoming wire id matches an existing entry are + // state advances on an old message, not new messages. + // We compare against `previouslyKnownMessageIds` + // captured BEFORE hydration, not against `hydrated`: + // the canonical hydrate pattern pushes the incoming + // user message into the persisted chain and returns + // it, so the new id IS in `hydrated`, which would + // wrongly drop every fresh user turn from + // `newUIMessages`. The non-hydrate branch below has + // the same "push only on append" semantic via its + // own append-vs-replace path. if ( currentWirePayload.trigger === "submit-message" && cleanedUIMessages.length > 0 ) { const lastUI = cleanedUIMessages[cleanedUIMessages.length - 1]!; - turnNewUIMessages.push(lastUI); - const lastModel = (await toModelMessages([lastUI]))[0]; - if (lastModel) turnNewModelMessages.push(lastModel); + const matchedExisting = + lastUI.id !== undefined && + previouslyKnownMessageIds.has(lastUI.id); + if (!matchedExisting) { + turnNewUIMessages.push(lastUI); + const lastModel = (await toModelMessages([lastUI]))[0]; + if (lastModel) turnNewModelMessages.push(lastModel); + } } } else { // Default delta-merge accumulation. @@ -6121,15 +6280,17 @@ function chatAgent< } else if (cleanedUIMessages.length > 0) { // Submit-message (and the special-cased // handover-prepare → submit-message rewrite earlier in - // this scope): append-or-replace-by-id for the single - // delta message. + // this scope): merge-or-append for the single delta + // message. // // Tool approval responses arrive as a single assistant // message whose id collides with the existing assistant - // in the accumulator — we replace by id. The fallback - // for HITL `addToolOutput` continuations where AI SDK - // regenerates the id (TRI-9137) still applies via - // `rewriteIncomingIdViaToolCallMap`. + // in the accumulator — we merge the resolved tool-part + // resolutions onto the existing entry, keeping text, + // reasoning, and tool `input` from the prior snapshot. + // The fallback for HITL `addToolOutput` continuations + // where AI SDK regenerates the id (TRI-9137) still + // applies via `rewriteIncomingIdViaToolCallMap`. let replaced = false; for (const raw of cleanedUIMessages) { let incoming = raw; @@ -6146,7 +6307,10 @@ function chatAgent< } } if (idx !== -1) { - accumulatedUIMessages[idx] = incoming as TUIMessage; + accumulatedUIMessages[idx] = mergeIncomingIntoHydrated( + accumulatedUIMessages[idx]!, + incoming + ) as TUIMessage; replaced = true; } else { accumulatedUIMessages.push(incoming as TUIMessage); diff --git a/packages/trigger-sdk/src/v3/chat-client.ts b/packages/trigger-sdk/src/v3/chat-client.ts index 98380f1e8be..dc2b2c1f39b 100644 --- a/packages/trigger-sdk/src/v3/chat-client.ts +++ b/packages/trigger-sdk/src/v3/chat-client.ts @@ -26,6 +26,7 @@ import { TRIGGER_CONTROL_SUBTYPE, } from "@trigger.dev/core/v3"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; +import { slimSubmitMessageForWire } from "./ai-shared.js"; import { sessions } from "./sessions.js"; // ─── Type inference ──────────────────────────────────────────────── @@ -406,8 +407,12 @@ export class AgentChat { // Slim wire — at most ONE message per record. The agent rebuilds prior // history from its durable S3 snapshot + session.out replay at run - // boot. `regenerate-message` omits `message` (the agent slices its own - // history). See plan vivid-humming-bonbon. + // boot (or `hydrateMessages` if registered). + // + // For `submit-message`, assistant messages carrying resolved tool parts + // (HITL `addToolOutput` answers) are slimmed to just the resolution + // payload — reasoning blobs, text, and tool `input` come from the + // agent's authoritative chain. `regenerate-message` omits `message`. if (triggerType === "submit-message" && messages.length === 0) { throw new Error( "AgentChat.sendRaw: 'submit-message' trigger requires at least one message" @@ -415,7 +420,7 @@ export class AgentChat { } const lastIfSubmit = triggerType === "submit-message" - ? (messages.at(-1) as UIMessage | undefined) + ? slimSubmitMessageForWire(messages.at(-1) as UIMessage | undefined) : undefined; const payload: ChatTaskWirePayload = { ...(lastIfSubmit ? { message: lastIfSubmit } : {}), diff --git a/packages/trigger-sdk/src/v3/chat.ts b/packages/trigger-sdk/src/v3/chat.ts index aaa3871e34a..48519b122f3 100644 --- a/packages/trigger-sdk/src/v3/chat.ts +++ b/packages/trigger-sdk/src/v3/chat.ts @@ -33,6 +33,7 @@ import { } from "@trigger.dev/core/v3"; import { ChatTabCoordinator } from "./chat-tab-coordinator.js"; import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js"; +import { slimSubmitMessageForWire } from "./ai-shared.js"; const DEFAULT_BASE_URL = "https://api.trigger.dev"; const DEFAULT_STREAM_TIMEOUT_SECONDS = 120; @@ -572,10 +573,15 @@ export class TriggerChatTransport implements ChatTransport { // Slim wire — at most ONE message per record. The agent rebuilds prior // history from its durable S3 snapshot + session.out replay at run boot - // (or `hydrateMessages`, if registered). See plan vivid-humming-bonbon. + // (or `hydrateMessages`, if registered). // // - "submit-message": ship the latest message (new user message OR a // tool-approval-responded assistant message). Throw if absent. + // Assistant messages with already-resolved tool parts are slimmed + // to just their resolution payload — reasoning blobs, prior text, + // and tool `input` stay on the agent side (rebuilt from snapshot + // or `hydrateMessages`). Keeps continuation payloads under the + // `.in/append` cap on reasoning-heavy turns. // - "regenerate-message": omit `message`; the agent slices its own // history (drops the trailing assistant) and re-runs. if (trigger === "submit-message" && messages.length === 0) { @@ -585,7 +591,9 @@ export class TriggerChatTransport implements ChatTransport { } const wirePayload: ChatTaskWirePayload = { ...((body as Record) ?? {}), - ...(trigger === "submit-message" ? { message: messages.at(-1) } : {}), + ...(trigger === "submit-message" + ? { message: slimSubmitMessageForWire(messages.at(-1)) } + : {}), chatId, trigger, messageId, diff --git a/packages/trigger-sdk/test/mockChatAgent.test.ts b/packages/trigger-sdk/test/mockChatAgent.test.ts index c0b2b6eaa3a..3832e64b848 100644 --- a/packages/trigger-sdk/test/mockChatAgent.test.ts +++ b/packages/trigger-sdk/test/mockChatAgent.test.ts @@ -5,9 +5,10 @@ import { mockChatAgent } from "../src/v3/test/index.js"; import { describe, expect, it, vi } from "vitest"; import { chat } from "../src/v3/ai.js"; import { locals } from "@trigger.dev/core/v3"; -import { simulateReadableStream, streamText } from "ai"; +import { simulateReadableStream, streamText, tool, validateUIMessages } from "ai"; import { MockLanguageModelV3 } from "ai/test"; import type { LanguageModelV3StreamPart } from "@ai-sdk/provider"; +import { z } from "zod"; // ── Helpers ──────────────────────────────────────────────────────────── @@ -178,6 +179,64 @@ describe("mockChatAgent", () => { } }); + it("hydrate path: fresh user message lands in onTurnComplete.newUIMessages", async () => { + // The dedup that suppresses HITL-continuation pushes to + // `turnNewUIMessages` must compare against the PRE-hydration + // accumulator, not the post-hydration chain. A `hydrateMessages` + // hook that pushes the incoming user message into its persisted + // chain (the canonical pattern documented in + // /ai-chat/lifecycle-hooks#hydratemessages and the + // `upsertIncomingMessage` helper) would otherwise see the new + // user message in the returned `hydrated` array for every fresh + // turn, causing the dedup to wrongly fire and drop the user + // message from `newUIMessages` / `newMessages`. + const stored: any[] = []; + + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("hi") }), + }); + + let capturedNewUIMessages: any[] | undefined; + const agent = chat.agent({ + id: "mockChatAgent.hydrate-newui-fresh-user", + hydrateMessages: async ({ trigger, incomingMessages }) => { + // Canonical pattern: push the incoming user message into the + // persisted chain and return the chain. Mirrors what + // `upsertIncomingMessage` does. + if (trigger === "submit-message" && incomingMessages.length > 0) { + const newMsg = incomingMessages[incomingMessages.length - 1]!; + const exists = newMsg.id + ? stored.some((m) => m.id === newMsg.id) + : false; + if (!exists) stored.push(newMsg); + } + return [...stored]; + }, + onTurnComplete: async ({ newUIMessages }) => { + capturedNewUIMessages = newUIMessages; + }, + run: async ({ messages, signal }) => { + return streamText({ model, messages, abortSignal: signal }); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "test-hydrate-newui-fresh-user" }); + try { + await harness.sendMessage(userMessage("hello", "u-fresh")); + await new Promise((r) => setTimeout(r, 50)); + + // `newUIMessages` for a fresh user turn must contain BOTH the + // user message and the assistant response. The bug surfaces as + // length=1 (assistant only — user dropped by the wrong dedup). + expect(capturedNewUIMessages).toBeDefined(); + const roles = capturedNewUIMessages!.map((m: any) => m.role); + expect(roles).toEqual(["user", "assistant"]); + expect(capturedNewUIMessages![0]!.id).toBe("u-fresh"); + } finally { + await harness.close(); + } + }); + it("merges HITL tool answer onto head assistant when AI SDK regenerates the id", async () => { // Regression for TRI-9137: customers (Arena AI) report that the AI SDK // intermittently mints a fresh id on `addToolOutput` resume, breaking @@ -185,8 +244,6 @@ describe("mockChatAgent", () => { // an assistant with tool parts lands in the accumulator and uses that // map as a fallback in the merge so a fresh-id incoming still attaches // to the right head. - const { z } = await import("zod"); - const { tool } = await import("ai"); const askUserTool = tool({ description: "Ask the user a question.", @@ -306,6 +363,503 @@ describe("mockChatAgent", () => { } }); + it("merges a slim wire copy onto a hydrated message — keeps hydrated `input`, overlays wire `output`", async () => { + // HITL continuations on reasoning-heavy turns ship a slim assistant + // message (resolved tool parts only, no `input`, no reasoning, no + // text) so the payload fits the `.in/append` cap. `hydrateMessages` + // returns the full DB-backed copy. The per-turn merge has to keep + // the hydrated `input` and overlay only the wire's `state` + + // `output` — otherwise the next LLM call ships a tool call with no + // `arguments` and the provider 400s. + const searchTool = tool({ + description: "Search.", + inputSchema: z.object({ query: z.string() }), + }); + + const TC = "tc_slim_merge"; + const HEAD_ID = "a-head-slim"; + + // The customer's DB-backed copy. Tool part has the original `input` + // intact and is sitting in `input-available` (HITL waiting). + const dbAssistant = { + id: HEAD_ID, + role: "assistant" as const, + parts: [ + { + type: "tool-search" as const, + toolCallId: TC, + state: "input-available" as const, + input: { query: "trigger.dev pricing" }, + }, + ], + }; + + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("ok") }), + }); + + let mergedToolPart: any; + const agent = chat.agent({ + id: "mockChatAgent.slim-wire-merge", + hydrateMessages: async () => [dbAssistant as any], + onTurnComplete: async ({ uiMessages }) => { + const head = uiMessages.find((m: any) => m.id === HEAD_ID); + mergedToolPart = (head?.parts ?? []).find( + (p: any) => p?.toolCallId === TC + ); + }, + run: async ({ messages, signal }) => { + return streamText({ + model, + messages, + tools: { search: searchTool }, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "test-slim-wire" }); + try { + // Slim wire — only the resolved tool part, no `input`. + const slim = { + id: HEAD_ID, + role: "assistant" as const, + parts: [ + { + type: "tool-search" as const, + toolCallId: TC, + state: "output-available" as const, + output: { hits: 7 }, + }, + ], + }; + await harness.sendMessage(slim as any); + await new Promise((r) => setTimeout(r, 50)); + + expect(mergedToolPart?.state).toBe("output-available"); + expect(mergedToolPart?.output).toEqual({ hits: 7 }); + expect(mergedToolPart?.input).toEqual({ query: "trigger.dev pricing" }); + } finally { + await harness.close(); + } + }); + + it("merges a slim approval-responded wire onto a hydrated approval-requested message", async () => { + // Approval-flow counterpart to the HITL slim-merge test. The wire + // copy carries `state: "approval-responded"` + `approval: { id, + // approved }`. Hydrated has the same tool part in + // `approval-requested`. Merge has to overlay state + approval onto + // hydrated while keeping `input` intact so the agent can resume. + const deleteTool = tool({ + description: "Delete.", + inputSchema: z.object({ resource: z.string() }), + }); + + const TC = "tc_approval_merge"; + const HEAD_ID = "a-head-approval"; + + const dbAssistant = { + id: HEAD_ID, + role: "assistant" as const, + parts: [ + { + type: "tool-delete" as const, + toolCallId: TC, + state: "approval-requested" as const, + input: { resource: "/critical/data" }, + approval: { id: "appr_1" }, + }, + ], + }; + + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("ok") }), + }); + + let mergedToolPart: any; + const agent = chat.agent({ + id: "mockChatAgent.approval-slim-merge", + hydrateMessages: async () => [dbAssistant as any], + onTurnComplete: async ({ uiMessages }) => { + const head = uiMessages.find((m: any) => m.id === HEAD_ID); + mergedToolPart = (head?.parts ?? []).find( + (p: any) => p?.toolCallId === TC + ); + }, + run: async ({ messages, signal }) => { + return streamText({ + model, + messages, + tools: { delete: deleteTool }, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "test-approval-slim" }); + try { + // Slim wire — only the approval-responded tool part, no `input`. + const slim = { + id: HEAD_ID, + role: "assistant" as const, + parts: [ + { + type: "tool-delete" as const, + toolCallId: TC, + state: "approval-responded" as const, + approval: { id: "appr_1", approved: true }, + }, + ], + }; + await harness.sendMessage(slim as any); + await new Promise((r) => setTimeout(r, 50)); + + expect(mergedToolPart?.state).toBe("approval-responded"); + expect(mergedToolPart?.approval).toEqual({ id: "appr_1", approved: true }); + expect(mergedToolPart?.input).toEqual({ resource: "/critical/data" }); + } finally { + await harness.close(); + } + }); + + it("does not downgrade a hydrated `output-denied` when a stale `approval-responded` arrives", async () => { + // Terminal hydrated states (`output-available` / `output-error` / + // `output-denied`) are authoritative. A wire copy that re-ships a + // prior `approval-responded` (replay, retry, out-of-order arrival) + // must NOT regress the terminal denial back to a pre-resolution + // state — the agent would then re-run the tool. + const deleteTool = tool({ + description: "Delete.", + inputSchema: z.object({ resource: z.string() }), + }); + + const TC = "tc_denied_no_regress"; + const HEAD_ID = "a-head-denied"; + + const dbAssistant = { + id: HEAD_ID, + role: "assistant" as const, + parts: [ + { + type: "tool-delete" as const, + toolCallId: TC, + state: "output-denied" as const, + input: { resource: "/critical/data" }, + approval: { id: "appr_1", approved: false, reason: "no" }, + }, + ], + }; + + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("ok") }), + }); + + let mergedToolPart: any; + const agent = chat.agent({ + id: "mockChatAgent.denied-no-regress", + hydrateMessages: async () => [dbAssistant as any], + onTurnComplete: async ({ uiMessages }) => { + const head = uiMessages.find((m: any) => m.id === HEAD_ID); + mergedToolPart = (head?.parts ?? []).find( + (p: any) => p?.toolCallId === TC + ); + }, + run: async ({ messages, signal }) => { + return streamText({ + model, + messages, + tools: { delete: deleteTool }, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "test-denied-no-regress" }); + try { + // Stale wire arrival — `approval-responded` for a tool that + // hydrated already shows as `output-denied`. + const stale = { + id: HEAD_ID, + role: "assistant" as const, + parts: [ + { + type: "tool-delete" as const, + toolCallId: TC, + state: "approval-responded" as const, + approval: { id: "appr_1", approved: true }, + }, + ], + }; + await harness.sendMessage(stale as any); + await new Promise((r) => setTimeout(r, 50)); + + // The hydrated terminal state survives the merge. + expect(mergedToolPart?.state).toBe("output-denied"); + expect(mergedToolPart?.approval).toEqual({ + id: "appr_1", + approved: false, + reason: "no", + }); + expect(mergedToolPart?.input).toEqual({ resource: "/critical/data" }); + } finally { + await harness.close(); + } + }); + + it("onValidateMessages sees the slim wire on HITL continuations — fresh-user filter still works", async () => { + // Slim wire is the wire shape on HITL `addToolOutput` continuations. + // Existing customers calling `validateUIMessages` from `ai` on the + // whole `messages` array would throw because the AI SDK schema + // requires `input` on resolved tool parts. The recommended pattern + // is to filter to user messages (or skip on HITL turns). + // + // This test pins both behaviors: (1) the slim assistant does arrive + // in the validate hook on the HITL turn, (2) a fresh-user-only + // filter still works (the user message turn is unaffected). + const askUser = tool({ + description: "Ask the user.", + inputSchema: z.object({ q: z.string() }), + }); + const TC = "tc_validate_slim"; + + let callIdx = 0; + const model = new MockLanguageModelV3({ + doStream: async () => ({ + stream: + callIdx++ === 0 + ? simulateReadableStream({ + chunks: [ + { type: "tool-input-start", id: TC, toolName: "askUser" }, + { + type: "tool-input-delta", + id: TC, + delta: JSON.stringify({ q: "what color?" }), + }, + { type: "tool-input-end", id: TC }, + { + type: "tool-call", + toolCallId: TC, + toolName: "askUser", + input: JSON.stringify({ q: "what color?" }), + }, + { + type: "finish", + finishReason: { unified: "tool-calls", raw: "tool_calls" }, + usage: { + inputTokens: { + total: 5, + noCache: 5, + cacheRead: undefined, + cacheWrite: undefined, + }, + outputTokens: { total: 5, text: 0, reasoning: undefined }, + }, + }, + ] as LanguageModelV3StreamPart[], + }) + : textStream("got it"), + }), + }); + + const validateCalls: any[] = []; + const agent = chat.agent({ + id: "mockChatAgent.validate-slim", + onValidateMessages: async ({ messages, trigger }) => { + validateCalls.push({ + trigger, + messages: messages.map((m: any) => ({ + id: m.id, + role: m.role, + partCount: m.parts?.length, + })), + }); + // Recommended pattern: validate only user messages, since HITL + // continuations carry slim assistants the AI SDK schema rejects. + const userMessages = messages.filter( + (m: any) => m.role === "user" + ); + if (userMessages.length > 0) { + await validateUIMessages({ + messages: userMessages, + // `validateUIMessages` is stricter than `streamText` about + // tool input/output variance — cast to satisfy the wider + // `Tool` it expects. + tools: { askUser } as any, + }); + } + return messages; + }, + run: async ({ messages, signal }) => { + return streamText({ + model, + messages, + tools: { askUser }, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "test-validate-slim" }); + try { + // Turn 1: user message. Validator sees a user message; validate passes. + await harness.sendMessage(userMessage("hi")); + await new Promise((r) => setTimeout(r, 50)); + + expect(validateCalls).toHaveLength(1); + expect(validateCalls[0].messages[0].role).toBe("user"); + + // Turn 2: slim wire HITL continuation. Validator sees a slim + // assistant with no `input`. The fresh-user-filter skips it, so + // validateUIMessages isn't called against the slim shape. + const turn1Assistant = (await import("vitest")).expect.any(Object); + void turn1Assistant; // appease unused-binding lints + + // Build a slim assistant that mirrors what the transport would ship. + const slim = { + id: "slim-id-doesnt-matter-for-validate", + role: "assistant" as const, + parts: [ + { + type: "tool-askUser" as const, + toolCallId: TC, + state: "output-available" as const, + output: { color: "blue" }, + }, + ], + }; + await harness.sendMessage(slim as any); + await new Promise((r) => setTimeout(r, 50)); + + expect(validateCalls).toHaveLength(2); + expect(validateCalls[1].trigger).toBe("submit-message"); + // The slim assistant arrived in validate — no user messages. + expect(validateCalls[1].messages[0].role).toBe("assistant"); + } finally { + await harness.close(); + } + }); + + it("merges a slim wire copy in the default (no-hydrate) branch — keeps snapshot `input`", async () => { + // Mirror of the hydrated slim-merge test, but exercising the + // default accumulator branch (no `hydrateMessages` registered). + // The agent's own turn-1 output seeds the accumulator with the + // full assistant + tool `input`; a slim turn-2 wire copy has to + // merge onto that without clobbering the snapshot's `input`. + const askUser = tool({ + description: "Ask the user.", + inputSchema: z.object({ q: z.string() }), + }); + const TC = "tc_default_slim"; + + let callIdx = 0; + const model = new MockLanguageModelV3({ + doStream: async () => ({ + stream: + callIdx++ === 0 + ? simulateReadableStream({ + chunks: [ + { type: "tool-input-start", id: TC, toolName: "askUser" }, + { + type: "tool-input-delta", + id: TC, + delta: JSON.stringify({ q: "what color?" }), + }, + { type: "tool-input-end", id: TC }, + { + type: "tool-call", + toolCallId: TC, + toolName: "askUser", + input: JSON.stringify({ q: "what color?" }), + }, + { + type: "finish", + finishReason: { unified: "tool-calls", raw: "tool_calls" }, + usage: { + inputTokens: { + total: 5, + noCache: 5, + cacheRead: undefined, + cacheWrite: undefined, + }, + outputTokens: { total: 5, text: 0, reasoning: undefined }, + }, + }, + ] as LanguageModelV3StreamPart[], + }) + : textStream("got it"), + }), + }); + + const turnsSeen: { uiMessages: any[] }[] = []; + const agent = chat.agent({ + id: "mockChatAgent.default-slim-merge", + onTurnComplete: async ({ uiMessages }) => { + turnsSeen.push({ + uiMessages: uiMessages.map((m: any) => ({ + id: m.id, + role: m.role, + parts: (m.parts ?? []).map((p: any) => ({ + type: p.type, + toolCallId: p.toolCallId, + state: p.state, + hasInput: p.input !== undefined, + output: p.output, + })), + })), + }); + }, + run: async ({ messages, signal }) => { + return streamText({ + model, + messages, + tools: { askUser }, + abortSignal: signal, + }); + }, + }); + + const harness = mockChatAgent(agent, { chatId: "test-default-slim" }); + try { + // Turn 1: user message → agent emits tool call (state=input-available, has input). + await harness.sendMessage(userMessage("hi")); + await new Promise((r) => setTimeout(r, 50)); + + const turn1Assistant = turnsSeen.at(-1)?.uiMessages.find( + (m: any) => m.role === "assistant" + ); + expect(turn1Assistant).toBeTruthy(); + const HEAD_ID = turn1Assistant!.id; + + // Turn 2: slim wire — only the resolved tool part with output. No input. + const slim = { + id: HEAD_ID, + role: "assistant" as const, + parts: [ + { + type: "tool-askUser" as const, + toolCallId: TC, + state: "output-available" as const, + output: { color: "blue" }, + }, + ], + }; + await harness.sendMessage(slim as any); + await new Promise((r) => setTimeout(r, 50)); + + const turn2 = turnsSeen.at(-1); + const head = turn2!.uiMessages.find((m: any) => m.id === HEAD_ID); + const toolPart = (head?.parts ?? []).find( + (p: any) => p?.toolCallId === TC + ); + expect(toolPart?.state).toBe("output-available"); + expect(toolPart?.output).toEqual({ color: "blue" }); + // Snapshot's `input` survived the merge. + expect(toolPart?.hasInput).toBe(true); + } finally { + await harness.close(); + } + }); + it("routes custom actions through actionSchema + onAction", async () => { const model = new MockLanguageModelV3({ doStream: async () => ({ stream: textStream("ok") }), @@ -313,7 +867,6 @@ describe("mockChatAgent", () => { const onActionSpy = vi.fn(); - const { z } = await import("zod"); const agent = chat.agent({ id: "mockChatAgent.actions", actionSchema: z.object({ @@ -352,7 +905,6 @@ describe("mockChatAgent", () => { }, }); - const { z } = await import("zod"); const agent = chat.agent({ id: "mockChatAgent.actions.void", actionSchema: z.object({ type: z.literal("undo") }), @@ -420,7 +972,6 @@ describe("mockChatAgent", () => { doStream: async () => ({ stream: textStream("normal-response") }), }); - const { z } = await import("zod"); const agent = chat.agent({ id: "mockChatAgent.actions.stream", actionSchema: z.object({ type: z.literal("regenerate") }), @@ -469,7 +1020,6 @@ describe("mockChatAgent", () => { }, }); - const { z } = await import("zod"); const agent = chat.agent({ id: "mockChatAgent.actions.no-handler", actionSchema: z.object({ type: z.literal("undo") }), @@ -660,8 +1210,6 @@ describe("mockChatAgent", () => { } it("getPendingToolCalls returns input-available parts on the leaf assistant", async () => { - const { z } = await import("zod"); - const { tool } = await import("ai"); const askUser = tool({ description: "Ask the user.", inputSchema: z.object({ q: z.string() }), @@ -723,8 +1271,6 @@ describe("mockChatAgent", () => { }); it("getResolvedToolCalls walks all messages after a HITL answer lands", async () => { - const { z } = await import("zod"); - const { tool } = await import("ai"); const askUser = tool({ description: "Ask the user.", inputSchema: z.object({ q: z.string() }), @@ -904,8 +1450,6 @@ describe("mockChatAgent", () => { it("extractNewToolResults dedups against a real-stream-built chain", async () => { // Build the chain through real model streams (no chat.history.set seed) // and assert extractNewToolResults compares against the post-merge state. - const { z } = await import("zod"); - const { tool } = await import("ai"); const askUser = tool({ description: "Ask the user.", inputSchema: z.object({ q: z.string() }), @@ -990,8 +1534,6 @@ describe("mockChatAgent", () => { // assistant via the toolCallId map. Here we send an answer in // `output-error` state and verify (a) getResolvedToolCalls reports // it, and (b) extractNewToolResults emits it with errorText set. - const { z } = await import("zod"); - const { tool } = await import("ai"); const search = tool({ description: "Search.", inputSchema: z.object({ q: z.string() }), diff --git a/packages/trigger-sdk/test/wire-shape.test.ts b/packages/trigger-sdk/test/wire-shape.test.ts index fd24fe00bba..6214f3ca01e 100644 --- a/packages/trigger-sdk/test/wire-shape.test.ts +++ b/packages/trigger-sdk/test/wire-shape.test.ts @@ -11,6 +11,7 @@ import "../src/v3/test/index.js"; import type { UIMessage } from "ai"; import { describe, expect, expectTypeOf, it } from "vitest"; import type { ChatInputChunk, ChatTaskWirePayload } from "../src/v3/ai-shared.js"; +import { slimSubmitMessageForWire, upsertIncomingMessage } from "../src/v3/ai-shared.js"; describe("ChatTaskWirePayload (slim wire shape)", () => { it("encodes and decodes a submit-message payload through JSON", () => { @@ -157,6 +158,297 @@ describe("ChatTaskWirePayload (slim wire shape)", () => { }); }); +describe("upsertIncomingMessage", () => { + const userMsg = (id: string, text: string): UIMessage => ({ + id, + role: "user", + parts: [{ type: "text", text }], + }); + + it("pushes a fresh user message and returns true", () => { + const stored: UIMessage[] = [userMsg("u-1", "first")]; + const mutated = upsertIncomingMessage(stored, { + trigger: "submit-message", + incomingMessages: [userMsg("u-2", "second")], + }); + expect(mutated).toBe(true); + expect(stored).toHaveLength(2); + expect(stored[1]!.id).toBe("u-2"); + }); + + it("no-ops when incoming id is already in stored (HITL continuation)", () => { + const head = { + id: "asst-1", + role: "assistant" as const, + parts: [{ type: "tool-search", toolCallId: "tc-1", state: "input-available", input: {} } as never], + }; + const stored: UIMessage[] = [userMsg("u-1", "hi"), head]; + const slim = { + id: "asst-1", + role: "assistant" as const, + parts: [{ type: "tool-search", toolCallId: "tc-1", state: "output-available", output: {} } as never], + }; + const mutated = upsertIncomingMessage(stored, { + trigger: "submit-message", + incomingMessages: [slim], + }); + expect(mutated).toBe(false); + expect(stored).toHaveLength(2); + // The original head is untouched — the runtime's per-turn merge + // overlays the resolution; the customer's stored array is just + // the pre-merge snapshot. + expect(stored[1]).toBe(head); + }); + + it("no-ops on regenerate-message trigger", () => { + const stored: UIMessage[] = [userMsg("u-1", "hi")]; + const mutated = upsertIncomingMessage(stored, { + trigger: "regenerate-message", + incomingMessages: [userMsg("u-2", "ignored")], + }); + expect(mutated).toBe(false); + expect(stored).toHaveLength(1); + }); + + it("no-ops on action trigger", () => { + const stored: UIMessage[] = [userMsg("u-1", "hi")]; + const mutated = upsertIncomingMessage(stored, { + trigger: "action", + incomingMessages: [], + }); + expect(mutated).toBe(false); + expect(stored).toHaveLength(1); + }); + + it("no-ops on empty incomingMessages", () => { + const stored: UIMessage[] = [userMsg("u-1", "hi")]; + const mutated = upsertIncomingMessage(stored, { + trigger: "submit-message", + incomingMessages: [], + }); + expect(mutated).toBe(false); + expect(stored).toHaveLength(1); + }); + + it("only inspects the last incoming message (slim wire ships at most one)", () => { + const stored: UIMessage[] = [userMsg("u-1", "hi")]; + const mutated = upsertIncomingMessage(stored, { + trigger: "submit-message", + incomingMessages: [userMsg("ignored", "ignored"), userMsg("u-3", "new")], + }); + expect(mutated).toBe(true); + expect(stored).toHaveLength(2); + expect(stored[1]!.id).toBe("u-3"); + }); + + it("pushes when newMsg has no id (no dedup possible)", () => { + const stored: UIMessage[] = [userMsg("u-1", "hi")]; + const incoming = { role: "user", parts: [{ type: "text", text: "no id" }] } as unknown as UIMessage; + const mutated = upsertIncomingMessage(stored, { + trigger: "submit-message", + incomingMessages: [incoming], + }); + expect(mutated).toBe(true); + expect(stored).toHaveLength(2); + }); + + it("accepts the full hydrateMessages event without re-packaging", () => { + // Customers can pass the destructured event directly — the helper + // only reads `trigger` + `incomingMessages` but ignores any other + // fields the event happens to carry. + const stored: UIMessage[] = []; + const event = { + chatId: "chat-1", + turn: 0, + trigger: "submit-message" as const, + incomingMessages: [userMsg("u-1", "hi")], + previousMessages: [], + continuation: false, + }; + const mutated = upsertIncomingMessage(stored, event); + expect(mutated).toBe(true); + expect(stored).toHaveLength(1); + }); +}); + +describe("slimSubmitMessageForWire", () => { + it("passes user messages through unchanged", () => { + const userMsg: UIMessage = { + id: "u-1", + role: "user", + parts: [{ type: "text", text: "hello" }], + }; + expect(slimSubmitMessageForWire(userMsg)).toBe(userMsg); + }); + + it("passes assistant messages with no resolved tool parts through unchanged", () => { + const assistantMsg: UIMessage = { + id: "a-1", + role: "assistant", + parts: [ + { type: "text", text: "thinking..." }, + { + type: "tool-search", + toolCallId: "tc-1", + state: "input-available", + input: { q: "x" }, + } as never, + ], + }; + expect(slimSubmitMessageForWire(assistantMsg)).toBe(assistantMsg); + }); + + it("slims output-available HITL continuation to {type, toolCallId, state, output}", () => { + const assistantMsg: UIMessage = { + id: "a-1", + role: "assistant", + parts: [ + { type: "text", text: "let me search" }, + { type: "reasoning", text: "long reasoning blob..." } as never, + { + type: "tool-search", + toolCallId: "tc-1", + state: "output-available", + input: { q: "very long query".repeat(1000) }, + output: { hits: 7 }, + } as never, + ], + }; + const slim = slimSubmitMessageForWire(assistantMsg); + expect(slim).toEqual({ + id: "a-1", + role: "assistant", + parts: [ + { + type: "tool-search", + toolCallId: "tc-1", + state: "output-available", + output: { hits: 7 }, + }, + ], + }); + // The slim drops `input` (server has it via hydrate/snapshot) — the + // wire is much smaller than the original. + expect(JSON.stringify(slim).length).toBeLessThan(JSON.stringify(assistantMsg).length / 50); + }); + + it("slims output-error to {type, toolCallId, state, errorText}", () => { + const assistantMsg: UIMessage = { + id: "a-1", + role: "assistant", + parts: [ + { + type: "tool-search", + toolCallId: "tc-1", + state: "output-error", + input: { q: "x" }, + errorText: "boom", + } as never, + ], + }; + expect(slimSubmitMessageForWire(assistantMsg)).toEqual({ + id: "a-1", + role: "assistant", + parts: [ + { + type: "tool-search", + toolCallId: "tc-1", + state: "output-error", + errorText: "boom", + }, + ], + }); + }); + + it("slims approval-responded to {type, toolCallId, state, approval}", () => { + const assistantMsg: UIMessage = { + id: "a-1", + role: "assistant", + parts: [ + { + type: "tool-delete", + toolCallId: "tc-1", + state: "approval-responded", + input: { path: "/critical" }, + approval: { id: "appr_1", approved: true, reason: "looks fine" }, + } as never, + ], + }; + expect(slimSubmitMessageForWire(assistantMsg)).toEqual({ + id: "a-1", + role: "assistant", + parts: [ + { + type: "tool-delete", + toolCallId: "tc-1", + state: "approval-responded", + approval: { id: "appr_1", approved: true, reason: "looks fine" }, + }, + ], + }); + }); + + it("slims dynamic-tool parts and preserves toolName", () => { + const assistantMsg: UIMessage = { + id: "a-1", + role: "assistant", + parts: [ + { + type: "dynamic-tool", + toolName: "dyn-search", + toolCallId: "tc-1", + state: "output-available", + input: { q: "x" }, + output: { hits: 1 }, + } as never, + ], + }; + expect(slimSubmitMessageForWire(assistantMsg)).toEqual({ + id: "a-1", + role: "assistant", + parts: [ + { + type: "dynamic-tool", + toolName: "dyn-search", + toolCallId: "tc-1", + state: "output-available", + output: { hits: 1 }, + }, + ], + }); + }); + + it("only slims the advanced tool parts when an assistant has mixed states", () => { + const assistantMsg: UIMessage = { + id: "a-1", + role: "assistant", + parts: [ + { type: "text", text: "thinking" }, + { + type: "tool-search", + toolCallId: "tc-resolved", + state: "output-available", + input: { q: "x" }, + output: { hits: 1 }, + } as never, + { + type: "tool-askUser", + toolCallId: "tc-still-pending", + state: "input-available", + input: { q: "ok?" }, + } as never, + ], + }; + const slim = slimSubmitMessageForWire(assistantMsg); + expect(slim?.parts).toHaveLength(1); + expect((slim?.parts?.[0] as any).toolCallId).toBe("tc-resolved"); + }); + + it("handles undefined input", () => { + expect(slimSubmitMessageForWire(undefined)).toBeUndefined(); + }); +}); + describe("ChatTaskWirePayload (compile-time shape)", () => { it("does NOT have a `messages` array field (slim wire removed it)", () => { // If a future edit reintroduces `messages: TMessage[]`, this assertion diff --git a/references/ai-chat/src/trigger/chat.ts b/references/ai-chat/src/trigger/chat.ts index 6033e9e3689..3f2e5900480 100644 --- a/references/ai-chat/src/trigger/chat.ts +++ b/references/ai-chat/src/trigger/chat.ts @@ -1,4 +1,4 @@ -import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; +import { chat, upsertIncomingMessage, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; import { logger, prompts, skills } from "@trigger.dev/sdk"; import { @@ -232,15 +232,26 @@ export const aiChat = chat turn, count: messages.length, }); - // Cast: `chatTools` has executes (output types are real), but - // `ChatUiMessage` is derived from the schema-only set in - // `chat-tools-schemas.ts` so its tools have `output: never`. - // `validateUIMessages` only reads `inputSchema` at runtime, so - // the type narrowing is safely sidestepped. - return validateUIMessages({ - messages, - tools: chatTools as unknown as Parameters[0]["tools"], - }); + // HITL continuations (`addToolOutput` / `addToolApproveResponse`) + // ship a slim assistant on the wire — `state` + `output` / + // `errorText` / `approval` only, no `input` or other parts. + // `validateUIMessages` rejects that shape (the AI SDK schema + // requires `input` on resolved tool parts), so filter to user + // messages first. The agent's per-turn merge restores the + // hydrated entry's `input` before `toModelMessages`. + const userMessages = messages.filter((m) => m.role === "user"); + if (userMessages.length > 0) { + await validateUIMessages({ + messages: userMessages, + // Cast: `chatTools` has executes (output types are real), but + // `ChatUiMessage` is derived from the schema-only set in + // `chat-tools-schemas.ts` so its tools have `output: never`. + // `validateUIMessages` only reads `inputSchema` at runtime, so + // the type narrowing is safely sidestepped. + tools: chatTools as unknown as Parameters[0]["tools"], + }); + } + return messages; }, // #endregion @@ -887,15 +898,15 @@ export const aiChatHydrated = chat // Load message history from the database on every turn. // The frontend's accumulated messages are ignored — the DB is the - // single source of truth. New user messages arrive in `incomingMessages` - // and are appended + persisted before returning. + // single source of truth. `upsertIncomingMessage` handles HITL + // continuations (slim wire sharing an id with the existing + // assistant — no-op so the runtime overlays the new state) and + // fresh user messages (push + persist). hydrateMessages: async ({ chatId, trigger, incomingMessages }) => { const record = await prisma.chat.findUnique({ where: { id: chatId } }); const stored = (record?.messages as unknown as UIMessage[]) ?? []; - if (trigger === "submit-message" && incomingMessages.length > 0) { - const newMsg = incomingMessages[incomingMessages.length - 1]!; - stored.push(newMsg); + if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { await prisma.chat.update({ where: { id: chatId }, data: { messages: stored as unknown as ChatMessagesForWrite }, From d34014ddccf9e806f5bfe5702d9306710c4bfe93 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 23 May 2026 17:15:33 +0100 Subject: [PATCH 4/5] chore: release v4.5.0-rc.2 (#3702) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary 3 improvements, 1 bug fix. ## Improvements - The per-turn merge now overlays the wire copy's tool-part state advancement onto the agent's existing chain — `state` + the matching resolution field (`output` / `errorText` / `approval`) come from the wire, everything else (text, reasoning, tool `input`, provider metadata) stays whatever the snapshot or `hydrateMessages` returned. Previously a full-message replace overwrote those fields with whatever the client shipped, so a slimmed wire copy landed a tool call with no `arguments` on the next LLM call. Covers `output-available` / `output-error` (HITL `addToolOutput`) and `approval-responded` / `output-denied` (approval flow). - `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim assistant messages that carry advanced tool parts. The wire payload is just `{ id, role, parts: [] }` for `submit-message` continuations; everything else passes through. Reasoning blobs and full tool inputs no longer ride the wire on every `addToolOutput` / `addToolApproveResponse`, so continuation payloads stay well under the `.in/append` cap on long agent loops. - Add `TriggerClient` for running multiple SDK clients side-by-side, each with its own auth, preview branch, and baseURL. Useful when a single process needs to trigger tasks or read runs across multiple projects, environments, or preview branches without mutating shared global state. ([#3683](https://github.com/triggerdotdev/trigger.dev/pull/3683)) ## Bug fixes - Fix `chat.agent` HITL continuations on reasoning-heavy turns. Two changes that work together: ([#3719](https://github.com/triggerdotdev/trigger.dev/pull/3719))
Raw changeset output ⚠️⚠️⚠️⚠️⚠️⚠️ `main` is currently in **pre mode** so this branch has prereleases rather than normal releases. If you want to exit prereleases, run `changeset pre exit` on `main`. ⚠️⚠️⚠️⚠️⚠️⚠️ # Releases ## @trigger.dev/build@4.5.0-rc.2 ### Patch Changes - Updated dependencies: - `@trigger.dev/core@4.5.0-rc.2` ## trigger.dev@4.5.0-rc.2 ### Patch Changes - Updated dependencies: - `@trigger.dev/build@4.5.0-rc.2` - `@trigger.dev/core@4.5.0-rc.2` - `@trigger.dev/schema-to-json@4.5.0-rc.2` ## @trigger.dev/plugins@4.5.0-rc.2 ### Patch Changes - Updated dependencies: - `@trigger.dev/core@4.5.0-rc.2` ## @trigger.dev/python@4.5.0-rc.2 ### Patch Changes - Updated dependencies: - `@trigger.dev/sdk@4.5.0-rc.2` - `@trigger.dev/build@4.5.0-rc.2` - `@trigger.dev/core@4.5.0-rc.2` ## @trigger.dev/react-hooks@4.5.0-rc.2 ### Patch Changes - Updated dependencies: - `@trigger.dev/core@4.5.0-rc.2` ## @trigger.dev/redis-worker@4.5.0-rc.2 ### Patch Changes - Updated dependencies: - `@trigger.dev/core@4.5.0-rc.2` ## @trigger.dev/rsc@4.5.0-rc.2 ### Patch Changes - Updated dependencies: - `@trigger.dev/core@4.5.0-rc.2` ## @trigger.dev/schema-to-json@4.5.0-rc.2 ### Patch Changes - Updated dependencies: - `@trigger.dev/core@4.5.0-rc.2` ## @trigger.dev/sdk@4.5.0-rc.2 ### Patch Changes - Fix `chat.agent` HITL continuations on reasoning-heavy turns. Two changes that work together: ([#3719](https://github.com/triggerdotdev/trigger.dev/pull/3719)) - The per-turn merge now overlays the wire copy's tool-part state advancement onto the agent's existing chain — `state` + the matching resolution field (`output` / `errorText` / `approval`) come from the wire, everything else (text, reasoning, tool `input`, provider metadata) stays whatever the snapshot or `hydrateMessages` returned. Previously a full-message replace overwrote those fields with whatever the client shipped, so a slimmed wire copy landed a tool call with no `arguments` on the next LLM call. Covers `output-available` / `output-error` (HITL `addToolOutput`) and `approval-responded` / `output-denied` (approval flow). - `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim assistant messages that carry advanced tool parts. The wire payload is just `{ id, role, parts: [] }` for `submit-message` continuations; everything else passes through. Reasoning blobs and full tool inputs no longer ride the wire on every `addToolOutput` / `addToolApproveResponse`, so continuation payloads stay well under the `.in/append` cap on long agent loops. Note: `onValidateMessages` receives the slim wire on HITL turns. If you call `validateUIMessages` from `ai` against the full `messages` array it will reject the slim assistant; filter to user messages (or skip on HITL turns) — see the updated docstring on `onValidateMessages` for the recommended pattern. For `hydrateMessages` hooks that persist the chain, this release also adds a small helper to the `@trigger.dev/sdk/ai` surface: ```ts import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai"; chat.agent({ hydrateMessages: async ({ chatId, trigger, incomingMessages }) => { const record = await db.chat.findUnique({ where: { id: chatId } }); const stored = record?.messages ?? []; if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { await db.chat.update({ where: { id: chatId }, data: { messages: stored } }); } return stored; }, }); ``` It pushes fresh user messages by id, no-ops on HITL continuations (the incoming shares an id with the existing assistant — the runtime overlays the new tool-state advance), and skips on non-`submit-message` triggers. Returns `true` if it mutated `stored` so the caller knows whether to persist. Net effect: `chat.addToolOutput(...)` / `chat.addToolApproveResponse(...)` on multi-step reasoning agents (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) no longer blows the cap and no longer corrupts the LLM input. - Add `TriggerClient` for running multiple SDK clients side-by-side, each with its own auth, preview branch, and baseURL. Useful when a single process needs to trigger tasks or read runs across multiple projects, environments, or preview branches without mutating shared global state. ([#3683](https://github.com/triggerdotdev/trigger.dev/pull/3683)) ```ts import { TriggerClient } from "@trigger.dev/sdk"; const prod = new TriggerClient({ accessToken: process.env.TRIGGER_PROD_KEY }); const preview = new TriggerClient({ accessToken: process.env.TRIGGER_PREVIEW_KEY, previewBranch: "signup-flow", }); await prod.tasks.trigger("send-email", payload); await preview.runs.list({ status: ["COMPLETED"] }); ``` - Updated dependencies: - `@trigger.dev/core@4.5.0-rc.2` ## @trigger.dev/core@4.5.0-rc.2
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- .changeset/pre.json | 4 +- .../configurable-http-keepalive-timeout.md | 6 --- .../drop-taskrun-scheduleid-createdat-idx.md | 6 --- .../organization-scoped-clickhouse.md | 6 --- .../pending-version-clickhouse-lookup.md | 6 --- .server-changes/realtime-append-cap.md | 6 --- .server-changes/sentry-tenant-attribution.md | 6 --- .../supervisor-checkpoint-type-compat.md | 6 --- hosting/k8s/helm/Chart.yaml | 4 +- packages/build/CHANGELOG.md | 7 +++ packages/build/package.json | 4 +- packages/cli-v3/CHANGELOG.md | 9 ++++ packages/cli-v3/package.json | 8 +-- packages/core/CHANGELOG.md | 2 + packages/core/package.json | 2 +- packages/plugins/CHANGELOG.md | 7 +++ packages/plugins/package.json | 2 +- packages/python/CHANGELOG.md | 9 ++++ packages/python/package.json | 12 ++--- packages/react-hooks/CHANGELOG.md | 7 +++ packages/react-hooks/package.json | 4 +- packages/redis-worker/CHANGELOG.md | 7 +++ packages/redis-worker/package.json | 4 +- packages/rsc/CHANGELOG.md | 7 +++ packages/rsc/package.json | 6 +-- packages/schema-to-json/CHANGELOG.md | 7 +++ packages/schema-to-json/package.json | 2 +- packages/trigger-sdk/CHANGELOG.md | 50 +++++++++++++++++++ packages/trigger-sdk/package.json | 4 +- pnpm-lock.yaml | 32 ++++++------ 30 files changed, 157 insertions(+), 85 deletions(-) delete mode 100644 .server-changes/configurable-http-keepalive-timeout.md delete mode 100644 .server-changes/drop-taskrun-scheduleid-createdat-idx.md delete mode 100644 .server-changes/organization-scoped-clickhouse.md delete mode 100644 .server-changes/pending-version-clickhouse-lookup.md delete mode 100644 .server-changes/realtime-append-cap.md delete mode 100644 .server-changes/sentry-tenant-attribution.md delete mode 100644 .server-changes/supervisor-checkpoint-type-compat.md diff --git a/.changeset/pre.json b/.changeset/pre.json index e4a34aff561..26efee12b15 100644 --- a/.changeset/pre.json +++ b/.changeset/pre.json @@ -28,6 +28,7 @@ "chat-agent", "chat-history-read-primitives", "chat-session-attributes", + "chat-slim-wire-merge", "chat-start-session-action-typed-client-data", "cli-deploy-skip-rewrite-timestamp", "locals-key-dual-package-fix", @@ -39,6 +40,7 @@ "resource-catalog-runtime-registration", "retry-sigsegv", "runs-list-region-filter", - "sessions-primitive" + "sessions-primitive", + "trigger-client" ] } diff --git a/.server-changes/configurable-http-keepalive-timeout.md b/.server-changes/configurable-http-keepalive-timeout.md deleted file mode 100644 index 982ace7fdc6..00000000000 --- a/.server-changes/configurable-http-keepalive-timeout.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: webapp -type: improvement ---- - -Make the Express server's `keepAliveTimeout` configurable via `HTTP_KEEPALIVE_TIMEOUT_MS` (defaults to the previous hardcoded 65000 ms). diff --git a/.server-changes/drop-taskrun-scheduleid-createdat-idx.md b/.server-changes/drop-taskrun-scheduleid-createdat-idx.md deleted file mode 100644 index cace0a4eb28..00000000000 --- a/.server-changes/drop-taskrun-scheduleid-createdat-idx.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: webapp -type: improvement ---- - -Reduce primary database write load on `TaskRun` by dropping an unused composite index on `(scheduleId, createdAt)`. The schedule list view reads from ClickHouse, so this Postgres index served no Prisma query while still being maintained on every `TaskRun` INSERT/UPDATE. diff --git a/.server-changes/organization-scoped-clickhouse.md b/.server-changes/organization-scoped-clickhouse.md deleted file mode 100644 index 874b9dc6026..00000000000 --- a/.server-changes/organization-scoped-clickhouse.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: webapp -type: feature ---- - -Organization-scoped ClickHouse routing enables customers with HIPAA and other data security requirements to use dedicated database instances diff --git a/.server-changes/pending-version-clickhouse-lookup.md b/.server-changes/pending-version-clickhouse-lookup.md deleted file mode 100644 index af46540528f..00000000000 --- a/.server-changes/pending-version-clickhouse-lookup.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: webapp -type: improvement ---- - -PendingVersionSystem now discovers PENDING_VERSION run ids via ClickHouse and re-validates each by primary key in Postgres, reducing read load on the TaskRun status index. Uses a dedicated `RUN_ENGINE_CLICKHOUSE_*` client so it doesn't contend with the main analytics pool. diff --git a/.server-changes/realtime-append-cap.md b/.server-changes/realtime-append-cap.md deleted file mode 100644 index cfbd3b7fa1a..00000000000 --- a/.server-changes/realtime-append-cap.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: webapp -type: fix ---- - -Session `.in/append` returns readable 413s on oversize bodies (was failing browser fetches as opaque `TypeError: Failed to fetch`) and now rejects only records that would actually exceed S2's per-record ceiling, instead of guessing at a conservative pre-encoding cap. diff --git a/.server-changes/sentry-tenant-attribution.md b/.server-changes/sentry-tenant-attribution.md deleted file mode 100644 index 2aaf43483ab..00000000000 --- a/.server-changes/sentry-tenant-attribution.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: webapp -type: feature ---- - -Stamp Sentry events with the signed-in user so "Users Impacted" counts individual humans, and enrich events with org / project / environment tags when that context is available (dashboard URLs, authenticated API requests). diff --git a/.server-changes/supervisor-checkpoint-type-compat.md b/.server-changes/supervisor-checkpoint-type-compat.md deleted file mode 100644 index dd26a3e6b04..00000000000 --- a/.server-changes/supervisor-checkpoint-type-compat.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: supervisor -type: fix ---- - -Keep older workloads working when checkpoints are produced by the compute path diff --git a/hosting/k8s/helm/Chart.yaml b/hosting/k8s/helm/Chart.yaml index 40c535f6854..70d883125c1 100644 --- a/hosting/k8s/helm/Chart.yaml +++ b/hosting/k8s/helm/Chart.yaml @@ -2,8 +2,8 @@ apiVersion: v2 name: trigger description: The official Trigger.dev Helm chart type: application -version: 4.5.0-rc.1 -appVersion: v4.5.0-rc.1 +version: 4.5.0-rc.2 +appVersion: v4.5.0-rc.2 home: https://trigger.dev sources: - https://github.com/triggerdotdev/trigger.dev diff --git a/packages/build/CHANGELOG.md b/packages/build/CHANGELOG.md index c6b4ee220e2..7115bb4520a 100644 --- a/packages/build/CHANGELOG.md +++ b/packages/build/CHANGELOG.md @@ -1,5 +1,12 @@ # @trigger.dev/build +## 4.5.0-rc.2 + +### Patch Changes + +- Updated dependencies: + - `@trigger.dev/core@4.5.0-rc.2` + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/build/package.json b/packages/build/package.json index 5fa1932e8a3..95435efa57f 100644 --- a/packages/build/package.json +++ b/packages/build/package.json @@ -1,6 +1,6 @@ { "name": "@trigger.dev/build", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "trigger.dev build extensions", "license": "MIT", "publishConfig": { @@ -78,7 +78,7 @@ }, "dependencies": { "@prisma/config": "^6.10.0", - "@trigger.dev/core": "workspace:4.5.0-rc.1", + "@trigger.dev/core": "workspace:4.5.0-rc.2", "mlly": "^1.7.1", "pkg-types": "^1.1.3", "resolve": "^1.22.8", diff --git a/packages/cli-v3/CHANGELOG.md b/packages/cli-v3/CHANGELOG.md index 8a4c454e4b8..d42b4f5a700 100644 --- a/packages/cli-v3/CHANGELOG.md +++ b/packages/cli-v3/CHANGELOG.md @@ -1,5 +1,14 @@ # trigger.dev +## 4.5.0-rc.2 + +### Patch Changes + +- Updated dependencies: + - `@trigger.dev/build@4.5.0-rc.2` + - `@trigger.dev/core@4.5.0-rc.2` + - `@trigger.dev/schema-to-json@4.5.0-rc.2` + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/cli-v3/package.json b/packages/cli-v3/package.json index 0ba8907d8e2..8b90095849e 100644 --- a/packages/cli-v3/package.json +++ b/packages/cli-v3/package.json @@ -1,6 +1,6 @@ { "name": "trigger.dev", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "A Command-Line Interface for Trigger.dev projects", "type": "module", "license": "MIT", @@ -95,9 +95,9 @@ "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", "@s2-dev/streamstore": "^0.22.5", - "@trigger.dev/build": "workspace:4.5.0-rc.1", - "@trigger.dev/core": "workspace:4.5.0-rc.1", - "@trigger.dev/schema-to-json": "workspace:4.5.0-rc.1", + "@trigger.dev/build": "workspace:4.5.0-rc.2", + "@trigger.dev/core": "workspace:4.5.0-rc.2", + "@trigger.dev/schema-to-json": "workspace:4.5.0-rc.2", "ansi-escapes": "^7.0.0", "braces": "^3.0.3", "c12": "^1.11.1", diff --git a/packages/core/CHANGELOG.md b/packages/core/CHANGELOG.md index 33ff1b005cf..b948880da7c 100644 --- a/packages/core/CHANGELOG.md +++ b/packages/core/CHANGELOG.md @@ -1,5 +1,7 @@ # internal-platform +## 4.5.0-rc.2 + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/core/package.json b/packages/core/package.json index 59e76b5d8c6..d35a21e1ab5 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@trigger.dev/core", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "Core code used across the Trigger.dev SDK and platform", "license": "MIT", "publishConfig": { diff --git a/packages/plugins/CHANGELOG.md b/packages/plugins/CHANGELOG.md index 7b8cb1ceb5d..913dbcea055 100644 --- a/packages/plugins/CHANGELOG.md +++ b/packages/plugins/CHANGELOG.md @@ -1,5 +1,12 @@ # @trigger.dev/plugins +## 4.5.0-rc.2 + +### Patch Changes + +- Updated dependencies: + - `@trigger.dev/core@4.5.0-rc.2` + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/plugins/package.json b/packages/plugins/package.json index 7ae7348869a..516bed10d59 100644 --- a/packages/plugins/package.json +++ b/packages/plugins/package.json @@ -1,6 +1,6 @@ { "name": "@trigger.dev/plugins", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "Plugin contracts and interfaces for Trigger.dev", "license": "MIT", "publishConfig": { diff --git a/packages/python/CHANGELOG.md b/packages/python/CHANGELOG.md index bdee0d00245..1179323c328 100644 --- a/packages/python/CHANGELOG.md +++ b/packages/python/CHANGELOG.md @@ -1,5 +1,14 @@ # @trigger.dev/python +## 4.5.0-rc.2 + +### Patch Changes + +- Updated dependencies: + - `@trigger.dev/sdk@4.5.0-rc.2` + - `@trigger.dev/build@4.5.0-rc.2` + - `@trigger.dev/core@4.5.0-rc.2` + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/python/package.json b/packages/python/package.json index 9e262b6bec9..a99249e825f 100644 --- a/packages/python/package.json +++ b/packages/python/package.json @@ -1,6 +1,6 @@ { "name": "@trigger.dev/python", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "Python runtime and build extension for Trigger.dev", "license": "MIT", "publishConfig": { @@ -45,7 +45,7 @@ "check-exports": "attw --pack ." }, "dependencies": { - "@trigger.dev/core": "workspace:4.5.0-rc.1", + "@trigger.dev/core": "workspace:4.5.0-rc.2", "tinyexec": "^0.3.2" }, "devDependencies": { @@ -56,12 +56,12 @@ "tsx": "4.17.0", "esbuild": "^0.23.0", "@arethetypeswrong/cli": "^0.15.4", - "@trigger.dev/build": "workspace:4.5.0-rc.1", - "@trigger.dev/sdk": "workspace:4.5.0-rc.1" + "@trigger.dev/build": "workspace:4.5.0-rc.2", + "@trigger.dev/sdk": "workspace:4.5.0-rc.2" }, "peerDependencies": { - "@trigger.dev/sdk": "workspace:^4.5.0-rc.1", - "@trigger.dev/build": "workspace:^4.5.0-rc.1" + "@trigger.dev/sdk": "workspace:^4.5.0-rc.2", + "@trigger.dev/build": "workspace:^4.5.0-rc.2" }, "engines": { "node": ">=18.20.0" diff --git a/packages/react-hooks/CHANGELOG.md b/packages/react-hooks/CHANGELOG.md index 6f8b063552f..93d6c07cf44 100644 --- a/packages/react-hooks/CHANGELOG.md +++ b/packages/react-hooks/CHANGELOG.md @@ -1,5 +1,12 @@ # @trigger.dev/react-hooks +## 4.5.0-rc.2 + +### Patch Changes + +- Updated dependencies: + - `@trigger.dev/core@4.5.0-rc.2` + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/react-hooks/package.json b/packages/react-hooks/package.json index 6a32e4521d0..79089c88d27 100644 --- a/packages/react-hooks/package.json +++ b/packages/react-hooks/package.json @@ -1,6 +1,6 @@ { "name": "@trigger.dev/react-hooks", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "trigger.dev react hooks", "license": "MIT", "publishConfig": { @@ -37,7 +37,7 @@ "check-exports": "attw --pack ." }, "dependencies": { - "@trigger.dev/core": "workspace:^4.5.0-rc.1", + "@trigger.dev/core": "workspace:^4.5.0-rc.2", "swr": "^2.2.5" }, "devDependencies": { diff --git a/packages/redis-worker/CHANGELOG.md b/packages/redis-worker/CHANGELOG.md index cb6957b5ea2..ba91f4b7b70 100644 --- a/packages/redis-worker/CHANGELOG.md +++ b/packages/redis-worker/CHANGELOG.md @@ -1,5 +1,12 @@ # @trigger.dev/redis-worker +## 4.5.0-rc.2 + +### Patch Changes + +- Updated dependencies: + - `@trigger.dev/core@4.5.0-rc.2` + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/redis-worker/package.json b/packages/redis-worker/package.json index 318b25a43da..0f025b7fefd 100644 --- a/packages/redis-worker/package.json +++ b/packages/redis-worker/package.json @@ -1,6 +1,6 @@ { "name": "@trigger.dev/redis-worker", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "Redis worker for trigger.dev", "license": "MIT", "publishConfig": { @@ -23,7 +23,7 @@ "test": "vitest --sequence.concurrent=false --no-file-parallelism" }, "dependencies": { - "@trigger.dev/core": "workspace:4.5.0-rc.1", + "@trigger.dev/core": "workspace:4.5.0-rc.2", "lodash.omit": "^4.5.0", "nanoid": "^5.0.7", "p-limit": "^6.2.0", diff --git a/packages/rsc/CHANGELOG.md b/packages/rsc/CHANGELOG.md index f076634753e..2fc48ef862c 100644 --- a/packages/rsc/CHANGELOG.md +++ b/packages/rsc/CHANGELOG.md @@ -1,5 +1,12 @@ # @trigger.dev/rsc +## 4.5.0-rc.2 + +### Patch Changes + +- Updated dependencies: + - `@trigger.dev/core@4.5.0-rc.2` + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/rsc/package.json b/packages/rsc/package.json index 6dadddedc8e..ead6e390e35 100644 --- a/packages/rsc/package.json +++ b/packages/rsc/package.json @@ -1,6 +1,6 @@ { "name": "@trigger.dev/rsc", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "trigger.dev rsc", "license": "MIT", "publishConfig": { @@ -37,14 +37,14 @@ "check-exports": "attw --pack ." }, "dependencies": { - "@trigger.dev/core": "workspace:^4.5.0-rc.1", + "@trigger.dev/core": "workspace:^4.5.0-rc.2", "mlly": "^1.7.1", "react": "19.0.0-rc.1", "react-dom": "19.0.0-rc.1" }, "devDependencies": { "@arethetypeswrong/cli": "^0.15.4", - "@trigger.dev/build": "workspace:^4.5.0-rc.1", + "@trigger.dev/build": "workspace:^4.5.0-rc.2", "@types/node": "^20.14.14", "@types/react": "*", "@types/react-dom": "*", diff --git a/packages/schema-to-json/CHANGELOG.md b/packages/schema-to-json/CHANGELOG.md index 4a8f1e33751..368ff1a8f62 100644 --- a/packages/schema-to-json/CHANGELOG.md +++ b/packages/schema-to-json/CHANGELOG.md @@ -1,5 +1,12 @@ # @trigger.dev/schema-to-json +## 4.5.0-rc.2 + +### Patch Changes + +- Updated dependencies: + - `@trigger.dev/core@4.5.0-rc.2` + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/schema-to-json/package.json b/packages/schema-to-json/package.json index 59b336a8699..b5cc0f3b309 100644 --- a/packages/schema-to-json/package.json +++ b/packages/schema-to-json/package.json @@ -1,6 +1,6 @@ { "name": "@trigger.dev/schema-to-json", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "Convert various schema validation libraries to JSON Schema", "license": "MIT", "publishConfig": { diff --git a/packages/trigger-sdk/CHANGELOG.md b/packages/trigger-sdk/CHANGELOG.md index fcafefd145b..781de7dcb45 100644 --- a/packages/trigger-sdk/CHANGELOG.md +++ b/packages/trigger-sdk/CHANGELOG.md @@ -1,5 +1,55 @@ # @trigger.dev/sdk +## 4.5.0-rc.2 + +### Patch Changes + +- Fix `chat.agent` HITL continuations on reasoning-heavy turns. Two changes that work together: ([#3719](https://github.com/triggerdotdev/trigger.dev/pull/3719)) + + - The per-turn merge now overlays the wire copy's tool-part state advancement onto the agent's existing chain — `state` + the matching resolution field (`output` / `errorText` / `approval`) come from the wire, everything else (text, reasoning, tool `input`, provider metadata) stays whatever the snapshot or `hydrateMessages` returned. Previously a full-message replace overwrote those fields with whatever the client shipped, so a slimmed wire copy landed a tool call with no `arguments` on the next LLM call. Covers `output-available` / `output-error` (HITL `addToolOutput`) and `approval-responded` / `output-denied` (approval flow). + - `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim assistant messages that carry advanced tool parts. The wire payload is just `{ id, role, parts: [] }` for `submit-message` continuations; everything else passes through. Reasoning blobs and full tool inputs no longer ride the wire on every `addToolOutput` / `addToolApproveResponse`, so continuation payloads stay well under the `.in/append` cap on long agent loops. + + Note: `onValidateMessages` receives the slim wire on HITL turns. If you call `validateUIMessages` from `ai` against the full `messages` array it will reject the slim assistant; filter to user messages (or skip on HITL turns) — see the updated docstring on `onValidateMessages` for the recommended pattern. + + For `hydrateMessages` hooks that persist the chain, this release also adds a small helper to the `@trigger.dev/sdk/ai` surface: + + ```ts + import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai"; + + chat.agent({ + hydrateMessages: async ({ chatId, trigger, incomingMessages }) => { + const record = await db.chat.findUnique({ where: { id: chatId } }); + const stored = record?.messages ?? []; + if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { + await db.chat.update({ where: { id: chatId }, data: { messages: stored } }); + } + return stored; + }, + }); + ``` + + It pushes fresh user messages by id, no-ops on HITL continuations (the incoming shares an id with the existing assistant — the runtime overlays the new tool-state advance), and skips on non-`submit-message` triggers. Returns `true` if it mutated `stored` so the caller knows whether to persist. + + Net effect: `chat.addToolOutput(...)` / `chat.addToolApproveResponse(...)` on multi-step reasoning agents (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) no longer blows the cap and no longer corrupts the LLM input. + +- Add `TriggerClient` for running multiple SDK clients side-by-side, each with its own auth, preview branch, and baseURL. Useful when a single process needs to trigger tasks or read runs across multiple projects, environments, or preview branches without mutating shared global state. ([#3683](https://github.com/triggerdotdev/trigger.dev/pull/3683)) + + ```ts + import { TriggerClient } from "@trigger.dev/sdk"; + + const prod = new TriggerClient({ accessToken: process.env.TRIGGER_PROD_KEY }); + const preview = new TriggerClient({ + accessToken: process.env.TRIGGER_PREVIEW_KEY, + previewBranch: "signup-flow", + }); + + await prod.tasks.trigger("send-email", payload); + await preview.runs.list({ status: ["COMPLETED"] }); + ``` + +- Updated dependencies: + - `@trigger.dev/core@4.5.0-rc.2` + ## 4.5.0-rc.1 ### Patch Changes diff --git a/packages/trigger-sdk/package.json b/packages/trigger-sdk/package.json index ab57b2b7a7c..c47c0140d4e 100644 --- a/packages/trigger-sdk/package.json +++ b/packages/trigger-sdk/package.json @@ -1,6 +1,6 @@ { "name": "@trigger.dev/sdk", - "version": "4.5.0-rc.1", + "version": "4.5.0-rc.2", "description": "trigger.dev Node.JS SDK", "license": "MIT", "publishConfig": { @@ -73,7 +73,7 @@ "dependencies": { "@opentelemetry/api": "1.9.0", "@opentelemetry/semantic-conventions": "1.36.0", - "@trigger.dev/core": "workspace:4.5.0-rc.1", + "@trigger.dev/core": "workspace:4.5.0-rc.2", "chalk": "^5.2.0", "cronstrue": "^2.21.0", "debug": "^4.3.4", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c742ab1bfc4..58a5f3f8012 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1472,7 +1472,7 @@ importers: specifier: ^6.10.0 version: 6.19.0(magicast@0.3.5) '@trigger.dev/core': - specifier: workspace:4.5.0-rc.1 + specifier: workspace:4.5.0-rc.2 version: link:../core mlly: specifier: ^1.7.1 @@ -1548,13 +1548,13 @@ importers: specifier: ^0.22.5 version: 0.22.5(supports-color@10.0.0) '@trigger.dev/build': - specifier: workspace:4.5.0-rc.1 + specifier: workspace:4.5.0-rc.2 version: link:../build '@trigger.dev/core': - specifier: workspace:4.5.0-rc.1 + specifier: workspace:4.5.0-rc.2 version: link:../core '@trigger.dev/schema-to-json': - specifier: workspace:4.5.0-rc.1 + specifier: workspace:4.5.0-rc.2 version: link:../schema-to-json ansi-escapes: specifier: ^7.0.0 @@ -1941,7 +1941,7 @@ importers: packages/python: dependencies: '@trigger.dev/core': - specifier: workspace:4.5.0-rc.1 + specifier: workspace:4.5.0-rc.2 version: link:../core tinyexec: specifier: ^0.3.2 @@ -1951,10 +1951,10 @@ importers: specifier: ^0.15.4 version: 0.15.4 '@trigger.dev/build': - specifier: workspace:4.5.0-rc.1 + specifier: workspace:4.5.0-rc.2 version: link:../build '@trigger.dev/sdk': - specifier: workspace:4.5.0-rc.1 + specifier: workspace:4.5.0-rc.2 version: link:../trigger-sdk '@types/node': specifier: 20.14.14 @@ -1978,7 +1978,7 @@ importers: packages/react-hooks: dependencies: '@trigger.dev/core': - specifier: workspace:^4.5.0-rc.1 + specifier: workspace:^4.5.0-rc.2 version: link:../core react: specifier: ^18.0 || ^19.0 || ^19.0.0-rc @@ -2012,7 +2012,7 @@ importers: packages/redis-worker: dependencies: '@trigger.dev/core': - specifier: workspace:4.5.0-rc.1 + specifier: workspace:4.5.0-rc.2 version: link:../core cron-parser: specifier: ^4.9.0 @@ -2061,7 +2061,7 @@ importers: packages/rsc: dependencies: '@trigger.dev/core': - specifier: workspace:^4.5.0-rc.1 + specifier: workspace:^4.5.0-rc.2 version: link:../core mlly: specifier: ^1.7.1 @@ -2077,7 +2077,7 @@ importers: specifier: ^0.15.4 version: 0.15.4 '@trigger.dev/build': - specifier: workspace:^4.5.0-rc.1 + specifier: workspace:^4.5.0-rc.2 version: link:../build '@types/node': specifier: 20.14.14 @@ -2153,7 +2153,7 @@ importers: specifier: 1.36.0 version: 1.36.0 '@trigger.dev/core': - specifier: workspace:4.5.0-rc.1 + specifier: workspace:4.5.0-rc.2 version: link:../core chalk: specifier: ^5.2.0 @@ -23334,7 +23334,7 @@ snapshots: '@epic-web/test-server@0.1.0(bufferutil@4.0.9)': dependencies: - '@hono/node-server': 1.12.2(hono@4.5.11) + '@hono/node-server': 1.12.2(hono@4.12.15) '@hono/node-ws': 1.0.4(@hono/node-server@1.12.2(hono@4.5.11))(bufferutil@4.0.9) '@open-draft/deferred-promise': 2.2.0 '@types/ws': 8.5.12 @@ -24013,9 +24013,9 @@ snapshots: dependencies: react: 18.2.0 - '@hono/node-server@1.12.2(hono@4.5.11)': + '@hono/node-server@1.12.2(hono@4.12.15)': dependencies: - hono: 4.5.11 + hono: 4.12.15 '@hono/node-server@1.19.11(hono@4.12.15)': dependencies: @@ -24031,7 +24031,7 @@ snapshots: '@hono/node-ws@1.0.4(@hono/node-server@1.12.2(hono@4.5.11))(bufferutil@4.0.9)': dependencies: - '@hono/node-server': 1.12.2(hono@4.5.11) + '@hono/node-server': 1.12.2(hono@4.12.15) ws: 8.18.3(bufferutil@4.0.9) transitivePeerDependencies: - bufferutil From 9f64bf404b90dba01d584e7abd8da615e14d2348 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 23 May 2026 17:27:15 +0100 Subject: [PATCH 5/5] docs(ai-chat): slim-wire HITL continuations + field-level merge contract (#3721) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Updates the AI chat docs to match the slim-wire + field-level merge behavior shipped in #3719 and the precise `.in/append` cap + CORS-readable 413 shipped in #3720. No behavior changes here — code is correct in `main`; the docs were lagging on three patterns customers copy out of the page. ## What changed - **`hydrateMessages` examples upsert by id** (in `lifecycle-hooks.mdx`, `patterns/database-persistence.mdx`, and `patterns/persistence-and-replay.mdx`). The previous `stored.push(newMsg)` pattern duplicated the assistant id on HITL continuations and caused the LLM to receive a tool call with no `arguments`. The new examples include the rationale inline. - **`onValidateMessages` example filters to user messages** (`lifecycle-hooks.mdx`). The previous example called `validateUIMessages({ messages, tools })` directly, which now throws on HITL slim wires (the AI SDK schema requires `input` on resolved tool parts). New example shows the filter pattern, with a Warning callout explaining why. - **Merge contract description updated** (`lifecycle-hooks.mdx`). The old wording said incoming messages are "auto-merged" / "replaced"; the new description explains the actual field-level overlay (state advances only). - **Approval-responded wire example slimmed** (`client-protocol.mdx`). Shows the minimum shape the agent reads — `state` + `approval` (or `output` / `errorText` for HITL). Notes that the built-in transports ship this slim shape by default and that fuller shapes are still accepted. - **`/in/append` 413 row and FAQ updated** (`client-protocol.mdx`, `patterns/trusted-edge-signals.mdx`). Reflects the new precise S2 cap and the CORS-readable 413. - **New changelog entry** at the top of `changelog.mdx` covering all of the above. The historical `## 512 KiB ceiling removed` entry further down the changelog is left as-is (it's a snapshot of the prior transition), and the v4.5 upgrade-guide section is skipped — the merge contract is backwards compatible. ## Test plan - Mintlify dev preview renders cleanly with no broken anchors - Linked references resolve (`/ai-chat/lifecycle-hooks#hydratemessages`, `/ai-chat/lifecycle-hooks#onvalidatemessages`, `/ai-chat/patterns/database-persistence#alternative-hydratemessages`, `/ai-chat/client-protocol#step-3-send-messages-stops-and-actions`, `/ai-chat/patterns/large-payloads`) --- docs/ai-chat/changelog.mdx | 48 +++++++++++++++++++ docs/ai-chat/client-protocol.mdx | 16 ++++--- docs/ai-chat/lifecycle-hooks.mdx | 23 ++++++--- .../ai-chat/patterns/database-persistence.mdx | 9 +++- .../patterns/persistence-and-replay.mdx | 7 +-- .../ai-chat/patterns/trusted-edge-signals.mdx | 2 +- 6 files changed, 86 insertions(+), 19 deletions(-) diff --git a/docs/ai-chat/changelog.mdx b/docs/ai-chat/changelog.mdx index a972ca368ac..08a1832ff60 100644 --- a/docs/ai-chat/changelog.mdx +++ b/docs/ai-chat/changelog.mdx @@ -4,6 +4,54 @@ sidebarTitle: "Changelog" description: "Pre-release updates for AI chat agents." --- + + +## HITL continuations — slim wire by default + field-level merge + +`chat.addToolOutput(...)` and `chat.addToolApproveResponse(...)` continuations on reasoning-heavy agent loops used to fail two ways: either the wire body crossed the `/in/append` cap (encrypted reasoning blobs + tool input routinely > 512 KiB), or apps that slimmed the wire as a workaround landed a tool call with no `arguments` on the next LLM step (the per-turn merge replaced the hydrated message wholesale instead of overlaying only the new tool-state advance). Both modes are fixed. + +The transport (`TriggerChatTransport.sendMessages`, `AgentChat.sendRaw`) now slims the assistant message itself on `submit-message` turns whose assistant carries resolved or approval-responded tool parts. The wire shape ships as `{ id, role: "assistant", parts: [] }` — `state` plus `output` / `errorText` / `approval`, depending on the new state. Everything else (reasoning blobs, prior text, tool `input`, provider metadata) is reconstructed server-side from `hydrateMessages` or the durable snapshot. Continuation payloads typically drop from 600 KiB – 1 MiB to ~1 KiB. + +The per-turn merge now overlays only the tool-part state advances (`output-available` / `output-error` / `approval-responded` / `output-denied`) from the wire copy onto the matching hydrated entry. Hydrated `input`, text, reasoning, and provider metadata stay put. The agent still accepts a fuller `UIMessage` on the wire (the merge only reads the resolved fields), so custom transports that ship more don't break — they just waste bytes. + +### `hydrateMessages` upsert-by-id + +If your `hydrateMessages` hook persists the incoming message, **upsert by id** — don't unconditionally push. HITL continuations ship the existing assistant's id with a slim payload; a blind `stored.push(newMsg)` duplicates the row in the chain you return, the merge updates the first match, and the slim duplicate hits `toModelMessages` with no `input`. + +A new `upsertIncomingMessage` helper is exported from `@trigger.dev/sdk/ai` to handle this for the common case: + +```ts +import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai"; + +chat.agent({ + hydrateMessages: async ({ chatId, trigger, incomingMessages }) => { + const record = await db.chat.findUnique({ where: { id: chatId } }); + const stored = record?.messages ?? []; + if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { + await db.chat.update({ where: { id: chatId }, data: { messages: stored } }); + } + return stored; + }, +}); +``` + +The helper pushes fresh user messages, no-ops on HITL continuations (so the runtime can overlay the new tool-state advance), and skips on non-`submit-message` triggers. Returns `true` if it mutated `stored`. The examples in [lifecycle hooks](/ai-chat/lifecycle-hooks#hydratemessages), [Database persistence](/ai-chat/patterns/database-persistence#alternative-hydratemessages), and [Persistence and replay](/ai-chat/patterns/persistence-and-replay) have all been updated. Custom hydrate logic (branching, rollback, etc.) can still write the upsert by hand — the helper is a convenience for the common shape. + +### `onValidateMessages` slim wire caveat + +The slim wire is what arrives in `onValidateMessages` on HITL turns. `validateUIMessages` from `ai` rejects the slim shape (the AI SDK schema requires `input` on resolved tool parts), so filter to user messages first (or skip validation entirely on those turns). See the updated example in [lifecycle hooks](/ai-chat/lifecycle-hooks#onvalidatemessages). + +### `/in/append` 413 + precise cap + +In parallel: + +- The 413 response now carries CORS headers, so browser fetches can read the status instead of failing as opaque `TypeError: Failed to fetch`. App-side retry-on-disconnect loops no longer spin forever on a permanently-rejected payload. +- The per-record cap is now computed precisely against S2's actual ceiling instead of the conservative 512 KiB floor. Legitimate ~600 – 900 KiB tool outputs (search results, file content) now succeed; pathological all-quote content that would double under JSON escape still rejects cleanly with a clear error. + +See the updated [413 row in the client protocol](/ai-chat/client-protocol#step-3-send-messages-stops-and-actions). + + + ## v4.5.0-rc.1 — two bug fixes diff --git a/docs/ai-chat/client-protocol.mdx b/docs/ai-chat/client-protocol.mdx index 548f428339e..0a94327b78c 100644 --- a/docs/ai-chat/client-protocol.mdx +++ b/docs/ai-chat/client-protocol.mdx @@ -692,7 +692,7 @@ The body is a JSON-serialized [`ChatInputChunk`](#chatinputchunk) — a tagged u | `401` | Missing or invalid `Authorization` header. | | `403` | Token doesn't carry `write:sessions:{externalId}`. | | `409` | The session is closed — `{ "ok": false, "error": "Cannot append to a closed session" }`. | -| `413` | Body exceeds 512 KiB. A normal `kind: "message"` payload is a few KB; if you hit this you're shipping more than one message per record. | +| `413` | Body exceeds 1 MiB **or** the wrapped record would exceed S2's ~1 MiB per-record metered ceiling. A normal `kind: "message"` payload is a few KB; if you hit this you're shipping more than one message per record or pushing a single tool output that's itself oversized. Carries CORS headers so browser fetches can read the status. | | `500` | Transient backend failure on the durable stream. Safe to retry — appends are idempotent on `(externalId, X-Part-Id)` if you set the optional `X-Part-Id` request header (the built-in clients set it from a UUID). | @@ -851,7 +851,7 @@ The agent trims trailing assistant messages from its accumulator and re-streams ### Tool approval responses -When a tool requires approval (`needsApproval: true`), the agent streams the tool call with an `approval-requested` state and completes the turn. After the user approves or denies, send the **updated assistant message** (with `approval-responded` tool parts) back as a `kind: "message"` chunk — singular, not the full chain: +When a tool requires approval (`needsApproval: true`), the agent streams the tool call with an `approval-requested` state and completes the turn. After the user approves or denies, send the **updated assistant message** back as a `kind: "message"` chunk — singular, not the full chain. The minimum shape the agent reads is just the resolved tool parts: ```json { @@ -861,12 +861,10 @@ When a tool requires approval (`needsApproval: true`), the agent streams the too "id": "asst-msg-1", "role": "assistant", "parts": [ - { "type": "text", "text": "I'll send that email for you." }, { "type": "tool-sendEmail", "toolCallId": "call-1", "state": "approval-responded", - "input": { "to": "user@example.com", "subject": "Hello" }, "approval": { "id": "approval-1", "approved": true } } ] @@ -878,7 +876,11 @@ When a tool requires approval (`needsApproval: true`), the agent streams the too } ``` -The agent matches the incoming message by `id` against the rebuilt accumulator. If a match is found, it **replaces** the existing message instead of appending. +The agent matches the incoming message by `id` against the rebuilt accumulator (or hydrated chain) and **overlays the tool-state advance** onto the matching entry — `state` plus `output` / `errorText` / `approval`, depending on the new state. Hydrated `input`, text, reasoning, and provider metadata stay put. This is what makes the slim shape above sufficient: the agent rebuilds everything else from the snapshot or from your `hydrateMessages` hook. + +The same shape applies to HITL `addToolOutput` answers — substitute `state: "output-available"` and `output: ` for the approval pair above. Single-tool HITL `addToolOutput` continuation payloads are typically ~1 KiB on the wire. + +The built-in transports (`TriggerChatTransport`, `AgentChat`) ship the slim shape by default on `submit-message` continuations. Custom transports can ship a fuller `UIMessage` — the agent still only reads the resolved tool-part fields — but the slim shape is the most efficient and avoids brushing the per-record cap on reasoning-heavy turns. The message `id` must match the one the agent assigned during streaming. `TriggerChatTransport` keeps IDs in sync automatically. Custom transports should use the `messageId` from the stream's `start` chunk. @@ -938,7 +940,7 @@ To bridge that gap, the head-start route handler ships **full UIMessage history* Two reasons this exception is safe: -1. **The route handler runs against the customer's own HTTP endpoint**, not `/realtime/v1/sessions/{id}/in/append`. The 512 KiB body cap on the realtime route doesn't apply. +1. **The route handler runs against the customer's own HTTP endpoint**, not `/realtime/v1/sessions/{id}/in/append`. The per-record cap on the realtime route doesn't apply. 2. **`headStartMessages` is only honored on `trigger: "handover-prepare"`**. The runtime ignores the field on every other trigger — the one-message-per-record rule still holds for normal turns. After turn 1 completes, the snapshot is written and turn 2+ run as a normal single-message-per-record chat. @@ -1067,7 +1069,7 @@ No. `seq_num` is monotonic across the entire session — turn 1 might emit seq 0 -512 KiB. A typical `kind: "message"` is a few KB. If you're brushing the cap you're shipping more than one message per record, which the protocol forbids. The headStart path (`trigger: "handover-prepare"`) sends through the customer's own HTTP route handler, not `.in/append`, so the cap doesn't apply there. +The HTTP body is capped at 1 MiB as a DoS guard. The actual ceiling is at the storage layer: each `.in/append` becomes a single S2 record, metered as `8 + body_bytes_after_JSON_wrap`, capped at 1 MiB. So the practical limit on the raw HTTP body sits around ~1023 KiB for content with low JSON-escape overhead (ASCII, base64) and ~512 KiB for content that escapes heavily (all quotes / backslashes). A typical `kind: "message"` is a few KiB. If you're brushing the cap you're either shipping a single tool output that's itself oversized — see [Large payloads](/ai-chat/patterns/large-payloads) — or you're shipping more than one message per record, which the protocol forbids. The 413 response carries CORS headers so browser fetches can read the status. The headStart path (`trigger: "handover-prepare"`) sends through the customer's own HTTP route handler, not `.in/append`, so the cap doesn't apply there. ## See also diff --git a/docs/ai-chat/lifecycle-hooks.mdx b/docs/ai-chat/lifecycle-hooks.mdx index c6ea62cbc81..f1e9ce93614 100644 --- a/docs/ai-chat/lifecycle-hooks.mdx +++ b/docs/ai-chat/lifecycle-hooks.mdx @@ -242,7 +242,11 @@ import { validateUIMessages } from "ai"; export const myChat = chat.agent({ id: "my-chat", onValidateMessages: async ({ messages }) => { - return validateUIMessages({ messages, tools: chatTools }); + const userMessages = messages.filter((m) => m.role === "user"); + if (userMessages.length > 0) { + await validateUIMessages({ messages: userMessages, tools: chatTools }); + } + return messages; }, run: async ({ messages, signal }) => { return streamText({ model: anthropic("claude-sonnet-4-5"), messages, tools: chatTools, abortSignal: signal }); @@ -250,6 +254,10 @@ export const myChat = chat.agent({ }); ``` + + On HITL continuations (`addToolOutput` / `addToolApproveResponse`) the assistant entry in `messages` is **slim** — `state` + `output` / `errorText` / `approval` only, no `input` or other parts. `validateUIMessages` against the AI SDK schema rejects that shape (the schema requires `input` on resolved tool parts), so filter to user messages first (or skip validation entirely on those turns). The example above does the filter. + + `onValidateMessages` fires **before** `onTurnStart` and message accumulation. If you need to validate messages loaded from a database, do the loading in `onChatStart` or `onPreload` and let `onValidateMessages` validate the full incoming set each turn. @@ -272,16 +280,15 @@ Use this when the backend should be the source of truth for message history: abu | `previousRunId` | `string \| undefined` | The previous run ID (if continuation) | ```ts +import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai"; + export const myChat = chat.agent({ id: "my-chat", hydrateMessages: async ({ chatId, trigger, incomingMessages }) => { const record = await db.chat.findUnique({ where: { id: chatId } }); const stored = record?.messages ?? []; - // Append the new user message and persist - if (trigger === "submit-message" && incomingMessages.length > 0) { - const newMsg = incomingMessages[incomingMessages.length - 1]!; - stored.push(newMsg); + if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { await db.chat.update({ where: { id: chatId }, data: { messages: stored }, @@ -296,9 +303,13 @@ export const myChat = chat.agent({ }); ``` +`upsertIncomingMessage` (exported from `@trigger.dev/sdk/ai`) handles the three cases that matter — fresh user messages get pushed, HITL continuations (`addToolOutput` / `addToolApproveResponse`) no-op because the incoming wire shares the existing assistant's id and the runtime overlays the new tool-state advance onto that entry, and non-`submit-message` triggers (`regenerate-message` / `action`) skip persistence. It returns `true` when it mutated `stored`, so the caller knows whether to persist. + +If you need branching, rollback, or other custom hydrate logic, you can still write the upsert by hand — `upsertIncomingMessage` is a convenience for the common case, not the only supported shape. + **Lifecycle position:** `onValidateMessages` → **`hydrateMessages`** → `onChatStart` (chat's first message only) → `onTurnStart` → `run()` -After the hook returns, any incoming wire message whose ID matches a hydrated message is auto-merged. This makes [tool approvals](/ai-chat/frontend#tool-approvals) work transparently with hydration. +After the hook returns, the runtime overlays the wire's tool-state advances (`output-available` / `output-error` / `approval-responded` / `output-denied`) onto matching hydrated entries by id. Everything else on the hydrated entry — text, reasoning, tool `input`, providerMetadata — stays put. This makes [tool approvals](/ai-chat/frontend#tool-approvals) and HITL `addToolOutput` continuations work transparently: ship a slim resolution on the wire, the agent merges the new state onto your DB-backed copy. `hydrateMessages` also fires for [action](/ai-chat/actions) turns (`trigger: "action"`) with empty `incomingMessages`. This lets the action handler work with the latest DB state. diff --git a/docs/ai-chat/patterns/database-persistence.mdx b/docs/ai-chat/patterns/database-persistence.mdx index 5ee32f8a6bd..0bfc447f31b 100644 --- a/docs/ai-chat/patterns/database-persistence.mdx +++ b/docs/ai-chat/patterns/database-persistence.mdx @@ -178,14 +178,19 @@ For apps that need the backend to be the single source of truth for message hist With hydration, the hook loads messages from your database on every turn. The frontend's messages are ignored (except for the new user message, which arrives in `incomingMessages`): ```ts +import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai"; + export const myChat = chat.agent({ id: "my-chat", hydrateMessages: async ({ chatId, trigger, incomingMessages }) => { const record = await db.chat.findUnique({ where: { id: chatId } }); const stored = record?.messages ?? []; - if (trigger === "submit-message" && incomingMessages.length > 0) { - stored.push(incomingMessages[incomingMessages.length - 1]!); + // `upsertIncomingMessage` pushes a fresh user message and no-ops + // on HITL continuations (the runtime overlays the new tool-state + // advance onto the existing entry). See lifecycle hooks for the + // full pattern: /ai-chat/lifecycle-hooks#hydratemessages + if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { await db.chat.update({ where: { id: chatId }, data: { messages: stored } }); } diff --git a/docs/ai-chat/patterns/persistence-and-replay.mdx b/docs/ai-chat/patterns/persistence-and-replay.mdx index f1008dda260..4e1bdf4084e 100644 --- a/docs/ai-chat/patterns/persistence-and-replay.mdx +++ b/docs/ai-chat/patterns/persistence-and-replay.mdx @@ -131,7 +131,7 @@ If `onAction` mutates `chat.history.*` and then the run crashes before the next When the customer registers a [`hydrateMessages`](/ai-chat/lifecycle-hooks#hydratemessages) hook, the runtime trusts the hook to be the source of truth for history. Snapshot read and replay are **skipped entirely** at boot. The hook fires per turn, returns the canonical chain from the customer's database, and the accumulator is set to whatever the hook returned. ```ts -import { chat } from "@trigger.dev/sdk/ai"; +import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai"; import { db } from "@/lib/db"; export const myChat = chat.agent({ @@ -139,8 +139,9 @@ export const myChat = chat.agent({ hydrateMessages: async ({ chatId, trigger, incomingMessages }) => { const stored = (await db.chat.findUnique({ where: { id: chatId } }))?.messages ?? []; - if (trigger === "submit-message" && incomingMessages.length > 0) { - stored.push(incomingMessages[0]!); + // See lifecycle-hooks for the full upsert pattern + rationale: + // /ai-chat/lifecycle-hooks#hydratemessages + if (upsertIncomingMessage(stored, { trigger, incomingMessages })) { await db.chat.update({ where: { id: chatId }, data: { messages: stored } }); } diff --git a/docs/ai-chat/patterns/trusted-edge-signals.mdx b/docs/ai-chat/patterns/trusted-edge-signals.mdx index 1dd5f97d3f5..181a3895474 100644 --- a/docs/ai-chat/patterns/trusted-edge-signals.mdx +++ b/docs/ai-chat/patterns/trusted-edge-signals.mdx @@ -115,7 +115,7 @@ The body is a JSON-serialized `ChatInputChunk`. The proxy parses it, checks `kin } ``` -Both bodies stay well under the [512 KiB cap on `/in/append`](/ai-chat/client-protocol#step-3-send-messages-stops-and-actions) — a typical trust object is ~200 bytes. +Both bodies stay well under the [per-record cap on `/in/append`](/ai-chat/client-protocol#step-3-send-messages-stops-and-actions) — a typical trust object is ~200 bytes. Other paths — `.out` SSE, `/api/v1/auth/jwt/claims`, anything else — pass through the proxy untouched. The SSE stream in particular must not be buffered; preserve the response body as-is.