Skip to content

Commit c1c4d6a

Browse files
d-csclaude
andcommitted
fix(webapp): mollifier mutation routes — log silent buffer outcomes + writer fallback in degraded mode
Three CodeRabbit findings on PR #3756, bundled because they share the same regression class: post-PR the mutation routes read from the replica for offload, but several non-happy-path branches lost the writer-side safety net the pre-PR routes had (or never logged non-throw failures from helper outcomes). 1. **`api.v1.runs.$runId.metadata.ts` — silent failure on non-throw buffer outcomes.** The parent/root fan-out helper wrapped `applyMetadataMutationToBufferedRun` in `tryCatch` and only inspected the thrown error. The helper reports non-throw failures via outcome `kind` (`not_found`, `busy`, `version_exhausted`, `metadata_too_large`); those silently disappeared. Now warn-log each non-success kind so ops can trace where a parent/root op went. Best-effort behaviour preserved — still doesn't bubble to the customer response. Helper exported for unit-test reach. 2. **`mutateWithFallback.server.ts` — `\!buffer` short-circuit returned false 404.** Pre-PR mutation routes read from the writer directly, so a fresh PG row was always visible regardless of replication lag. Post-PR the replica read became the primary lookup; if the buffer isn't available (mollifier disabled, boot- time init error), the helper returned `not_found` without probing the writer — regressing mutation behaviour in mollifier-disabled mode. Mirror the writer-disambiguation block already used in the buffer-says-not-found branch. 3. **`resolveRunForMutation.server.ts` — pre-handler resolver did the same.** Returns null if both replica and buffer miss; the route builder converts null to a hard 404 BEFORE the action handler runs, so the downstream `mutateWithFallback` writer recovery can never fire. Add a final writer probe before returning null, so replica-lag and degraded-buffer states are still served. Tests: - `metadataRouteOperationsLogging.test.ts` (new): 7 assertions — 4× non-success kind logs the warn, 1× happy path stays silent, 1× thrown-error branch unaffected, 1× missing-args short-circuit. - `mollifierMutateWithFallback.test.ts`: +2 tests for the `\!buffer + writer hit/miss` paths. - `mollifierResolveRunForMutation.test.ts`: +4 tests for the writer fallback paths (replica+buffer miss → writer hit, \!buffer + writer hit, all-miss legitimate 404, replica-hit short-circuits writer). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9ee389d commit c1c4d6a

6 files changed

