Skip to content

Commit e8efd71

Browse files
authored
Merge branch 'main' into hipaa-updates
2 parents 2a4362a + 7b4443a commit e8efd71

91 files changed

Lines changed: 8475 additions & 253 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.changeset/chat-agent-hardening.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Reliability fixes for `chat.agent`. A user message sent while the agent is streaming is no longer delivered twice (which could run a duplicate turn), input appends now carry an idempotency key so a retried send can't duplicate a message, stopping a generation clears the streaming state so a page reload doesn't replay the stopped turn, and runs can now carry the full set of dashboard tags instead of being silently truncated. `onTurnComplete` now fires on errored turns (with the thrown error attached) and the failed turn's user message is persisted so it isn't lost on the next run. Custom agents and manual `chat.writeTurnComplete` callers now trim the output stream, sending a custom action no longer leaves a second stream reader running, and a long-lived `watch` subscription no longer grows its dedupe set without bound.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
area: supervisor
3+
type: improvement
4+
---
5+
6+
Compute workload manager now sets an `org` label on every run (create +
7+
restore) for network-policy selection, instead of a plan-gated label. The
8+
Kubernetes workload manager is unchanged.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add a new backend for the realtime runs feed (single runs, tags, and batches) that scales under high concurrency, available behind a feature flag
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Hardening fixes for realtime sessions: stricter authorization on snapshot URLs and out-channel appends, environment-scoped message delivery for waiting runs, and idempotent appends via the X-Part-Id header. Session creation now rejects expired sessions, externalId can no longer be changed after creation, and the sessions list returns friendly run ids.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Run snapshot polling no longer errors or pays extra latency when the database read replica hasn't yet replicated the snapshot the runner is polling from (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the replica and served from the primary if it still hasn't caught up. Polling also now rejects a since-snapshot id that doesn't belong to the run being polled.

apps/supervisor/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"@kubernetes/client-node": "^1.0.0",
1919
"@trigger.dev/core": "workspace:*",
2020
"dockerode": "^4.0.6",
21-
"ioredis": "^5.3.2",
21+
"ioredis": "~5.6.0",
2222
"p-limit": "^6.2.0",
2323
"prom-client": "^15.1.0",
2424
"socket.io": "4.7.4",

