Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,52 @@ REDIS_TLS_DISABLED="true"
DEV_OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:3030/otel"
DEV_OTEL_BATCH_PROCESSING_ENABLED="0"

# Realtime streams v2 (Sessions, chat.agent, large stream backfills) backed
# by S2 (https://s2.dev). The `s2` service in docker/docker-compose.yml runs
# the open-source s2-lite binary and pre-creates a basin named `trigger-local`
# (see docker/config/s2-spec.json). Comment these out to fall back to v1
# (Redis-only) streams; Sessions and chat.agent then become unavailable.
REALTIME_STREAMS_S2_BASIN=trigger-local
REALTIME_STREAMS_S2_ACCESS_TOKEN=ignored
REALTIME_STREAMS_S2_ENDPOINT=http://localhost:4566/v1
REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS=true
REALTIME_STREAMS_DEFAULT_VERSION=v2

# Running multiple instances side by side (worktrees, branch experiments)
#
# Every host port in docker/docker-compose.yml is `${VAR:-default}` and the
# project name comes from `COMPOSE_PROJECT_NAME`. To stand up a second stack
# alongside the default one, uncomment the block below in this clone's `.env`
# (pick any offset that doesn't clash with anything else running), then update
# the URL/PORT vars further up to match. Default values are commented for
# reference.
#
# --- core (pnpm run docker) ---
# COMPOSE_PROJECT_NAME=triggerdotdev-docker-alt
# CONTAINER_PREFIX=alt-
# POSTGRES_HOST_PORT=15432 # default 5432
# REDIS_HOST_PORT=16379 # default 6379
# ELECTRIC_HOST_PORT=13060 # default 3060
# MINIO_API_HOST_PORT=19005 # default 9005
# MINIO_CONSOLE_HOST_PORT=19006 # default 9006
# CLICKHOUSE_HTTP_HOST_PORT=18123 # default 8123
# CLICKHOUSE_TCP_HOST_PORT=19000 # default 9000
# S2_HOST_PORT=14566 # default 4566
# REMIX_APP_PORT=13030 # default 3030
# --- extras (only needed if you also run `pnpm run docker:full`) ---
# ELECTRIC_SHARD_1_HOST_PORT=13061 # default 3061
# CH_UI_HOST_PORT=15521 # default 5521
# TOXIPROXY_PROXY_HOST_PORT=40303 # default 30303
# TOXIPROXY_API_HOST_PORT=18474 # default 8474
# NGINX_H2_HOST_PORT=18443 # default 8443
# OTEL_GRPC_HOST_PORT=14317 # default 4317
# OTEL_HTTP_HOST_PORT=14318 # default 4318
# OTEL_PROMETHEUS_HOST_PORT=18889 # default 8889
# PROMETHEUS_HOST_PORT=19090 # default 9090
# GRAFANA_HOST_PORT=13001 # default 3001
# (and update DATABASE_URL / CLICKHOUSE_URL / REDIS_PORT / APP_ORIGIN /
# LOGIN_ORIGIN / ELECTRIC_ORIGIN / REALTIME_STREAMS_S2_ENDPOINT to match)

# When the domain is set to `localhost` the CLI deploy command will only --load the image by default and not --push it
DEPLOY_REGISTRY_HOST=localhost:5000

Expand Down Expand Up @@ -106,7 +152,7 @@ POSTHOG_PROJECT_KEY=
# INTERNAL_OTEL_TRACE_LOGGING_ENABLED=1
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0

