Skip to content

Commit 8856bc6

Browse files
committed
fix(webapp): stop replica lag from double-triggering session runs and 404ing fresh sessions
ensureRunForSession probed run liveness on the read replica, so a probe miss on a just-triggered run was judged dead and a second live run was spawned for the same session, duplicating turns and responses. The append and subscribe/init session routes also resolved the Session row on the replica only, failing a fresh session's first append or subscribe inside the replication window. Liveness now re-probes the writer before declaring a run dead, and session resolution on those routes falls back to the writer on a replica miss.
1 parent 954ee5c commit 8856bc6

5 files changed

Lines changed: 50 additions & 30 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix read-replica races on the session APIs: a fresh session's first append or subscribe no longer fails with a 404, and a just-triggered session run is no longer mistaken for dead, which could double-trigger the run and duplicate chat responses.

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ import { json } from "@remix-run/server-runtime";
22
import { tryCatch } from "@trigger.dev/core/utils";
33
import { nanoid } from "nanoid";
44
import { z } from "zod";
5-
import { $replica } from "~/db.server";
65
import { logger } from "~/services/logger.server";
76
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
87
import { ensureRunForSession } from "~/services/realtime/sessionRunManager.server";
98
import {
109
canonicalSessionAddressingKey,
11-
resolveSessionByIdOrExternalId,
10+
resolveSessionWithWriterFallback,
1211
} from "~/services/realtime/sessions.server";
1312
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
1413
import {
@@ -55,9 +54,10 @@ const { action, loader } = createActionApiRoute(
5554
// also triggers the first run). The row exists before any caller
5655
// can reach `.in/append` — no row, no append. Resolved here so the
5756
// authorization scope can expand to both addressing forms (friendlyId
58-
// + externalId) and the handler can skip its own lookup.
57+
// + externalId) and the handler can skip its own lookup. Writer
58+
// fallback: a first append can land inside the replica apply window.
5959
findResource: async (params, auth) =>
60-
resolveSessionByIdOrExternalId($replica, auth.environment.id, params.session),
60+
resolveSessionWithWriterFallback(auth.environment.id, params.session),
6161
authorization: {
6262
action: "write",
6363
// Authorize against the union of the URL form, friendlyId, and

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
3-
import { $replica } from "~/db.server";
43
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
54
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
65
import {
76
canonicalSessionAddressingKey,
87
isSessionFriendlyIdForm,
9-
resolveSessionByIdOrExternalId,
8+
resolveSessionWithWriterFallback,
109
} from "~/services/realtime/sessions.server";
1110
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
1211
import {
@@ -43,8 +42,7 @@ const { action } = createActionApiRoute(
4342
// when a row exists. The S2 stream key is built from the row's
4443
// canonical key (externalId if set, else friendlyId) so writers
4544
// and readers converge regardless of URL form.
46-
const maybeSession = await resolveSessionByIdOrExternalId(
47-
$replica,
45+
const maybeSession = await resolveSessionWithWriterFallback(
4846
authentication.environment.id,
4947
params.session
5048
);
@@ -100,8 +98,7 @@ const loader = createLoaderApiRoute(
10098
allowJWT: true,
10199
corsStrategy: "all",
102100
findResource: async (params, auth) => {
103-
const row = await resolveSessionByIdOrExternalId(
104-
$replica,
101+
const row = await resolveSessionWithWriterFallback(
105102
auth.environment.id,
106103
params.session
107104
);

apps/webapp/app/services/realtime/sessionRunManager.server.ts

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -111,31 +111,32 @@ export async function ensureRunForSession(
111111
// 1. Probe currentRunId.
112112
let priorDeadRunFriendlyId: string | undefined;
113113
if (session.currentRunId) {
114-
const probe = await getRunStatusAndFriendlyId(session.currentRunId);
114+
let probe = await getRunStatusAndFriendlyId(session.currentRunId);
115+
if (!probe) {
116+
// Replica miss on a row we just observed via `currentRunId` — the
117+
// run was likely triggered moments ago and hasn't replicated yet.
118+
// Re-probe the writer BEFORE deciding liveness: treating a lagging
119+
// replica as "row vanished" double-triggers the session (a fast
120+
// first append after session create races the replica apply delay
121+
// and spawns a second live run consuming the same `.in`).
122+
probe = await prisma.taskRun.findFirst({
123+
where: { id: session.currentRunId },
124+
select: { status: true, friendlyId: true },
125+
});
126+
}
115127
if (probe && !isFinalRunStatus(probe.status)) {
116128
return { runId: session.currentRunId, triggered: false };
117129
}
118-
// Either the row vanished (probe null) or its status is final. Either
119-
// way the prior run isn't going to consume new appends — but the
120-
// session may still hold conversation state on `session.out` and an
121-
// S3 snapshot keyed on `session.friendlyId`. Forward the prior run's
122-
// public-form id (friendlyId — same shape as `ctx.run.id`) to the
123-
// agent as `previousRunId` so its boot gate flips
130+
// Either the row vanished on the writer too (probe null) or its status
131+
// is final. Either way the prior run isn't going to consume new
132+
// appends — but the session may still hold conversation state on
133+
// `session.out` and an S3 snapshot keyed on `session.friendlyId`.
134+
// Forward the prior run's public-form id (friendlyId — same shape as
135+
// `ctx.run.id`) to the agent as `previousRunId` so its boot gate flips
124136
// `couldHavePriorState` and replays the persisted state instead of
125137
// treating this as a fresh chat. See `chat.agent`'s boot orchestration
126138
// in `packages/trigger-sdk/src/v3/ai.ts`.
127-
if (probe?.friendlyId) {
128-
priorDeadRunFriendlyId = probe.friendlyId;
129-
} else {
130-
// Replica miss on a row we just observed via `currentRunId`. Retry
131-
// on the writer so the customer's `runs.retrieve(previousRunId)`
132-
// gets the public `run_*` form rather than the internal cuid.
133-
const writerProbe = await prisma.taskRun.findFirst({
134-
where: { id: session.currentRunId },
135-
select: { friendlyId: true },
136-
});
137-
priorDeadRunFriendlyId = writerProbe?.friendlyId ?? session.currentRunId;
138-
}
139+
priorDeadRunFriendlyId = probe?.friendlyId ?? session.currentRunId;
139140
}
140141

141142
// 2. Validate config + trigger upfront. Continuation overrides

apps/webapp/app/services/realtime/sessions.server.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { PrismaClient, Session } from "@trigger.dev/database";
22
import type { SessionItem } from "@trigger.dev/core/v3";
3-
import { $replica } from "~/db.server";
3+
import { $replica, prisma } from "~/db.server";
44

55
/**
66
* Prefix that {@link SessionId.generate} attaches to every Session friendlyId.
@@ -36,6 +36,22 @@ export async function resolveSessionByIdOrExternalId(
3636
});
3737
}
3838

39+
/**
40+
* Replica-first session resolution with a writer fallback on miss. For the
41+
* hot realtime routes (append / SSE subscribe / end-and-continue): a fresh
42+
* session's first append or subscribe can arrive inside the replica's apply
43+
* window, and a bare replica miss there surfaces as a 404 (or a subscribe
44+
* that never finds its session) for a session that exists on the writer.
45+
*/
46+
export async function resolveSessionWithWriterFallback(
47+
runtimeEnvironmentId: string,
48+
idOrExternalId: string
49+
): Promise<Session | null> {
50+
const row = await resolveSessionByIdOrExternalId($replica, runtimeEnvironmentId, idOrExternalId);
51+
if (row) return row;
52+
return resolveSessionByIdOrExternalId(prisma, runtimeEnvironmentId, idOrExternalId);
53+
}
54+
3955
/** True for `session_*` friendlyId form, false for everything else. */
4056
export function isSessionFriendlyIdForm(value: string): boolean {
4157
return value.startsWith(SESSION_FRIENDLY_ID_PREFIX);

0 commit comments

Comments
 (0)