Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .changeset/chat-slim-wire-merge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
"@trigger.dev/sdk": patch
---

Fix `chat.agent` HITL continuations on reasoning-heavy turns. Two changes that work together:

- The per-turn merge now overlays the wire copy's tool-part state advancement onto the agent's existing chain — `state` + the matching resolution field (`output` / `errorText` / `approval`) come from the wire, everything else (text, reasoning, tool `input`, provider metadata) stays whatever the snapshot or `hydrateMessages` returned. Previously a full-message replace overwrote those fields with whatever the client shipped, so a slimmed wire copy landed a tool call with no `arguments` on the next LLM call. Covers `output-available` / `output-error` (HITL `addToolOutput`) and `approval-responded` / `output-denied` (approval flow).
- `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim assistant messages that carry advanced tool parts. The wire payload is just `{ id, role, parts: [<state + resolution field>] }` for `submit-message` continuations; everything else passes through. Reasoning blobs and full tool inputs no longer ride the wire on every `addToolOutput` / `addToolApproveResponse`, so continuation payloads stay well under the `.in/append` cap on long agent loops.

Note: `onValidateMessages` receives the slim wire on HITL turns. If you call `validateUIMessages` from `ai` against the full `messages` array it will reject the slim assistant; filter to user messages (or skip on HITL turns) — see the updated docstring on `onValidateMessages` for the recommended pattern.

For `hydrateMessages` hooks that persist the chain, this release also adds a small helper to the `@trigger.dev/sdk/ai` surface:

```ts
import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai";

chat.agent({
hydrateMessages: async ({ chatId, trigger, incomingMessages }) => {
const record = await db.chat.findUnique({ where: { id: chatId } });
const stored = record?.messages ?? [];
if (upsertIncomingMessage(stored, { trigger, incomingMessages })) {
await db.chat.update({ where: { id: chatId }, data: { messages: stored } });
}
return stored;
},
});
```

It pushes fresh user messages by id, no-ops on HITL continuations (the incoming shares an id with the existing assistant — the runtime overlays the new tool-state advance), and skips on non-`submit-message` triggers. Returns `true` if it mutated `stored` so the caller knows whether to persist.

Net effect: `chat.addToolOutput(...)` / `chat.addToolApproveResponse(...)` on multi-step reasoning agents (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) no longer blows the cap and no longer corrupts the LLM input.
4 changes: 3 additions & 1 deletion .changeset/pre.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"chat-agent",
"chat-history-read-primitives",
"chat-session-attributes",
"chat-slim-wire-merge",
"chat-start-session-action-typed-client-data",
"cli-deploy-skip-rewrite-timestamp",
"locals-key-dual-package-fix",
Expand All @@ -39,6 +40,7 @@
"resource-catalog-runtime-registration",
"retry-sigsegv",
"runs-list-region-filter",
"sessions-primitive"
"sessions-primitive",
"trigger-client"
]
}
6 changes: 0 additions & 6 deletions .server-changes/configurable-http-keepalive-timeout.md

This file was deleted.

6 changes: 0 additions & 6 deletions .server-changes/drop-taskrun-scheduleid-createdat-idx.md

This file was deleted.

6 changes: 0 additions & 6 deletions .server-changes/organization-scoped-clickhouse.md

This file was deleted.

6 changes: 0 additions & 6 deletions .server-changes/pending-version-clickhouse-lookup.md

This file was deleted.

6 changes: 0 additions & 6 deletions .server-changes/sentry-tenant-attribution.md

This file was deleted.

6 changes: 0 additions & 6 deletions .server-changes/supervisor-checkpoint-type-compat.md

This file was deleted.

18 changes: 12 additions & 6 deletions apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@ const ParamsSchema = z.object({
// POST: server-side append of a single record to a session channel. Mirrors
// the existing /realtime/v1/streams/:runId/:target/:streamId/append route,
// scoped to a Session primitive.
// S2 enforces a 1 MiB per-record limit (metered as
// `8 + 2*H + Σ(header name+value) + body`). We cap the raw HTTP body at
// 512 KiB so the JSON wrapper (`{"data":"...","id":"..."}`), string
// escaping, and any future per-record header additions all stay comfortably
// below S2's ceiling. See https://s2.dev/docs/limits.
const MAX_APPEND_BODY_BYTES = 1024 * 512;
//
// The HTTP body cap here is just a DoS pre-guard — set generously at
// 1 MiB so we don't buffer arbitrarily large inputs before we can
// compute the wrapped size. The actual S2 per-record limit (verified
// empirically against cloud S2) is enforced precisely inside
// `S2RealtimeStreams.#appendPartByName` — it throws
// `S2RecordTooLargeError` (a `ServiceValidationError` with status
// 413) when the metered record size would exceed S2's 1 MiB ceiling
// after JSON wrapping. That lets legitimate bodies up to ~1023 KiB
// raw through (ASCII or low-escape content) while still rejecting
// pathological all-quote content that would double on wrap.
const MAX_APPEND_BODY_BYTES = 1024 * 1024;