# Enable local observability stack (requires `pnpm run docker` to start otel-collector)
# Enable local observability stack (requires `pnpm run docker:full` to bring up otel-collector + prometheus + grafana)
# Uncomment these to send metrics to the local Prometheus via OTEL Collector:
# INTERNAL_OTEL_METRIC_EXPORTER_ENABLED=1
# INTERNAL_OTEL_METRIC_EXPORTER_URL=http://localhost:4318/v1/metrics
Expand Down
6 changes: 6 additions & 0 deletions .server-changes/realtimestreams-dedupe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Dedupe the `realtimeStreams` array push on `PUT /realtime/v1/streams/:runId/:target/:streamId` so repeat stream-init calls for the same `(run, streamId)` skip the row UPDATE, mirroring the existing append handler.
6 changes: 6 additions & 0 deletions .server-changes/webapp-sentry-fingerprint-p1001.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Group Prisma P1001 ("Can't reach database server") errors into a single Sentry issue via a `beforeSend` fingerprint rule, so DB outages no longer fan out into hundreds of distinct issues that bury other alerts. Adds a small extensible rule table for future collapsing rules.
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ See `ai/references/repo.md` for a more complete explanation of the workspaces.
```bash
pnpm run docker
```
Add `:full` (`pnpm run docker:full`) for the optional observability + chaos tooling. See `docker/docker-compose.extras.yml`.
4. Run database migrations:
```bash
pnpm run db:migrate
Expand Down
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ This is a pnpm 10.33.2 monorepo using Turborepo. Run commands from root with `pn
**Adding dependencies:** Edit `package.json` directly instead of using `pnpm add`, then run `pnpm i` from the repo root. See `.claude/rules/package-installation.md` for the full process.

```bash
pnpm run docker # Start Docker services (PostgreSQL, Redis, Electric)
pnpm run docker # Core dev services (Postgres, Redis, Electric, MinIO, ClickHouse, s2-lite)
# pnpm run docker:full # Same + observability stack (Prometheus, Grafana, OTEL) and chaos tooling
pnpm run db:migrate # Run database migrations
pnpm run db:seed # Seed the database (required for reference projects)

Expand Down
8 changes: 7 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ branch are tagged into a release periodically.

Feel free to update `SESSION_SECRET` and `MAGIC_LINK_SECRET` as well using the same method.

8. Start Docker. This starts the required services: Postgres, Redis, Electric, and ClickHouse (the ClickHouse migrator runs once on first start). If this is your first time using Docker, consider going through this [guide](DOCKER_INSTALLATION.md).
8. Start Docker. This starts the core dev services (Postgres, Redis, Electric, MinIO, ClickHouse, s2-lite) and runs the ClickHouse migrator once on first start. If this is your first time using Docker, consider going through this [guide](DOCKER_INSTALLATION.md).

```
pnpm run docker
```

For the observability stack (Prometheus, Grafana, OTEL collector) and other optional tooling (Toxiproxy, nginx-h2, ch-ui, extra electric shard), use `pnpm run docker:full` instead. See `docker/docker-compose.extras.yml` for the full list.

9. Migrate the database
```
pnpm run db:migrate
Expand Down Expand Up @@ -300,3 +302,7 @@ The process running on port `3030` should be destroyed.
```sh
sudo kill -9 <PID>
```

### Running two clones side by side (worktree, branch experiment)

The default `pnpm run docker` uses the project name `triggerdotdev-docker` and the standard host ports (5432, 6379, 3060, 4566, 8123, 9000, 9005, 9006). To stand up a second instance in another clone without clashing, set a different `COMPOSE_PROJECT_NAME` and the offset host ports in that clone's `.env`. The "Running multiple instances side by side" block in `.env.example` lists every overridable env var with its default for reference; uncomment the lines you need and update `DATABASE_URL` / `CLICKHOUSE_URL` / `REDIS_PORT` / `APP_ORIGIN` / `LOGIN_ORIGIN` / `ELECTRIC_ORIGIN` / `REALTIME_STREAMS_S2_ENDPOINT` to match.
124 changes: 111 additions & 13 deletions apps/webapp/app/components/runs/v3/agent/AgentView.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { UIMessage } from "@ai-sdk/react";
import { SSEStreamSubscription } from "@trigger.dev/core/v3";
import { ChatSnapshotV1Schema, SSEStreamSubscription } from "@trigger.dev/core/v3";
import { useEffect, useMemo, useRef, useState } from "react";
import { Paragraph } from "~/components/primitives/Paragraph";
import { Spinner } from "~/components/primitives/Spinner";
Expand Down Expand Up @@ -27,6 +27,15 @@ export type AgentViewAuth = {
* channel and is merged in by the AgentView subscription.
*/
initialMessages: UIMessage[];
/**
* Presigned GET URL for the session's chat-snapshot S3 blob (written
* by the agent after each turn-complete; see `ChatSnapshotV1`).
* Optional — sessions that registered a `hydrateMessages` hook skip
* snapshot writes and the URL fetch will 404. In that case the
* dashboard falls back to seq=0 SSE (which, post-trim, shows only the
* most recent turn). Generated server-side by `SessionPresenter`.
*/
snapshotPresignedUrl?: string;
};

