Skip to content

Commit 9e6e96b

Browse files
authored
Merge branch 'main' into feat/tri-10431-attio-signup-sync
2 parents 0955014 + 7b4443a commit 9e6e96b

83 files changed

Lines changed: 7470 additions & 227 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.changeset/chat-agent-hardening.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. `onTurnComplete` now fires on errored turns (with the thrown error attached) and the failed turn's user message is persisted so it isn't lost on the next run. Custom agents and manual `chat.writeTurnComplete` callers now trim the output stream, sending a custom action no longer leaves a second stream reader running, and a long-lived `watch` subscription no longer grows its dedupe set without bound.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
area: supervisor
3+
type: improvement
4+
---
5+
6+
Compute workload manager now sets an `org` label on every run (create +
7+
restore) for network-policy selection, instead of a plan-gated label. The
8+
Kubernetes workload manager is unchanged.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add a new backend for the realtime runs feed (single runs, tags, and batches) that scales under high concurrency, available behind a feature flag
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Hardening fixes for realtime sessions: stricter authorization on snapshot URLs and out-channel appends, environment-scoped message delivery for waiting runs, and idempotent appends via the X-Part-Id header. Session creation now rejects expired sessions, externalId can no longer be changed after creation, and the sessions list returns friendly run ids.

apps/supervisor/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"@kubernetes/client-node": "^1.0.0",
1919
"@trigger.dev/core": "workspace:*",
2020
"dockerode": "^4.0.6",
21-
"ioredis": "^5.3.2",
21+
"ioredis": "~5.6.0",
2222
"p-limit": "^6.2.0",
2323
"prom-client": "^15.1.0",
2424
"socket.io": "4.7.4",

