Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/chat-agent-hardening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. `onTurnComplete` now fires on errored turns (with the thrown error attached) and the failed turn's user message is persisted so it isn't lost on the next run. Custom agents and manual `chat.writeTurnComplete` callers now trim the output stream, sending a custom action no longer leaves a second stream reader running, and a long-lived `watch` subscription no longer grows its dedupe set without bound.
6 changes: 6 additions & 0 deletions .server-changes/realtime-runs-subscription-scalability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add a new backend for the realtime runs feed (single runs, tags, and batches) that scales under high concurrency, available behind a feature flag
6 changes: 6 additions & 0 deletions .server-changes/session-route-hardening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Hardening fixes for realtime sessions: stricter authorization on snapshot URLs and out-channel appends, environment-scoped message delivery for waiting runs, and idempotent appends via the X-Part-Id header. Session creation now rejects expired sessions, externalId can no longer be changed after creation, and the sessions list returns friendly run ids.
2 changes: 1 addition & 1 deletion apps/supervisor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"@kubernetes/client-node": "^1.0.0",
"@trigger.dev/core": "workspace:*",
"dockerode": "^4.0.6",
"ioredis": "^5.3.2",
"ioredis": "~5.6.0",
"p-limit": "^6.2.0",
"prom-client": "^15.1.0",
"socket.io": "4.7.4",
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";
import { registerRunChangeNotifierHandlers } from "./services/realtime/runChangeNotifierHandlers.server";
// Touch the sessions replication singleton at entry so it boots deterministically
// on webapp startup. The singleton's initializer wires start (gated on
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
Expand Down Expand Up @@ -269,6 +270,9 @@ process.on("uncaughtException", (error, origin) => {

singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
// Attach the realtime run-changed publish delegations to the engine event bus.
// No-ops (registers nothing) unless REALTIME_BACKEND_NATIVE_ENABLED=1.
singleton("RunChangeNotifierHandlers", registerRunChangeNotifierHandlers);

// Wrapped in singleton() so Remix's dev-mode CJS reloads don't append
// duplicate copies of the processor — Sentry's processor list lives in
Expand Down
81 changes: 81 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,45 @@ const EnvironmentSchema = z
.int()
.default(24 * 60 * 60 * 1000), // 1 day in milliseconds

// Master switch for the native realtime backend; off = Electric serves everything, publishes no-op.
REALTIME_BACKEND_NATIVE_ENABLED: z.string().default("0"),
// Live long-poll backstop hold (ms); matches Electric's ~20s cadence.
REALTIME_BACKEND_NATIVE_LIVE_POLL_TIMEOUT_MS: z.coerce.number().int().default(20_000),
// Jitter ratio on the live-poll hold (0.15 = ±15%) to avoid synchronized refetch herds.
REALTIME_BACKEND_NATIVE_LIVE_POLL_JITTER_RATIO: z.coerce.number().default(0.15),
// Hard cap on the tag-list snapshot size.
REALTIME_BACKEND_NATIVE_MAX_LIST_RESULTS: z.coerce.number().int().default(1_000),
// TTL/size of the coalescing cache for the multi-run resolve+hydrate (same-filter feeds share one query).
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_TTL_MS: z.coerce.number().int().default(1_000),
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
// Size/TTL of the per-handle working-set cache used to diff multi-run live polls.
REALTIME_BACKEND_NATIVE_WORKING_SET_MAX_ENTRIES: z.coerce.number().int().default(10_000),
REALTIME_BACKEND_NATIVE_WORKING_SET_TTL_MS: z.coerce.number().int().default(300_000),
// Bucket (ms) the tag-list createdAt floor is quantized to so same-tag feeds share a cache entry; 0 disables.
REALTIME_BACKEND_NATIVE_RUNSET_CREATED_AT_BUCKET_MS: z.coerce.number().int().default(60_000),
// Leading-edge throttle (ms) on per-env wake delivery; 0 wakes on every change.
REALTIME_BACKEND_NATIVE_ENV_WAKE_COALESCE_WINDOW_MS: z.coerce.number().int().default(250),
// "1" shares per-connection replay cursors fleet-wide via Redis, so a load-balancer hop reads the connection's true inter-poll gap instead of cold-resolving.
REALTIME_BACKEND_NATIVE_SHARED_REPLAY_CURSORS: z.string().default("1"),
// "1" holds a multi-run live poll open on a non-matching wake instead of replying up-to-date.
REALTIME_BACKEND_NATIVE_HOLD_ON_EMPTY: z.string().default("1"),
// Max concurrent fresh ClickHouse resolves per instance (reconnect-stampede gate); 0 disables.
REALTIME_BACKEND_NATIVE_RESOLVE_ADMISSION_LIMIT: z.coerce.number().int().default(16),
// Replay window (ms) for buffered change records delivered to newly-armed feeds; 0 disables.
REALTIME_BACKEND_NATIVE_REPLAY_WINDOW_MS: z.coerce.number().int().default(2_000),
// Cap on buffered recent records per env (latest record per run).
REALTIME_BACKEND_NATIVE_REPLAY_MAX_RUNS: z.coerce.number().int().default(512),
// Keep an env subscribed + buffering this long (ms) after its last feed closes; 0 disables.
REALTIME_BACKEND_NATIVE_UNSUBSCRIBE_LINGER_MS: z.coerce.number().int().default(5_000),
// Fallback per-env concurrent-connection limit when the org has none configured.
REALTIME_BACKEND_NATIVE_DEFAULT_CONCURRENCY_LIMIT: z.coerce.number().int().default(100_000),
// TTL/size of the single-run read-through cache that collapses duplicate refetch bursts.
REALTIME_BACKEND_NATIVE_RUN_CACHE_TTL_MS: z.coerce.number().int().default(250),
REALTIME_BACKEND_NATIVE_RUN_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
// TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend.
REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000),
REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000),