apps/supervisor/src/workloadManager/compute.ts

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,11 @@ export class ComputeWorkloadManager implements WorkloadManager {
133133
// Strip image digest - resolve by tag, not digest
134134
const imageRef = stripImageDigest(opts.image);
135135

136-
// Labels forwarded to the compute provider for network-policy selection;
137-
// the provider promotes a configured subset to its network layer. Mirrors
138-
// the privatelink label the Kubernetes workload manager sets on the run pod.
139-
const labels: Record<string, string> = {};
140-
if (opts.hasPrivateLink) {
141-
labels.privatelink = opts.orgId;
142-
}
136+
// Labels forwarded to the compute provider for network-policy selection.
137+
// `org` is always set so every run carries its org identity.
138+
const labels: Record<string, string> = {
139+
org: opts.orgId,
140+
};
143141

144142
// Wide event: single canonical log line emitted in finally
145143
const event: Record<string, unknown> = {
@@ -319,12 +317,11 @@ export class ComputeWorkloadManager implements WorkloadManager {
319317
TRIGGER_WORKER_INSTANCE_NAME: this.opts.runner.instanceName,
320318
};
321319

322-
// Resupply the same labels on restore (mirror of the create path); the
323-
// provider doesn't persist them across a snapshot, so without this a
324-
// restored run would lose its policy-based network selection.
320+
// Resupply labels on restore (the provider doesn't persist them across a
321+
// snapshot). orgId is optional on the restore opts type, so guard it.
325322
const labels: Record<string, string> = {};
326-
if (opts.hasPrivateLink && opts.orgId) {
327-
labels.privatelink = opts.orgId;
323+
if (opts.orgId) {
324+
labels.org = opts.orgId;
328325
}
329326

330327
this.logger.verbose("restore request body", {

apps/webapp/app/entry.server.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {
2727
registerRunEngineEventBusHandlers,
2828
setupBatchQueueCallbacks,
2929
} from "./v3/runEngineHandlers.server";
30+
import { registerRunChangeNotifierHandlers } from "./services/realtime/runChangeNotifierHandlers.server";
3031
// Touch the sessions replication singleton at entry so it boots deterministically
3132
// on webapp startup. The singleton's initializer wires start (gated on
3233
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
@@ -269,6 +270,9 @@ process.on("uncaughtException", (error, origin) => {
269270

270271
singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
271272
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
273+
// Attach the realtime run-changed publish delegations to the engine event bus.
274+
// No-ops (registers nothing) unless REALTIME_BACKEND_NATIVE_ENABLED=1.
275+
singleton("RunChangeNotifierHandlers", registerRunChangeNotifierHandlers);
272276

273277
// Wrapped in singleton() so Remix's dev-mode CJS reloads don't append
274278
// duplicate copies of the processor — Sentry's processor list lives in

apps/webapp/app/env.server.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,45 @@ const EnvironmentSchema = z
300300
.int()
301301
.default(24 * 60 * 60 * 1000), // 1 day in milliseconds
302302

303+
// Master switch for the native realtime backend; off = Electric serves everything, publishes no-op.
304+
REALTIME_BACKEND_NATIVE_ENABLED: z.string().default("0"),
305+
// Live long-poll backstop hold (ms); matches Electric's ~20s cadence.
306+
REALTIME_BACKEND_NATIVE_LIVE_POLL_TIMEOUT_MS: z.coerce.number().int().default(20_000),
307+
// Jitter ratio on the live-poll hold (0.15 = ±15%) to avoid synchronized refetch herds.
308+
REALTIME_BACKEND_NATIVE_LIVE_POLL_JITTER_RATIO: z.coerce.number().default(0.15),
309+
// Hard cap on the tag-list snapshot size.
310+
REALTIME_BACKEND_NATIVE_MAX_LIST_RESULTS: z.coerce.number().int().default(1_000),
311+
// TTL/size of the coalescing cache for the multi-run resolve+hydrate (same-filter feeds share one query).
312+
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_TTL_MS: z.coerce.number().int().default(1_000),
313+
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
314+
// Size/TTL of the per-handle working-set cache used to diff multi-run live polls.
315+
REALTIME_BACKEND_NATIVE_WORKING_SET_MAX_ENTRIES: z.coerce.number().int().default(10_000),
316+
REALTIME_BACKEND_NATIVE_WORKING_SET_TTL_MS: z.coerce.number().int().default(300_000),
317+
// Bucket (ms) the tag-list createdAt floor is quantized to so same-tag feeds share a cache entry; 0 disables.
318+
REALTIME_BACKEND_NATIVE_RUNSET_CREATED_AT_BUCKET_MS: z.coerce.number().int().default(60_000),
319+
// Leading-edge throttle (ms) on per-env wake delivery; 0 wakes on every change.
320+
REALTIME_BACKEND_NATIVE_ENV_WAKE_COALESCE_WINDOW_MS: z.coerce.number().int().default(250),
321+
// "1" shares per-connection replay cursors fleet-wide via Redis, so a load-balancer hop reads the connection's true inter-poll gap instead of cold-resolving.
322+
REALTIME_BACKEND_NATIVE_SHARED_REPLAY_CURSORS: z.string().default("1"),
323+
// "1" holds a multi-run live poll open on a non-matching wake instead of replying up-to-date.
324+
REALTIME_BACKEND_NATIVE_HOLD_ON_EMPTY: z.string().default("1"),
325+
// Max concurrent fresh ClickHouse resolves per instance (reconnect-stampede gate); 0 disables.
326+
REALTIME_BACKEND_NATIVE_RESOLVE_ADMISSION_LIMIT: z.coerce.number().int().default(16),
327+
// Replay window (ms) for buffered change records delivered to newly-armed feeds; 0 disables.
328+
REALTIME_BACKEND_NATIVE_REPLAY_WINDOW_MS: z.coerce.number().int().default(2_000),
329+
// Cap on buffered recent records per env (latest record per run).
330+
REALTIME_BACKEND_NATIVE_REPLAY_MAX_RUNS: z.coerce.number().int().default(512),
331+
// Keep an env subscribed + buffering this long (ms) after its last feed closes; 0 disables.
332+
REALTIME_BACKEND_NATIVE_UNSUBSCRIBE_LINGER_MS: z.coerce.number().int().default(5_000),
333+
// Fallback per-env concurrent-connection limit when the org has none configured.
334+
REALTIME_BACKEND_NATIVE_DEFAULT_CONCURRENCY_LIMIT: z.coerce.number().int().default(100_000),
335+
// TTL/size of the single-run read-through cache that collapses duplicate refetch bursts.
336+
REALTIME_BACKEND_NATIVE_RUN_CACHE_TTL_MS: z.coerce.number().int().default(250),
337+
REALTIME_BACKEND_NATIVE_RUN_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
338+
// TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend.
339+
REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000),
340+
REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000),
341+
303342
PUBSUB_REDIS_HOST: z
304343
.string()
305344
.optional()
@@ -332,6 +371,36 @@ const EnvironmentSchema = z
332371
PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
333372
PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
334373

374+
// Dedicated pub/sub Redis for the native realtime backend; falls back to PUBSUB_REDIS_* then REDIS_*.
375+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_HOST: z
376+
.string()
377+
.optional()
378+
.transform((v) => v ?? process.env.PUBSUB_REDIS_HOST ?? process.env.REDIS_HOST),
379+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PORT: z.coerce
380+
.number()
381+
.optional()
382+
.transform((v) => {
383+
if (v !== undefined) return v;
384+
const raw = process.env.PUBSUB_REDIS_PORT ?? process.env.REDIS_PORT;
385+
return raw ? parseInt(raw) : undefined;
386+
}),
387+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_USERNAME: z
388+
.string()
389+
.optional()
390+
.transform((v) => v ?? process.env.PUBSUB_REDIS_USERNAME ?? process.env.REDIS_USERNAME),
391+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PASSWORD: z
392+
.string()
393+
.optional()
394+
.transform((v) => v ?? process.env.PUBSUB_REDIS_PASSWORD ?? process.env.REDIS_PASSWORD),
395+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_TLS_DISABLED: z
396+
.string()
397+
.default(process.env.PUBSUB_REDIS_TLS_DISABLED ?? process.env.REDIS_TLS_DISABLED ?? "false"),
398+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z
399+
.string()
400+
.default(process.env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED ?? "0"),
401+
// Use sharded pub/sub (SSUBSCRIBE/SPUBLISH) in cluster mode; "0" forces classic pub/sub.
402+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_SHARDED_ENABLED: z.string().default("1"),
403+
335404
DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
336405
DEFAULT_ENV_EXECUTION_CONCURRENCY_BURST_FACTOR: z.coerce.number().default(1.0),
337406
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(300),
@@ -950,6 +1019,8 @@ const EnvironmentSchema = z
9501019
.default("info"),
9511020
RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"),
9521021
RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED: z.string().default("0"),
1022+
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS: z.coerce.number().int().default(50),
1023+
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS: z.coerce.number().int().default(200),
9531024
RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ: z.string().default("0"),
9541025

9551026
/** How long should the presence ttl last */
@@ -1608,6 +1679,18 @@ const EnvironmentSchema = z
16081679
.enum(["log", "error", "warn", "info", "debug"])
16091680
.default("info"),
16101681
RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
1682+
// Dedicated ClickHouse pool for the native backend's tag/batch id resolution; falls back to CLICKHOUSE_URL.
1683+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_URL: z
1684+
.string()
1685+
.optional()
1686+
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
1687+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
1688+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
1689+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
1690+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_LOG_LEVEL: z
1691+
.enum(["log", "error", "warn", "info", "debug"])
1692+
.default("info"),
1693+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
16111694
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
16121695
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
16131696
METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000),

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,20 @@ export async function findEnvironmentBySlug(
237237
return environment ? toAuthenticated(environment) : null;
238238
}
239239

240+
// The authenticated environment plus the run scalars the realtime publish needs.
241+
// Both come from one taskRun read — see findEnvironmentFromRun.
242+
export type EnvironmentFromRun = {
243+
environment: AuthenticatedEnvironment;
244+
runTags: string[];
245+
batchId: string | null;
246+
};
247+
240248
export async function findEnvironmentFromRun(
241249
runId: string,
242250
tx?: PrismaClientOrTransaction
243-
): Promise<AuthenticatedEnvironment | null> {
251+
): Promise<EnvironmentFromRun | null> {
252+
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
253+
// ride along for free — no extra query for the realtime publish to send a full record.
244254
const taskRun = await (tx ?? $replica).taskRun.findFirst({
245255
where: {
246256
id: runId,
@@ -249,7 +259,14 @@ export async function findEnvironmentFromRun(
249259
runtimeEnvironment: { include: authIncludeBase },
250260
},
251261
});
252-
return taskRun?.runtimeEnvironment ? toAuthenticated(taskRun.runtimeEnvironment) : null;
262+
if (!taskRun?.runtimeEnvironment) {
263+
return null;
264+
}
265+
return {
266+
environment: toAuthenticated(taskRun.runtimeEnvironment),
267+
runTags: taskRun.runTags,
268+
batchId: taskRun.batchId,
269+
};
253270
}
254271

255272
export async function createNewSession(

0 commit comments

Comments
 (0)