diff --git a/.server-changes/session-replica-race-fixes.md b/.server-changes/session-replica-race-fixes.md new file mode 100644 index 0000000000..677b4c4c3c --- /dev/null +++ b/.server-changes/session-replica-race-fixes.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Fix read-replica races on the session APIs: a fresh session's first append or subscribe no longer fails with a 404, and a just-triggered session run is no longer mistaken for dead, which could double-trigger the run and duplicate chat responses. diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts index e5fc0d5c34..0bdcceb714 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts @@ -2,13 +2,12 @@ import { json } from "@remix-run/server-runtime"; import { tryCatch } from "@trigger.dev/core/utils"; import { nanoid } from "nanoid"; import { z } from "zod"; -import { $replica } from "~/db.server"; import { logger } from "~/services/logger.server"; import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server"; import { ensureRunForSession } from "~/services/realtime/sessionRunManager.server"; import { canonicalSessionAddressingKey, - resolveSessionByIdOrExternalId, + resolveSessionWithWriterFallback, } from "~/services/realtime/sessions.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { @@ -55,9 +54,10 @@ const { action, loader } = createActionApiRoute( // also triggers the first run). The row exists before any caller // can reach `.in/append` — no row, no append. Resolved here so the // authorization scope can expand to both addressing forms (friendlyId - // + externalId) and the handler can skip its own lookup. + // + externalId) and the handler can skip its own lookup. Writer + // fallback: a first append can land inside the replica apply window. findResource: async (params, auth) => - resolveSessionByIdOrExternalId($replica, auth.environment.id, params.session), + resolveSessionWithWriterFallback(auth.environment.id, params.session), authorization: { action: "write", // Authorize against the union of the URL form, friendlyId, and diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts index 562b19fad9..ab64621750 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts @@ -1,12 +1,11 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; -import { $replica } from "~/db.server"; import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server"; import { canonicalSessionAddressingKey, isSessionFriendlyIdForm, - resolveSessionByIdOrExternalId, + resolveSessionWithWriterFallback, } from "~/services/realtime/sessions.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { @@ -43,8 +42,7 @@ const { action } = createActionApiRoute( // when a row exists. The S2 stream key is built from the row's // canonical key (externalId if set, else friendlyId) so writers // and readers converge regardless of URL form. - const maybeSession = await resolveSessionByIdOrExternalId( - $replica, + const maybeSession = await resolveSessionWithWriterFallback( authentication.environment.id, params.session ); @@ -100,8 +98,7 @@ const loader = createLoaderApiRoute( allowJWT: true, corsStrategy: "all", findResource: async (params, auth) => { - const row = await resolveSessionByIdOrExternalId( - $replica, + const row = await resolveSessionWithWriterFallback( auth.environment.id, params.session ); diff --git a/apps/webapp/app/services/realtime/sessionRunManager.server.ts b/apps/webapp/app/services/realtime/sessionRunManager.server.ts index 95fc87e201..1ad5174d1c 100644 --- a/apps/webapp/app/services/realtime/sessionRunManager.server.ts +++ b/apps/webapp/app/services/realtime/sessionRunManager.server.ts @@ -111,31 +111,32 @@ export async function ensureRunForSession( // 1. Probe currentRunId. let priorDeadRunFriendlyId: string | undefined; if (session.currentRunId) { - const probe = await getRunStatusAndFriendlyId(session.currentRunId); + let probe = await getRunStatusAndFriendlyId(session.currentRunId); + if (!probe) { + // Replica miss on a row we just observed via `currentRunId` — the + // run was likely triggered moments ago and hasn't replicated yet. + // Re-probe the writer BEFORE deciding liveness: treating a lagging + // replica as "row vanished" double-triggers the session (a fast + // first append after session create races the replica apply delay + // and spawns a second live run consuming the same `.in`). + probe = await prisma.taskRun.findFirst({ + where: { id: session.currentRunId }, + select: { status: true, friendlyId: true }, + }); + } if (probe && !isFinalRunStatus(probe.status)) { return { runId: session.currentRunId, triggered: false }; } - // Either the row vanished (probe null) or its status is final. Either - // way the prior run isn't going to consume new appends — but the - // session may still hold conversation state on `session.out` and an - // S3 snapshot keyed on `session.friendlyId`. Forward the prior run's - // public-form id (friendlyId — same shape as `ctx.run.id`) to the - // agent as `previousRunId` so its boot gate flips + // Either the row vanished on the writer too (probe null) or its status + // is final. Either way the prior run isn't going to consume new + // appends — but the session may still hold conversation state on + // `session.out` and an S3 snapshot keyed on `session.friendlyId`. + // Forward the prior run's public-form id (friendlyId — same shape as + // `ctx.run.id`) to the agent as `previousRunId` so its boot gate flips // `couldHavePriorState` and replays the persisted state instead of // treating this as a fresh chat. See `chat.agent`'s boot orchestration // in `packages/trigger-sdk/src/v3/ai.ts`. - if (probe?.friendlyId) { - priorDeadRunFriendlyId = probe.friendlyId; - } else { - // Replica miss on a row we just observed via `currentRunId`. Retry - // on the writer so the customer's `runs.retrieve(previousRunId)` - // gets the public `run_*` form rather than the internal cuid. - const writerProbe = await prisma.taskRun.findFirst({ - where: { id: session.currentRunId }, - select: { friendlyId: true }, - }); - priorDeadRunFriendlyId = writerProbe?.friendlyId ?? session.currentRunId; - } + priorDeadRunFriendlyId = probe?.friendlyId ?? session.currentRunId; } // 2. Validate config + trigger upfront. Continuation overrides diff --git a/apps/webapp/app/services/realtime/sessions.server.ts b/apps/webapp/app/services/realtime/sessions.server.ts index 226ec443d4..55b969e7e5 100644 --- a/apps/webapp/app/services/realtime/sessions.server.ts +++ b/apps/webapp/app/services/realtime/sessions.server.ts @@ -1,6 +1,6 @@ import type { PrismaClient, Session } from "@trigger.dev/database"; import type { SessionItem } from "@trigger.dev/core/v3"; -import { $replica } from "~/db.server"; +import { $replica, prisma } from "~/db.server"; /** * Prefix that {@link SessionId.generate} attaches to every Session friendlyId. @@ -36,6 +36,22 @@ export async function resolveSessionByIdOrExternalId( }); } +/** + * Replica-first session resolution with a writer fallback on miss. For the + * hot realtime routes (append / SSE subscribe / end-and-continue): a fresh + * session's first append or subscribe can arrive inside the replica's apply + * window, and a bare replica miss there surfaces as a 404 (or a subscribe + * that never finds its session) for a session that exists on the writer. + */ +export async function resolveSessionWithWriterFallback( + runtimeEnvironmentId: string, + idOrExternalId: string +): Promise { + const row = await resolveSessionByIdOrExternalId($replica, runtimeEnvironmentId, idOrExternalId); + if (row) return row; + return resolveSessionByIdOrExternalId(prisma, runtimeEnvironmentId, idOrExternalId); +} + /** True for `session_*` friendlyId form, false for everything else. */ export function isSessionFriendlyIdForm(value: string): boolean { return value.startsWith(SESSION_FRIENDLY_ID_PREFIX); diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 1e70d94cd0..f5d0996fa8 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -58,7 +58,8 @@ services: # docker exec database bash -c 'grep -q "host replication" "$PGDATA/pg_hba.conf" || echo "host replication all all md5" >> "$PGDATA/pg_hba.conf"' # docker exec database psql -U postgres -c "SELECT pg_reload_conf()" # Then point the webapp at it: DATABASE_READ_REPLICA_URL=postgresql://postgres:postgres@localhost:5433/postgres - # Tune the lag via REPLICA_APPLY_DELAY (e.g. 150ms, 2s). Wipe database-replica-data to re-init. + # Tune the lag via REPLICA_APPLY_DELAY (default 20ms ~ realistic prod lag; crank to 150ms/2s to + # shake out replica races). Wipe database-replica-data to re-init. database-replica: container_name: ${CONTAINER_PREFIX:-}database-replica profiles: ["replica"] @@ -72,7 +73,7 @@ services: - ${DB_REPLICA_VOLUME:-database-replica-data}:/var/lib/postgresql/data/ environment: PGPASSWORD: postgres - REPLICA_APPLY_DELAY: ${REPLICA_APPLY_DELAY:-150ms} + REPLICA_APPLY_DELAY: ${REPLICA_APPLY_DELAY:-20ms} networks: - app_network ports: