diff --git a/.changeset/ai-prompts.md b/.changeset/ai-prompts.md new file mode 100644 index 00000000000..511aa303097 --- /dev/null +++ b/.changeset/ai-prompts.md @@ -0,0 +1,52 @@ +--- +"@trigger.dev/sdk": minor +--- + +**AI Prompts** — define prompt templates as code alongside your tasks, version them on deploy, and override the text or model from the dashboard without redeploying. Prompts integrate with the Vercel AI SDK via `toAISDKTelemetry()` (links every generation span back to the prompt) and with `chat.agent` via `chat.prompt.set()` + `chat.toStreamTextOptions()`. + +```ts +import { prompts } from "@trigger.dev/sdk"; +import { generateText } from "ai"; +import { openai } from "@ai-sdk/openai"; +import { z } from "zod"; + +export const supportPrompt = prompts.define({ + id: "customer-support", + model: "gpt-4o", + config: { temperature: 0.7 }, + variables: z.object({ + customerName: z.string(), + plan: z.string(), + issue: z.string(), + }), + content: `You are a support agent for Acme. + +Customer: {{customerName}} ({{plan}} plan) +Issue: {{issue}}`, +}); + +const resolved = await supportPrompt.resolve({ + customerName: "Alice", + plan: "Pro", + issue: "Can't access billing", +}); + +const result = await generateText({ + model: openai(resolved.model ?? "gpt-4o"), + system: resolved.text, + prompt: "Can't access billing", + ...resolved.toAISDKTelemetry(), +}); +``` + +**What you get:** + +- **Code-defined, deploy-versioned templates** — define with `prompts.define({ id, model, config, variables, content })`. Every deploy creates a new version visible in the dashboard. Mustache-style placeholders (`{{var}}`, `{{#cond}}...{{/cond}}`) with Zod / ArkType / Valibot-typed variables. +- **Dashboard overrides** — change a prompt's text or model from the dashboard without redeploying. Overrides take priority over the deployed "current" version and are environment-scoped (dev / staging / production independent). +- **Resolve API** — `prompt.resolve(vars, { version?, label? })` returns the compiled `text`, resolved `model`, `version`, and labels. Standalone `prompts.resolve(slug, vars)` for cross-file resolution with full type inference on slug and variable shape. +- **AI SDK integration** — spread `resolved.toAISDKTelemetry({ ...extra })` into any `generateText` / `streamText` call and every generation span links to the prompt in the dashboard alongside its input variables, model, tokens, and cost. +- **`chat.agent` integration** — `chat.prompt.set(resolved)` stores the resolved prompt run-scoped; `chat.toStreamTextOptions({ registry })` pulls `system`, `model` (resolved via the AI SDK provider registry), `temperature` / `maxTokens` / etc., and telemetry into a single spread for `streamText`. +- **Management SDK** — `prompts.list()`, `prompts.versions(slug)`, `prompts.promote(slug, version)`, `prompts.createOverride(slug, body)`, `prompts.updateOverride(slug, body)`, `prompts.removeOverride(slug)`, `prompts.reactivateOverride(slug, version)`. +- **Dashboard** — prompts list with per-prompt usage sparklines; per-prompt detail with Template / Details / Versions / Generations / Metrics tabs. AI generation spans get a custom inspector showing the linked prompt's metadata, input variables, and template content alongside model, tokens, cost, and the message thread. + +See [/docs/ai/prompts](https://trigger.dev/docs/ai/prompts) for the full reference — template syntax, version resolution order, override workflow, and type utilities (`PromptHandle`, `PromptIdentifier`, `PromptVariables`). diff --git a/.changeset/chat-actions-no-turn.md b/.changeset/chat-actions-no-turn.md deleted file mode 100644 index a0113441520..00000000000 --- a/.changeset/chat-actions-no-turn.md +++ /dev/null @@ -1,33 +0,0 @@ ---- -"@trigger.dev/sdk": minor ---- - -`chat.agent` actions are no longer treated as turns. They fire `hydrateMessages` and `onAction` only — no `onTurnStart` / `prepareMessages` / `onBeforeTurnComplete` / `onTurnComplete`, no `run()`, no turn-counter increment. The trace span is named `chat action` instead of `chat turn N`. - -`onAction` can now return a `StreamTextResult`, `string`, or `UIMessage` to produce a model response from the action; returning `void` (the previous and now default) is side-effect-only. - -**Migration**: if you previously had `run()` branching on `payload.trigger === "action"`, return your `streamText(...)` from `onAction` instead. If you persisted in `onTurnComplete`, do that work inside `onAction`. For any other state-only action, just remove your skip-the-model workaround — the default is now correct. - -```ts -// before -onAction: async ({ action }) => { - if (action.type === "regenerate") { - chat.store.set({ skipModelCall: false }); - chat.history.slice(0, -1); - } -}, -run: async ({ messages, signal }) => { - if (chat.store.get()?.skipModelCall) return; - return streamText({ model, messages, abortSignal: signal }); -}, - -// after -onAction: async ({ action, messages, signal }) => { - if (action.type === "regenerate") { - chat.history.slice(0, -1); - return streamText({ model, messages, abortSignal: signal }); - } -}, -run: async ({ messages, signal }) => - streamText({ model, messages, abortSignal: signal }), -``` diff --git a/.changeset/chat-agent-delta-wire-snapshots.md b/.changeset/chat-agent-delta-wire-snapshots.md deleted file mode 100644 index 21a8fd01fa4..00000000000 --- a/.changeset/chat-agent-delta-wire-snapshots.md +++ /dev/null @@ -1,8 +0,0 @@ ---- -"@trigger.dev/sdk": patch -"@trigger.dev/core": patch ---- - -`chat.agent` wire is now delta-only — clients ship at most one new message per `.in/append` instead of the full `UIMessage[]` history. The agent rebuilds prior history at run boot from a JSON snapshot in object storage plus a `wait=0` replay of the `session.out` tail. Long chats stop hitting the 512 KiB body cap on `/realtime/v1/sessions/{id}/in/append`. Snapshot writes happen after every `onTurnComplete`, awaited so they survive idle suspend; reads happen only at run boot. Registering a `hydrateMessages` hook short-circuits both the snapshot read/write and the replay — the customer is the source of truth for history. - -Custom transports that constructed `ChatTaskWirePayload` directly need to drop the `messages: UIMessage[]` field and use `message?: UIMessage` (singular). Built-in transports (`TriggerChatTransport`, `AgentChat`) handle the change below the customer-facing surface — most apps need no changes. Configure object-store env vars (`OBJECT_STORE_*`) on your webapp deployment if you haven't already; without an object store and without `hydrateMessages`, conversations don't survive run boundaries. diff --git a/.changeset/chat-agent-on-boot-hook.md b/.changeset/chat-agent-on-boot-hook.md index 86715b31b74..5eaa078e65e 100644 --- a/.changeset/chat-agent-on-boot-hook.md +++ b/.changeset/chat-agent-on-boot-hook.md @@ -18,4 +18,4 @@ export const myChat = chat.agent({ }); ``` -If you previously initialized `chat.local` in `onChatStart`, move it to `onBoot` — `onChatStart` is once-per-chat and won't fire on a continuation, leaving `chat.local` uninitialized when `run()` tries to use it. See the upgrade guide for the migration pattern. +Use `onBoot` (not `onChatStart`) for state setup that must run every time a worker picks up the chat — `onChatStart` fires once per chat and won't run on continuation, leaving `chat.local` uninitialized when `run()` tries to use it. diff --git a/.changeset/chat-agent.md b/.changeset/chat-agent.md index 9ca65682da7..733a8ab22e4 100644 --- a/.changeset/chat-agent.md +++ b/.changeset/chat-agent.md @@ -3,7 +3,7 @@ "@trigger.dev/core": patch --- -Run AI chats as durable Trigger.dev tasks. Define the agent in one function, wire `useChat` to it from React, and the conversation survives page refreshes, network blips, and process restarts — with built-in support for tools, HITL approvals, multi-turn state, and stop-mid-stream cancellation. +**AI Agents** — run AI SDK chat completions as durable Trigger.dev agents instead of fragile API routes. Define an agent in one function, point `useChat` at it from React, and the conversation survives page refreshes, network blips, and process restarts. ```ts import { chat } from "@trigger.dev/sdk/ai"; @@ -21,10 +21,24 @@ export const myChat = chat.agent({ import { useChat } from "@ai-sdk/react"; import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react"; -const transport = useTriggerChatTransport({ task: "my-chat", accessToken }); +const transport = useTriggerChatTransport({ task: "my-chat", accessToken, startSession }); const { messages, sendMessage } = useChat({ transport }); ``` -Lifecycle hooks (`onPreload`, `onTurnStart`, `onTurnComplete`, etc.) cover the common needs around persistence, validation, and post-turn work. `chat.store` gives you a typed shared-data slot the agent and client both read and write. `chat.endRun()` exits cleanly when the agent decides it's done. The transport's `watch` mode lets a dashboard tab observe a run without driving it. +**What you get:** -Drops the pre-Sessions chat stream constants (`CHAT_STREAM_KEY`, `CHAT_MESSAGES_STREAM_ID`, `CHAT_STOP_STREAM_ID`) — migrate to `sessions.open(id).out` / `.in`. +- **AI SDK `useChat` integration** — a custom [`ChatTransport`](https://sdk.vercel.ai/docs/ai-sdk-ui/transport) (`useTriggerChatTransport`) plugs straight into Vercel AI SDK's `useChat` hook. Text streaming, tool calls, reasoning, and `data-*` parts all work natively over Trigger.dev's realtime streams. No custom API routes needed. +- **First-turn fast path (`chat.headStart`)** — opt-in handler that runs the first turn's `streamText` step in your warm server process while the agent run boots in parallel, cutting cold-start TTFC by roughly half (measured 2801ms → 1218ms on `claude-sonnet-4-6`). The agent owns step 2+ (tool execution, persistence, hooks) so heavy deps stay where they belong. Web Fetch handler works natively in Next.js, Hono, SvelteKit, Remix, Workers, etc.; bridge to Express/Fastify/Koa via `chat.toNodeListener`. New `@trigger.dev/sdk/chat-server` subpath. +- **Multi-turn durability via Sessions** — every chat is backed by a durable Session that outlives any individual run. Conversations resume across page refreshes, idle timeout, crashes, and deploys; `resume: true` reconnects via `lastEventId` so clients only see new chunks. `sessions.list` enumerates chats for inbox-style UIs. +- **Auto-accumulated history, delta-only wire** — the backend accumulates the full conversation across turns; clients only ship the new message each turn. Long chats never hit the 512 KiB body cap. Register `hydrateMessages` to be the source of truth yourself. +- **Lifecycle hooks** — `onPreload`, `onChatStart`, `onValidateMessages`, `hydrateMessages`, `onTurnStart`, `onBeforeTurnComplete`, `onTurnComplete`, `onChatSuspend`, `onChatResume` — for persistence, validation, and post-turn work. +- **Stop generation** — client-driven `transport.stopGeneration(chatId)` aborts mid-stream; the run stays alive for the next message, partial response is captured, and aborted parts (stuck `partial-call` tools, in-progress reasoning) are auto-cleaned. +- **Tool approvals (HITL)** — tools with `needsApproval: true` pause until the user approves or denies via `addToolApprovalResponse`. The runtime reconciles the updated assistant message by ID and continues `streamText`. +- **Steering and background injection** — `pendingMessages` injects user messages between tool-call steps so users can steer the agent mid-execution; `chat.inject()` + `chat.defer()` adds context from background work (self-review, RAG, safety checks) between turns. +- **Actions** — non-turn frontend commands (undo, rollback, regenerate, edit) sent via `transport.sendAction`. Fire `hydrateMessages` + `onAction` only — no turn hooks, no `run()`. `onAction` can return a `StreamTextResult` for a model response, or `void` for side-effect-only. +- **Typed state primitives** — `chat.local` for per-run state accessible from hooks, `run()`, tools, and subtasks (auto-serialized through `ai.toolExecute`); `chat.store` for typed shared data between agent and client; `chat.history` for reading and mutating the message chain; `clientDataSchema` for typed `clientData` in every hook. +- **`chat.toStreamTextOptions()`** — one spread into `streamText` wires up versioned system [Prompts](https://trigger.dev/docs/ai/prompts), model resolution, telemetry metadata, compaction, steering, and background injection. +- **Multi-tab coordination** — `multiTab: true` + `useMultiTabChat` prevents duplicate sends and syncs state across browser tabs via `BroadcastChannel`. Non-active tabs go read-only with live updates. +- **Network resilience** — built-in indefinite retry with bounded backoff, reconnect on `online` / tab refocus / bfcache restore, `Last-Event-ID` mid-stream resume. No app code needed. + +See [/docs/ai-chat](https://trigger.dev/docs/ai-chat/overview) for the full surface — quick start, three backend approaches (`chat.agent`, `chat.createSession`, raw task), persistence and code-sandbox patterns, type-level guides, and API reference. diff --git a/.changeset/chat-head-start.md b/.changeset/chat-head-start.md deleted file mode 100644 index 5e33344493f..00000000000 --- a/.changeset/chat-head-start.md +++ /dev/null @@ -1,34 +0,0 @@ ---- -"@trigger.dev/sdk": minor ---- - -Add `chat.headStart` — an opt-in fast-path that runs the first turn's `streamText` step in your warm Next.js / Hono / Workers / Express handler while the trigger agent run boots in parallel. Cold-start TTFC drops by ~50% on the first message; the agent owns step 2+ (tool execution, persistence, hooks) so heavy deps stay where they belong. - -```ts -// app/api/chat/route.ts (Next.js / any Web Fetch framework) -import { chat } from "@trigger.dev/sdk/chat-server"; -import { streamText } from "ai"; -import { openai } from "@ai-sdk/openai"; -import { headStartTools } from "@/lib/chat-tools-schemas"; // schema-only - -export const POST = chat.headStart({ - agentId: "ai-chat", - run: async ({ chat: chatHelper }) => - streamText({ - ...chatHelper.toStreamTextOptions({ tools: headStartTools }), - model: openai("gpt-4o-mini"), - system: "You are a helpful AI assistant.", - }), -}); -``` - -```tsx -// browser — opt in by pointing the transport at your handler -const transport = useTriggerChatTransport({ - task: "ai-chat", - accessToken, - headStart: "/api/chat", // first-turn-only; turn 2+ bypasses the endpoint -}); -``` - -For Node-only frameworks (Express, Fastify, Koa, raw `node:http`) use `chat.toNodeListener(handler)` to bridge the Web Fetch handler to `(req, res)`. Adds a new `@trigger.dev/sdk/chat-server` subpath; bundle stays Web Fetch–only with no `node:*` imports. diff --git a/.changeset/chat-ready-core-additions.md b/.changeset/chat-ready-core-additions.md deleted file mode 100644 index e06db5e2c9f..00000000000 --- a/.changeset/chat-ready-core-additions.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@trigger.dev/core": patch ---- - -Add `ChatChunkTooLargeError` and ApiClient methods for subscribing to session streams. Lays the groundwork for the upcoming `chat.agent`. diff --git a/.changeset/mcp-list-runs-region.md b/.changeset/mcp-list-runs-region.md new file mode 100644 index 00000000000..b72cfb23c97 --- /dev/null +++ b/.changeset/mcp-list-runs-region.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +MCP `list_runs` tool: add a `region` filter input and surface each run's executing region in the formatted summary. diff --git a/.changeset/pre.json b/.changeset/pre.json new file mode 100644 index 00000000000..a5d1b75f8c7 --- /dev/null +++ b/.changeset/pre.json @@ -0,0 +1,22 @@ +{ + "mode": "pre", + "tag": "rc", + "initialVersions": { + "coordinator": "0.0.1", + "docker-provider": "0.0.1", + "kubernetes-provider": "0.0.1", + "supervisor": "0.0.1", + "webapp": "1.0.0", + "@trigger.dev/build": "4.4.6", + "trigger.dev": "4.4.6", + "@trigger.dev/core": "4.4.6", + "@trigger.dev/plugins": "4.4.6", + "@trigger.dev/python": "4.4.6", + "@trigger.dev/react-hooks": "4.4.6", + "@trigger.dev/redis-worker": "4.4.6", + "@trigger.dev/rsc": "4.4.6", + "@trigger.dev/schema-to-json": "4.4.6", + "@trigger.dev/sdk": "4.4.6" + }, + "changesets": [] +} diff --git a/.changeset/runs-list-region-filter.md b/.changeset/runs-list-region-filter.md new file mode 100644 index 00000000000..c487e2d632c --- /dev/null +++ b/.changeset/runs-list-region-filter.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/core": patch +"@trigger.dev/sdk": patch +--- + +Add `region` to the runs list / retrieve API: filter runs by region (`runs.list({ region: "..." })` / `filter[region]=`) and read each run's executing region from the new `region` field on the response. diff --git a/.changeset/sessions-primitive.md b/.changeset/sessions-primitive.md index 20690235c9a..79a6ca48f65 100644 --- a/.changeset/sessions-primitive.md +++ b/.changeset/sessions-primitive.md @@ -3,7 +3,24 @@ "@trigger.dev/core": patch --- -Adds the Sessions primitive — a durable, run-aware stream channel keyed -on a stable `externalId`. Public SDK additions: `tasks.triggerAndSubscribe()` -and the `chat.agent` runtime built on top of Sessions. See -https://trigger.dev/docs/ai-chat/overview for the full feature surface. +**Sessions** — a durable, run-aware stream channel keyed on a stable `externalId`. A Session is the unit of state that owns a multi-run conversation: messages flow through `.in`, responses through `.out`, both survive run boundaries. Sessions back the new `chat.agent` runtime, and you can build on them directly for any pattern that needs durable bi-directional streaming across runs. + +```ts +import { sessions, tasks } from "@trigger.dev/sdk"; + +// Trigger a task and subscribe to its session output in one call +const { runId, stream } = await tasks.triggerAndSubscribe("my-task", payload, { + externalId: "user-456", +}); + +for await (const chunk of stream) { + // ... +} + +// Enumerate existing sessions (powers inbox-style UIs without a separate index) +for await (const s of sessions.list({ type: "chat.agent", tag: "user:user-456" })) { + console.log(s.id, s.externalId, s.createdAt, s.closedAt); +} +``` + +See [/docs/ai-chat/overview](https://trigger.dev/docs/ai-chat/overview) for the full surface — Sessions powers the durable, resumable chat runtime described there. diff --git a/.github/workflows/publish-webapp.yml b/.github/workflows/publish-webapp.yml index 036d65728ed..466eaf855c0 100644 --- a/.github/workflows/publish-webapp.yml +++ b/.github/workflows/publish-webapp.yml @@ -53,12 +53,6 @@ jobs: ref_without_tag=ghcr.io/triggerdotdev/trigger.dev image_tags=$ref_without_tag:${STEPS_GET_TAG_OUTPUTS_TAG} - # if tag is a semver, also tag it as v4 - if [[ "${STEPS_GET_TAG_OUTPUTS_IS_SEMVER}" == true ]]; then - # TODO: switch to v4 tag on GA - image_tags=$image_tags,$ref_without_tag:v4-beta - fi - # when pushing the mutable main tag, also push an immutable-by-convention # full-commit-sha tag so a commit can be resolved to a specific digest if [[ "${STEPS_GET_TAG_OUTPUTS_TAG}" == "main" ]]; then diff --git a/.github/workflows/publish-worker-v4.yml b/.github/workflows/publish-worker-v4.yml index c3b72c6b7d9..6ed490c9471 100644 --- a/.github/workflows/publish-worker-v4.yml +++ b/.github/workflows/publish-worker-v4.yml @@ -68,12 +68,6 @@ jobs: ref_without_tag=ghcr.io/triggerdotdev/${STEPS_GET_REPOSITORY_OUTPUTS_REPO} image_tags=$ref_without_tag:${STEPS_GET_TAG_OUTPUTS_TAG} - # if tag is a semver, also tag it as v4 - if [[ "${STEPS_GET_TAG_OUTPUTS_IS_SEMVER}" == true ]]; then - # TODO: switch to v4 tag on GA - image_tags=$image_tags,$ref_without_tag:v4-beta - fi - echo "image_tags=${image_tags}" >> "$GITHUB_OUTPUT" env: STEPS_GET_REPOSITORY_OUTPUTS_REPO: ${{ steps.get_repository.outputs.repo }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 07af45a8a40..d352752fb0d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -64,6 +64,7 @@ jobs: published: ${{ steps.changesets.outputs.published }} published_packages: ${{ steps.changesets.outputs.publishedPackages }} published_package_version: ${{ steps.get_version.outputs.package_version }} + is_prerelease: ${{ steps.get_version.outputs.is_prerelease }} steps: - name: Checkout repo uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 # zizmor: ignore[artipacked] needs persisted git creds for tag push; no artifact upload here so no leak path @@ -124,6 +125,12 @@ jobs: run: | package_version=$(echo "${STEPS_CHANGESETS_OUTPUTS_PUBLISHEDPACKAGES}" | jq -r '.[0].version') echo "package_version=${package_version}" >> "$GITHUB_OUTPUT" + # Any semver with a hyphen is a prerelease (e.g. 4.5.0-rc.0, 0.0.0-snapshot-...) + if [[ "${package_version}" == *-* ]]; then + echo "is_prerelease=true" >> "$GITHUB_OUTPUT" + else + echo "is_prerelease=false" >> "$GITHUB_OUTPUT" + fi env: STEPS_CHANGESETS_OUTPUTS_PUBLISHEDPACKAGES: ${{ steps.changesets.outputs.publishedPackages }} @@ -133,13 +140,19 @@ jobs: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} RELEASE_PR_BODY: ${{ github.event.pull_request.body }} STEPS_GET_VERSION_OUTPUTS_PACKAGE_VERSION: ${{ steps.get_version.outputs.package_version }} + STEPS_GET_VERSION_OUTPUTS_IS_PRERELEASE: ${{ steps.get_version.outputs.is_prerelease }} run: | VERSION="${STEPS_GET_VERSION_OUTPUTS_PACKAGE_VERSION}" node scripts/generate-github-release.mjs "$VERSION" > /tmp/release-body.md + PRERELEASE_FLAG="" + if [ "${STEPS_GET_VERSION_OUTPUTS_IS_PRERELEASE}" = "true" ]; then + PRERELEASE_FLAG="--prerelease" + fi gh release create "v${VERSION}" \ --title "trigger.dev v${VERSION}" \ --notes-file /tmp/release-body.md \ - --target main + --target main \ + $PRERELEASE_FLAG - name: Create and push Docker tag if: steps.changesets.outputs.published == 'true' @@ -239,7 +252,7 @@ jobs: dispatch-changelog: name: 📝 Dispatch changelog PR needs: [release, update-release] - if: needs.release.outputs.published == 'true' + if: needs.release.outputs.published == 'true' && needs.release.outputs.is_prerelease != 'true' runs-on: ubuntu-latest permissions: {} steps: diff --git a/.server-changes/agent-playground.md b/.server-changes/agent-playground.md new file mode 100644 index 00000000000..f2e0852add7 --- /dev/null +++ b/.server-changes/agent-playground.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +New Agent Playground for testing `chat.agent` tasks interactively — multi-turn chat with tool-call visualization, a side panel for payload / schema / clientData configuration, and trigger-config controls for `maxDuration`, version pin, and region. diff --git a/.server-changes/agent-view-sessions.md b/.server-changes/agent-view-sessions.md deleted file mode 100644 index 757dcdc2f40..00000000000 --- a/.server-changes/agent-view-sessions.md +++ /dev/null @@ -1,12 +0,0 @@ ---- -area: webapp -type: improvement ---- - -Migrate the dashboard Agent tab (span inspector) to subscribe to the backing Session's `.out` and `.in` channels instead of the run-scoped chat output + chat-messages input streams. Pairs with the SDK + MCP migrations on the ai-chat branch. - -- `SpanPresenter.server.ts` extracts `agentSession` from the run payload (prefers `sessionId`, falls back to `chatId` for pre-Sessions agent runs — matches `resolveSessionByIdOrExternalId`). -- Span route threads `agentSession` through `AgentViewAuth` and gates `agentView` creation on having one. -- New dashboard resource route `resources.orgs.../runs.$runParam/realtime/v1/sessions/$sessionId/$io` proxies `S2RealtimeStreams.streamResponseFromSessionStream` under dashboard session auth. The run param binds resource hierarchy; the session identity is verified against the environment. -- `AgentView.tsx` subscribes to `/out` and `/in` URLs, drops local `CHAT_STREAM_KEY`/`CHAT_MESSAGES_STREAM_ID` constants, and parses the `.in` stream as `ChatInputChunk` (`{kind: "message", payload}` for user turns; `{kind: "stop"}` ignored). Output-stream parsing is unchanged — session v2 SSE already delivers UIMessageChunk objects from `record.body.data`. -- Smoke: opened a prior `test-agent` run in the dashboard, Agent tab rendered user + assistant messages end-to-end with zero console errors. Both SSE endpoints (`/out`, `/in`) returned 200. diff --git a/.server-changes/agents-dashboard.md b/.server-changes/agents-dashboard.md new file mode 100644 index 00000000000..1aca65320bb --- /dev/null +++ b/.server-changes/agents-dashboard.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +New Agents page in the dashboard listing every `chat.agent` task in the environment with active/inactive status and run counts, plus fuzzy search for navigating large agent catalogs. diff --git a/.server-changes/ai-span-inspector.md b/.server-changes/ai-span-inspector.md new file mode 100644 index 00000000000..41f7a5dea90 --- /dev/null +++ b/.server-changes/ai-span-inspector.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +AI generation spans in the run trace get a dedicated inspector showing model, provider, token counts, cost, token speed, finish reason, service tier, tool count, and a link to the prompt version that produced the generation. diff --git a/.server-changes/models-registry.md b/.server-changes/models-registry.md new file mode 100644 index 00000000000..ee87f625868 --- /dev/null +++ b/.server-changes/models-registry.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +New Models page in the dashboard: a provider-grouped catalog of LLMs (OpenAI, Anthropic, Google, etc.) with pricing, capabilities, and cross-tenant usage metrics, plus per-model detail pages with token / cost / latency charts and a side-by-side compare panel. diff --git a/.server-changes/playground-trigger-config-fields.md b/.server-changes/playground-trigger-config-fields.md deleted file mode 100644 index 8a811c9dd1d..00000000000 --- a/.server-changes/playground-trigger-config-fields.md +++ /dev/null @@ -1,8 +0,0 @@ ---- -area: webapp -type: fix ---- - -Playground action now forwards `maxDuration`, `version` (as `lockToVersion`), and `region` from the sidebar form into the Session's `triggerConfig`. Previously the form fields rendered as working controls but were silently dropped (`void`-suppressed) because `SessionTriggerConfig` didn't accept them — runs ignored the user's max duration, version pin, and region selection. With the schema extended in core, the playground now plumbs them through to `ensureRunForSession`. - -Also fixes stale `clientData` in the playground transport: the JSON editor's value was captured at construction and never updated, so per-turn `metadata` merges used the original value across the whole conversation. Added a `useEffect` that calls `transport.setClientData(...)` whenever `clientDataJson` changes. diff --git a/.server-changes/prompts-dashboard.md b/.server-changes/prompts-dashboard.md new file mode 100644 index 00000000000..10397b9da22 --- /dev/null +++ b/.server-changes/prompts-dashboard.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +New Prompts page in the dashboard: list view with per-prompt usage sparklines, detail view with the template alongside Generations / Metrics / Versions tabs, and a dashboard override UI for changing the template text or model without redeploying. diff --git a/.server-changes/run-agent-view.md b/.server-changes/run-agent-view.md index 90833d9729f..570351f89ed 100644 --- a/.server-changes/run-agent-view.md +++ b/.server-changes/run-agent-view.md @@ -3,4 +3,4 @@ area: webapp type: feature --- -Add an Agent view to the run details page for runs whose `taskKind` annotation is `AGENT`. The view renders the agent's `UIMessage` conversation by subscribing to the backing Session's `.out` and `.in` channels — the same data source as the Agent Playground content view. Switching is via a `Trace view` / `Agent view` segmented control above the run body, and the selected view is reflected in the URL via `?view=agent` so it's shareable. +Run detail page gains an Agent view alongside the Trace view, rendering the agent's `UIMessage` conversation in real time from the backing Session for any run whose `taskKind` is `AGENT`. diff --git a/.server-changes/runs-task-source-filter.md b/.server-changes/runs-task-source-filter.md new file mode 100644 index 00000000000..70c8e2ff895 --- /dev/null +++ b/.server-changes/runs-task-source-filter.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Task source filter on the Runs list — slice runs by Standard, Scheduled, or Agent so agent runs can be separated from mixed workloads at a glance. diff --git a/.server-changes/sessions-dashboard-and-task-source-filter.md b/.server-changes/sessions-dashboard-and-task-source-filter.md deleted file mode 100644 index c3a727c4325..00000000000 --- a/.server-changes/sessions-dashboard-and-task-source-filter.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: webapp -type: feature ---- - -New Sessions page in the dashboard for inspecting `chat.agent` Session rows alongside their underlying runs, plus a "Task source" filter on the Runs list (Standard / Scheduled / Agent) so agent runs can be sliced out of mixed workloads at a glance. diff --git a/.server-changes/sessions-dashboard.md b/.server-changes/sessions-dashboard.md new file mode 100644 index 00000000000..7adc299aec6 --- /dev/null +++ b/.server-changes/sessions-dashboard.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +New Sessions page in the dashboard for inspecting `chat.agent` Session rows alongside their underlying runs, with filters by status, type, task identifier, and period, and a detail view that streams the live conversation from the backing Session's `.out` and `.in` channels. diff --git a/.server-changes/task-metadata-cache.md b/.server-changes/task-metadata-cache.md new file mode 100644 index 00000000000..a71bbdf347b --- /dev/null +++ b/.server-changes/task-metadata-cache.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Cache task defaults in Redis so the trigger API skips per-request database lookups, restoring the fast trigger path when callers pass queue and TTL options. diff --git a/apps/webapp/app/components/BulkActionFilterSummary.tsx b/apps/webapp/app/components/BulkActionFilterSummary.tsx index c5d1a2f48d7..a2eabc879de 100644 --- a/apps/webapp/app/components/BulkActionFilterSummary.tsx +++ b/apps/webapp/app/components/BulkActionFilterSummary.tsx @@ -215,6 +215,19 @@ export function BulkActionFilterSummary({ /> ); } + case "regions": { + const values = Array.isArray(value) ? value : [`${value}`]; + return ( + + ); + } case "machines": { const values = Array.isArray(value) ? value : [`${value}`]; return ( diff --git a/apps/webapp/app/components/runs/v3/RegionLabel.tsx b/apps/webapp/app/components/runs/v3/RegionLabel.tsx new file mode 100644 index 00000000000..015e8fe9152 --- /dev/null +++ b/apps/webapp/app/components/runs/v3/RegionLabel.tsx @@ -0,0 +1,22 @@ +import { FlagIcon } from "~/assets/icons/RegionIcons"; +import { cn } from "~/utils/cn"; + +type RegionLabelProps = { + region: { + name: string; + location?: string | null; + }; + className?: string; + iconClassName?: string; +}; + +export function RegionLabel({ region, className, iconClassName }: RegionLabelProps) { + return ( + + {region.location ? ( + + ) : null} + {region.name} + + ); +} diff --git a/apps/webapp/app/components/runs/v3/RunFilters.tsx b/apps/webapp/app/components/runs/v3/RunFilters.tsx index c27ac1bc187..097b388caaa 100644 --- a/apps/webapp/app/components/runs/v3/RunFilters.tsx +++ b/apps/webapp/app/components/runs/v3/RunFilters.tsx @@ -4,6 +4,7 @@ import { ClockIcon, CpuChipIcon, FingerPrintIcon, + GlobeAltIcon, PlusIcon, RectangleStackIcon, Squares2X2Icon, @@ -61,6 +62,8 @@ import { useShortcutKeys } from "~/hooks/useShortcutKeys"; import { ShortcutKey } from "~/components/primitives/ShortcutKey"; import { type loader as tagsLoader } from "~/routes/resources.environments.$envId.runs.tags"; import { type loader as queuesLoader } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues"; +import { useRegions } from "~/hooks/useRegions"; +import { RegionLabel } from "./RegionLabel"; import { type loader as versionsLoader } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.versions"; import { Button } from "../../primitives/Buttons"; import { AIFilterInput } from "./AIFilterInput"; @@ -187,6 +190,9 @@ export const TaskRunListSearchFilters = z.object({ "Schedule ID to filter by - shows runs from a specific schedule. They start with sched_" ), queues: StringOrStringArray.describe("Queue names to filter by (these are user-defined names)"), + regions: StringOrStringArray.describe( + "Region master-queue identifiers to filter by (the worker instance group masterQueue values)" + ), machines: MachinePresetOrMachinePresetArray.describe( `Machine presets to filter by (${machines.join(", ")})` ), @@ -229,6 +235,8 @@ export function filterTitle(filterKey: string) { return "Schedule ID"; case "queues": return "Queues"; + case "regions": + return "Region"; case "machines": return "Machine"; case "versions": @@ -271,6 +279,8 @@ export function filterIcon(filterKey: string): ReactNode | undefined { return ; case "queues": return ; + case "regions": + return ; case "machines": return ; case "versions": @@ -317,6 +327,10 @@ export function getRunFiltersFromSearchParams( searchParams.getAll("queues").filter((v) => v.length > 0).length > 0 ? searchParams.getAll("queues") : undefined, + regions: + searchParams.getAll("regions").filter((v) => v.length > 0).length > 0 + ? searchParams.getAll("regions") + : undefined, machines: searchParams.getAll("machines").filter((v) => v.length > 0).length > 0 ? searchParams.getAll("machines") @@ -369,6 +383,7 @@ export function RunsFilters(props: RunFiltersProps) { searchParams.has("runId") || searchParams.has("scheduleId") || searchParams.has("queues") || + searchParams.has("regions") || searchParams.has("machines") || searchParams.has("versions") || searchParams.has("errorId") || @@ -402,6 +417,7 @@ const filterTypes = [ { name: "tags", title: "Tags", icon: }, { name: "versions", title: "Versions", icon: }, { name: "queues", title: "Queues", icon: }, + { name: "regions", title: "Region", icon: }, { name: "machines", title: "Machines", icon: }, { name: "run", title: "Run ID", icon: }, { name: "batch", title: "Batch ID", icon: }, @@ -456,6 +472,7 @@ function AppliedFilters({ bulkActions }: RunFiltersProps) { + @@ -485,6 +502,8 @@ function Menu(props: MenuProps) { return props.setFilterType(undefined)} {...props} />; case "queues": return props.setFilterType(undefined)} {...props} />; + case "regions": + return props.setFilterType(undefined)} {...props} />; case "machines": return props.setFilterType(undefined)} {...props} />; case "run": @@ -503,11 +522,14 @@ function Menu(props: MenuProps) { } function MainMenu({ searchValue, trigger, clearSearchValue, setFilterType }: MenuProps) { + const environment = useEnvironment(); + const showRegion = environment.type !== "DEVELOPMENT"; const filtered = useMemo(() => { return filterTypes.filter((item) => { + if (item.name === "regions" && !showRegion) return false; return item.title.toLowerCase().includes(searchValue.toLowerCase()); }); - }, [searchValue]); + }, [searchValue, showRegion]); return ( @@ -1260,6 +1282,138 @@ function AppliedQueuesFilter() { ); } +function RegionsDropdown({ + trigger, + clearSearchValue, + searchValue, + onClose, +}: { + trigger: ReactNode; + clearSearchValue: () => void; + searchValue: string; + onClose?: () => void; +}) { + const { values, replace } = useSearchParams(); + const regions = useRegions(); + + const handleChange = (values: string[]) => { + clearSearchValue(); + replace({ + regions: values.length > 0 ? values : undefined, + cursor: undefined, + direction: undefined, + }); + }; + + const selected = values("regions").filter((v) => v !== ""); + + const filtered = useMemo(() => { + type RegionItem = { masterQueue: string; name: string; location?: string }; + const items: RegionItem[] = []; + + for (const masterQueue of selected) { + const known = regions.find((r) => r.masterQueue === masterQueue); + if (!known) { + items.push({ masterQueue, name: masterQueue }); + } + } + + for (const region of regions) { + if (!items.some((i) => i.masterQueue === region.masterQueue)) { + items.push({ + masterQueue: region.masterQueue, + name: region.name, + location: region.location, + }); + } + } + + return matchSorter(items, searchValue, { keys: ["name", "masterQueue"] }); + }, [searchValue, regions, selected.join(",")]); + + return ( + + {trigger} + { + if (onClose) { + onClose(); + return false; + } + return true; + }} + > + ( +
+ +
+ )} + /> + + {filtered.length > 0 + ? filtered.map((region) => ( + + + + )) + : null} + {filtered.length === 0 && No regions found} + +
+
+ ); +} + +function AppliedRegionsFilter() { + const { values, del } = useSearchParams(); + const environment = useEnvironment(); + const knownRegions = useRegions(); + + const regions = values("regions"); + + if (environment.type === "DEVELOPMENT") { + return null; + } + + if (regions.length === 0 || regions.every((v) => v === "")) { + return null; + } + + const labels = regions.map((mq) => { + const match = knownRegions.find((r) => r.masterQueue === mq); + return match?.name ?? mq; + }); + + return ( + + {(search, setSearch) => ( + }> + del(["regions", "cursor", "direction"])} + variant="secondary/small" + /> + + } + searchValue={search} + clearSearchValue={() => setSearch("")} + /> + )} + + ); +} + function MachinesDropdown({ trigger, clearSearchValue, diff --git a/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx b/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx index bf8337baa10..5e645dab877 100644 --- a/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx +++ b/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx @@ -23,6 +23,7 @@ import { useSelectedItems } from "~/components/primitives/SelectedItemsProvider" import { SimpleTooltip } from "~/components/primitives/Tooltip"; import { TruncatedCopyableValue } from "~/components/primitives/TruncatedCopyableValue"; import { useEnvironment } from "~/hooks/useEnvironment"; +import { useRegions } from "~/hooks/useRegions"; import { useFeatures } from "~/hooks/useFeatures"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; @@ -47,6 +48,7 @@ import { type TableVariant, } from "../../primitives/Table"; import { CancelRunDialog } from "./CancelRunDialog"; +import { RegionLabel } from "./RegionLabel"; import { LiveTimer } from "./LiveTimer"; import { ReplayRunDialog } from "./ReplayRunDialog"; import { RunTag } from "./RunTag"; @@ -86,8 +88,11 @@ export function TaskRunsTable({ variant = "dimmed", additionalTableState, }: RunsTableProps) { + const regions = useRegions(); + const regionByMasterQueue = new Map(regions.map((r) => [r.masterQueue, r] as const)); const organization = useOrganization(); const project = useProject(); + const environment = useEnvironment(); const checkboxes = useRef<(HTMLInputElement | null)[]>([]); const { has, hasAll, select, deselect, toggle } = useSelectedItems(allowSelection); const { isManagedCloud } = useFeatures(); @@ -107,6 +112,7 @@ export function TaskRunsTable({ const tableStateParam = disableAdjacentRows ? "" : encodeURIComponent(search); const showCompute = isManagedCloud; + const showRegion = environment.type !== "DEVELOPMENT"; const navigateCheckboxes = useCallback( (event: React.KeyboardEvent, index: number) => { @@ -233,6 +239,7 @@ export function TaskRunsTable({ Machine Queue + {showRegion && Region} Test Created at {total === 0 && !hasFilters ? ( - + {!isLoading && } ) : runs.length === 0 ? ( - + ) : ( runs.map((run, index) => { const searchParams = new URLSearchParams(); @@ -441,6 +448,20 @@ export function TaskRunsTable({ {run.queue.name} + {showRegion && ( + + {run.region ? ( + + ) : ( + "–" + )} + + )} {run.isTest ? ( @@ -467,7 +488,7 @@ export function TaskRunsTable({ )} {isLoading && ( Loading… @@ -603,11 +624,16 @@ function NoRuns({ title }: { title: string }) { ); } -function BlankState({ isLoading, filters }: Pick) { +function BlankState({ + isLoading, + filters, + showRegion, +}: Pick & { showRegion: boolean }) { const organization = useOrganization(); const project = useProject(); const environment = useEnvironment(); - if (isLoading) return ; + const colSpan = showRegion ? 16 : 15; + if (isLoading) return ; const { tasks, from, to, ...otherFilters } = filters; const singleTaskFromFilters = filters.tasks.length === 1 ? filters.tasks[0] : null; @@ -622,7 +648,7 @@ function BlankState({ isLoading, filters }: Pick filterArray.length === 0) ) { return ( - + There are no runs for {filters.tasks[0]} @@ -650,7 +676,7 @@ function BlankState({ isLoading, filters }: Pick +
No runs match your filters. Try refreshing, modifying your filters or run a test. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 97cccbc1710..8eacb9634e1 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -235,6 +235,30 @@ const EnvironmentSchema = z CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), CACHE_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + TASK_META_CACHE_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + TASK_META_CACHE_REDIS_PORT: z.coerce + .number() + .optional() + .transform( + (v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined) + ), + TASK_META_CACHE_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + TASK_META_CACHE_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + TASK_META_CACHE_REDIS_TLS_DISABLED: z + .string() + .default(process.env.REDIS_TLS_DISABLED ?? "false"), + TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS: z.coerce.number().default(86400), + TASK_META_CACHE_BY_WORKER_TTL_SECONDS: z.coerce.number().default(2592000), + REALTIME_STREAMS_REDIS_HOST: z .string() .optional() diff --git a/apps/webapp/app/hooks/useRegions.tsx b/apps/webapp/app/hooks/useRegions.tsx new file mode 100644 index 00000000000..fe696440473 --- /dev/null +++ b/apps/webapp/app/hooks/useRegions.tsx @@ -0,0 +1,16 @@ +import { type UIMatch } from "@remix-run/react"; +import { type UseDataFunctionReturn } from "remix-typedjson"; +import type { loader as orgLoader } from "~/routes/_app.orgs.$organizationSlug/route"; +import { organizationMatchId } from "./useOrganizations"; +import { useTypedMatchesData } from "./useTypedMatchData"; + +export type MatchedRegion = UseDataFunctionReturn["regions"][number]; + +export function useRegions(matches?: UIMatch[]): MatchedRegion[] { + const routeMatch = useTypedMatchesData({ + id: organizationMatchId, + matches, + }); + + return routeMatch?.regions ?? []; +} diff --git a/apps/webapp/app/presenters/RunFilters.server.ts b/apps/webapp/app/presenters/RunFilters.server.ts index 44bb4c01f50..4254dc83e61 100644 --- a/apps/webapp/app/presenters/RunFilters.server.ts +++ b/apps/webapp/app/presenters/RunFilters.server.ts @@ -34,6 +34,7 @@ export async function getRunFiltersFromRequest(request: Request): Promise { return value ? value.split(",") : undefined; }), + "filter[region]": z + .string() + .optional() + .transform((value) => { + return value ? value.split(",") : undefined; + }), "filter[machine]": z .string() .optional() @@ -255,6 +261,10 @@ export class ApiRunListPresenter extends BasePresenter { options.queues = searchParams["filter[queue]"]; } + if (searchParams["filter[region]"]) { + options.regions = searchParams["filter[region]"]; + } + if (searchParams["filter[machine]"]) { options.machines = searchParams["filter[machine]"]; } @@ -308,6 +318,7 @@ export class ApiRunListPresenter extends BasePresenter { // Match `NextRunListPresenter`'s "STANDARD" fallback so API // consumers and the dashboard see the same value. taskKind: run.taskKind || "STANDARD", + region: run.region ?? undefined, ...ApiRetrieveRunPresenter.apiBooleanHelpersFromRunStatus( ApiRetrieveRunPresenter.apiStatusFromRunStatus(run.status, apiVersion) ), diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index f0a2d363d61..ffa8d5df91e 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -33,6 +33,7 @@ export type RunListOptions = { batchId?: string; runId?: string[]; queues?: string[]; + regions?: string[]; machines?: MachinePresetName[]; errorId?: string; sources?: string[]; @@ -72,6 +73,7 @@ export class NextRunListPresenter { batchId, runId, queues, + regions, machines, errorId, sources, @@ -102,6 +104,7 @@ export class NextRunListPresenter { batchId !== undefined || (runId !== undefined && runId.length > 0) || (queues !== undefined && queues.length > 0) || + (regions !== undefined && regions.length > 0) || (machines !== undefined && machines.length > 0) || (errorId !== undefined && errorId !== "") || typeof isTest === "boolean" || @@ -188,6 +191,7 @@ export class NextRunListPresenter { runId, bulkId, queues, + regions, machines, errorId, taskKinds: sources, @@ -255,6 +259,7 @@ export class NextRunListPresenter { name: run.queue.replace("task/", ""), type: run.queue.startsWith("task/") ? "task" : "custom", }, + region: run.workerQueue ? run.workerQueue : undefined, taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD", }; }), diff --git a/apps/webapp/app/presenters/v3/RegionsPresenter.server.ts b/apps/webapp/app/presenters/v3/RegionsPresenter.server.ts index 2feb29a9968..2dd5a448cb4 100644 --- a/apps/webapp/app/presenters/v3/RegionsPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RegionsPresenter.server.ts @@ -10,6 +10,7 @@ import { getCurrentPlan } from "~/services/platform.v3.server"; export type Region = { id: string; name: string; + masterQueue: string; description?: string; cloudProvider?: string; location?: string; @@ -73,6 +74,7 @@ export class RegionsPresenter extends BasePresenter { select: { id: true, name: true, + masterQueue: true, description: true, cloudProvider: true, location: true, @@ -96,6 +98,7 @@ export class RegionsPresenter extends BasePresenter { const regions: Region[] = visibleRegions.map((region) => ({ id: region.id, name: region.name, + masterQueue: region.masterQueue, description: region.description ?? undefined, cloudProvider: region.cloudProvider ?? undefined, location: region.location ?? undefined, @@ -110,6 +113,7 @@ export class RegionsPresenter extends BasePresenter { select: { id: true, name: true, + masterQueue: true, description: true, cloudProvider: true, location: true, @@ -130,6 +134,7 @@ export class RegionsPresenter extends BasePresenter { regions.push({ id: defaultWorkerGroup.id, name: defaultWorkerGroup.name, + masterQueue: defaultWorkerGroup.masterQueue, description: defaultWorkerGroup.description ?? undefined, cloudProvider: defaultWorkerGroup.cloudProvider ?? undefined, location: defaultWorkerGroup.location ?? undefined, diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx index 6fea018af54..1ad36854dcf 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx @@ -7,6 +7,7 @@ import { prisma } from "~/db.server"; import { useOptionalOrganization } from "~/hooks/useOrganizations"; import { useTypedMatchesData } from "~/hooks/useTypedMatchData"; import { OrganizationsPresenter } from "~/presenters/OrganizationsPresenter.server"; +import { RegionsPresenter, type Region } from "~/presenters/v3/RegionsPresenter.server"; import { getImpersonationId } from "~/services/impersonation.server"; import { getCachedUsage, getCurrentPlan } from "~/services/platform.v3.server"; import { requireUser } from "~/services/session.server"; @@ -88,7 +89,10 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { firstDayOfNextMonth.setUTCDate(1); firstDayOfNextMonth.setUTCHours(0, 0, 0, 0); - const [plan, usage, customDashboards] = await Promise.all([ + const shouldLoadRegions = + !!projectParam && !!environment && environment.type !== "DEVELOPMENT"; + + const [plan, usage, customDashboards, regions] = await Promise.all([ getCurrentPlan(organization.id), getCachedUsage(organization.id, { from: firstDayOfMonth, to: firstDayOfNextMonth }), prisma.metricsDashboard.findMany({ @@ -100,6 +104,12 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }, orderBy: { createdAt: "desc" }, }), + shouldLoadRegions + ? new RegionsPresenter() + .call({ userId: user.id, projectSlug: projectParam! }) + .then(({ regions }) => regions) + .catch(() => [] as Region[]) + : Promise.resolve([] as Region[]), ]); let hasExceededFreeTier = false; @@ -147,6 +157,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { organization, project, environment, + regions, isImpersonating: !!impersonationId, currentPlan: { ...plan, v3Usage: { ...usage, hasExceededFreeTier, usagePercentage } }, customDashboards: customDashboardsWithWidgetCount, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index 3e4c231cc1f..09f3f33fcb3 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -22,7 +22,7 @@ import { assertNever } from "assert-never"; import { useEffect, useState } from "react"; import { typedjson, useTypedFetcher } from "remix-typedjson"; import { ExitIcon } from "~/assets/icons/ExitIcon"; -import { FlagIcon } from "~/assets/icons/RegionIcons"; +import { RegionLabel } from "~/components/runs/v3/RegionLabel"; import { AdminDebugRun } from "~/components/admin/debugRun"; import { CodeBlock } from "~/components/code/CodeBlock"; import { EnvironmentCombo } from "~/components/environments/EnvironmentLabel"; @@ -972,12 +972,7 @@ function RunBody({ Region - - {run.region.location ? ( - - ) : null} - {run.region.name} - + )} diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index 4b4298bc935..2fc35fc8435 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -17,6 +17,8 @@ import { tryCatch } from "@trigger.dev/core/v3"; import { ServiceValidationError } from "~/v3/services/common.server"; import { createCache, createLRUMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache"; import { singleton } from "~/utils/singleton"; +import type { TaskMetadataCache, TaskMetadataEntry } from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; // LRU cache for environment queue sizes to reduce Redis calls const queueSizeCache = singleton("queueSizeCache", () => { @@ -63,13 +65,16 @@ function extractQueueName(queue: { name?: unknown } | undefined): string | undef export class DefaultQueueManager implements QueueManager { private readonly replicaPrisma: PrismaClientOrTransaction; + private readonly taskMetaCache: TaskMetadataCache; constructor( private readonly prisma: PrismaClientOrTransaction, private readonly engine: RunEngine, - replicaPrisma?: PrismaClientOrTransaction + replicaPrisma?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance ) { this.replicaPrisma = replicaPrisma ?? prisma; + this.taskMetaCache = taskMetaCache; } async resolveQueueProperties( @@ -87,7 +92,10 @@ export class DefaultQueueManager implements QueueManager { const specifiedQueueName = extractQueueName(request.body.options?.queue); if (specifiedQueueName) { - // A specific queue name is provided, validate it exists for the locked worker + // A specific queue name is provided, validate it exists for the locked worker. + // Pre-existing query — not cached because TaskQueue rows can be added or + // removed independently of BackgroundWorkerTask, and a stale "queue exists" + // claim would silently route to the wrong queue. const specifiedQueue = await this.prisma.taskQueue.findFirst({ where: { name: specifiedQueueName, @@ -107,49 +115,45 @@ export class DefaultQueueManager implements QueueManager { queueName = specifiedQueue.name; lockedQueueId = specifiedQueue.id; - // Always fetch the task so we can resolve `triggerSource` (which - // becomes `taskKind` on annotations and replicates to ClickHouse). - // Without this, AGENT/SCHEDULED runs triggered with - // `lockToVersion` + a queue override would be annotated as - // STANDARD and disappear from the run-list "Source" filter. - // `ttl` is read from the same row but only used when the caller - // didn't specify a per-trigger TTL. - const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: lockedBackgroundWorker.id, - runtimeEnvironmentId: request.environment.id, - slug: request.taskId, - }, - select: { ttl: true, triggerSource: true }, - }); + // Pull `triggerSource` (for `taskKind` annotation) and `ttl` from cache. + // On cache hit this is 0 PG queries; on miss the helper falls back to + // a BackgroundWorkerTask lookup and back-fills the cache. + // + // If the task slug isn't on this locked worker version, we tolerate + // the missing row and fall through with `taskKind = undefined` + // (coalesced to "STANDARD" downstream) and `taskTtl = undefined`. + // This matches main's pre-PR behavior — the no-override branch below + // still throws because there's no queue to route to in that case, + // but here the caller already named the queue. + const lockedMeta = await this.resolveLockedTaskMetadata( + lockedBackgroundWorker.id, + request.environment.id, + request.taskId + ); if (request.body.options?.ttl === undefined) { - taskTtl = lockedTask?.ttl; + taskTtl = lockedMeta?.ttl ?? undefined; } - taskKind = lockedTask?.triggerSource; + taskKind = lockedMeta?.triggerSource; } else { - // No queue override - fetch task with queue to get both default queue and TTL - const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: lockedBackgroundWorker.id, - runtimeEnvironmentId: request.environment.id, - slug: request.taskId, - }, - include: { - queue: true, - }, - }); + // No queue override - resolve default queue + TTL + triggerSource via cache, + // falling back to a single BackgroundWorkerTask lookup on miss. + const lockedMeta = await this.resolveLockedTaskMetadata( + lockedBackgroundWorker.id, + request.environment.id, + request.taskId + ); - if (!lockedTask) { + if (!lockedMeta) { throw new ServiceValidationError( `Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "" }'.` ); } - taskTtl = lockedTask.ttl; + taskTtl = lockedMeta.ttl; - if (!lockedTask.queue) { + if (!lockedMeta.queueName) { // This case should ideally be prevented by earlier checks or schema constraints, // but handle it defensively. logger.error("Task found on locked version, but has no associated queue record", { @@ -164,9 +168,9 @@ export class DefaultQueueManager implements QueueManager { } // Use the task's default queue name - queueName = lockedTask.queue.name; - lockedQueueId = lockedTask.queue.id; - taskKind = lockedTask.triggerSource; + queueName = lockedMeta.queueName; + lockedQueueId = lockedMeta.queueId ?? undefined; + taskKind = lockedMeta.triggerSource; } } else { // Task is not locked to a specific version, use regular logic @@ -213,76 +217,130 @@ export class DefaultQueueManager implements QueueManager { const defaultQueueName = `task/${taskId}`; - // Even when the caller provides both a queue override and a - // per-trigger TTL, we still need to fetch the task so `triggerSource` - // (which becomes `taskKind` on annotations and replicates to - // ClickHouse) is populated. Without it, AGENT/SCHEDULED runs hitting - // this path get stamped as STANDARD and disappear from the - // dashboard's `Source` filter. Mirrors the locked-worker fix above - // — `taskTtl` is harmless in the returned value because the call - // site coalesces `body.options.ttl ?? taskTtl`. - - // Find the current worker for the environment. Replica is fine here — - // the adjacent `backgroundWorkerTask` lookups below already use - // `replicaPrisma` (replica lag for "just deployed" is bounded the same - // way for both queries; reading the worker from the writer and the - // task from the replica would only widen the inconsistency window). - const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma); + // Resolve the current worker's task metadata via cache (HGET on warm path, + // BackgroundWorkerTask findFirst + cache back-fill on miss). When this hits, + // both the queue-override + TTL caller and the default-queue caller satisfy + // their full result without any database query. + const meta = await this.resolveCurrentTaskMetadata(environment, taskId); + + if (overriddenQueueName) { + // Caller already named the queue. We only need triggerSource (for taskKind) + // and ttl (for the call site to coalesce against body.options.ttl). + return { + queueName: overriddenQueueName, + taskTtl: meta?.ttl ?? undefined, + taskKind: meta?.triggerSource, + }; + } - if (!worker) { - logger.debug("Failed to get queue name: No worker found", { + if (!meta) { + logger.debug("Failed to get queue name: No worker or task found", { taskId, environmentId: environment.id, }); - - return { queueName: overriddenQueueName ?? defaultQueueName, taskTtl: undefined }; + return { queueName: defaultQueueName, taskTtl: undefined }; } - // When queue is overridden, we only need TTL from the task (no queue join needed) - if (overriddenQueueName) { - const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: worker.id, - runtimeEnvironmentId: environment.id, - slug: taskId, - }, - select: { ttl: true, triggerSource: true }, + if (!meta.queueName) { + logger.debug("Failed to get queue name: No queue found", { + taskId, + environmentId: environment.id, }); - - return { queueName: overriddenQueueName, taskTtl: task?.ttl, taskKind: task?.triggerSource }; + return { queueName: defaultQueueName, taskTtl: meta.ttl, taskKind: meta.triggerSource }; } - const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { - workerId: worker.id, - runtimeEnvironmentId: environment.id, - slug: taskId, - }, - include: { - queue: true, + return { queueName: meta.queueName, taskTtl: meta.ttl, taskKind: meta.triggerSource }; + } + + /** + * Resolve task metadata for a locked-version trigger. Reads from the + * `task-meta:by-worker:{workerId}` Redis hash; falls back to a single + * BackgroundWorkerTask findFirst on miss and back-fills the cache. + * + * Returns null when no BackgroundWorkerTask row exists. + */ + private async resolveLockedTaskMetadata( + workerId: string, + environmentId: string, + slug: string + ): Promise { + const cached = await this.taskMetaCache.getByWorker(workerId, slug); + if (cached) return cached; + + const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({ + where: { workerId, runtimeEnvironmentId: environmentId, slug }, + select: { + ttl: true, + triggerSource: true, + queue: { select: { id: true, name: true } }, }, }); - if (!task) { - console.log("Failed to get queue name: No task found", { - taskId, - environmentId: environment.id, - }); + if (!row) return null; - return { queueName: defaultQueueName, taskTtl: undefined }; - } + const entry: TaskMetadataEntry = { + slug, + ttl: row.ttl, + triggerSource: row.triggerSource, + queueId: row.queue?.id ?? null, + queueName: row.queue?.name ?? "", + }; - if (!task.queue) { - console.log("Failed to get queue name: No queue found", { - taskId, - environmentId: environment.id, - queueConfig: task.queueConfig, - }); + // Fire-and-forget back-fill — `setByWorker` upserts the single field and + // refreshes the hash TTL. Errors are logged inside the cache and swallowed. + void this.taskMetaCache.setByWorker(workerId, entry); - return { queueName: defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource }; - } + return entry; + } + + /** + * Resolve task metadata for a non-locked trigger. Reads from the + * `task-meta:env:{envId}` Redis hash; falls back to + * findCurrentWorkerFromEnvironment + a single BackgroundWorkerTask findFirst + * on miss and back-fills both keyspaces. + * + * Returns null when no current worker or task can be resolved. + */ + private async resolveCurrentTaskMetadata( + environment: AuthenticatedEnvironment, + slug: string + ): Promise { + const cached = await this.taskMetaCache.getCurrent(environment.id, slug); + if (cached) return cached; + + // Cold cache: discover the current worker for the env. Replica is fine — + // the adjacent BackgroundWorkerTask lookup below uses `replicaPrisma` too + // (replica lag for "just deployed" is bounded the same way for both + // queries; reading from the writer here would only widen the window). + const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma); + if (!worker) return null; + + const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({ + where: { workerId: worker.id, runtimeEnvironmentId: environment.id, slug }, + select: { + ttl: true, + triggerSource: true, + queue: { select: { id: true, name: true } }, + }, + }); + + if (!row) return null; + + const entry: TaskMetadataEntry = { + slug, + ttl: row.ttl, + triggerSource: row.triggerSource, + queueId: row.queue?.id ?? null, + queueName: row.queue?.name ?? "", + }; + + // Fire-and-forget back-fill — atomically upserts the slug into both + // keyspaces so a subsequent locked-or-not trigger hits the cache. The + // env-keyspace TTL is preserved (promotion owns it); the by-worker TTL + // is refreshed (sliding window keeps active workers warm). + void this.taskMetaCache.setByCurrentWorker(environment.id, worker.id, entry); - return { queueName: task.queue.name ?? defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource }; + return entry; } async validateQueueLimits( diff --git a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index 49725d2cefb..7a59179c55e 100644 --- a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -151,6 +151,7 @@ export class ClickHouseRunsRepository implements IRunsRepository { metadataType: true, machinePreset: true, queue: true, + workerQueue: true, annotations: true, }, }); @@ -324,6 +325,10 @@ function applyRunFiltersToQueryBuilder( queryBuilder.where("queue IN {queues: Array(String)}", { queues: options.queues }); } + if (options.regions && options.regions.length > 0) { + queryBuilder.where("worker_queue IN {regions: Array(String)}", { regions: options.regions }); + } + if (options.machines && options.machines.length > 0) { queryBuilder.where("machine_preset IN {machines: Array(String)}", { machines: options.machines, diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts index 68c9da63098..9a0a4a19746 100644 --- a/apps/webapp/app/services/runsRepository/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -40,6 +40,7 @@ const RunListInputOptionsSchema = z.object({ runId: z.array(z.string()).optional(), bulkId: z.string().optional(), queues: z.array(z.string()).optional(), + regions: z.array(z.string()).optional(), machines: MachinePresetName.array().optional(), errorId: z.string().optional(), taskKinds: z.array(z.string()).optional(), @@ -104,6 +105,7 @@ export type ListedRun = Prisma.TaskRunGetPayload<{ metadataType: true; machinePreset: true; queue: true; + workerQueue: true; annotations: true; }; }>; diff --git a/apps/webapp/app/services/taskMetadataCache.server.ts b/apps/webapp/app/services/taskMetadataCache.server.ts new file mode 100644 index 00000000000..6130295a73f --- /dev/null +++ b/apps/webapp/app/services/taskMetadataCache.server.ts @@ -0,0 +1,427 @@ +import type { Redis, Result, Callback } from "ioredis"; +import type { TaskTriggerSource } from "@trigger.dev/database"; +import { logger } from "./logger.server"; + +export type TaskMetadataEntry = { + slug: string; + ttl: string | null; + triggerSource: TaskTriggerSource; + queueId: string | null; + queueName: string; +}; + +export interface TaskMetadataCache { + /** Read a slug's metadata from the env keyspace (current pointer). */ + getCurrent(envId: string, slug: string): Promise; + /** Read a slug's metadata from the by-worker keyspace (locked-version lookups). */ + getByWorker(workerId: string, slug: string): Promise; + /** + * Atomically replace both `task-meta:env:{envId}` and + * `task-meta:by-worker:{workerId}` with the given entries. Used at deploy + * promotion sites where the worker just became current for the env. + */ + populateByCurrentWorker( + envId: string, + workerId: string, + entries: TaskMetadataEntry[] + ): Promise; + /** + * Replace `task-meta:by-worker:{workerId}` only. Used at deploy build sites + * (V4) where the worker is created but not yet promoted. + */ + populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise; + /** + * Atomically upsert one slug in both keyspaces. Used by the non-locked + * read-path back-fill. The env-keyspace TTL is only set when no TTL is + * present (preserves the promotion boundary); the by-worker TTL is + * refreshed on every call (sliding expiry). + */ + setByCurrentWorker(envId: string, workerId: string, entry: TaskMetadataEntry): Promise; + /** + * Upsert one slug in `task-meta:by-worker:{workerId}` only. Used by the + * locked-version read-path back-fill; refreshes the by-worker TTL. + */ + setByWorker(workerId: string, entry: TaskMetadataEntry): Promise; +} + +export type RedisTaskMetadataCacheOptions = { + redis: Redis; + /** Safety TTL on `task-meta:env:{envId}`. Default 24h. Use 0 for no expiry. */ + currentEnvTtlSeconds?: number; + /** Idle TTL on `task-meta:by-worker:{workerId}`. Default 30d. Use 0 for no expiry. */ + byWorkerTtlSeconds?: number; +}; + +type EncodedEntry = { + t: string | null; + k: TaskTriggerSource; + q: string | null; + n: string; +}; + +function encode(entry: TaskMetadataEntry): string { + const payload: EncodedEntry = { + t: entry.ttl, + k: entry.triggerSource, + q: entry.queueId, + n: entry.queueName, + }; + return JSON.stringify(payload); +} + +function decode(slug: string, raw: string): TaskMetadataEntry | null { + try { + const parsed = JSON.parse(raw) as EncodedEntry; + return { + slug, + ttl: parsed.t, + triggerSource: parsed.k, + queueId: parsed.q, + queueName: parsed.n, + }; + } catch (error) { + logger.error("Failed to decode task metadata cache entry", { slug, error }); + return null; + } +} + +function currentEnvKey(envId: string): string { + return `task-meta:env:${envId}`; +} + +function byWorkerKey(workerId: string): string { + return `task-meta:by-worker:${workerId}`; +} + +/** + * Atomically replace a single HASH's contents and reset its TTL. + * + * KEYS[1] = hash key + * ARGV[1] = ttl seconds (0 = no TTL) + * ARGV[2..N] = alternating field, value pairs + */ +const REPLACE_HASH_LUA = ` +redis.call("DEL", KEYS[1]) +if #ARGV > 1 then + local fv = {} + for i = 2, #ARGV do + fv[#fv + 1] = ARGV[i] + end + redis.call("HSET", KEYS[1], unpack(fv)) +end +local ttl = tonumber(ARGV[1]) +if ttl and ttl > 0 then + redis.call("EXPIRE", KEYS[1], ttl) +end +return 1 +`; + +/** + * Reserved field name on env hashes that records the worker currently + * "owning" the env keyspace. The back-fill Lua script reads this and skips + * its env-side write if the owner has flipped — closing the race where a + * concurrent promotion atomically replaces the env hash between a resolver's + * PG read and its back-fill write. Customer task slugs are kebab/camelCase + * and never start with `__`, so collisions are not a concern; an accidental + * `getCurrent(envId, "__owner_worker_id")` would JSON.parse-fail and fall + * back to PG, not corrupt anything. + */ +const OWNER_FIELD = "__owner_worker_id"; + +/** + * Atomically replace BOTH keyspaces in one Redis transaction. Used at deploy + * promotion — the worker just became current for the env, so the env keyspace + * and the worker keyspace get the same field set, and the env hash is + * stamped with the new owner workerId. + * + * KEYS[1] = env hash key + * KEYS[2] = by-worker hash key + * ARGV[1] = env ttl seconds (0 = no TTL) + * ARGV[2] = by-worker ttl seconds (0 = no TTL) + * ARGV[3] = workerId (env-hash owner marker) + * ARGV[4..N] = alternating field, value pairs (same for both hashes) + */ +const REPLACE_TWO_HASHES_LUA = ` +redis.call("DEL", KEYS[1]) +redis.call("DEL", KEYS[2]) +if #ARGV > 3 then + local fv = {} + for i = 4, #ARGV do + fv[#fv + 1] = ARGV[i] + end + redis.call("HSET", KEYS[1], unpack(fv)) + redis.call("HSET", KEYS[2], unpack(fv)) +end +redis.call("HSET", KEYS[1], "${OWNER_FIELD}", ARGV[3]) +local envTtl = tonumber(ARGV[1]) +if envTtl and envTtl > 0 then + redis.call("EXPIRE", KEYS[1], envTtl) +end +local workerTtl = tonumber(ARGV[2]) +if workerTtl and workerTtl > 0 then + redis.call("EXPIRE", KEYS[2], workerTtl) +end +return 1 +`; + +/** + * Set a single field and refresh the HASH TTL. Used by the locked-version + * back-fill path — sliding expiry keeps active workers warm. + * + * KEYS[1] = hash key + * ARGV[1] = ttl seconds (0 = no TTL refresh) + * ARGV[2] = field + * ARGV[3] = value + */ +const SET_FIELD_REFRESH_TTL_LUA = ` +redis.call("HSET", KEYS[1], ARGV[2], ARGV[3]) +local ttl = tonumber(ARGV[1]) +if ttl and ttl > 0 then + redis.call("EXPIRE", KEYS[1], ttl) +end +return 1 +`; + +/** + * Atomically upsert one field in BOTH keyspaces. Used by the non-locked + * back-fill path. + * + * The by-worker hash always gets written (the key contains the workerId, so + * stale data lands in a dead worker's keyspace and is never read by anyone + * not pinned to that version). + * + * The env hash is CAS-guarded by `${OWNER_FIELD}`: if a concurrent promotion + * has replaced the hash between this resolver's PG read and this write, the + * stored owner won't match the workerId the back-filler resolved to, so the + * env write is skipped — preventing the back-fill from overwriting a freshly + * promoted slug with stale data from the previous worker. + * + * KEYS[1] = env hash key + * KEYS[2] = by-worker hash key + * ARGV[1] = env ttl seconds (0 = no TTL) + * ARGV[2] = by-worker ttl seconds (0 = no TTL) + * ARGV[3] = writer's expected env-hash owner workerId + * ARGV[4] = field + * ARGV[5] = value + */ +const SET_TWO_FIELDS_LUA = ` +redis.call("HSET", KEYS[2], ARGV[4], ARGV[5]) +local workerTtl = tonumber(ARGV[2]) +if workerTtl and workerTtl > 0 then + redis.call("EXPIRE", KEYS[2], workerTtl) +end + +local owner = redis.call("HGET", KEYS[1], "${OWNER_FIELD}") +if owner == false or owner == ARGV[3] then + redis.call("HSET", KEYS[1], ARGV[4], ARGV[5]) + if owner == false then + redis.call("HSET", KEYS[1], "${OWNER_FIELD}", ARGV[3]) + end + local envTtl = tonumber(ARGV[1]) + if envTtl and envTtl > 0 and redis.call("TTL", KEYS[1]) == -1 then + redis.call("EXPIRE", KEYS[1], envTtl) + end +end +return 1 +`; + +declare module "ioredis" { + interface RedisCommander { + taskMetaReplaceHash( + key: string, + ttlSeconds: string, + ...fieldValues: string[] + ): Result; + taskMetaReplaceTwoHashes( + envKey: string, + workerKey: string, + envTtlSeconds: string, + workerTtlSeconds: string, + workerId: string, + ...fieldValues: string[] + ): Result; + taskMetaSetFieldRefreshTtl( + key: string, + ttlSeconds: string, + field: string, + value: string, + callback?: Callback + ): Result; + taskMetaSetTwoFields( + envKey: string, + workerKey: string, + envTtlSeconds: string, + workerTtlSeconds: string, + workerId: string, + field: string, + value: string, + callback?: Callback + ): Result; + } +} + +export class RedisTaskMetadataCache implements TaskMetadataCache { + private readonly redis: Redis; + private readonly currentEnvTtlSeconds: number; + private readonly byWorkerTtlSeconds: number; + + constructor(options: RedisTaskMetadataCacheOptions) { + this.redis = options.redis; + this.currentEnvTtlSeconds = options.currentEnvTtlSeconds ?? 86400; + this.byWorkerTtlSeconds = options.byWorkerTtlSeconds ?? 30 * 24 * 60 * 60; + + this.redis.defineCommand("taskMetaReplaceHash", { + numberOfKeys: 1, + lua: REPLACE_HASH_LUA, + }); + this.redis.defineCommand("taskMetaReplaceTwoHashes", { + numberOfKeys: 2, + lua: REPLACE_TWO_HASHES_LUA, + }); + this.redis.defineCommand("taskMetaSetFieldRefreshTtl", { + numberOfKeys: 1, + lua: SET_FIELD_REFRESH_TTL_LUA, + }); + this.redis.defineCommand("taskMetaSetTwoFields", { + numberOfKeys: 2, + lua: SET_TWO_FIELDS_LUA, + }); + } + + async getCurrent(envId: string, slug: string): Promise { + return this.#get(currentEnvKey(envId), slug); + } + + async getByWorker(workerId: string, slug: string): Promise { + return this.#get(byWorkerKey(workerId), slug); + } + + async populateByCurrentWorker( + envId: string, + workerId: string, + entries: TaskMetadataEntry[] + ): Promise { + try { + // Always invoke the script — empty `entries` is valid and causes both + // keyspaces to be cleared (DEL + no HSET), which is the right behavior + // when promoting a worker with no tasks. + const fieldValues: string[] = []; + for (const entry of entries) { + fieldValues.push(entry.slug, encode(entry)); + } + await this.redis.taskMetaReplaceTwoHashes( + currentEnvKey(envId), + byWorkerKey(workerId), + String(this.currentEnvTtlSeconds), + String(this.byWorkerTtlSeconds), + workerId, + ...fieldValues + ); + } catch (error) { + logger.error("Failed to populate task metadata cache (current worker)", { + envId, + workerId, + error, + }); + } + } + + async populateByWorker(workerId: string, entries: TaskMetadataEntry[]): Promise { + try { + // Always invoke the script — empty `entries` clears the keyspace. + const fieldValues: string[] = []; + for (const entry of entries) { + fieldValues.push(entry.slug, encode(entry)); + } + await this.redis.taskMetaReplaceHash( + byWorkerKey(workerId), + String(this.byWorkerTtlSeconds), + ...fieldValues + ); + } catch (error) { + logger.error("Failed to populate task metadata cache (by worker)", { + workerId, + error, + }); + } + } + + async setByCurrentWorker( + envId: string, + workerId: string, + entry: TaskMetadataEntry + ): Promise { + try { + await this.redis.taskMetaSetTwoFields( + currentEnvKey(envId), + byWorkerKey(workerId), + String(this.currentEnvTtlSeconds), + String(this.byWorkerTtlSeconds), + workerId, + entry.slug, + encode(entry) + ); + } catch (error) { + logger.error("Failed to set task metadata cache field (current worker)", { + envId, + workerId, + slug: entry.slug, + error, + }); + } + } + + async setByWorker(workerId: string, entry: TaskMetadataEntry): Promise { + try { + await this.redis.taskMetaSetFieldRefreshTtl( + byWorkerKey(workerId), + String(this.byWorkerTtlSeconds), + entry.slug, + encode(entry) + ); + } catch (error) { + logger.error("Failed to set task metadata cache field (by worker)", { + workerId, + slug: entry.slug, + error, + }); + } + } + + async #get(key: string, slug: string): Promise { + try { + const raw = await this.redis.hget(key, slug); + if (!raw) return null; + return decode(slug, raw); + } catch (error) { + logger.error("Failed to read task metadata from cache", { key, slug, error }); + return null; + } + } +} + +export class NoopTaskMetadataCache implements TaskMetadataCache { + async getCurrent(): Promise { + return null; + } + + async getByWorker(): Promise { + return null; + } + + async populateByCurrentWorker(): Promise { + // intentionally empty + } + + async populateByWorker(): Promise { + // intentionally empty + } + + async setByCurrentWorker(): Promise { + // intentionally empty + } + + async setByWorker(): Promise { + // intentionally empty + } +} diff --git a/apps/webapp/app/services/taskMetadataCacheInstance.server.ts b/apps/webapp/app/services/taskMetadataCacheInstance.server.ts new file mode 100644 index 00000000000..b673cf122ed --- /dev/null +++ b/apps/webapp/app/services/taskMetadataCacheInstance.server.ts @@ -0,0 +1,38 @@ +import { Redis } from "ioredis"; +import { defaultReconnectOnError } from "@internal/redis"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; +import { + NoopTaskMetadataCache, + RedisTaskMetadataCache, + type TaskMetadataCache, +} from "./taskMetadataCache.server"; + +export const taskMetadataCacheInstance: TaskMetadataCache = singleton( + "taskMetadataCacheInstance", + initializeTaskMetadataCache +); + +function initializeTaskMetadataCache(): TaskMetadataCache { + if (!env.TASK_META_CACHE_REDIS_HOST) { + return new NoopTaskMetadataCache(); + } + + const redis = new Redis({ + connectionName: "taskMetadataCache", + host: env.TASK_META_CACHE_REDIS_HOST, + port: env.TASK_META_CACHE_REDIS_PORT, + username: env.TASK_META_CACHE_REDIS_USERNAME, + password: env.TASK_META_CACHE_REDIS_PASSWORD, + keyPrefix: "tr:", + enableAutoPipelining: true, + reconnectOnError: defaultReconnectOnError, + ...(env.TASK_META_CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }); + + return new RedisTaskMetadataCache({ + redis, + currentEnvTtlSeconds: env.TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS, + byWorkerTtlSeconds: env.TASK_META_CACHE_BY_WORKER_TTL_SECONDS, + }); +} diff --git a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts index 288374ec61a..6022c172908 100644 --- a/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts +++ b/apps/webapp/app/v3/services/changeCurrentDeployment.server.ts @@ -1,8 +1,13 @@ import { BackgroundWorkerMetadata, tryCatch } from "@trigger.dev/core/v3"; import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic"; -import { WorkerDeployment } from "@trigger.dev/database"; +import { PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +import { + type TaskMetadataCache, + type TaskMetadataEntry, +} from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { syncDeclarativeSchedules } from "./createBackgroundWorker.server"; import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy"; @@ -11,6 +16,17 @@ import { compareDeploymentVersions } from "../utils/deploymentVersions"; export type ChangeCurrentDeploymentDirection = "promote" | "rollback"; export class ChangeCurrentDeploymentService extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( deployment: WorkerDeployment, direction: ChangeCurrentDeploymentDirection, @@ -96,23 +112,59 @@ export class ChangeCurrentDeploymentService extends BaseService { }, }); - const [syncError] = await tryCatch( - (async () => { - const tasks = await this._prisma.backgroundWorkerTask.findMany({ - where: { workerId: deployment.workerId! }, - select: { slug: true, triggerSource: true }, - }); - await syncTaskIdentifiers( + const [fetchTasksError, tasks] = await tryCatch( + this._prisma.backgroundWorkerTask.findMany({ + where: { workerId: deployment.workerId! }, + select: { + slug: true, + triggerSource: true, + ttl: true, + queue: { select: { id: true, name: true } }, + }, + }) + ); + + if (fetchTasksError) { + logger.error("Error fetching worker tasks on deployment change", { + error: fetchTasksError, + }); + } + + if (tasks) { + // Side effect 1: refresh the `TaskIdentifier` table and the existing + // `tids:` Redis cache so the task-listing UI reflects the new deploy. + const [syncIdentifiersError] = await tryCatch( + syncTaskIdentifiers( deployment.environmentId, deployment.projectId, deployment.workerId!, tasks.map((t) => ({ id: t.slug, triggerSource: t.triggerSource })) - ); - })() - ); + ) + ); - if (syncError) { - logger.error("Error syncing task identifiers on deployment change", { error: syncError }); + if (syncIdentifiersError) { + logger.error("Error syncing task identifiers on deployment change", { + error: syncIdentifiersError, + }); + } + + // Side effect 2: refresh the `task-meta:` cache that the queue resolver + // reads from. Independent of side effect 1 — if `syncTaskIdentifiers` + // throws, the queue resolver still gets a warm cache for the new worker. + const metadataEntries: TaskMetadataEntry[] = tasks.map((t) => ({ + slug: t.slug, + ttl: t.ttl, + triggerSource: t.triggerSource, + queueId: t.queue?.id ?? null, + queueName: t.queue?.name ?? "", + })); + + // Cache calls log+swallow internally. + await this._taskMetaCache.populateByCurrentWorker( + deployment.environmentId, + deployment.workerId!, + metadataEntries + ); } const [scheduleSyncError] = await tryCatch(this.#syncSchedulesForDeployment(deployment)); diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index da79e386afb..ec3f6d077ad 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -14,6 +14,11 @@ import { sanitizeQueueName } from "~/models/taskQueue.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +import { + type TaskMetadataCache, + type TaskMetadataEntry, +} from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { removeQueueConcurrencyLimits, @@ -56,6 +61,17 @@ export function stripBackgroundWorkerMetadataForStorage( } export class CreateBackgroundWorkerService extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( projectRef: string, environment: AuthenticatedEnvironment, @@ -147,7 +163,7 @@ export class CreateBackgroundWorkerService extends BaseService { throw new ServiceValidationError("Error creating background worker files"); } - const [resourcesError] = await tryCatch( + const [resourcesError, workerTaskEntries] = await tryCatch( createWorkerResources( body.metadata, backgroundWorker, @@ -212,6 +228,26 @@ export class CreateBackgroundWorkerService extends BaseService { }); } + // Populate task metadata cache. DEV workers are always "current" because + // `findCurrentWorkerFromEnvironment` resolves DEV current as the latest + // worker by createdAt. Non-DEV (deploy-built) workers are not promoted + // here — promotion writes the `:env:` keyspace later in + // changeCurrentDeployment / createDeploymentBackgroundWorkerV3. + // Cache calls log+swallow internally, so a Redis blip can't break + // anything else here. Empty `workerTaskEntries` is intentional — the + // populate methods clear stale hashes for zero-task deploys. + if (workerTaskEntries) { + if (environment.type === "DEVELOPMENT") { + await this._taskMetaCache.populateByCurrentWorker( + environment.id, + backgroundWorker.id, + workerTaskEntries + ); + } else { + await this._taskMetaCache.populateByWorker(backgroundWorker.id, workerTaskEntries); + } + } + const [updateConcurrencyLimitsError] = await tryCatch( updateEnvConcurrencyLimits(environment) ); @@ -265,17 +301,26 @@ export async function createWorkerResources( environment: AuthenticatedEnvironment, prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map -) { +): Promise { // Create the queues const queues = await createWorkerQueues(metadata, worker, environment, prisma); // Create the tasks - await createWorkerTasks(metadata, queues, worker, environment, prisma, tasksToBackgroundFiles); + const taskEntries = await createWorkerTasks( + metadata, + queues, + worker, + environment, + prisma, + tasksToBackgroundFiles + ); // Register prompts if (metadata.prompts && metadata.prompts.length > 0) { await createWorkerPrompts(metadata.prompts, worker, environment, prisma); } + + return taskEntries; } async function createWorkerTasks( @@ -285,17 +330,22 @@ async function createWorkerTasks( environment: AuthenticatedEnvironment, prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map -) { +): Promise { // Create tasks in chunks of 20 const CHUNK_SIZE = 20; + const entries: TaskMetadataEntry[] = []; for (let i = 0; i < metadata.tasks.length; i += CHUNK_SIZE) { const chunk = metadata.tasks.slice(i, i + CHUNK_SIZE); - await Promise.all( + const chunkEntries = await Promise.all( chunk.map((task) => createWorkerTask(task, queues, worker, environment, prisma, tasksToBackgroundFiles) ) ); + for (const entry of chunkEntries) { + if (entry) entries.push(entry); + } } + return entries; } async function createWorkerTask( @@ -305,7 +355,7 @@ async function createWorkerTask( environment: AuthenticatedEnvironment, prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map -) { +): Promise { try { let queue = queues.find((queue) => queue.name === task.queue?.name); @@ -331,6 +381,9 @@ async function createWorkerTask( ? ("AGENT" as const) : ("STANDARD" as const); + const resolvedTtl = + typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null; + await prisma.backgroundWorkerTask.create({ data: { friendlyId: generateFriendlyId("task"), @@ -348,12 +401,19 @@ async function createWorkerTask( config: task.agentConfig ? (task.agentConfig as any) : undefined, fileId: tasksToBackgroundFiles?.get(task.id) ?? null, maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null, - ttl: - typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null, + ttl: resolvedTtl, queueId: queue.id, payloadSchema: task.payloadSchema as any, }, }); + + return { + slug: task.id, + ttl: resolvedTtl, + triggerSource: resolvedTriggerSource, + queueId: queue.id, + queueName: queue.name, + }; } catch (error) { if (error instanceof Prisma.PrismaClientKnownRequestError) { // The error code for unique constraint violation in Prisma is P2002 @@ -389,6 +449,7 @@ async function createWorkerTask( worker, }); } + return null; } } diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts index f20a024d2d6..d8f13227d78 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV3.server.ts @@ -1,8 +1,10 @@ import { CreateBackgroundWorkerRequestBody, tryCatch } from "@trigger.dev/core/v3"; -import type { BackgroundWorker } from "@trigger.dev/database"; +import type { BackgroundWorker, PrismaClientOrTransaction } from "@trigger.dev/database"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server"; +import { type TaskMetadataCache } from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; import { socketIo } from "../handleSocketIo.server"; import { updateEnvConcurrencyLimits } from "../runQueue.server"; import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server"; @@ -24,6 +26,17 @@ import { CURRENT_DEPLOYMENT_LABEL, BackgroundWorkerId } from "@trigger.dev/core/ * @deprecated */ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( projectRef: string, environment: AuthenticatedEnvironment, @@ -74,8 +87,14 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { }); } + let workerTaskEntries: Awaited> = []; try { - await createWorkerResources(body.metadata, backgroundWorker, environment, this._prisma); + workerTaskEntries = await createWorkerResources( + body.metadata, + backgroundWorker, + environment, + this._prisma + ); await syncDeclarativeSchedules( body.metadata.tasks, backgroundWorker, @@ -147,6 +166,16 @@ export class CreateDeploymentBackgroundWorkerServiceV3 extends BaseService { logger.error("Error syncing task identifiers", { error: syncIdError }); } + // V3 promotes the deployment immediately above, so this worker is now + // current for the env — write both keyspaces atomically. Cache calls + // log+swallow internally. Empty `workerTaskEntries` is intentional: the + // populate methods clear stale hashes for zero-task deploys. + await this._taskMetaCache.populateByCurrentWorker( + environment.id, + backgroundWorker.id, + workerTaskEntries + ); + try { //send a notification that a new worker has been created await projectPubSub.publish( diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts index f764d39dc7b..ff041359bb0 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts @@ -1,7 +1,9 @@ import { CreateBackgroundWorkerRequestBody, logger, tryCatch } from "@trigger.dev/core/v3"; import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; -import type { BackgroundWorker, WorkerDeployment } from "@trigger.dev/database"; +import type { BackgroundWorker, PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { type TaskMetadataCache } from "~/services/taskMetadataCache.server"; +import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { createBackgroundFiles, @@ -13,6 +15,17 @@ import { TimeoutDeploymentService } from "./timeoutDeployment.server"; import { env } from "~/env.server"; export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { + private readonly _taskMetaCache: TaskMetadataCache; + + constructor( + prisma?: PrismaClientOrTransaction, + replica?: PrismaClientOrTransaction, + taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance + ) { + super(prisma, replica); + this._taskMetaCache = taskMetaCache; + } + public async call( environment: AuthenticatedEnvironment, deploymentId: string, @@ -110,7 +123,7 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { throw serviceError; } - const [resourcesError] = await tryCatch( + const [resourcesError, workerTaskEntries] = await tryCatch( createWorkerResources( body.metadata, backgroundWorker, @@ -134,6 +147,16 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { throw serviceError; } + // V4 build path: worker created but NOT yet promoted to current. Write + // only the `task-meta:by-worker:{workerId}` keyspace so locked-version + // triggers against this build hit the cache. Promotion (which writes the + // env keyspace) happens later via finalizeDeployment → changeCurrentDeployment. + // Cache calls log+swallow internally, so a Redis blip can't stall the + // deployment state machine. Empty entries clears stale hashes. + if (workerTaskEntries) { + await this._taskMetaCache.populateByWorker(backgroundWorker.id, workerTaskEntries); + } + const [schedulesError] = await tryCatch( syncDeclarativeSchedules(body.metadata.tasks, backgroundWorker, environment, this._prisma) ); diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index ddceb8754c1..798e39e0601 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -20,8 +20,10 @@ import { assertNonNullable, containerTest } from "@internal/testcontainers"; import { trace } from "@opentelemetry/api"; import { IOPacket } from "@trigger.dev/core/v3"; import { TaskRun } from "@trigger.dev/database"; +import { Redis } from "ioredis"; import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; import { DefaultQueueManager } from "~/runEngine/concerns/queues.server"; +import { RedisTaskMetadataCache } from "~/services/taskMetadataCache.server"; import { EntitlementValidationParams, MaxAttemptsValidationParams, @@ -1173,3 +1175,295 @@ describe("RunEngineTriggerTaskService", () => { } ); }); + +describe("DefaultQueueManager task metadata cache", () => { + containerTest( + "warm cache returns metadata without falling through to PG", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "cached-task"; + const setup = await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Pre-populate cache with AGENT triggerSource; DB row has the default STANDARD. + // If the read path hits the cache, the resulting TaskRun.taskKind reflects the + // cached value. If it falls through to PG, it reflects STANDARD. + await cache.populateByCurrentWorker(environment.id, setup.worker.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "AGENT", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { payload: { test: "x" } }, + }); + + assertNonNullable(result); + expect(result.run.taskIdentifier).toBe(taskIdentifier); + expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("AGENT"); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "cache miss falls through to PG and back-fills the cache", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "miss-task"; + await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Cache starts empty. Sanity-check both keyspaces. + expect(await cache.getCurrent(environment.id, taskIdentifier)).toBeNull(); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { payload: { test: "x" } }, + }); + + assertNonNullable(result); + expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("STANDARD"); + + // Back-fill is fire-and-forget; poll with a bounded timeout to avoid CI flakes. + let backfilled = await cache.getCurrent(environment.id, taskIdentifier); + for (let i = 0; i < 40 && !backfilled; i++) { + await setTimeout(25); + backfilled = await cache.getCurrent(environment.id, taskIdentifier); + } + expect(backfilled).not.toBeNull(); + expect(backfilled?.triggerSource).toBe("STANDARD"); + expect(backfilled?.queueName).toBe(`task/${taskIdentifier}`); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "queue-override + ttl path returns taskKind from cache without a BWT lookup", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "override-task"; + const setup = await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Cache says AGENT; DB row says STANDARD. Caller provides both a queue + // override and an explicit TTL — the hot path the PR regressed. + await cache.populateByCurrentWorker(environment.id, setup.worker.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "AGENT", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { + payload: { test: "x" }, + options: { + queue: { name: "caller-queue" }, + ttl: "5m", + }, + }, + }); + + assertNonNullable(result); + expect(result.run.queue).toBe("caller-queue"); + expect((result.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("AGENT"); + + await redis.quit(); + await engine.quit(); + } + ); + + containerTest( + "locked-version trigger reads from by-worker keyspace, not env keyspace", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "keyspace-task"; + const worker = await setupBackgroundWorker(engine, environment, taskIdentifier); + + const redis = new Redis(redisOptions); + const cache = new RedisTaskMetadataCache({ redis }); + + // Populate the two keyspaces with conflicting triggerSource values so we + // can tell which keyspace the read used. The real worker's by-worker + // hash gets AGENT; the env hash gets SCHEDULED (seeded via a throwaway + // worker id since `populateByCurrentWorker` writes both keyspaces and + // we want the real worker's by-worker hash untouched). + await cache.populateByWorker(worker.worker.id, [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "AGENT", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + await cache.populateByCurrentWorker(environment.id, "dummy-worker-for-env-seed", [ + { + slug: taskIdentifier, + ttl: null, + triggerSource: "SCHEDULED", + queueId: null, + queueName: `task/${taskIdentifier}`, + }, + ]); + + const queuesManager = new DefaultQueueManager(prisma, engine, undefined, cache); + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024, + }); + + // Locked → by-worker keyspace → AGENT + const locked = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { + payload: { test: "x" }, + options: { lockToVersion: worker.worker.version }, + }, + }); + assertNonNullable(locked); + expect((locked.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("AGENT"); + + // Not locked → env keyspace → SCHEDULED + const current = await triggerTaskService.call({ + taskId: taskIdentifier, + environment, + body: { payload: { test: "y" } }, + }); + assertNonNullable(current); + expect((current.run.annotations as { taskKind?: string } | null)?.taskKind).toBe("SCHEDULED"); + + await redis.quit(); + await engine.quit(); + } + ); +}); diff --git a/packages/cli-v3/src/mcp/formatters.ts b/packages/cli-v3/src/mcp/formatters.ts index c90ae0570df..131124cc9a4 100644 --- a/packages/cli-v3/src/mcp/formatters.ts +++ b/packages/cli-v3/src/mcp/formatters.ts @@ -427,6 +427,11 @@ function formatRunSummary(run: ListRunResponseItem): string { parts.push(`v${run.version}`); } + // Region if available + if (run.region) { + parts.push(`region:${run.region}`); + } + return parts.join(" | "); } diff --git a/packages/cli-v3/src/mcp/schemas.ts b/packages/cli-v3/src/mcp/schemas.ts index b5b7d7518da..88d578fd9b7 100644 --- a/packages/cli-v3/src/mcp/schemas.ts +++ b/packages/cli-v3/src/mcp/schemas.ts @@ -183,6 +183,12 @@ export const ListRunsInput = CommonProjectsInput.extend({ .describe("Filter for runs created in the last N time period. e.g. 7d, 30d, 365d") .optional(), machine: MachinePresetName.describe("Filter for runs that match this machine preset").optional(), + region: z + .string() + .describe( + "Filter for runs that executed in this region (the worker instance group masterQueue identifier, e.g. 'us-east-1' or 'main')" + ) + .optional(), }); export type ListRunsInput = z.output; diff --git a/packages/cli-v3/src/mcp/tools/runs.ts b/packages/cli-v3/src/mcp/tools/runs.ts index 5b576cfc41d..d1d43df5f6f 100644 --- a/packages/cli-v3/src/mcp/tools/runs.ts +++ b/packages/cli-v3/src/mcp/tools/runs.ts @@ -374,6 +374,7 @@ export const listRunsTool = { to: $to, period: input.period, machine: input.machine, + region: input.region, }); const formattedRuns = formatRunList(result); diff --git a/packages/cli-v3/src/utilities/initialBanner.ts b/packages/cli-v3/src/utilities/initialBanner.ts index c30deeee7fe..74e217c6e61 100644 --- a/packages/cli-v3/src/utilities/initialBanner.ts +++ b/packages/cli-v3/src/utilities/initialBanner.ts @@ -1,5 +1,6 @@ import chalk from "chalk"; import { getLatestVersion } from "fast-npm-meta"; +import * as semver from "semver"; import { VERSION } from "../version.js"; import { chalkGrey, chalkRun, chalkTask, chalkWorker, logo } from "./cliOutput.js"; import { logger } from "./logger.js"; @@ -105,18 +106,16 @@ async function doUpdateCheck(): Promise { return; } - const compareVersions = (a: string, b: string) => - a.localeCompare(b, "en-US", { numeric: true }); - - const comparison = compareVersions(VERSION, meta.version); - - if (comparison === -1) { + // Use real semver comparison (loose) so prereleases sort correctly against + // their stable counterpart — e.g. a user on `4.5.0-rc.0` sees `4.5.0` as + // newer. String/locale comparison gets this wrong for `X.Y.Z-rc.N` vs `X.Y.Z`. + if (semver.lt(VERSION, meta.version, true)) { return meta.version; } return; } catch (err) { - // ignore error + // ignore error (covers both network failures and any version-parse oddities) logger.debug(err); return; diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 04a9009e356..b304300f145 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -2080,6 +2080,13 @@ function createSearchQueryForListRuns(query?: ListRunsQueryParams): URLSearchPar Array.isArray(query.machine) ? query.machine.join(",") : query.machine ); } + + if (query.region) { + searchParams.append( + "filter[region]", + Array.isArray(query.region) ? query.region.join(",") : query.region + ); + } } return searchParams; diff --git a/packages/core/src/v3/apiClient/types.ts b/packages/core/src/v3/apiClient/types.ts index 79baaf74ef7..aa21e2022d0 100644 --- a/packages/core/src/v3/apiClient/types.ts +++ b/packages/core/src/v3/apiClient/types.ts @@ -58,6 +58,8 @@ export interface ListRunsQueryParams extends CursorPageParams { queue?: Array | QueueTypeName; /** The machine name, or multiple of them. */ machine?: Array | MachinePresetName; + /** The region master-queue identifier, or multiple of them. */ + region?: Array | string; } export interface ListProjectRunsQueryParams extends CursorPageParams, ListRunsQueryParams { diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 6317d816503..6cb746762c0 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -1134,6 +1134,7 @@ const CommonRunFields = { durationMs: z.number(), metadata: z.record(z.any()).optional(), taskKind: z.string().optional(), + region: z.string().optional(), }; const RetrieveRunCommandFields = {