const { action, loader } = createActionApiRoute(
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { json } from "@remix-run/server-runtime";
import { tryCatch } from "@trigger.dev/core/utils";
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
Expand All @@ -13,6 +14,7 @@ import {
} from "~/services/routeBuilders/apiBuilder.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { engine } from "~/v3/runEngine.server";
import { ServiceValidationError } from "~/v3/services/common.server";

const ParamsSchema = z.object({
runId: z.string(),
Expand Down Expand Up @@ -77,13 +79,29 @@ const { action } = createActionApiRoute(
const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`;
const record = JSON.stringify(body.data.data);

// Append the record to the per-stream S2 stream (auto-creates on first write)
await realtimeStream.appendPart(
record,
recordId,
run.friendlyId,
`$trigger.input:${params.streamId}`
// Append the record to the per-stream S2 stream (auto-creates on
// first write). `appendPart` can throw `S2RecordTooLargeError` (a
// `ServiceValidationError` with status 413) when the wrapped
// record exceeds S2's per-record ceiling — surface that as 413
// rather than letting it propagate to the apiBuilder catch-all
// as a generic 500.
const [appendError] = await tryCatch(
realtimeStream.appendPart(
record,
recordId,
run.friendlyId,
`$trigger.input:${params.streamId}`
)
);
if (appendError) {
if (appendError instanceof ServiceValidationError) {
return json(
{ ok: false, error: appendError.message },
{ status: appendError.status ?? 422 }
);
}
throw appendError;
}

// Check Redis cache for a linked .wait() waitpoint (fast, no DB hit if none)
// Get first, complete, then delete — so the mapping survives if completeWaitpoint throws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ const ParamsSchema = z.object({
io: z.enum(["out", "in"]),
});

// S2 record body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append
// route — keep it well under S2's 1 MiB per-record limit so JSON wrapping,
// string escaping, and any future per-record headers stay safe.
const MAX_APPEND_BODY_BYTES = 1024 * 512;
// HTTP body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append
// route — DoS pre-guard only. The actual S2 per-record limit is
// enforced precisely by `S2RealtimeStreams.#appendPartByName`
// (throws `S2RecordTooLargeError` with status 413 when the metered
// record size would exceed S2's 1 MiB ceiling after JSON wrapping).
const MAX_APPEND_BODY_BYTES = 1024 * 1024;

// POST: Append a single record to a Session channel from the dashboard
// playground. Mirrors the public `POST /realtime/v1/sessions/:session/:io/append`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,19 @@ import { getSecretStore } from "../secrets/secretStore.server";
import { ClickhouseConnectionSchema } from "../clickhouse/clickhouseSecretSchemas.server";

export class OrganizationDataStoresRegistry {
private _prisma: PrismaClient | PrismaReplicaClient;
/**
* Writer client — used by every method that mutates state
* (`addDataStore` / `updateDataStore` / `deleteDataStore` and their backing
* SecretStore writes). Must be the primary connection; replica-targeted
* writes are rejected by Postgres with code 25006 (read-only transaction).
*/
private _writer: PrismaClient;
/**
* Read client used by the polling `loadFromDatabase()` (and its
* `SecretStore.getSecret` lookups). Can be a replica — these are
* cache-fillers, not on hot user-facing paths.
*/
private _replica: PrismaClient | PrismaReplicaClient;
/** Keyed by `${organizationId}:${kind}` */
private _lookup: Map<string, ParsedDataStore> = new Map();
private _loaded = false;
Expand All @@ -21,8 +33,9 @@ export class OrganizationDataStoresRegistry {
*/
readonly isReady: Promise<void>;

constructor(prisma: PrismaClient | PrismaReplicaClient) {
this._prisma = prisma;
constructor(writer: PrismaClient, replica: PrismaClient | PrismaReplicaClient) {
this._writer = writer;
this._replica = replica;
this.isReady = new Promise<void>((resolve) => {
this._readyResolve = resolve;
});
Expand All @@ -37,10 +50,10 @@ export class OrganizationDataStoresRegistry {
// same `${orgId}:${kind}` appears in multiple rows. The registry must never
// throw on overlap — failing the load would break every customer, not just the
// misconfigured orgs — so we keep the first entry and log an error instead.
const rows = await this._prisma.organizationDataStore.findMany({
const rows = await this._replica.organizationDataStore.findMany({
orderBy: { key: "asc" },
});
const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma });
const secretStore = getSecretStore("DATABASE", { prismaClient: this._replica });

const lookup = new Map<string, ParsedDataStore>();
/** Tracks which row's `key` already owns each `${orgId}:${kind}` so we can log conflicts. */
Expand Down Expand Up @@ -125,10 +138,10 @@ export class OrganizationDataStoresRegistry {
}) {
const secretKey = this.#secretKey(key, kind);

const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma });
const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer });
await secretStore.setSecret(secretKey, config);

return this._prisma.organizationDataStore.create({
return this._writer.organizationDataStore.create({
data: {
key,
organizationIds,
Expand All @@ -153,11 +166,11 @@ export class OrganizationDataStoresRegistry {
const secretKey = this.#secretKey(key, kind);

if (config) {
const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma });
const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer });
await secretStore.setSecret(secretKey, config);
}

return this._prisma.organizationDataStore.update({
return this._writer.organizationDataStore.update({
where: {
key,
},
Expand All @@ -170,12 +183,12 @@ export class OrganizationDataStoresRegistry {

async deleteDataStore({ key, kind }: { key: string; kind: DataStoreKind }) {
const secretKey = this.#secretKey(key, kind);
const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma });
const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer });
await secretStore.deleteSecret(secretKey).catch(() => {
// Secret may not exist — proceed with deletion
});

await this._prisma.organizationDataStore.delete({ where: { key } });
await this._writer.organizationDataStore.delete({ where: { key } });
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import pRetry from "p-retry";
import { $replica } from "~/db.server";
import { $replica, prisma } from "~/db.server";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { signalsEmitter } from "~/services/signals.server";
import { singleton } from "~/utils/singleton";
import { OrganizationDataStoresRegistry } from "./organizationDataStoresRegistry.server";

export const organizationDataStoresRegistry = singleton("organizationDataStoresRegistry", () => {
const registry = new OrganizationDataStoresRegistry($replica);
const registry = new OrganizationDataStoresRegistry(prisma, $replica);

// Runs as soon as this singleton is created (first import of this module). The
// registry’s `isReady` promise resolves when this eventually succeeds.
Expand Down
38 changes: 37 additions & 1 deletion apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,36 @@ import { StreamIngestor, StreamRecord, StreamResponder, StreamResponseOptions }
import { Logger, LogLevel } from "@trigger.dev/core/logger";
import { headerValue } from "@trigger.dev/core/v3";
import { randomUUID } from "node:crypto";
import { ServiceValidationError } from "~/v3/services/common.server";

// S2's per-record metered-size limit. Verified empirically against
// cloud S2: append succeeds at metered=1048576 and 422s at 1048577
// with `"record must have metered size less than 1 MiB"` (the "less
// than" wording is slightly off — the boundary is inclusive).
//
// Metered size formula:
// metered = 8 (record overhead) + 2*H + Σ(header name + value) + body
// where `body` is the unescaped record body length in UTF-8 bytes and
// `H` is the number of S2 record headers.
//
// We attach no record headers (H=0), so the budget reduces to:
// 8 + body ≤ 1048576 → body ≤ 1048568
export const S2_MAX_METERED_BYTES = 1024 * 1024; // 1 MiB
export const S2_RECORD_BASE_OVERHEAD_BYTES = 8;

/**
* Thrown when a record's metered size would exceed S2's hard per-record
* limit. Caught by the route handler and surfaced as 413.
*/
export class S2RecordTooLargeError extends ServiceValidationError {
constructor(public readonly meteredBytes: number) {
super(
`Record metered size ${meteredBytes} bytes exceeds the S2 per-record limit of ${S2_MAX_METERED_BYTES} bytes. Reduce tool-output size or split into smaller parts.`,
413
);
this.name = "S2RecordTooLargeError";
}
}

export type S2RealtimeStreamsOptions = {
// S2
Expand Down Expand Up @@ -181,8 +211,14 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
async #appendPartByName(part: string, partId: string, s2Stream: string): Promise<void> {
this.logger.debug(`S2 appending to stream`, { part, stream: s2Stream });

const recordBody = JSON.stringify({ data: part, id: partId });
const meteredBytes = Buffer.byteLength(recordBody, "utf8") + S2_RECORD_BASE_OVERHEAD_BYTES;
if (meteredBytes > S2_MAX_METERED_BYTES) {
throw new S2RecordTooLargeError(meteredBytes);
}

const result = await this.s2Append(s2Stream, {
records: [{ body: JSON.stringify({ data: part, id: partId }) }],
records: [{ body: recordBody }],
});

this.logger.debug(`S2 append result`, { result });
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,11 @@ export function createActionApiRoute<
const contentLength = request.headers.get("content-length");

if (!contentLength || parseInt(contentLength) > maxContentLength) {
return json({ error: "Request body too large" }, { status: 413 });
return await wrapResponse(
request,
json({ error: "Request body too large" }, { status: 413 }),
corsStrategy !== "none"
);
}
}

Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/test/clickhouseFactory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"returns default client when org has no data store",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);
await registry.loadFromDatabase();

const factory = new ClickhouseFactory(registry);
Expand All @@ -28,7 +28,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"returns org-specific client when a data store is configured",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "factory-store",
Expand All @@ -52,7 +52,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"two orgs sharing the same data store get the same cached client",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "shared-factory-store",
Expand All @@ -75,7 +75,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"two data stores with different URLs produce different clients",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "store-a",
Expand Down Expand Up @@ -104,7 +104,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"after reload with a deleted store, org falls back to default",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "removable-store",
Expand Down
Loading
Loading