Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/session-replica-race-fixes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Fix read-replica races on the session APIs: a fresh session's first append or subscribe no longer fails with a 404, and a just-triggered session run is no longer mistaken for dead, which could double-trigger the run and duplicate chat responses.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ import { json } from "@remix-run/server-runtime";
import { tryCatch } from "@trigger.dev/core/utils";
import { nanoid } from "nanoid";
import { z } from "zod";
import { $replica } from "~/db.server";
import { logger } from "~/services/logger.server";
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
import { ensureRunForSession } from "~/services/realtime/sessionRunManager.server";
import {
canonicalSessionAddressingKey,
resolveSessionByIdOrExternalId,
resolveSessionWithWriterFallback,
} from "~/services/realtime/sessions.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import {
Expand Down Expand Up @@ -55,9 +54,10 @@ const { action, loader } = createActionApiRoute(
// also triggers the first run). The row exists before any caller
// can reach `.in/append` — no row, no append. Resolved here so the
// authorization scope can expand to both addressing forms (friendlyId
// + externalId) and the handler can skip its own lookup.
// + externalId) and the handler can skip its own lookup. Writer
// fallback: a first append can land inside the replica apply window.
findResource: async (params, auth) =>
resolveSessionByIdOrExternalId($replica, auth.environment.id, params.session),
resolveSessionWithWriterFallback(auth.environment.id, params.session),
authorization: {
action: "write",
// Authorize against the union of the URL form, friendlyId, and
Expand Down
9 changes: 3 additions & 6 deletions apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
import {
canonicalSessionAddressingKey,
isSessionFriendlyIdForm,
resolveSessionByIdOrExternalId,
resolveSessionWithWriterFallback,
} from "~/services/realtime/sessions.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import {
Expand Down Expand Up @@ -43,8 +42,7 @@ const { action } = createActionApiRoute(
// when a row exists. The S2 stream key is built from the row's
// canonical key (externalId if set, else friendlyId) so writers
// and readers converge regardless of URL form.
const maybeSession = await resolveSessionByIdOrExternalId(
$replica,
const maybeSession = await resolveSessionWithWriterFallback(
authentication.environment.id,
params.session
);
Expand Down Expand Up @@ -100,8 +98,7 @@ const loader = createLoaderApiRoute(
allowJWT: true,
corsStrategy: "all",
findResource: async (params, auth) => {
const row = await resolveSessionByIdOrExternalId(
$replica,
const row = await resolveSessionWithWriterFallback(
auth.environment.id,
params.session
);
Expand Down
39 changes: 20 additions & 19 deletions apps/webapp/app/services/realtime/sessionRunManager.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,31 +111,32 @@ export async function ensureRunForSession(
// 1. Probe currentRunId.
let priorDeadRunFriendlyId: string | undefined;
if (session.currentRunId) {
const probe = await getRunStatusAndFriendlyId(session.currentRunId);
let probe = await getRunStatusAndFriendlyId(session.currentRunId);
if (!probe) {
// Replica miss on a row we just observed via `currentRunId` — the
// run was likely triggered moments ago and hasn't replicated yet.
// Re-probe the writer BEFORE deciding liveness: treating a lagging
// replica as "row vanished" double-triggers the session (a fast
// first append after session create races the replica apply delay
// and spawns a second live run consuming the same `.in`).
probe = await prisma.taskRun.findFirst({
where: { id: session.currentRunId },
select: { status: true, friendlyId: true },
});
}
if (probe && !isFinalRunStatus(probe.status)) {
return { runId: session.currentRunId, triggered: false };
}
// Either the row vanished (probe null) or its status is final. Either
// way the prior run isn't going to consume new appends — but the
// session may still hold conversation state on `session.out` and an
// S3 snapshot keyed on `session.friendlyId`. Forward the prior run's
// public-form id (friendlyId — same shape as `ctx.run.id`) to the
// agent as `previousRunId` so its boot gate flips
// Either the row vanished on the writer too (probe null) or its status
// is final. Either way the prior run isn't going to consume new
// appends — but the session may still hold conversation state on
// `session.out` and an S3 snapshot keyed on `session.friendlyId`.
// Forward the prior run's public-form id (friendlyId — same shape as
// `ctx.run.id`) to the agent as `previousRunId` so its boot gate flips
// `couldHavePriorState` and replays the persisted state instead of
// treating this as a fresh chat. See `chat.agent`'s boot orchestration
// in `packages/trigger-sdk/src/v3/ai.ts`.
if (probe?.friendlyId) {
priorDeadRunFriendlyId = probe.friendlyId;
} else {
// Replica miss on a row we just observed via `currentRunId`. Retry
// on the writer so the customer's `runs.retrieve(previousRunId)`
// gets the public `run_*` form rather than the internal cuid.
const writerProbe = await prisma.taskRun.findFirst({
where: { id: session.currentRunId },
select: { friendlyId: true },
});
priorDeadRunFriendlyId = writerProbe?.friendlyId ?? session.currentRunId;
}
priorDeadRunFriendlyId = probe?.friendlyId ?? session.currentRunId;
}

// 2. Validate config + trigger upfront. Continuation overrides
Expand Down
18 changes: 17 additions & 1 deletion apps/webapp/app/services/realtime/sessions.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { PrismaClient, Session } from "@trigger.dev/database";
import type { SessionItem } from "@trigger.dev/core/v3";
import { $replica } from "~/db.server";
import { $replica, prisma } from "~/db.server";

/**
* Prefix that {@link SessionId.generate} attaches to every Session friendlyId.
Expand Down Expand Up @@ -36,6 +36,22 @@ export async function resolveSessionByIdOrExternalId(
});
}

/**
* Replica-first session resolution with a writer fallback on miss. For the
* hot realtime routes (append / SSE subscribe / end-and-continue): a fresh
* session's first append or subscribe can arrive inside the replica's apply
* window, and a bare replica miss there surfaces as a 404 (or a subscribe
* that never finds its session) for a session that exists on the writer.
*/
export async function resolveSessionWithWriterFallback(
runtimeEnvironmentId: string,
idOrExternalId: string
): Promise<Session | null> {
const row = await resolveSessionByIdOrExternalId($replica, runtimeEnvironmentId, idOrExternalId);
if (row) return row;
return resolveSessionByIdOrExternalId(prisma, runtimeEnvironmentId, idOrExternalId);
}

/** True for `session_*` friendlyId form, false for everything else. */
export function isSessionFriendlyIdForm(value: string): boolean {
return value.startsWith(SESSION_FRIENDLY_ID_PREFIX);
Expand Down
5 changes: 3 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ services:
# docker exec database bash -c 'grep -q "host replication" "$PGDATA/pg_hba.conf" || echo "host replication all all md5" >> "$PGDATA/pg_hba.conf"'
# docker exec database psql -U postgres -c "SELECT pg_reload_conf()"
# Then point the webapp at it: DATABASE_READ_REPLICA_URL=postgresql://postgres:postgres@localhost:5433/postgres
# Tune the lag via REPLICA_APPLY_DELAY (e.g. 150ms, 2s). Wipe database-replica-data to re-init.
# Tune the lag via REPLICA_APPLY_DELAY (default 20ms ~ realistic prod lag; crank to 150ms/2s to
# shake out replica races). Wipe database-replica-data to re-init.
database-replica:
container_name: ${CONTAINER_PREFIX:-}database-replica
profiles: ["replica"]
Expand All @@ -72,7 +73,7 @@ services:
- ${DB_REPLICA_VOLUME:-database-replica-data}:/var/lib/postgresql/data/
environment:
PGPASSWORD: postgres
REPLICA_APPLY_DELAY: ${REPLICA_APPLY_DELAY:-150ms}
REPLICA_APPLY_DELAY: ${REPLICA_APPLY_DELAY:-20ms}
networks:
- app_network
ports:
Expand Down