Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/org-scoped-clickhouse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@trigger.dev/core": patch
"@trigger.dev/database": patch
---

Support for org-scoped ClickHouse

Implements OrganizationDataStore system allowing organizations to have data stored in specific separate ClickHouse instances. Adds factory-based client resolution, registry system for organization data store configurations, caching by organization and type, and admin UI routes for dynamic configuration.
6 changes: 5 additions & 1 deletion .cursor/mcp.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
{
"mcpServers": {}
"mcpServers": {
"linear": {
"url": "https://mcp.linear.app/mcp"
}
}
}
6 changes: 6 additions & 0 deletions .server-changes/organization-scoped-clickhouse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Organization-scoped ClickHouse routing enables customers with HIPAA and other data security requirements to use dedicated database instances
11 changes: 11 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ containerTest("should use both", async ({ prisma, redisOptions }) => {
});
```

## Code Style

### Imports

**Prefer static imports over dynamic imports.** Only use dynamic `import()` when:
- Circular dependencies cannot be resolved otherwise
- Code splitting is genuinely needed for performance
- The module must be loaded conditionally at runtime

Dynamic imports add unnecessary overhead in hot paths and make code harder to analyze. If you find yourself using `await import()`, ask if a regular `import` statement would work instead.

## Changesets and Server Changes

When modifying any public package (`packages/*` or `integrations/*`), add a changeset:
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,9 @@ const EnvironmentSchema = z
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),

// Organization data stores registry
ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes

// LLM cost tracking
LLM_COST_TRACKING_ENABLED: BoolEnv.default(true),
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes
Expand Down
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { type Project, type RuntimeEnvironment, type TaskRunStatus } from "@trig
import assertNever from "assert-never";
import { z } from "zod";
import { API_VERSIONS, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
import { logger } from "~/services/logger.server";
import { CoercedDate } from "~/utils/zod";
import { ServiceValidationError } from "~/v3/services/baseService.server";
Expand Down Expand Up @@ -259,7 +259,8 @@ export class ApiRunListPresenter extends BasePresenter {
options.machines = searchParams["filter[machine]"];
}

const presenter = new NextRunListPresenter(this._replica, clickhouseClient);
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const presenter = new NextRunListPresenter(this._replica, clickhouse);

logger.debug("Calling RunListPresenter", { options });

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type PrismaClient } from "@trigger.dev/database";
import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { getRunFiltersFromRequest } from "../RunFilters.server";
import { BasePresenter } from "./basePresenter.server";
Expand All @@ -24,8 +24,9 @@ export class CreateBulkActionPresenter extends BasePresenter {
Object.fromEntries(new URL(request.url).searchParams)
);

const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const runsRepository = new RunsRepository({
clickhouse: clickhouseClient,
clickhouse,
prisma: this._replica as PrismaClient,
});

Expand Down
11 changes: 7 additions & 4 deletions apps/webapp/app/presenters/v3/RunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/Tr
import { prisma, type PrismaClient } from "~/db.server";
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
import { getUsername } from "~/utils/username";
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { SpanSummary } from "~/v3/eventRepository/eventRepository.types";
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { isFinalRunStatus } from "~/v3/taskStatus";
import { env } from "~/env.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";

type Result = Awaited<ReturnType<RunPresenter["call"]>>;
export type Run = Result["run"];
Expand Down Expand Up @@ -145,10 +145,13 @@ export class RunPresenter {
};
}

const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
const { repository } = await clickhouseFactory.getEventRepositoryForOrganization(
run.taskEventStore,
run.runtimeEnvironment.organizationId
);

// get the events
let traceSummary = await eventRepository.getTraceSummary(
let traceSummary = await repository.getTraceSummary(
getTaskEventStoreTableForRun(run),
run.runtimeEnvironment.id,
run.traceId,
Expand Down Expand Up @@ -272,7 +275,7 @@ export class RunPresenter {
overridesBySpanId: traceSummary.overridesBySpanId,
linkedRunIdBySpanId,
},
maximumLiveReloadingSetting: eventRepository.maximumLiveReloadingSetting,
maximumLiveReloadingSetting: repository.maximumLiveReloadingSetting,
};
}
}
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/RunTagListPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { BasePresenter } from "./basePresenter.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
import { type PrismaClient } from "@trigger.dev/database";
import { timeFilters } from "~/components/runs/v3/SharedFilters";

Expand Down Expand Up @@ -37,8 +37,9 @@ export class RunTagListPresenter extends BasePresenter {
}: TagListOptions) {
const hasFilters = Boolean(name?.trim());

const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const runsRepository = new RunsRepository({
clickhouse: clickhouseClient,
clickhouse,
prisma: this._replica as PrismaClient,
});

Expand Down
25 changes: 10 additions & 15 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
import { BasePresenter } from "./basePresenter.server";
import { WaitpointPresenter } from "./WaitpointPresenter.server";
import { engine } from "~/v3/runEngine.server";
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { IEventRepository, SpanDetail } from "~/v3/eventRepository/eventRepository.types";
import { safeJsonParse } from "~/utils/json";
import {
Expand All @@ -30,6 +29,7 @@ import {
extractAIToolCallData,
extractAIEmbedData,
} from "~/components/runs/v3/ai";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";

export type PromptSpanData = {
slug: string;
Expand All @@ -42,9 +42,7 @@ export type PromptSpanData = {
config?: string;
};

function extractPromptSpanData(
properties: Record<string, unknown>
): PromptSpanData | undefined {
function extractPromptSpanData(properties: Record<string, unknown>): PromptSpanData | undefined {
// Properties come as an unflattened nested object from ClickHouse,
// e.g. { prompt: { slug: "...", version: 3, ... } }
const prompt = properties.prompt;
Expand Down Expand Up @@ -132,14 +130,17 @@ export class SpanPresenter extends BasePresenter {

const { traceId } = parentRun;

const eventRepository = resolveEventRepositoryForStore(parentRun.taskEventStore);
const { repository } = await clickhouseFactory.getEventRepositoryForOrganization(
parentRun.taskEventStore,
project.organizationId
);

const eventStore = getTaskEventStoreTableForRun(parentRun);

const run = await this.getRun({
eventStore,
traceId,
eventRepository,
eventRepository: repository,
spanId,
linkedRunId,
createdAt: parentRun.createdAt,
Expand All @@ -161,7 +162,7 @@ export class SpanPresenter extends BasePresenter {
projectId: parentRun.projectId,
createdAt: parentRun.createdAt,
completedAt: parentRun.completedAt,
eventRepository,
eventRepository: repository,
});

if (!span) {
Expand Down Expand Up @@ -592,10 +593,7 @@ export class SpanPresenter extends BasePresenter {
triggeredRuns,
aiData:
span.properties && typeof span.properties === "object"
? extractAISpanData(
span.properties as Record<string, unknown>,
span.duration / 1_000_000
)
? extractAISpanData(span.properties as Record<string, unknown>, span.duration / 1_000_000)
: undefined,
};

Expand Down Expand Up @@ -739,10 +737,7 @@ export class SpanPresenter extends BasePresenter {
"ai.streamObject",
];

if (
typeof span.message === "string" &&
AI_SUMMARY_MESSAGES.includes(span.message)
) {
if (typeof span.message === "string" && AI_SUMMARY_MESSAGES.includes(span.message)) {
const aiSummaryData = extractAISummarySpanData(
span.properties as Record<string, unknown>,
span.duration / 1_000_000
Expand Down
25 changes: 12 additions & 13 deletions apps/webapp/app/presenters/v3/TaskListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
type TaskTriggerSource,
} from "@trigger.dev/database";
import { $replica } from "~/db.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
import {
type AverageDurations,
ClickHouseEnvironmentMetricsRepository,
Expand All @@ -25,10 +25,7 @@ export type TaskListItem = {
export type TaskActivity = DailyTaskActivity[string];

export class TaskListPresenter {
constructor(
private readonly environmentMetricsRepository: EnvironmentMetricsRepository,
private readonly _replica: PrismaClientOrTransaction
) {}
constructor(private readonly _replica: PrismaClientOrTransaction) {}

public async call({
organizationId,
Expand Down Expand Up @@ -76,25 +73,31 @@ export class TaskListPresenter {

const slugs = tasks.map((t) => t.slug);

// Create org-specific environment metrics repository
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const environmentMetricsRepository = new ClickHouseEnvironmentMetricsRepository({
clickhouse,
});

// IMPORTANT: Don't await these, we want to return the promises
// so we can defer the loading of the data
const activity = this.environmentMetricsRepository.getDailyTaskActivity({
const activity = environmentMetricsRepository.getDailyTaskActivity({
organizationId,
projectId,
environmentId,
days: 6, // This actually means 7 days, because we want to show the current day too
tasks: slugs,
});

const runningStats = this.environmentMetricsRepository.getCurrentRunningStats({
const runningStats = environmentMetricsRepository.getCurrentRunningStats({
organizationId,
projectId,
environmentId,
days: 6,
tasks: slugs,
});

const durations = this.environmentMetricsRepository.getAverageDurations({
const durations = environmentMetricsRepository.getAverageDurations({
organizationId,
projectId,
environmentId,
Expand All @@ -109,9 +112,5 @@ export class TaskListPresenter {
export const taskListPresenter = singleton("taskListPresenter", setupTaskListPresenter);

function setupTaskListPresenter() {
const environmentMetricsRepository = new ClickHouseEnvironmentMetricsRepository({
clickhouse: clickhouseClient,
});

return new TaskListPresenter(environmentMetricsRepository, $replica);
return new TaskListPresenter($replica);
}
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/UsagePresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { getUsage, getUsageSeries } from "~/services/platform.v3.server";
import { createTimeSeriesData } from "~/utils/graphs";
import { BasePresenter } from "./basePresenter.server";
import { DataPoint, linear } from "regression";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";

type Options = {
organizationId: string;
Expand Down Expand Up @@ -124,7 +124,8 @@ async function getTaskUsageByOrganization(
endOfMonth: Date,
replica: PrismaClientOrTransaction
) {
const [queryError, tasks] = await clickhouseClient.taskRuns.getTaskUsageByOrganization({
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const [queryError, tasks] = await clickhouse.taskRuns.getTaskUsageByOrganization({
startTime: startOfMonth.getTime(),
endTime: endOfMonth.getTime(),
organizationId,
Expand Down
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/ViewSchedulePresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ScheduleObject } from "@trigger.dev/core/v3";
import { PrismaClient, prisma } from "~/db.server";
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
import { nextScheduledTimestamps } from "~/v3/utils/calculateNextSchedule.server";
import { NextRunListPresenter } from "./NextRunListPresenter.server";
import { scheduleWhereClause } from "~/models/schedules.server";
Expand Down Expand Up @@ -75,7 +75,8 @@ export class ViewSchedulePresenter {
? nextScheduledTimestamps(schedule.generatorExpression, schedule.timezone, new Date(), 5)
: [];

const runPresenter = new NextRunListPresenter(this.#prismaClient, clickhouseClient);
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(schedule.project.organizationId, "standard");
const runPresenter = new NextRunListPresenter(this.#prismaClient, clickhouse);
const { runs } = await runPresenter.call(schedule.project.organizationId, environmentId, {
projectId: schedule.project.id,
scheduleId: schedule.id,
Expand Down
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/WaitpointPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { isWaitpointOutputTimeout, prettyPrintPacket } from "@trigger.dev/core/v3";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
import { generateHttpCallbackUrl } from "~/services/httpCallback.server";
import { logger } from "~/services/logger.server";
import { BasePresenter } from "./basePresenter.server";
Expand Down Expand Up @@ -79,7 +79,8 @@ export class WaitpointPresenter extends BasePresenter {
const connectedRuns: NextRunListItem[] = [];

if (connectedRunIds.length > 0) {
const runPresenter = new NextRunListPresenter(this._prisma, clickhouseClient);
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(waitpoint.environment.organizationId, "standard");
const runPresenter = new NextRunListPresenter(this._prisma, clickhouse);
const { runs } = await runPresenter.call(
waitpoint.environment.organizationId,
environmentId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
MetricDashboardPresenter,
} from "~/presenters/v3/MetricDashboardPresenter.server";
import { PromptPresenter } from "~/presenters/v3/PromptPresenter.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
import { requireUser } from "~/services/session.server";
import { cn } from "~/utils/cn";
import { EnvironmentParamSchema } from "~/utils/pathBuilder";
Expand Down Expand Up @@ -75,10 +75,12 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {

const filters = dashboard.filters ?? ["tasks", "queues"];

const clickhouse = await clickhouseFactory.getClickhouseForOrganization(project.organizationId, "standard");

// Load distinct models from ClickHouse if the dashboard has a models filter
let possibleModels: { model: string; system: string }[] = [];
if (filters.includes("models")) {
const queryFn = clickhouseClient.reader.query({
const queryFn = clickhouse.reader.query({
name: "getDistinctModels",
query: `SELECT response_model, any(gen_ai_system) AS gen_ai_system FROM trigger_dev.llm_metrics_v1 WHERE organization_id = {organizationId: String} AND project_id = {projectId: String} AND environment_id = {environmentId: String} AND response_model != '' GROUP BY response_model ORDER BY response_model`,
params: z.object({
Expand All @@ -98,7 +100,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
}
}

const promptPresenter = new PromptPresenter(clickhouseClient);
const promptPresenter = new PromptPresenter(clickhouse);
const [possiblePrompts, possibleOperations, possibleProviders] = await Promise.all([
filters.includes("prompts")
? promptPresenter.getDistinctPromptSlugs(project.organizationId, project.id, environment.id)
Expand Down
Loading