Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .changeset/trigger-client.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"@trigger.dev/sdk": patch
---

Add `TriggerClient` for running multiple SDK clients side-by-side, each with its own auth, preview branch, and baseURL. Useful when a single process needs to trigger tasks or read runs across multiple projects, environments, or preview branches without mutating shared global state.

```ts
import { TriggerClient } from "@trigger.dev/sdk";

const prod = new TriggerClient({ accessToken: process.env.TRIGGER_PROD_KEY });
const preview = new TriggerClient({
accessToken: process.env.TRIGGER_PREVIEW_KEY,
previewBranch: "signup-flow",
});

await prod.tasks.trigger("send-email", payload);
await preview.runs.list({ status: ["COMPLETED"] });
```
6 changes: 5 additions & 1 deletion .cursor/mcp.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
{
"mcpServers": {}
"mcpServers": {
"linear": {
"url": "https://mcp.linear.app/mcp"
}
}
}
6 changes: 6 additions & 0 deletions .server-changes/configurable-http-keepalive-timeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Make the Express server's `keepAliveTimeout` configurable via `HTTP_KEEPALIVE_TIMEOUT_MS` (defaults to the previous hardcoded 65000 ms).
6 changes: 6 additions & 0 deletions .server-changes/drop-taskrun-scheduleid-createdat-idx.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Reduce primary database write load on `TaskRun` by dropping an unused composite index on `(scheduleId, createdAt)`. The schedule list view reads from ClickHouse, so this Postgres index served no Prisma query while still being maintained on every `TaskRun` INSERT/UPDATE.
6 changes: 6 additions & 0 deletions .server-changes/organization-scoped-clickhouse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Organization-scoped ClickHouse routing enables customers with HIPAA and other data security requirements to use dedicated database instances
6 changes: 6 additions & 0 deletions .server-changes/pending-version-clickhouse-lookup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

PendingVersionSystem now discovers PENDING_VERSION run ids via ClickHouse and re-validates each by primary key in Postgres, reducing read load on the TaskRun status index. Uses a dedicated `RUN_ENGINE_CLICKHOUSE_*` client so it doesn't contend with the main analytics pool.
6 changes: 6 additions & 0 deletions .server-changes/sentry-tenant-attribution.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Stamp Sentry events with the signed-in user so "Users Impacted" counts individual humans, and enrich events with org / project / environment tags when that context is available (dashboard URLs, authenticated API requests).
6 changes: 6 additions & 0 deletions .server-changes/supervisor-checkpoint-type-compat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: supervisor
type: fix
---

