Skip to content

Commit 5b81551

Browse files
d-csclaude
andcommitted
test(run-engine): replica catches up during jittered retry window (red)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 139f179 commit 5b81551

2 files changed

Lines changed: 145 additions & 4 deletions

File tree

internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,9 @@ describe("RunEngine getSnapshotsSince", () => {
808808
// primary instead of failing the poll.
809809
readOnlyPrisma: schemaOnlyPrisma,
810810
readReplicaSnapshotsSinceEnabled: true,
811+
// Tiny jitter window: the replica is permanently empty here, so the retry
812+
// always misses - no need to pay a realistic delay.
813+
readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 },
811814
worker: {
812815
redis: redisOptions,
813816
workers: 1,
@@ -889,7 +892,131 @@ describe("RunEngine getSnapshotsSince", () => {
889892
expect(result!.length).toBe(expectedSnapshots.length);
890893
expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id));
891894

892-
expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(1);
895+
expect(
896+
await getCounterValue("run_engine.snapshots_since.replica_miss", { outcome: "primary" })
897+
).toBe(1);
898+
// The replica retry never succeeds against a permanently empty replica.
899+
expect(
900+
await getCounterValue("run_engine.snapshots_since.replica_miss", {
901+
outcome: "replica_retry",
902+
})
903+
).toBe(0);
904+
} finally {
905+
await engine.quit();
906+
}
907+
}
908+
);
909+
910+
containerTest(
911+
"serves the read from the replica after a jittered retry when it catches up",
912+
async ({ prisma, schemaOnlyPrisma, redisOptions }) => {
913+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
914+
const { meter, getCounterValue } = createTestMetricsMeter();
915+
916+
const engine = new RunEngine({
917+
prisma,
918+
// The schema-only database stands in for a lagging replica: empty when the
919+
// poll first arrives, caught up by the time the jittered retry fires.
920+
readOnlyPrisma: schemaOnlyPrisma,
921+
readReplicaSnapshotsSinceEnabled: true,
922+
// A near-deterministic ~400ms window: long enough to seed the replica
923+
// mid-flight (below), short enough to keep the test fast.
924+
readReplicaSnapshotsSinceRetryDelay: { minMs: 400, maxMs: 401 },
925+
worker: {
926+
redis: redisOptions,
927+
workers: 1,
928+
tasksPerWorker: 10,
929+
pollIntervalMs: 100,
930+
},
931+
queue: {
932+
redis: redisOptions,
933+
},
934+
runLock: {
935+
redis: redisOptions,
936+
},
937+
machines: {
938+
defaultMachine: "small-1x",
939+
machines: {
940+
"small-1x": {
941+
name: "small-1x" as const,
942+
cpu: 0.5,
943+
memory: 0.5,
944+
centsPerMs: 0.0001,
945+
},
946+
},
947+
baseCostInCents: 0.0001,
948+
},
949+
tracer: trace.getTracer("test", "0.0.0"),
950+
meter,
951+
});
952+
953+
try {
954+
const taskIdentifier = "test-task";
955+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
956+
957+
const runFriendlyId = generateFriendlyId("run");
958+
const run = await engine.trigger(
959+
{
960+
number: 1,
961+
friendlyId: runFriendlyId,
962+
environment: authenticatedEnvironment,
963+
taskIdentifier,
964+
payload: "{}",
965+
payloadType: "application/json",
966+
context: {},
967+
traceContext: {},
968+
traceId: "t_replica_retry",
969+
spanId: "s_replica_retry",
970+
workerQueue: "main",
971+
queue: "task/test-task",
972+
isTest: false,
973+
tags: [],
974+
},
975+
prisma
976+
);
977+
978+
await setTimeout(500);
979+
await engine.dequeueFromWorkerQueue({
980+
consumerId: "test_replica_retry",
981+
workerQueue: "main",
982+
});
983+
984+
const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({
985+
where: { runId: run.id, isValid: true },
986+
orderBy: { createdAt: "asc" },
987+
});
988+
expect(allSnapshots.length).toBeGreaterThan(1);
989+
990+
const firstSnapshot = allSnapshots[0];
991+
992+
// Kick off the poll against the still-empty replica, then seed the replica
993+
// well before the ~400ms jittered retry fires - simulating the replica
994+
// catching up while the engine waits.
995+
const resultPromise = engine.getSnapshotsSince({
996+
runId: run.id,
997+
snapshotId: firstSnapshot.id,
998+
});
999+
await setTimeout(100);
1000+
await copySnapshotsToReplica(prisma, schemaOnlyPrisma, run.id);
1001+
const result = await resultPromise;
1002+
1003+
expect(result).not.toBeNull();
1004+
const expectedSnapshots = allSnapshots.filter(
1005+
(s) => s.createdAt.getTime() > firstSnapshot.createdAt.getTime()
1006+
);
1007+
expect(expectedSnapshots.length).toBeGreaterThan(0);
1008+
expect(result!.length).toBe(expectedSnapshots.length);
1009+
expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id));
1010+
1011+
// Recovered on the replica retry - the writer was never consulted.
1012+
expect(
1013+
await getCounterValue("run_engine.snapshots_since.replica_miss", {
1014+
outcome: "replica_retry",
1015+
})
1016+
).toBe(1);
1017+
expect(
1018+
await getCounterValue("run_engine.snapshots_since.replica_miss", { outcome: "primary" })
1019+
).toBe(0);
8931020
} finally {
8941021
await engine.quit();
8951022
}
@@ -905,6 +1032,7 @@ describe("RunEngine getSnapshotsSince", () => {
9051032
prisma,
9061033
readOnlyPrisma: schemaOnlyPrisma,
9071034
readReplicaSnapshotsSinceEnabled: true,
1035+
readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 },
9081036
worker: {
9091037
redis: redisOptions,
9101038
workers: 1,
@@ -963,6 +1091,7 @@ describe("RunEngine getSnapshotsSince", () => {
9631091
// but lags behind the primary by one snapshot (the newest one is excluded below).
9641092
readOnlyPrisma: schemaOnlyPrisma,
9651093
readReplicaSnapshotsSinceEnabled: true,
1094+
readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 },
9661095
worker: {
9671096
redis: redisOptions,
9681097
workers: 1,
@@ -1071,6 +1200,7 @@ describe("RunEngine getSnapshotsSince", () => {
10711200
// from the primary.
10721201
readOnlyPrisma: schemaOnlyPrisma,
10731202
readReplicaSnapshotsSinceEnabled: false,
1203+
readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 },
10741204
worker: {
10751205
redis: redisOptions,
10761206
workers: 1,
@@ -1169,6 +1299,7 @@ describe("RunEngine getSnapshotsSince", () => {
11691299
// the counter.
11701300
readOnlyPrisma: schemaOnlyPrisma,
11711301
readReplicaSnapshotsSinceEnabled: true,
1302+
readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 },
11721303
worker: {
11731304
redis: redisOptions,
11741305
workers: 1,

internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,28 @@ export function createTestMetricsMeter() {
7171
const meterProvider = new MeterProvider({ readers: [reader] });
7272
const meter = meterProvider.getMeter("test");
7373

74-
const getCounterValue = async (name: string): Promise<number> => {
74+
const getCounterValue = async (
75+
name: string,
76+
attributes?: Record<string, string>
77+
): Promise<number> => {
7578
await reader.forceFlush();
7679
const resourceMetrics = exporter.getMetrics();
7780

7881
// Cumulative temporality: every export batch carries the full running total,
7982
// so read the most recent batch that contains the metric. A counter that was
80-
// never added to exports no data points - treat that as 0.
83+
// never added to exports no data points - treat that as 0. When `attributes`
84+
// is provided, only data points whose attributes match are summed.
8185
for (let i = resourceMetrics.length - 1; i >= 0; i--) {
8286
for (const scopeMetrics of resourceMetrics[i].scopeMetrics) {
8387
for (const metric of scopeMetrics.metrics) {
8488
if (metric.descriptor.name === name && metric.dataPoints.length > 0) {
85-
return metric.dataPoints.reduce((sum, dp) => sum + (dp.value as number), 0);
89+
return metric.dataPoints
90+
.filter(
91+
(dp) =>
92+
!attributes ||
93+
Object.entries(attributes).every(([key, value]) => dp.attributes[key] === value)
94+
)
95+
.reduce((sum, dp) => sum + (dp.value as number), 0);
8696
}
8797
}
8898
}

0 commit comments

Comments
 (0)