Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/snapshots-since-replica-primary-fallback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Run snapshot polling no longer errors or pays extra latency when the database read replica hasn't yet replicated the snapshot the runner is polling from (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the replica and served from the primary if it still hasn't caught up. Polling also now rejects a since-snapshot id that doesn't belong to the run being polled.
2 changes: 2 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,8 @@ const EnvironmentSchema = z
.default("info"),
RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"),
RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED: z.string().default("0"),
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS: z.coerce.number().int().default(50),
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS: z.coerce.number().int().default(200),
RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ: z.string().default("0"),

/** How long should the presence ttl last */
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ function createRunEngine() {
env.RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM === "1",
readReplicaSnapshotsSinceEnabled:
env.RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED === "1",
readReplicaSnapshotsSinceRetryDelay: {
minMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS,
maxMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS,
},
worker: {
disabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
workers: env.RUN_ENGINE_WORKER_COUNT,
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
},
"devDependencies": {
"@internal/testcontainers": "workspace:*",
"@opentelemetry/sdk-metrics": "2.7.1",
"@types/seedrandom": "^3.0.8",
"rimraf": "6.0.1"
},
Expand Down
7 changes: 7 additions & 0 deletions internal-packages/run-engine/src/engine/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,10 @@ export class RunOneTimeUseTokenError extends Error {
this.name = "RunOneTimeUseTokenError";
}
}