/**
Expand Down Expand Up @@ -81,6 +90,7 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
projectSlug: project.slug,
envSlug: environment.slug,
initialMessages: agentView.initialMessages,
snapshotPresignedUrl: agentView.snapshotPresignedUrl,
});

// Sticky-bottom auto-scroll: walks up to find the inspector's scroll
Expand Down Expand Up @@ -120,13 +130,19 @@ export function AgentView({ agentView }: { agentView: AgentViewAuth }) {
* - `kind: "stop"` is a stop signal — no messages, nothing to render
* here, so it's filtered.
*
* Wire payloads are slim-wire (one new UIMessage per record, on
* `payload.message`). The legacy `payload.messages` array shape is kept
* here as a fallback so any historical records on a long-lived session
* still render.
*
* The server wraps records in `{data, id}` and writes `data` as a JSON
* string; SSE v2 delivers the parsed string back. {@link parseChunkPayload}
* re-parses to recover the object.
*/
type InputStreamChunk = {
kind?: "message" | "stop";
payload?: {
message?: { id?: string; role?: string; parts?: unknown[] };
messages?: Array<{ id?: string; role?: string; parts?: unknown[] }>;
trigger?: string;
};
Expand Down Expand Up @@ -217,13 +233,15 @@ function useAgentSessionMessages({
projectSlug,
envSlug,
initialMessages,
snapshotPresignedUrl,
}: {
sessionId: string;
apiOrigin: string;
orgSlug: string;
projectSlug: string;
envSlug: string;
initialMessages: UIMessage[];
snapshotPresignedUrl?: string;
}): UIMessage[] {
// Seed with the user messages from the run's task payload.
const seedMessages = useMemo(
Expand Down Expand Up @@ -285,27 +303,92 @@ function useAgentSessionMessages({
const outputUrl = `${sessionBase}/out`;
const inputUrl = `${sessionBase}/in`;

/**
* Try to seed `pendingRef` from the agent's S3 snapshot blob and return
* the snapshot's `lastOutEventId` so the `.out` SSE subscription resumes
* just past the snapshot. Returns undefined for sessions that don't
* have a snapshot (e.g. `hydrateMessages` customers, or sessions that
* have never completed a turn).
*/
const loadSnapshot = async (): Promise<string | undefined> => {
if (!snapshotPresignedUrl) return undefined;
try {
const resp = await fetch(snapshotPresignedUrl, { signal: abort.signal });
if (!resp.ok) return undefined;
const json = (await resp.json()) as unknown;
const parsed = ChatSnapshotV1Schema.safeParse(json);
if (!parsed.success) return undefined;
const snapshot = parsed.data;
// Preserve the snapshot's array order in the final render by
// giving each message a unique, monotonically increasing
// timestamp from `(savedAt - count + index)`. Real chunk
// timestamps from the SSE path use S2 arrival ms (positive
// numbers in the present), so anything below `savedAt` sorts
// before live chunks while preserving snapshot order among
// themselves.
const count = snapshot.messages.length;
snapshot.messages.forEach((raw, i) => {
const message = raw as UIMessage;
if (!message?.id) return;
// The snapshot's seed wins over the task-payload seed for any
// overlapping ids (the snapshot represents the agent's
// canonical accumulator, post-turn).
pendingRef.current.set(message.id, message);
if (!timestampsRef.current.has(message.id)) {
timestampsRef.current.set(message.id, snapshot.savedAt - count + i);
}
});
scheduleFlush.current();
return snapshot.lastOutEventId;
} catch {
// 404 / network / parse / abort — fall back to seq=0 SSE
return undefined;
}
};

const outputSubOptions = (lastEventId: string | undefined) =>
({
signal: abort.signal,
timeoutInSeconds: 120,
...(lastEventId !== undefined ? { lastEventId } : {}),
}) as const;

const commonSubOptions = {
signal: abort.signal,
timeoutInSeconds: 120,
} as const;

// ---- Output stream: assistant messages ---------------------------------
//
// The output stream delivers UIMessageChunks interleaved with
// Trigger-specific control chunks (`trigger:turn-complete`, etc.). We
// filter the control chunks and fold everything else into an assistant
// `UIMessage` via our own `applyOutputChunk` accumulator — the AI SDK's
// `readUIMessageStream` helper is only available in `ai@6`, and the
// webapp is pinned to `ai@4`, so we re-implement just the chunk types
// that `renderPart` actually displays.
// The output stream delivers data records (UIMessageChunks) interleaved
// with Trigger control records (`turn-complete`, `upgrade-required`) and
// S2 command records (`trim`). Control + command records ride on
// `record.headers` with empty bodies; the SSE parser strips S2 command
// records entirely, and control records arrive with `value.chunk ===
// undefined`, which `parseChunkPayload` drops below.
//
// We fold everything else into an assistant `UIMessage` via our own
// `applyOutputChunk` accumulator — the AI SDK's `readUIMessageStream`
// helper is only available in `ai@6`, and the webapp is pinned to
// `ai@4`, so we re-implement just the chunk types that `renderPart`
// actually displays.
//
// We capture the **server timestamp of each assistant message's first
// `start` chunk** so later sort-by-timestamp merges with the input
// stream correctly.
const runOutput = async () => {
try {
const sub = new SSEStreamSubscription(outputUrl, commonSubOptions);
// Seed messages from the snapshot first (if available), then
// resume the SSE from the snapshot's last event id so we don't
// re-stream chunks already represented in the snapshot. If no
// snapshot exists (no URL, 404, parse failure), the SSE opens
// at seq=0 — which, post-trim, contains roughly one turn of
// records (acceptable fallback for `hydrateMessages` sessions
// and fresh sessions).
const snapshotLastEventId = await loadSnapshot();
if (abort.signal.aborted) return;

const sub = new SSEStreamSubscription(outputUrl, outputSubOptions(snapshotLastEventId));
const raw = await sub.subscribe();
const reader = raw.getReader();

Expand All @@ -318,6 +401,12 @@ function useAgentSessionMessages({

const chunk = parseChunkPayload(value.chunk) as OutputChunk | null;
if (!chunk || typeof chunk.type !== "string") continue;
// Legacy belt-and-suspenders: prior versions of the SDK
// emitted `trigger:turn-complete` / `trigger:upgrade-required`
// as data records (`type` field). Current versions use
// header-form control records, which `parseChunkPayload`
// drops above. Keep this filter to handle any in-flight
// sessions whose `.out` was populated by the older SDK.
if (chunk.type.startsWith("trigger:")) continue;

if (chunk.type === "start") {
Expand Down Expand Up @@ -413,9 +502,18 @@ function useAgentSessionMessages({
const chunk = parseChunkPayload(value.chunk) as InputStreamChunk | null;
if (!chunk || chunk.kind !== "message") continue;
const payload = chunk.payload;
if (!payload || !Array.isArray(payload.messages)) continue;

const incomingUsers = payload.messages.filter(
if (!payload) continue;

// Slim-wire is one UIMessage on `payload.message`; legacy
// payloads carried an array on `payload.messages`. Accept
// either so historical records on a long-lived session still
// render.
const candidates = Array.isArray(payload.messages)
? payload.messages
: payload.message
? [payload.message]
: [];
const incomingUsers = candidates.filter(
(m): m is UIMessage =>
m != null && (m as { role?: string }).role === "user" && typeof m.id === "string"
);
Expand Down Expand Up @@ -454,7 +552,7 @@ function useAgentSessionMessages({
pendingTimerRef.current = null;
}
};
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug]);
}, [sessionId, apiOrigin, orgSlug, projectSlug, envSlug, snapshotPresignedUrl]);

return useMemo(() => {
const timestamps = timestampsRef.current;
Expand Down
Loading
Loading