Skip to content

Commit f4dac20

Browse files
d-csclaude
andcommitted
fix(run-engine,testcontainers): address review - normalize retry bounds, tie-stable test assertions, shared clone/drop helpers
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 1157227 commit f4dac20

3 files changed

Lines changed: 64 additions & 43 deletions

File tree

internal-packages/run-engine/src/engine/index.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,9 +288,12 @@ export class RunEngine {
288288
}
289289
);
290290

291-
this.snapshotsSinceReplicaRetryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? {
292-
minMs: 50,
293-
maxMs: 200,
291+
// Normalize the bounds, but keep maxMs <= 0 meaning "skip the replica retry".
292+
const retryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? { minMs: 50, maxMs: 200 };
293+
const retryMinMs = Math.max(0, retryDelay.minMs);
294+
this.snapshotsSinceReplicaRetryDelay = {
295+
minMs: retryDelay.maxMs > 0 ? Math.min(retryMinMs, retryDelay.maxMs) : retryMinMs,
296+
maxMs: retryDelay.maxMs,
294297
};
295298

296299
const defaultHeartbeatTimeouts: HeartbeatTimeouts = {

internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,11 @@ describe("RunEngine getSnapshotsSince", () => {
890890
);
891891
expect(expectedSnapshots.length).toBeGreaterThan(0);
892892
expect(result!.length).toBe(expectedSnapshots.length);
893-
expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id));
893+
// Compare as sorted lists: same-millisecond snapshots have unspecified relative
894+
// order in both the engine query and this test's query.
895+
expect(result!.map((s) => s.snapshot.id).sort()).toEqual(
896+
expectedSnapshots.map((s) => s.id).sort()
897+
);
894898

895899
expect(
896900
await getCounterValue("run_engine.snapshots_since.replica_miss", { outcome: "primary" })
@@ -1010,7 +1014,11 @@ describe("RunEngine getSnapshotsSince", () => {
10101014
);
10111015
expect(expectedSnapshots.length).toBeGreaterThan(0);
10121016
expect(result!.length).toBe(expectedSnapshots.length);
1013-
expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id));
1017+
// Compare as sorted lists: same-millisecond snapshots have unspecified relative
1018+
// order in both the engine query and this test's query.
1019+
expect(result!.map((s) => s.snapshot.id).sort()).toEqual(
1020+
expectedSnapshots.map((s) => s.id).sort()
1021+
);
10141022

10151023
// Recovered on the replica retry - the writer was never consulted.
10161024
expect(
@@ -1183,7 +1191,11 @@ describe("RunEngine getSnapshotsSince", () => {
11831191
(s) => s.createdAt.getTime() > since.createdAt.getTime() && s.id !== tail.id
11841192
);
11851193
expect(result!.map((s) => s.snapshot.id)).not.toContain(tail.id);
1186-
expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id));
1194+
// Compare as sorted lists: same-millisecond snapshots have unspecified relative
1195+
// order in both the engine query and this test's query.
1196+
expect(result!.map((s) => s.snapshot.id).sort()).toEqual(
1197+
expectedSnapshots.map((s) => s.id).sort()
1198+
);
11871199

11881200
expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0);
11891201
} finally {
@@ -1281,7 +1293,11 @@ describe("RunEngine getSnapshotsSince", () => {
12811293
(s) => s.createdAt.getTime() > since.createdAt.getTime()
12821294
);
12831295
expect(expectedSnapshots.length).toBeGreaterThan(0);
1284-
expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id));
1296+
// Compare as sorted lists: same-millisecond snapshots have unspecified relative
1297+
// order in both the engine query and this test's query.
1298+
expect(result!.map((s) => s.snapshot.id).sort()).toEqual(
1299+
expectedSnapshots.map((s) => s.id).sort()
1300+
);
12851301

12861302
expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0);
12871303
} finally {
@@ -1381,7 +1397,11 @@ describe("RunEngine getSnapshotsSince", () => {
13811397
(s) => s.createdAt.getTime() > since.createdAt.getTime()
13821398
);
13831399
expect(expectedSnapshots.length).toBeGreaterThan(0);
1384-
expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id));
1400+
// Compare as sorted lists: same-millisecond snapshots have unspecified relative
1401+
// order in both the engine query and this test's query.
1402+
expect(result!.map((s) => s.snapshot.id).sort()).toEqual(
1403+
expectedSnapshots.map((s) => s.id).sort()
1404+
);
13851405