PUBSUB_REDIS_HOST: z
.string()
.optional()
Expand Down Expand Up @@ -332,6 +371,36 @@ const EnvironmentSchema = z
PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

// Dedicated pub/sub Redis for the native realtime backend; falls back to PUBSUB_REDIS_* then REDIS_*.
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.PUBSUB_REDIS_HOST ?? process.env.REDIS_HOST),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => {
if (v !== undefined) return v;
const raw = process.env.PUBSUB_REDIS_PORT ?? process.env.REDIS_PORT;
return raw ? parseInt(raw) : undefined;
}),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.PUBSUB_REDIS_USERNAME ?? process.env.REDIS_USERNAME),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.PUBSUB_REDIS_PASSWORD ?? process.env.REDIS_PASSWORD),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_TLS_DISABLED: z
.string()
.default(process.env.PUBSUB_REDIS_TLS_DISABLED ?? process.env.REDIS_TLS_DISABLED ?? "false"),
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z
.string()
.default(process.env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED ?? "0"),
// Use sharded pub/sub (SSUBSCRIBE/SPUBLISH) in cluster mode; "0" forces classic pub/sub.
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_SHARDED_ENABLED: z.string().default("1"),

DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
DEFAULT_ENV_EXECUTION_CONCURRENCY_BURST_FACTOR: z.coerce.number().default(1.0),
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(300),
Expand Down Expand Up @@ -1610,6 +1679,18 @@ const EnvironmentSchema = z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
// Dedicated ClickHouse pool for the native backend's tag/batch id resolution; falls back to CLICKHOUSE_URL.
REALTIME_BACKEND_NATIVE_CLICKHOUSE_URL: z
.string()
.optional()
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
REALTIME_BACKEND_NATIVE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000),
Expand Down
21 changes: 19 additions & 2 deletions apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,20 @@ export async function findEnvironmentBySlug(
return environment ? toAuthenticated(environment) : null;
}

// The authenticated environment plus the run scalars the realtime publish needs.
// Both come from one taskRun read — see findEnvironmentFromRun.
export type EnvironmentFromRun = {
environment: AuthenticatedEnvironment;
runTags: string[];
batchId: string | null;
};

