@@ -33,6 +33,7 @@ import {
3333import { Worker } from "@trigger.dev/redis-worker" ;
3434import { assertNever } from "assert-never" ;
3535import { EventEmitter } from "node:events" ;
36+ import { setTimeout } from "node:timers/promises" ;
3637import { BatchQueue } from "../batch-queue/index.js" ;
3738import type {
3839 BatchItem ,
@@ -94,6 +95,7 @@ export class RunEngine {
9495 private tracer : Tracer ;
9596 private meter : Meter ;
9697 private snapshotsSinceReplicaMissCounter : Counter ;
98+ private snapshotsSinceReplicaRetryDelay : { minMs : number ; maxMs : number } ;
9799 private heartbeatTimeouts : HeartbeatTimeouts ;
98100 private repairSnapshotTimeoutMs : number ;
99101 private batchQueue : BatchQueue ;
@@ -282,10 +284,15 @@ export class RunEngine {
282284 "run_engine.snapshots_since.replica_miss" ,
283285 {
284286 description :
285- "getSnapshotsSince reads where the since snapshot was not yet on the read replica and was served from the primary" ,
287+ "getSnapshotsSince reads where the since snapshot was not yet on the read replica, recovered via a replica retry or served from the primary" ,
286288 }
287289 ) ;
288290
291+ this . snapshotsSinceReplicaRetryDelay = options . readReplicaSnapshotsSinceRetryDelay ?? {
292+ minMs : 50 ,
293+ maxMs : 200 ,
294+ } ;
295+
289296 const defaultHeartbeatTimeouts : HeartbeatTimeouts = {
290297 PENDING_EXECUTING : 60_000 ,
291298 PENDING_CANCEL : 60_000 ,
@@ -1948,11 +1955,36 @@ export class RunEngine {
19481955 } catch ( e ) {
19491956 if ( useReplica && e instanceof ExecutionSnapshotNotFoundError ) {
19501957 // Replica lag: the runner learned this snapshot id from the writer before the
1951- // replica caught up. Serve from the writer; only count/warn if the writer has it
1952- // (a permanent miss is a real error, not lag).
1958+ // replica caught up. Give the replica one jittered retry, then serve from the
1959+ // writer; only count/warn if a retry succeeds (a permanent miss is a real error,
1960+ // not lag).
1961+ const { minMs, maxMs } = this . snapshotsSinceReplicaRetryDelay ;
1962+ if ( maxMs > 0 ) {
1963+ await setTimeout ( minMs + Math . random ( ) * Math . max ( 0 , maxMs - minMs ) ) ;
1964+ try {
1965+ const result = await query ( this . readOnlyPrisma ) ;
1966+ this . snapshotsSinceReplicaMissCounter . add ( 1 , { outcome : "replica_retry" } ) ;
1967+ return result ;
1968+ } catch ( replicaRetryError ) {
1969+ if ( ! ( replicaRetryError instanceof ExecutionSnapshotNotFoundError ) ) {
1970+ this . logger . error ( "Failed to getSnapshotsSince" , {
1971+ message :
1972+ replicaRetryError instanceof Error
1973+ ? replicaRetryError . message
1974+ : replicaRetryError ,
1975+ runId,
1976+ snapshotId,
1977+ retriedFromReplica : true ,
1978+ } ) ;
1979+ return null ;
1980+ }
1981+ // still not on the replica - fall through to the primary
1982+ }
1983+ }
1984+
19531985 try {
19541986 const result = await query ( this . prisma ) ;
1955- this . snapshotsSinceReplicaMissCounter . add ( 1 ) ;
1987+ this . snapshotsSinceReplicaMissCounter . add ( 1 , { outcome : "primary" } ) ;
19561988 this . logger . warn ( "getSnapshotsSince: snapshot not yet on replica, served from primary" , {
19571989 runId,
19581990 snapshotId,
0 commit comments