13861406
expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0);
13871407
} finally {

internal-packages/testcontainers/src/index.ts

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,7 @@ const clonedPostgresContainer = async ({}, use: Use<StartedPostgreSqlContainer>)
181181
const baseUri = container.getConnectionUri();
182182
const cloneDb = `test_${pgCloneCounter++}`;
183183

184-
const admin = new PrismaClient({
185-
datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } },
186-
});
187-
await admin.$executeRawUnsafe(`CREATE DATABASE "${cloneDb}" TEMPLATE "${POSTGRES_TEMPLATE_DB}"`);
188-
await admin.$disconnect();
184+
await createDatabaseFromTemplate(baseUri, cloneDb);
189185

190186
const cloneUri = postgresUriWithDatabase(baseUri, cloneDb);
191187
const view = new Proxy(container, {
@@ -200,19 +196,36 @@ const clonedPostgresContainer = async ({}, use: Use<StartedPostgreSqlContainer>)
200196
try {
201197
await use(view);
202198
} finally {
203-
// Best-effort drop so clones don't pile up in the worker's pg over a long suite. WITH (FORCE)
204-
// terminates any lingering backends (pg 13+). A failed drop is harmless - the whole container is
205-
// reaped on worker exit - so we never let cleanup fail the test.
206-
const cleanup = new PrismaClient({
207-
datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } },
208-
});
209-
try {
210-
await cleanup.$executeRawUnsafe(`DROP DATABASE IF EXISTS "${cloneDb}" WITH (FORCE)`);
211-
} catch {
212-
// ignore - reaped with the container anyway
213-
} finally {
214-
await cleanup.$disconnect();
215-
}
199+
await dropCloneDatabase(baseUri, cloneDb);
200+
}
201+
};
202+
203+
const createDatabaseFromTemplate = async (baseUri: string, cloneDb: string) => {
204+
const admin = new PrismaClient({
205+
datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } },
206+
});
207+
try {
208+
await admin.$executeRawUnsafe(
209+
`CREATE DATABASE "${cloneDb}" TEMPLATE "${POSTGRES_TEMPLATE_DB}"`
210+
);
211+
} finally {
212+
await admin.$disconnect();
213+
}
214+
};
215+
216+
// Best-effort drop so clones don't pile up in the worker's pg over a long suite. WITH (FORCE)
217+
// terminates any lingering backends (pg 13+). A failed drop is harmless - the whole container is
218+
// reaped on worker exit - so we never let cleanup fail the test.
219+
const dropCloneDatabase = async (baseUri: string, cloneDb: string) => {
220+
const cleanup = new PrismaClient({
221+
datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } },
222+
});
223+
try {
224+
await cleanup.$executeRawUnsafe(`DROP DATABASE IF EXISTS "${cloneDb}" WITH (FORCE)`);
225+
} catch {
226+
// ignore - reaped with the container anyway
227+
} finally {
228+
await cleanup.$disconnect();
216229
}
217230
};
218231

@@ -224,11 +237,7 @@ const schemaOnlyPrismaFixture = async ({}: {}, use: Use<PrismaClient>) => {
224237
const baseUri = container.getConnectionUri();
225238
const cloneDb = `schema_only_${pgCloneCounter++}`;
226239

227-
const admin = new PrismaClient({
228-
datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } },
229-
});
230-
await admin.$executeRawUnsafe(`CREATE DATABASE "${cloneDb}" TEMPLATE "${POSTGRES_TEMPLATE_DB}"`);
231-
await admin.$disconnect();
240+
await createDatabaseFromTemplate(baseUri, cloneDb);
232241

233242
const prisma = new PrismaClient({
234243
datasources: { db: { url: postgresUriWithDatabase(baseUri, cloneDb) } },
@@ -237,18 +246,7 @@ const schemaOnlyPrismaFixture = async ({}: {}, use: Use<PrismaClient>) => {
237246
await use(prisma);
238247
} finally {
239248
await logCleanup("schemaOnlyPrisma", prisma.$disconnect());
240-
// Best-effort drop, mirroring clonedPostgresContainer cleanup - the container is reaped on
241-
// worker exit anyway, so never let cleanup fail the test.
242-
const cleanup = new PrismaClient({
243-
datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } },
244-
});
245-
try {
246-
await cleanup.$executeRawUnsafe(`DROP DATABASE IF EXISTS "${cloneDb}" WITH (FORCE)`);
247-
} catch {
248-
// ignore - reaped with the container anyway
249-
} finally {
250-
await cleanup.$disconnect();
251-
}
249+
await dropCloneDatabase(baseUri, cloneDb);
252250
}
253251
};
254252

0 commit comments

Comments
 (0)