export async function findEnvironmentFromRun(
runId: string,
tx?: PrismaClientOrTransaction
): Promise<AuthenticatedEnvironment | null> {
): Promise<EnvironmentFromRun | null> {
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
// ride along for free — no extra query for the realtime publish to send a full record.
const taskRun = await (tx ?? $replica).taskRun.findFirst({
where: {
id: runId,
Expand All @@ -249,7 +259,14 @@ export async function findEnvironmentFromRun(
runtimeEnvironment: { include: authIncludeBase },
},
});
return taskRun?.runtimeEnvironment ? toAuthenticated(taskRun.runtimeEnvironment) : null;
if (!taskRun?.runtimeEnvironment) {
return null;
}
return {
environment: toAuthenticated(taskRun.runtimeEnvironment),
runTags: taskRun.runTags,
batchId: taskRun.batchId,
};
}

export async function createNewSession(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,14 @@ const { action, loader } = createActionApiRoute(
});

// Step 2: Register the waitpoint on the session channel so the next
// append fires it. Keyed by (addressingKey, io) — the canonical
// string for the row. The append handler drains by the same
// canonical key, so writers and readers converge regardless of
// which URL form the agent vs. the appending caller used.
// append fires it. Keyed by (environmentId, addressingKey, io) — the
// canonical string for the row, scoped to the environment because
// externalIds are only unique per environment. The append handler
// drains by the same key, so writers and readers converge regardless
// of which URL form the agent vs. the appending caller used.
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
await addSessionStreamWaitpoint(
authentication.environment.id,
addressingKey,
body.io,
result.waitpoint.id,
Expand Down Expand Up @@ -152,6 +154,7 @@ const { action, loader } = createActionApiRoute(
});

await removeSessionStreamWaitpoint(
authentication.environment.id,
addressingKey,
body.io,
result.waitpoint.id
Expand Down
11 changes: 10 additions & 1 deletion apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { ServiceValidationError } from "~/v3/services/common.server";
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
Expand Down Expand Up @@ -184,7 +185,15 @@ const { action } = createActionApiRoute(
return json({ error: "Internal Server Error" }, { status: 500 });
}
if (pgResult) {
return json(pgResult, { status: 200 });
// Reflect metadata.set() on a live feed before the next lifecycle event. Publish the
// internal id (the router keys single-run feeds by it, not the friendly id from the URL).
publishChangeRecord({
runId: pgResult.runId,
envId: env.id,
tags: pgResult.runTags,
batchId: pgResult.batchId,
});
return json({ metadata: pgResult.metadata }, { status: 200 });
}

// PG miss. Target run is either buffered or genuinely absent.
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/routes/api.v1.runs.$runId.tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { logger } from "~/services/logger.server";
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";

// Pull the existing tags out of a buffer entry's serialised payload so
Expand Down Expand Up @@ -90,6 +91,14 @@ export async function action({ request, params }: ActionFunctionArgs) {
},
data: { runTags: { push: newTags } },
});
// Publish a run-changed record with the NEW tag set so tag feeds reindex
// (no-op unless enabled).
publishChangeRecord({
runId: taskRun.id,
envId: env.id,
tags: existing.concat(newTags),
batchId: taskRun.batchId,
});
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
},
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates
Expand Down
18 changes: 18 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.$session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ const { action } = createActionApiRoute(
return json({ error: "Session not found" }, { status: 404 });
}

// The externalId is the canonical addressing key once set: the S2
// stream names, the waitpoint cache key, and the minted session PAT
// scope all derive from it. Re-keying a session would orphan its
// streams (the chat goes silent) and invalidate the PAT's scope, so
// reject any change. Same-value PATCHes stay idempotent.
if (
body.externalId !== undefined &&
body.externalId !== existing.externalId
) {
return json(
{
error:
"externalId cannot be changed after creation; close this session and create a new one with the desired externalId",
},
{ status: 422 }
);
}

try {
const updated = await prisma.session.update({
where: { id: existing.id },
Expand Down
36 changes: 34 additions & 2 deletions apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { $replica } from "~/db.server";
import { chatSnapshotStorageKey } from "~/services/realtime/chatSnapshot.server";
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
import {
anyResource,
createActionApiRoute,
createLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";
Expand All @@ -21,8 +22,31 @@ const routeConfig = {
resolveSessionByIdOrExternalId($replica, auth.environment.id, params.sessionId),
};

// Authorize against the union of the URL form, friendlyId, and externalId —
// same shape as the sibling session routes. Without an authorization block
// the route builder skips scope checks entirely, so any session-scoped JWT
// in the environment could presign URLs for any other session's snapshot.
function sessionResource(
paramId: string,
session: { friendlyId: string; externalId: string | null } | null | undefined
) {
const ids = new Set<string>([paramId]);
if (session) {
ids.add(session.friendlyId);
if (session.externalId) ids.add(session.externalId);
}
return anyResource([...ids].map((id) => ({ type: "sessions" as const, id })));
}

export const { action } = createActionApiRoute(
{ ...routeConfig, method: "PUT" },
{
...routeConfig,
method: "PUT",
authorization: {
action: "write",
resource: (params, _, __, ___, session) => sessionResource(params.sessionId, session),
},
},
async ({ authentication, resource: session }) => {
if (!session) {
return json({ error: "Session not found" }, { status: 404 });
Expand All @@ -42,7 +66,15 @@ export const { action } = createActionApiRoute(
}
);

export const loader = createLoaderApiRoute(routeConfig, async ({ authentication, resource: session }) => {
export const loader = createLoaderApiRoute(
{
...routeConfig,
authorization: {
action: "read",
resource: (session, params) => sessionResource(params.sessionId, session),
},
},
async ({ authentication, resource: session }) => {
if (!session) {
return json({ error: "Session not found" }, { status: 404 });
}
Expand Down
Loading
Loading