export class ExecutionSnapshotNotFoundError extends Error {
constructor(public readonly snapshotId: string) {
super(`No execution snapshot found for id ${snapshotId}`);
this.name = "ExecutionSnapshotNotFoundError";
}
}
92 changes: 86 additions & 6 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createRedisClient, Redis } from "@internal/redis";
import { getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
import { type Counter, getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
import { Logger } from "@trigger.dev/core/logger";
import {
CheckpointInput,
Expand Down Expand Up @@ -33,6 +33,7 @@ import {
import { Worker } from "@trigger.dev/redis-worker";
import { assertNever } from "assert-never";
import { EventEmitter } from "node:events";
import { setTimeout } from "node:timers/promises";
import { BatchQueue } from "../batch-queue/index.js";
import type {
BatchItem,
Expand All @@ -46,7 +47,12 @@ import { RunQueue } from "../run-queue/index.js";
import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js";
import { AuthenticatedEnvironment, MinimalAuthenticatedEnvironment } from "../shared/index.js";
import { BillingCache } from "./billingCache.js";
import { NotImplementedError, RunDuplicateIdempotencyKeyError, RunOneTimeUseTokenError } from "./errors.js";
import {
ExecutionSnapshotNotFoundError,
NotImplementedError,
RunDuplicateIdempotencyKeyError,
RunOneTimeUseTokenError,
} from "./errors.js";
import { EventBus, EventBusEvents } from "./eventBus.js";
import { RunLocker } from "./locking.js";
import { getFinalRunStatuses } from "./statuses.js";
Expand Down Expand Up @@ -88,6 +94,8 @@ export class RunEngine {
private logger: Logger;
private tracer: Tracer;
private meter: Meter;
private snapshotsSinceReplicaMissCounter: Counter;
private snapshotsSinceReplicaRetryDelay: { minMs: number; maxMs: number };
private heartbeatTimeouts: HeartbeatTimeouts;
private repairSnapshotTimeoutMs: number;
private batchQueue: BatchQueue;
Expand Down Expand Up @@ -272,6 +280,22 @@ export class RunEngine {
this.tracer = options.tracer;
this.meter = options.meter ?? getMeter("run-engine");

this.snapshotsSinceReplicaMissCounter = this.meter.createCounter(
"run_engine.snapshots_since.replica_miss",
{
description:
"getSnapshotsSince reads where the since snapshot was not yet on the read replica, recovered via a replica retry or served from the primary",
}
);

// Normalize the bounds, but keep maxMs <= 0 meaning "skip the replica retry".
const retryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? { minMs: 50, maxMs: 200 };
const retryMinMs = Math.max(0, retryDelay.minMs);
this.snapshotsSinceReplicaRetryDelay = {
minMs: retryDelay.maxMs > 0 ? Math.min(retryMinMs, retryDelay.maxMs) : retryMinMs,
maxMs: retryDelay.maxMs,
};

const defaultHeartbeatTimeouts: HeartbeatTimeouts = {
PENDING_EXECUTING: 60_000,
PENDING_CANCEL: 60_000,
Expand Down Expand Up @@ -1918,13 +1942,69 @@ export class RunEngine {
snapshotId: string;
tx?: PrismaClientOrTransaction;
}): Promise<RunExecutionData[] | null> {
const prisma =
tx ?? (this.options.readReplicaSnapshotsSinceEnabled ? this.readOnlyPrisma : this.prisma);
const useReplica =
!tx &&
this.options.readReplicaSnapshotsSinceEnabled === true &&
this.readOnlyPrisma !== this.prisma;
const prisma = tx ?? (useReplica ? this.readOnlyPrisma : this.prisma);

const query = async (client: PrismaClientOrTransaction) => {
const snapshots = await getExecutionSnapshotsSince(client, runId, snapshotId);
return snapshots.map(executionDataFromSnapshot);
};

try {
const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId);
return snapshots.map(executionDataFromSnapshot);
return await query(prisma);
} catch (e) {
if (useReplica && e instanceof ExecutionSnapshotNotFoundError) {
// Replica lag: the runner learned this snapshot id from the writer before the
// replica caught up. Give the replica one jittered retry; if it's still missing,
// serve from the writer. Only not-found errors get this treatment - any other
// replica failure stays an error rather than shifting read load to the writer.
// A miss on the writer too is a real error, not lag.
const { minMs, maxMs } = this.snapshotsSinceReplicaRetryDelay;
if (maxMs > 0) {
await setTimeout(minMs + Math.random() * Math.max(0, maxMs - minMs));
try {
const result = await query(this.readOnlyPrisma);
this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "replica_retry" });
return result;
} catch (replicaRetryError) {
if (!(replicaRetryError instanceof ExecutionSnapshotNotFoundError)) {
this.logger.error("Failed to getSnapshotsSince", {
message:
replicaRetryError instanceof Error
? replicaRetryError.message
: replicaRetryError,
runId,
snapshotId,
failedDuring: "replica_retry",
});
return null;
}
// still not on the replica - fall through to the primary
}
}

try {
const result = await query(this.prisma);
this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "primary" });
this.logger.warn("getSnapshotsSince: snapshot not yet on replica, served from primary", {
runId,
snapshotId,
});
return result;
} catch (retryError) {
this.logger.error("Failed to getSnapshotsSince", {
message: retryError instanceof Error ? retryError.message : retryError,
runId,
snapshotId,
failedDuring: "primary_fallback",
});
return null;
}
}

this.logger.error("Failed to getSnapshotsSince", {
message: e instanceof Error ? e.message : e,
runId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
TaskRunStatus,
Waitpoint,
} from "@trigger.dev/database";
import { ExecutionSnapshotNotFoundError } from "../errors.js";
import { HeartbeatTimeouts } from "../types.js";
import { SystemResources } from "./systems.js";

Expand Down Expand Up @@ -273,12 +274,12 @@ export async function getExecutionSnapshotsSince(
): Promise<EnhancedExecutionSnapshot[]> {
// Step 1: Find the createdAt of the sinceSnapshotId
const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({
where: { id: sinceSnapshotId },
where: { id: sinceSnapshotId, runId },
select: { createdAt: true },
});

if (!sinceSnapshot) {
throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`);
throw new ExecutionSnapshotNotFoundError(sinceSnapshotId);
}

// Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion
Expand Down
Loading
Loading