Lines changed: 320 additions & 23 deletions

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,12 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
7171
// `_ingestion_only` flag: a synthetic body that has the operations
7272
// promoted to top-level `operations` so the service applies them to
7373
// `targetRunId` directly.
74-
async function routeOperationsToRun(
74+
// Exported so the silent-failure logging behaviour can be unit-tested.
75+
// The route handler itself isn't an attractive test target (createActionApiRoute
76+
// wraps it in auth + body parsing + error-handler middleware), but the
77+
// fan-out helper carries the load-bearing logic — including the ops-
78+
// visibility branch this change adds.
79+
export async function routeOperationsToRun(
7580
targetRunId: string | undefined,
7681
operations: RunMetadataChangeOperation[] | undefined,
7782
env: AuthenticatedEnvironment
@@ -118,7 +123,7 @@ async function routeOperationsToRun(
118123
// Best-effort buffer fallback. Wrap so a transient Redis throw on
119124
// this auxiliary op can't 500 the request after the primary mutation
120125
// already succeeded.
121-
const [bufferError] = await tryCatch(
126+
const [bufferError, bufferOutcome] = await tryCatch(
122127
applyMetadataMutationToBufferedRun({
123128
runId: targetRunId,
124129
environmentId: env.id,
@@ -132,6 +137,22 @@ async function routeOperationsToRun(
132137
targetRunId,
133138
error: bufferError instanceof Error ? bufferError.message : String(bufferError),
134139
});
140+
return;
141+
}
142+
// `applyMetadataMutationToBufferedRun` reports non-throw failures via
143+
// its returned outcome kind: `not_found`, `busy`, `version_exhausted`,
144+
// `metadata_too_large`. Without inspecting `.kind`, the parent/root
145+
// operation can silently disappear — no PG row landed it (handled
146+
// above) and the buffer rejected it for one of these reasons but the
147+
// helper returned cleanly. Surface a warn log per non-success branch
148+
// so ops can trace why a parent/root op went missing. The customer's
149+
// primary mutation has already succeeded by this point; this remains
150+
// best-effort, so we still don't bubble these to the response.
151+
if (bufferOutcome && bufferOutcome.kind !== "applied") {
152+
logger.warn("metadata route: parent/root buffer op did not apply", {
153+
targetRunId,
154+
kind: bufferOutcome.kind,
155+
});
135156
}
136157
}
137158

apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,19 @@ export async function mutateWithFallback<TResponse>(
9191
}
9292

9393
if (!buffer) {
94-
// No buffer configured (mollifier disabled or boot-time error). PG
95-
// missed; nothing else to consult.
94+
// No buffer configured (mollifier disabled or boot-time error). The
95+
// pre-PR mutation routes read from the writer directly, so a freshly-
96+
// created PG row was always visible regardless of replication lag.
97+
// Now that the read moved to the replica (line 87) for the offload,
98+
// a `!buffer` short-circuit would regress: a real PG row + replica
99+
// lag would return 404. Mirror the writer-disambiguation block below
100+
// (line 148, the buffer-says-not-found path) so degraded mode
101+
// (mollifier disabled) still matches pre-PR mutation behaviour.
102+
const writerRow = await findRunInPg(writer, input.runId, input.environmentId);
103+
if (writerRow) {
104+
const response = await input.pgMutation(writerRow);
105+
return { kind: "pg", response };
106+
}
96107
return { kind: "not_found" };
97108
}
98109

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
2-
import { $replica as defaultReplica } from "~/db.server";
2+
import { $replica as defaultReplica, prisma as defaultWriter } from "~/db.server";
33
import { getMollifierBuffer as defaultGetBuffer } from "./mollifierBuffer.server";
44

55
// Discriminated-union resolver used by mutation routes' `findResource`.
@@ -16,15 +16,18 @@ export type ResolvedRunForMutation =
1616
| { source: "pg"; friendlyId: string }
1717
| { source: "buffer"; friendlyId: string };
1818

19-
export type ResolveRunForMutationDeps = {
20-
prismaReplica?: {
21-
taskRun: {
22-
findFirst(args: {
23-
where: { friendlyId: string; runtimeEnvironmentId: string };
24-
select: { friendlyId: true };
25-
}): Promise<{ friendlyId: string } | null>;
26-
};
19+
type PrismaTaskRunFindFirst = {
20+
taskRun: {
21+
findFirst(args: {
22+
where: { friendlyId: string; runtimeEnvironmentId: string };
23+
select: { friendlyId: true };
24+
}): Promise<{ friendlyId: string } | null>;
2725
};
26+
};
27+
28+
export type ResolveRunForMutationDeps = {
29+
prismaReplica?: PrismaTaskRunFindFirst;
30+
prismaWriter?: PrismaTaskRunFindFirst;
2831
getBuffer?: () => MollifierBuffer | null;
2932
};
3033

@@ -35,6 +38,7 @@ export async function resolveRunForMutation(input: {
3538
deps?: ResolveRunForMutationDeps;
3639
}): Promise<ResolvedRunForMutation | null> {
3740
const replica = input.deps?.prismaReplica ?? defaultReplica;
41+
const writer = input.deps?.prismaWriter ?? defaultWriter;
3842
const getBuffer = input.deps?.getBuffer ?? defaultGetBuffer;
3943

4044
const pgRun = await replica.taskRun.findFirst({
@@ -44,15 +48,35 @@ export async function resolveRunForMutation(input: {
4448
if (pgRun) return { source: "pg", friendlyId: pgRun.friendlyId };
4549

4650
const buffer = getBuffer();
47-
if (!buffer) return null;
48-
49-
const entry = await buffer.getEntry(input.runParam);
50-
if (
51-
entry &&
52-
entry.envId === input.environmentId &&
53-
entry.orgId === input.organizationId
54-
) {
55-
return { source: "buffer", friendlyId: input.runParam };
51+
52+
if (buffer) {
53+
const entry = await buffer.getEntry(input.runParam);
54+
if (
55+
entry &&
56+
entry.envId === input.environmentId &&
57+
entry.orgId === input.organizationId
58+
) {
59+
return { source: "buffer", friendlyId: input.runParam };
60+
}
5661
}
62+
63+
// Replica + buffer both missed. Before declaring "not found" (which the
64+
// route builder converts to a hard 404 *before* the action handler runs,
65+
// so the downstream `mutateWithFallback` writer-recovery never gets a
66+
// chance to fire), do one final probe against the writer. This catches
67+
// two cases:
68+
// 1. Replica lag on a freshly-created PG row.
69+
// 2. A buffered run that materialised in the window between the
70+
// replica read and our buffer check (the entry was ack'd and the
71+
// hash is mid-grace-TTL but our getEntry returned null due to
72+
// lookup-by-friendlyId timing).
73+
// Without this, the resolver returns null in degraded states that the
74+
// downstream mutateWithFallback flow would otherwise handle correctly.
75+
const writerRun = await writer.taskRun.findFirst({
76+
where: { friendlyId: input.runParam, runtimeEnvironmentId: input.environmentId },
77+
select: { friendlyId: true },
78+
});
79+
if (writerRun) return { source: "pg", friendlyId: writerRun.friendlyId };
80+
5781
return null;
5882
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
// `vi.mock` factories are hoisted above regular top-level `const`s, so
4+
// any cross-references between the spy/mock fns and the factories have
5+
// to live inside `vi.hoisted`. See `mollifierDrainerHandler.test.ts`
6+
// for the same pattern.
7+
const { warnSpy, applyMetadataMutationToBufferedRunMock } = vi.hoisted(() => ({
8+
warnSpy: vi.fn(),
9+
applyMetadataMutationToBufferedRunMock: vi.fn(),
10+
}));
11+
12+
// The route module's import graph (createActionApiRoute, the env, the
13+
// services singleton) is heavier than the helper actually needs. Stub
14+
// the leaf modules so only the helper under test executes; the route's
15+
// top-level `createActionApiRoute(...)` call runs against the stubbed
16+
// builder and never touches platform.v3.server / prisma.
17+
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
18+
vi.mock("~/env.server", () => ({
19+
env: { TASK_RUN_METADATA_MAXIMUM_SIZE: 256 * 1024 },
20+
}));
21+
vi.mock("~/services/routeBuilders/apiBuilder.server", () => ({
22+
createActionApiRoute: () => ({ action: vi.fn() }),
23+
}));
24+
vi.mock("~/services/apiAuth.server", () => ({
25+
authenticateApiRequest: vi.fn(),
26+
}));
27+
vi.mock("~/v3/services/common.server", () => ({
28+
ServiceValidationError: class extends Error {
29+
constructor(public override message: string, public status?: number) {
30+
super(message);
31+
}
32+
},
33+
}));
34+
vi.mock("~/services/metadata/updateMetadataInstance.server", () => ({
35+
updateMetadataService: { call: vi.fn(async () => undefined) },
36+
}));
37+
vi.mock("~/v3/mollifier/applyMetadataMutation.server", () => ({
38+
applyMetadataMutationToBufferedRun: applyMetadataMutationToBufferedRunMock,
39+
}));
40+
vi.mock("~/v3/mollifier/readFallback.server", () => ({
41+
findRunByIdWithMollifierFallback: vi.fn(),
42+
}));
43+
vi.mock("~/services/logger.server", () => ({
44+
logger: {
45+
warn: warnSpy,
46+
info: vi.fn(),
47+
error: vi.fn(),
48+
debug: vi.fn(),
49+
},
50+
}));
51+
52+
import { routeOperationsToRun } from "~/routes/api.v1.runs.$runId.metadata";
53+
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
54+
55+
const env = {
56+
id: "env_a",
57+
organizationId: "org_1",
58+
} as unknown as AuthenticatedEnvironment;
59+
60+
const opsFixture = [{ type: "set", key: "k", value: "v" }] as Parameters<
61+
typeof routeOperationsToRun
62+
>[1];
63+
64+
describe("routeOperationsToRun — non-throw buffer outcome logging", () => {
65+
// Each non-success outcome `applyMetadataMutationToBufferedRun` can
66+
// return (`not_found`, `busy`, `version_exhausted`, `metadata_too_large`)
67+
// must produce a warn log so ops can trace silent drops. Without this
68+
// branch the parent/root operation would disappear with no record —
69+
// `tryCatch` only catches throws, and the outcome object was
70+
// previously ignored.
71+
for (const kind of ["not_found", "busy", "version_exhausted", "metadata_too_large"] as const) {
72+
it(`warn-logs when buffer outcome is { kind: "${kind}" }`, async () => {
73+
warnSpy.mockClear();
74+
applyMetadataMutationToBufferedRunMock.mockResolvedValueOnce({ kind });
75+
76+
await routeOperationsToRun("run_buffered_1", opsFixture, env);
77+
78+
expect(warnSpy).toHaveBeenCalledWith(
79+
"metadata route: parent/root buffer op did not apply",
80+
expect.objectContaining({ targetRunId: "run_buffered_1", kind }),
81+
);
82+
});
83+
}
84+
85+
it("does NOT warn on the happy path (kind: 'applied')", async () => {
86+
warnSpy.mockClear();
87+
applyMetadataMutationToBufferedRunMock.mockResolvedValueOnce({
88+
kind: "applied",
89+
newMetadata: { k: "v" },
90+
parentTaskRunFriendlyId: undefined,
91+
rootTaskRunFriendlyId: undefined,
92+
});
93+
94+
await routeOperationsToRun("run_buffered_1", opsFixture, env);
95+
96+
expect(warnSpy).not.toHaveBeenCalledWith(
97+
"metadata route: parent/root buffer op did not apply",
98+
expect.anything(),
99+
);
100+
});
101+
102+
it("warn-logs once when the helper throws (the pre-existing throw branch keeps working)", async () => {
103+
warnSpy.mockClear();
104+
applyMetadataMutationToBufferedRunMock.mockRejectedValueOnce(new Error("ECONNRESET"));
105+
106+
await routeOperationsToRun("run_buffered_1", opsFixture, env);
107+
108+
// Pre-existing branch — the catch logs `buffer fallback for parent/root
109+
// op failed`. The new non-throw branch must NOT also fire (we return
110+
// early on bufferError).
111+
expect(warnSpy).toHaveBeenCalledWith(
112+
"metadata route: buffer fallback for parent/root op failed",
113+
expect.objectContaining({ targetRunId: "run_buffered_1" }),
114+
);
115+
expect(warnSpy).not.toHaveBeenCalledWith(
116+
"metadata route: parent/root buffer op did not apply",
117+
expect.anything(),
118+
);
119+
});
120+
121+
it("skips both PG and buffer when targetRunId is missing or operations is empty", async () => {
122+
warnSpy.mockClear();
123+
applyMetadataMutationToBufferedRunMock.mockClear();
124+
125+
await routeOperationsToRun(undefined, opsFixture, env);
126+
await routeOperationsToRun("run_x", undefined, env);
127+
await routeOperationsToRun("run_x", [], env);
128+
129+
expect(applyMetadataMutationToBufferedRunMock).not.toHaveBeenCalled();
130+
expect(warnSpy).not.toHaveBeenCalled();
131+
});
132+
});

apps/webapp/test/mollifierMutateWithFallback.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,40 @@ describe("mutateWithFallback", () => {
159159
expect(ctx?.bufferEntry?.orgId).toBe("org_1");
160160
});
161161

162+
// Symmetric writer-fallback in the `!buffer` short-circuit. Without
163+
// this, mollifier-disabled deployments (or boot-time buffer init
164+
// failures) would regress the pre-PR mutation routes — those read
165+
// from the writer directly, so a fresh PG row was always visible.
166+
// The replica offload introduced here moves the read to the lagging
167+
// follower; if the buffer isn't available to disambiguate, we still
168+
// probe the writer before returning 404.
169+
it("replica miss + !buffer + writer hit → pgMutation (mollifier-disabled mode recovery)", async () => {
170+
const row = fakeRun({ friendlyId: "run_1" });
171+
const pgMutation = vi.fn(async () => "pg-recovered-no-buffer");
172+
const result = await mutateWithFallback({
173+
...baseInput,
174+
pgMutation,
175+
synthesisedResponse: () => "snap",
176+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
177+
prismaWriter: fakePrisma([row]) as unknown as typeof import("~/db.server").prisma,
178+
getBuffer: () => null,
179+
});
180+
expect(result).toEqual({ kind: "pg", response: "pg-recovered-no-buffer" });
181+
expect(pgMutation).toHaveBeenCalledWith(row);
182+
});
183+
184+
it("replica miss + !buffer + writer miss → not_found (genuine 404 in mollifier-disabled mode)", async () => {
185+
const result = await mutateWithFallback({
186+
...baseInput,
187+
pgMutation: async () => "pg",
188+
synthesisedResponse: () => "snap",
189+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
190+
prismaWriter: fakePrisma([null]) as unknown as typeof import("~/db.server").prisma,
191+
getBuffer: () => null,
192+
});
193+
expect(result).toEqual({ kind: "not_found" });
194+
});
195+
162196
it("replica miss + buffer not_found + writer miss → not_found", async () => {
163197
const result = await mutateWithFallback({
164198
...baseInput,

0 commit comments

Comments
 (0)