apps/supervisor/src/workloadManager/compute.ts

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,11 @@ export class ComputeWorkloadManager implements WorkloadManager {
133133
// Strip image digest - resolve by tag, not digest
134134
const imageRef = stripImageDigest(opts.image);
135135

136-
// Labels forwarded to the compute provider for network-policy selection;
137-
// the provider promotes a configured subset to its network layer. Mirrors
138-
// the privatelink label the Kubernetes workload manager sets on the run pod.
139-
const labels: Record<string, string> = {};
140-
if (opts.hasPrivateLink) {
141-
labels.privatelink = opts.orgId;
142-
}
136+
// Labels forwarded to the compute provider for network-policy selection.
137+
// `org` is always set so every run carries its org identity.
138+
const labels: Record<string, string> = {
139+
org: opts.orgId,
140+
};
143141

144142
// Wide event: single canonical log line emitted in finally
145143
const event: Record<string, unknown> = {
@@ -319,12 +317,11 @@ export class ComputeWorkloadManager implements WorkloadManager {
319317
TRIGGER_WORKER_INSTANCE_NAME: this.opts.runner.instanceName,
320318
};
321319

322-
// Resupply the same labels on restore (mirror of the create path); the
323-
// provider doesn't persist them across a snapshot, so without this a
324-
// restored run would lose its policy-based network selection.
320+
// Resupply labels on restore (the provider doesn't persist them across a
321+
// snapshot). orgId is optional on the restore opts type, so guard it.
325322
const labels: Record<string, string> = {};
326-
if (opts.hasPrivateLink && opts.orgId) {
327-
labels.privatelink = opts.orgId;
323+
if (opts.orgId) {
324+
labels.org = opts.orgId;
328325
}
329326

330327
this.logger.verbose("restore request body", {

apps/webapp/app/entry.server.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {
2727
registerRunEngineEventBusHandlers,
2828
setupBatchQueueCallbacks,
2929
} from "./v3/runEngineHandlers.server";
30+
import { registerRunChangeNotifierHandlers } from "./services/realtime/runChangeNotifierHandlers.server";
3031
// Touch the sessions replication singleton at entry so it boots deterministically
3132
// on webapp startup. The singleton's initializer wires start (gated on
3233
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
@@ -269,6 +270,9 @@ process.on("uncaughtException", (error, origin) => {
269270

270271
singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
271272
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
273+
// Attach the realtime run-changed publish delegations to the engine event bus.
274+
// No-ops (registers nothing) unless REALTIME_BACKEND_NATIVE_ENABLED=1.
275+
singleton("RunChangeNotifierHandlers", registerRunChangeNotifierHandlers);
272276

273277
// Wrapped in singleton() so Remix's dev-mode CJS reloads don't append
274278
// duplicate copies of the processor — Sentry's processor list lives in

apps/webapp/app/env.server.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,45 @@ const EnvironmentSchema = z
295295
.int()
296296
.default(24 * 60 * 60 * 1000), // 1 day in milliseconds
297297

298+
// Master switch for the native realtime backend; off = Electric serves everything, publishes no-op.
299+
REALTIME_BACKEND_NATIVE_ENABLED: z.string().default("0"),
300+
// Live long-poll backstop hold (ms); matches Electric's ~20s cadence.
301+
REALTIME_BACKEND_NATIVE_LIVE_POLL_TIMEOUT_MS: z.coerce.number().int().default(20_000),
302+
// Jitter ratio on the live-poll hold (0.15 = ±15%) to avoid synchronized refetch herds.
303+
REALTIME_BACKEND_NATIVE_LIVE_POLL_JITTER_RATIO: z.coerce.number().default(0.15),
304+
// Hard cap on the tag-list snapshot size.
305+
REALTIME_BACKEND_NATIVE_MAX_LIST_RESULTS: z.coerce.number().int().default(1_000),
306+
// TTL/size of the coalescing cache for the multi-run resolve+hydrate (same-filter feeds share one query).
307+
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_TTL_MS: z.coerce.number().int().default(1_000),
308+
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
309+
// Size/TTL of the per-handle working-set cache used to diff multi-run live polls.
310+
REALTIME_BACKEND_NATIVE_WORKING_SET_MAX_ENTRIES: z.coerce.number().int().default(10_000),
311+
REALTIME_BACKEND_NATIVE_WORKING_SET_TTL_MS: z.coerce.number().int().default(300_000),
312+
// Bucket (ms) the tag-list createdAt floor is quantized to so same-tag feeds share a cache entry; 0 disables.
313+
REALTIME_BACKEND_NATIVE_RUNSET_CREATED_AT_BUCKET_MS: z.coerce.number().int().default(60_000),
314+
// Leading-edge throttle (ms) on per-env wake delivery; 0 wakes on every change.
315+
REALTIME_BACKEND_NATIVE_ENV_WAKE_COALESCE_WINDOW_MS: z.coerce.number().int().default(250),
316+
// "1" shares per-connection replay cursors fleet-wide via Redis, so a load-balancer hop reads the connection's true inter-poll gap instead of cold-resolving.
317+
REALTIME_BACKEND_NATIVE_SHARED_REPLAY_CURSORS: z.string().default("1"),
318+
// "1" holds a multi-run live poll open on a non-matching wake instead of replying up-to-date.
319+
REALTIME_BACKEND_NATIVE_HOLD_ON_EMPTY: z.string().default("1"),
320+
// Max concurrent fresh ClickHouse resolves per instance (reconnect-stampede gate); 0 disables.
321+
REALTIME_BACKEND_NATIVE_RESOLVE_ADMISSION_LIMIT: z.coerce.number().int().default(16),
322+
// Replay window (ms) for buffered change records delivered to newly-armed feeds; 0 disables.
323+
REALTIME_BACKEND_NATIVE_REPLAY_WINDOW_MS: z.coerce.number().int().default(2_000),
324+
// Cap on buffered recent records per env (latest record per run).
325+
REALTIME_BACKEND_NATIVE_REPLAY_MAX_RUNS: z.coerce.number().int().default(512),
326+
// Keep an env subscribed + buffering this long (ms) after its last feed closes; 0 disables.
327+
REALTIME_BACKEND_NATIVE_UNSUBSCRIBE_LINGER_MS: z.coerce.number().int().default(5_000),
328+
// Fallback per-env concurrent-connection limit when the org has none configured.
329+
REALTIME_BACKEND_NATIVE_DEFAULT_CONCURRENCY_LIMIT: z.coerce.number().int().default(100_000),
330+
// TTL/size of the single-run read-through cache that collapses duplicate refetch bursts.
331+
REALTIME_BACKEND_NATIVE_RUN_CACHE_TTL_MS: z.coerce.number().int().default(250),
332+
REALTIME_BACKEND_NATIVE_RUN_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
333+
// TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend.
334+
REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000),
335+
REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000),
336+
298337
PUBSUB_REDIS_HOST: z
299338
.string()
300339
.optional()
@@ -327,6 +366,36 @@ const EnvironmentSchema = z
327366
PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
328367
PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
329368

369+
// Dedicated pub/sub Redis for the native realtime backend; falls back to PUBSUB_REDIS_* then REDIS_*.
370+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_HOST: z
371+
.string()
372+
.optional()
373+
.transform((v) => v ?? process.env.PUBSUB_REDIS_HOST ?? process.env.REDIS_HOST),
374+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PORT: z.coerce
375+
.number()
376+
.optional()
377+
.transform((v) => {
378+
if (v !== undefined) return v;
379+
const raw = process.env.PUBSUB_REDIS_PORT ?? process.env.REDIS_PORT;
380+
return raw ? parseInt(raw) : undefined;
381+
}),
382+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_USERNAME: z
383+
.string()
384+
.optional()
385+
.transform((v) => v ?? process.env.PUBSUB_REDIS_USERNAME ?? process.env.REDIS_USERNAME),
386+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PASSWORD: z
387+
.string()
388+
.optional()
389+
.transform((v) => v ?? process.env.PUBSUB_REDIS_PASSWORD ?? process.env.REDIS_PASSWORD),
390+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_TLS_DISABLED: z
391+
.string()
392+
.default(process.env.PUBSUB_REDIS_TLS_DISABLED ?? process.env.REDIS_TLS_DISABLED ?? "false"),
393+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z
394+
.string()
395+
.default(process.env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED ?? "0"),
396+
// Use sharded pub/sub (SSUBSCRIBE/SPUBLISH) in cluster mode; "0" forces classic pub/sub.
397+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_SHARDED_ENABLED: z.string().default("1"),
398+
330399
DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
331400
DEFAULT_ENV_EXECUTION_CONCURRENCY_BURST_FACTOR: z.coerce.number().default(1.0),
332401
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(300),
@@ -1603,6 +1672,18 @@ const EnvironmentSchema = z
16031672
.enum(["log", "error", "warn", "info", "debug"])
16041673
.default("info"),
16051674
RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
1675+
// Dedicated ClickHouse pool for the native backend's tag/batch id resolution; falls back to CLICKHOUSE_URL.
1676+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_URL: z
1677+
.string()
1678+
.optional()
1679+
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
1680+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
1681+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
1682+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
1683+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_LOG_LEVEL: z
1684+
.enum(["log", "error", "warn", "info", "debug"])
1685+
.default("info"),
1686+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
16061687
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
16071688
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
16081689
METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000),

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,20 @@ export async function findEnvironmentBySlug(
237237
return environment ? toAuthenticated(environment) : null;
238238
}
239239

240+
// The authenticated environment plus the run scalars the realtime publish needs.
241+
// Both come from one taskRun read — see findEnvironmentFromRun.
242+
export type EnvironmentFromRun = {
243+
environment: AuthenticatedEnvironment;
244+
runTags: string[];
245+
batchId: string | null;
246+
};
247+
240248
export async function findEnvironmentFromRun(
241249
runId: string,
242250
tx?: PrismaClientOrTransaction
243-
): Promise<AuthenticatedEnvironment | null> {
251+
): Promise<EnvironmentFromRun | null> {
252+
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
253+
// ride along for free — no extra query for the realtime publish to send a full record.
244254
const taskRun = await (tx ?? $replica).taskRun.findFirst({
245255
where: {
246256
id: runId,
@@ -249,7 +259,14 @@ export async function findEnvironmentFromRun(
249259
runtimeEnvironment: { include: authIncludeBase },
250260
},
251261
});
252-
return taskRun?.runtimeEnvironment ? toAuthenticated(taskRun.runtimeEnvironment) : null;
262+
if (!taskRun?.runtimeEnvironment) {
263+
return null;
264+
}
265+
return {
266+
environment: toAuthenticated(taskRun.runtimeEnvironment),
267+
runTags: taskRun.runTags,
268+
batchId: taskRun.batchId,
269+
};
253270
}
254271

255272
export async function createNewSession(

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,14 @@ const { action, loader } = createActionApiRoute(
106106
});
107107

108108
// Step 2: Register the waitpoint on the session channel so the next
109-
// append fires it. Keyed by (addressingKey, io) — the canonical
110-
// string for the row. The append handler drains by the same
111-
// canonical key, so writers and readers converge regardless of
112-
// which URL form the agent vs. the appending caller used.
109+
// append fires it. Keyed by (environmentId, addressingKey, io) — the
110+
// canonical string for the row, scoped to the environment because
111+
// externalIds are only unique per environment. The append handler
112+
// drains by the same key, so writers and readers converge regardless
113+
// of which URL form the agent vs. the appending caller used.
113114
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
114115
await addSessionStreamWaitpoint(
116+
authentication.environment.id,
115117
addressingKey,
116118
body.io,
117119
result.waitpoint.id,
@@ -152,6 +154,7 @@ const { action, loader } = createActionApiRoute(
152154
});
153155

154156
await removeSessionStreamWaitpoint(
157+
authentication.environment.id,
155158
addressingKey,
156159
body.io,
157160
result.waitpoint.id

0 commit comments

Comments
 (0)