Skip to content

Commit 0cbaddc

Browse files
d-csclaude
andcommitted
feat(run-engine): jittered replica retry before primary fallback in getSnapshotsSince
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 5b81551 commit 0cbaddc

5 files changed

Lines changed: 48 additions & 5 deletions

File tree

.server-changes/snapshots-since-replica-primary-fallback.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ type: improvement
55

66
Run snapshot polling no longer errors or pays extra latency when the database read replica
77
hasn't yet replicated the snapshot the runner is polling from
8-
(`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is served from the primary instead.
8+
(`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the
9+
replica and served from the primary if it still hasn't caught up.

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,8 @@ const EnvironmentSchema = z
950950
.default("info"),
951951
RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"),
952952
RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED: z.string().default("0"),
953+
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS: z.coerce.number().int().default(50),
954+
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS: z.coerce.number().int().default(200),
953955
RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ: z.string().default("0"),
954956

955957
/** How long should the presence ttl last */

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ function createRunEngine() {
2222
env.RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM === "1",
2323
readReplicaSnapshotsSinceEnabled:
2424
env.RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED === "1",
25+
readReplicaSnapshotsSinceRetryDelay: {
26+
minMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS,
27+
maxMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS,
28+
},
2529
worker: {
2630
disabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
2731
workers: env.RUN_ENGINE_WORKER_COUNT,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
import { Worker } from "@trigger.dev/redis-worker";
3434
import { assertNever } from "assert-never";
3535
import { EventEmitter } from "node:events";
36+
import { setTimeout } from "node:timers/promises";
3637
import { BatchQueue } from "../batch-queue/index.js";
3738
import type {
3839
BatchItem,
@@ -94,6 +95,7 @@ export class RunEngine {
9495
private tracer: Tracer;
9596
private meter: Meter;
9697
private snapshotsSinceReplicaMissCounter: Counter;
98+
private snapshotsSinceReplicaRetryDelay: { minMs: number; maxMs: number };
9799
private heartbeatTimeouts: HeartbeatTimeouts;
98100
private repairSnapshotTimeoutMs: number;
99101
private batchQueue: BatchQueue;
@@ -282,10 +284,15 @@ export class RunEngine {
282284
"run_engine.snapshots_since.replica_miss",
283285
{
284286
description:
285-
"getSnapshotsSince reads where the since snapshot was not yet on the read replica and was served from the primary",
287+
"getSnapshotsSince reads where the since snapshot was not yet on the read replica, recovered via a replica retry or served from the primary",
286288
}
287289
);
288290

291+
this.snapshotsSinceReplicaRetryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? {
292+
minMs: 50,
293+
maxMs: 200,
294+
};
295+
289296
const defaultHeartbeatTimeouts: HeartbeatTimeouts = {
290297
PENDING_EXECUTING: 60_000,
291298
PENDING_CANCEL: 60_000,
@@ -1948,11 +1955,36 @@ export class RunEngine {
19481955
} catch (e) {
19491956
if (useReplica && e instanceof ExecutionSnapshotNotFoundError) {
19501957
// Replica lag: the runner learned this snapshot id from the writer before the
1951-
// replica caught up. Serve from the writer; only count/warn if the writer has it
1952-
// (a permanent miss is a real error, not lag).
1958+
// replica caught up. Give the replica one jittered retry, then serve from the
1959+
// writer; only count/warn if a retry succeeds (a permanent miss is a real error,
1960+
// not lag).
1961+
const { minMs, maxMs } = this.snapshotsSinceReplicaRetryDelay;
1962+
if (maxMs > 0) {
1963+
await setTimeout(minMs + Math.random() * Math.max(0, maxMs - minMs));
1964+
try {
1965+
const result = await query(this.readOnlyPrisma);
1966+
this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "replica_retry" });
1967+
return result;
1968+
} catch (replicaRetryError) {
1969+
if (!(replicaRetryError instanceof ExecutionSnapshotNotFoundError)) {
1970+
this.logger.error("Failed to getSnapshotsSince", {
1971+
message:
1972+
replicaRetryError instanceof Error
1973+
? replicaRetryError.message
1974+
: replicaRetryError,
1975+
runId,
1976+
snapshotId,
1977+
retriedFromReplica: true,
1978+
});
1979+
return null;
1980+
}
1981+
// still not on the replica - fall through to the primary
1982+
}
1983+
}
1984+
19531985
try {
19541986
const result = await query(this.prisma);
1955-
this.snapshotsSinceReplicaMissCounter.add(1);
1987+
this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "primary" });
19561988
this.logger.warn("getSnapshotsSince: snapshot not yet on replica, served from primary", {
19571989
runId,
19581990
snapshotId,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ export type RunEngineOptions = {
207207
* of the primary. Defaults to false. Callers passing an explicit `tx` always use
208208
* that client regardless of this flag. */
209209
readReplicaSnapshotsSinceEnabled?: boolean;
210+
/** Jittered delay bounds for the single replica retry `getSnapshotsSince` performs when
211+
* the since snapshot is not yet on the replica, before falling back to the primary.
212+
* Set maxMs to 0 to skip the replica retry and go straight to the primary. */
213+
readReplicaSnapshotsSinceRetryDelay?: { minMs: number; maxMs: number };
210214
tracer: Tracer;
211215
meter?: Meter;
212216
logger?: Logger;

0 commit comments

Comments
 (0)