Keep older workloads working when checkpoints are produced by the compute path
11 changes: 11 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ containerTest("should use both", async ({ prisma, redisOptions }) => {
});
```

## Code Style

### Imports

**Prefer static imports over dynamic imports.** Only use dynamic `import()` when:
- Circular dependencies cannot be resolved otherwise
- Code splitting is genuinely needed for performance
- The module must be loaded conditionally at runtime

Dynamic imports add unnecessary overhead in hot paths and make code harder to analyze. If you find yourself using `await import()`, ask if a regular `import` statement would work instead.

## Changesets and Server Changes

When modifying any public package (`packages/*` or `integrations/*`), add a changeset:
Expand Down
20 changes: 18 additions & 2 deletions apps/supervisor/src/workloadServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ const WorkloadActionParams = z.object({
snapshotFriendlyId: z.string(),
});

// Workloads bundled into customer task images before CLI v4.4.4 use a strict
// zod enum for checkpoint type that only allows DOCKER and KUBERNETES. The
// workload never reads this field - it only validates the response shape - so
// rewriting it to a known value keeps older runners working without affecting
// the value stored in the database or seen by internal services.
function legacifyCheckpointType<T extends { checkpoint?: { type: string } | null }>(item: T): T {
if (item.checkpoint?.type === "COMPUTE") {
return { ...item, checkpoint: { ...item.checkpoint, type: "KUBERNETES" } } as T;
}
return item;
}

type WorkloadServerEvents = {
runConnected: [
{
Expand Down Expand Up @@ -384,7 +396,9 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
return;
}

reply.json(sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody);
reply.json({
snapshots: sinceSnapshotResponse.data.snapshots.map(legacifyCheckpointType),
} satisfies WorkloadRunSnapshotsSinceResponseBody);
},
}
)
Expand All @@ -409,7 +423,9 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
return;
}

reply.json(dequeueResponse.data satisfies WorkloadDequeueFromVersionResponseBody);
reply.json(
dequeueResponse.data.map(legacifyCheckpointType) satisfies WorkloadDequeueFromVersionResponseBody
);
},
});

Expand Down
59 changes: 22 additions & 37 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { createReadableStreamFromReadable, type EntryContext } from "@remix-run/node"; // or cloudflare/deno
import { RemixServer } from "@remix-run/react";
import * as Sentry from "@sentry/remix";
import { wrapHandleErrorWithSentry } from "@sentry/remix";
import { addTenantContextToEvent } from "~/utils/sentryTenantContext.server";
import { parseAcceptLanguage } from "intl-parse-accept-language";
import isbot from "isbot";
import { renderToPipeableStream } from "react-dom/server";
Expand All @@ -24,44 +26,12 @@ import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";
// Touch the sessions replication singleton at entry so it boots deterministically
// on webapp startup. The singleton's initializer wires start (gated on
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
// runsReplicationInstance.
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
import { signalsEmitter } from "./services/signals.server";

// Start the sessions replication service (subscribes to the logical replication
// slot, runs leader election, flushes to ClickHouse). Done at entry level so it
// runs deterministically on webapp boot rather than lazily via a singleton
// reference elsewhere in the module graph.
if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") {
// Capture a non-nullable reference so the shutdown closure below
// doesn't need to re-null-check (TS narrowing doesn't follow through
// an inner function scope).
const replicator = sessionsReplicationInstance;
replicator
.start()
.then(() => {
console.log("🗃️ Sessions replication service started");
})
.catch((error) => {
console.error("🗃️ Sessions replication service failed to start", {
error,
});
});

// Wrap the async shutdown in a sync handler that catches rejections —
// SIGTERM/SIGINT fire during process teardown, and an unhandled
// promise rejection from `_replicationClient.stop()` there would
// bubble up past the process exit. Matches the pattern in
// dynamicFlushScheduler.server.ts.
const shutdownSessionsReplication = () => {
replicator.shutdown().catch((error) => {
console.error("🗃️ Sessions replication service shutdown error", {
error,
});
});
};
signalsEmitter.on("SIGTERM", shutdownSessionsReplication);
signalsEmitter.on("SIGINT", shutdownSessionsReplication);
}
void sessionsReplicationInstance;

const ABORT_DELAY = 30000;

Expand Down Expand Up @@ -289,9 +259,24 @@ process.on("uncaughtException", (error, origin) => {
singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);

// Wrapped in singleton() so Remix's dev-mode CJS reloads don't append
// duplicate copies of the processor — Sentry's processor list lives in
// node_modules and persists across module reloads. Idempotent at runtime
// (the processor is a pure read+stamp), but the pattern matches the rest
// of this file.
singleton("SentryTenantContextProcessor", () => {
if (env.SENTRY_DSN) {
Sentry.addEventProcessor(addTenantContextToEvent);
}
// Return a truthy value — `singleton()` uses `??=` so a `void`
// callback would re-execute (and re-register) on every dev reload.
return true;
});

export { apiRateLimiter } from "./services/apiRateLimit.server";
export { engineRateLimiter } from "./services/engineRateLimit.server";
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
export { tenantContextMiddleware } from "./services/tenantContextResolver.server";
export { socketIo } from "./v3/handleSocketIo.server";
export { wss } from "./v3/handleWebsockets.server";

Expand Down
32 changes: 30 additions & 2 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ const EnvironmentSchema = z
ELECTRIC_ORIGIN_SHARDS: z.string().optional(),
APP_ENV: z.string().default(process.env.NODE_ENV),
SERVICE_NAME: z.string().default("trigger.dev webapp"),
SENTRY_DSN: z.string().optional(),
POSTHOG_PROJECT_KEY: z.string().default("phc_LFH7kJiGhdIlnO22hTAKgHpaKhpM8gkzWAFvHmf5vfS"),
TRIGGER_TELEMETRY_DISABLED: z.string().optional(),
AUTH_GITHUB_CLIENT_ID: z.string().optional(),
Expand Down Expand Up @@ -459,7 +460,10 @@ const EnvironmentSchema = z
// If specified, you must configure the corresponding provider using OBJECT_STORE_{PROTOCOL}_* env vars.
// Example: OBJECT_STORE_DEFAULT_PROTOCOL=s3 requires OBJECT_STORE_S3_BASE_URL, OBJECT_STORE_S3_ACCESS_KEY_ID, etc.
// Enables zero-downtime migration between providers (old data keeps working, new data uses new provider).
OBJECT_STORE_DEFAULT_PROTOCOL: z.string().regex(/^[a-z0-9]+$/).optional(),
OBJECT_STORE_DEFAULT_PROTOCOL: z
.string()
.regex(/^[a-z0-9]+$/)
.optional(),

ARTIFACTS_OBJECT_STORE_BUCKET: z.string().optional(),
ARTIFACTS_OBJECT_STORE_BASE_URL: z.string().optional(),
Expand Down Expand Up @@ -1468,6 +1472,21 @@ const EnvironmentSchema = z
EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
EVENTS_CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
EVENTS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
// ClickHouse client used by @internal/run-engine's PendingVersionSystem.
// Kept on its own URL + pool so this low-QPS path can't contend with
// the main analytics client (CLICKHOUSE_URL). Falls back to the main
// URL when unset so unconfigured environments still work.
RUN_ENGINE_CLICKHOUSE_URL: z
.string()
.optional()
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(5),
RUN_ENGINE_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000),
Expand All @@ -1489,9 +1508,18 @@ const EnvironmentSchema = z
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),

// Organization data stores registry
ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce
.number()
.int()
.default(60 * 1000), // 1 minute

// LLM cost tracking
LLM_COST_TRACKING_ENABLED: BoolEnv.default(true),
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce
.number()
.int()
.default(5 * 60 * 1000), // 5 minutes
LLM_PRICING_RELOAD_CHANNEL: z.string().default("llm-registry:reload"),
LLM_PRICING_RELOAD_DEBOUNCE_MS: z.coerce.number().int().default(1000),
// Whether to subscribe this process to the LLM_PRICING_RELOAD_CHANNEL.
Expand Down
36 changes: 21 additions & 15 deletions apps/webapp/app/presenters/v3/AgentListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import {
type RuntimeEnvironmentType,
type TaskTriggerSource,
} from "@trigger.dev/database";
import { ClickHouse } from "@internal/clickhouse";
import { type ClickHouse } from "@internal/clickhouse";
import { z } from "zod";
import { $replica } from "~/db.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { singleton } from "~/utils/singleton";
import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server";

Expand All @@ -24,10 +24,7 @@ export type AgentActiveState = {
};

export class AgentListPresenter {
constructor(
private readonly clickhouse: ClickHouse,
private readonly _replica: PrismaClientOrTransaction
) {}
constructor(private readonly _replica: PrismaClientOrTransaction) {}

public async call({
organizationId,
Expand All @@ -40,6 +37,11 @@ export class AgentListPresenter {
environmentId: string;
environmentType: RuntimeEnvironmentType;
}) {
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(
organizationId,
"standard"
);

const currentWorker = await findCurrentWorkerFromEnvironment(
{
id: environmentId,
Expand Down Expand Up @@ -89,20 +91,21 @@ export class AgentListPresenter {
}

// All queries are deferred for streaming
const activeStates = this.#getActiveStates(environmentId, slugs);
const conversationSparklines = this.#getConversationSparklines(environmentId, slugs);
const costSparklines = this.#getCostSparklines(environmentId, slugs);
const tokenSparklines = this.#getTokenSparklines(environmentId, slugs);
const activeStates = this.#getActiveStates(clickhouse, environmentId, slugs);
const conversationSparklines = this.#getConversationSparklines(clickhouse, environmentId, slugs);
const costSparklines = this.#getCostSparklines(clickhouse, environmentId, slugs);
const tokenSparklines = this.#getTokenSparklines(clickhouse, environmentId, slugs);

return { agents, activeStates, conversationSparklines, costSparklines, tokenSparklines };
}

/** Count runs currently executing vs suspended per agent */
async #getActiveStates(
clickhouse: ClickHouse,
environmentId: string,
slugs: string[]
): Promise<Record<string, AgentActiveState>> {
const queryFn = this.clickhouse.reader.query({
const queryFn = clickhouse.reader.query({
name: "agentActiveStates",
query: `SELECT
task_identifier,
Expand Down Expand Up @@ -140,10 +143,11 @@ export class AgentListPresenter {

/** 24h hourly sparkline of conversation (run) count per agent */
async #getConversationSparklines(
clickhouse: ClickHouse,
environmentId: string,
slugs: string[]
): Promise<Record<string, number[]>> {
const queryFn = this.clickhouse.reader.query({
const queryFn = clickhouse.reader.query({
name: "agentConversationSparklines",
query: `SELECT
task_identifier,
Expand Down Expand Up @@ -172,10 +176,11 @@ export class AgentListPresenter {

/** 24h hourly sparkline of LLM cost per agent */
async #getCostSparklines(
clickhouse: ClickHouse,
environmentId: string,
slugs: string[]
): Promise<Record<string, number[]>> {
const queryFn = this.clickhouse.reader.query({
const queryFn = clickhouse.reader.query({
name: "agentCostSparklines",
query: `SELECT
task_identifier,
Expand Down Expand Up @@ -203,10 +208,11 @@ export class AgentListPresenter {

/** 24h hourly sparkline of total tokens per agent */
async #getTokenSparklines(
clickhouse: ClickHouse,
environmentId: string,
slugs: string[]
): Promise<Record<string, number[]>> {
const queryFn = this.clickhouse.reader.query({
const queryFn = clickhouse.reader.query({
name: "agentTokenSparklines",
query: `SELECT
task_identifier,
Expand Down Expand Up @@ -284,5 +290,5 @@ export class AgentListPresenter {
export const agentListPresenter = singleton("agentListPresenter", setupAgentListPresenter);

function setupAgentListPresenter() {
return new AgentListPresenter(clickhouseClient, $replica);
return new AgentListPresenter($replica);
}
Loading
Loading