Skip to content

Commit 0c1a02e

Browse files
committed
fix flaky test and external sot sync
1 parent ac261d2 commit 0c1a02e

2 files changed

Lines changed: 118 additions & 8 deletions

File tree

apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { globalPrismaClient } from "@/prisma-client";
1+
import { getPrismaClientForTenancy, globalPrismaClient, type PrismaClientTransaction } from "@/prisma-client";
22
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
33
import {
44
yupBoolean,
@@ -10,6 +10,7 @@ import {
1010
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
1111
import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
1212
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
13+
import { getTenancy, type Tenancy } from "@/lib/tenancies";
1314

1415
const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
1516

@@ -102,6 +103,112 @@ async function backfillSequenceIds() {
102103
}
103104
}
104105

106+
async function backfillSequenceIdsForTenancy(prisma: PrismaClientTransaction, tenancyId: string): Promise<boolean> {
107+
assertUuid(tenancyId, "tenancyId");
108+
let didUpdate = false;
109+
110+
const projectUserRows = await prisma.$queryRaw<{ tenancyId: string }[]>`
111+
WITH rows_to_update AS (
112+
SELECT "tenancyId", "projectUserId"
113+
FROM "ProjectUser"
114+
WHERE ("shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)
115+
AND "tenancyId" = ${tenancyId}::uuid
116+
LIMIT 1000
117+
FOR UPDATE SKIP LOCKED
118+
),
119+
updated_rows AS (
120+
UPDATE "ProjectUser" pu
121+
SET "sequenceId" = nextval('global_seq_id'),
122+
"shouldUpdateSequenceId" = FALSE
123+
FROM rows_to_update r
124+
WHERE pu."tenancyId" = r."tenancyId"
125+
AND pu."projectUserId" = r."projectUserId"
126+
RETURNING pu."tenancyId"
127+
)
128+
SELECT DISTINCT "tenancyId" FROM updated_rows
129+
`;
130+
if (projectUserRows.length > 0) {
131+
didUpdate = true;
132+
}
133+
134+
const contactChannelRows = await prisma.$queryRaw<{ tenancyId: string }[]>`
135+
WITH rows_to_update AS (
136+
SELECT "tenancyId", "projectUserId", "id"
137+
FROM "ContactChannel"
138+
WHERE ("shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)
139+
AND "tenancyId" = ${tenancyId}::uuid
140+
LIMIT 1000
141+
FOR UPDATE SKIP LOCKED
142+
),
143+
updated_rows AS (
144+
UPDATE "ContactChannel" cc
145+
SET "sequenceId" = nextval('global_seq_id'),
146+
"shouldUpdateSequenceId" = FALSE
147+
FROM rows_to_update r
148+
WHERE cc."tenancyId" = r."tenancyId"
149+
AND cc."projectUserId" = r."projectUserId"
150+
AND cc."id" = r."id"
151+
RETURNING cc."tenancyId"
152+
)
153+
SELECT DISTINCT "tenancyId" FROM updated_rows
154+
`;
155+
if (contactChannelRows.length > 0) {
156+
didUpdate = true;
157+
}
158+
159+
const deletedRowRows = await prisma.$queryRaw<{ tenancyId: string }[]>`
160+
WITH rows_to_update AS (
161+
SELECT "id", "tenancyId"
162+
FROM "DeletedRow"
163+
WHERE ("shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)
164+
AND "tenancyId" = ${tenancyId}::uuid
165+
LIMIT 1000
166+
FOR UPDATE SKIP LOCKED
167+
),
168+
updated_rows AS (
169+
UPDATE "DeletedRow" dr
170+
SET "sequenceId" = nextval('global_seq_id'),
171+
"shouldUpdateSequenceId" = FALSE
172+
FROM rows_to_update r
173+
WHERE dr."id" = r."id"
174+
RETURNING dr."tenancyId"
175+
)
176+
SELECT DISTINCT "tenancyId" FROM updated_rows
177+
`;
178+
if (deletedRowRows.length > 0) {
179+
didUpdate = true;
180+
}
181+
182+
return didUpdate;
183+
}
184+
185+
async function getNonHostedTenancies(): Promise<Tenancy[]> {
186+
const tenancyIds = await globalPrismaClient.tenancy.findMany({
187+
select: { id: true },
188+
});
189+
190+
const tenancies: Tenancy[] = [];
191+
for (const { id } of tenancyIds) {
192+
const tenancy = await getTenancy(id);
193+
if (!tenancy) continue;
194+
if (tenancy.config.sourceOfTruth.type !== "hosted") {
195+
tenancies.push(tenancy);
196+
}
197+
}
198+
199+
return tenancies;
200+
}
201+
202+
async function backfillSequenceIdsForNonHostedTenancies(tenancies: Tenancy[]): Promise<void> {
203+
for (const tenancy of tenancies) {
204+
const prisma = await getPrismaClientForTenancy(tenancy);
205+
const didUpdate = await backfillSequenceIdsForTenancy(prisma, tenancy.id);
206+
if (didUpdate) {
207+
await enqueueTenantSync(tenancy.id);
208+
}
209+
}
210+
}
211+
105212
// Queues a sync request for a specific tenant if one isn't already pending.
106213
// Prevents duplicate sync requests by checking for unfulfilled requests.
107214
async function enqueueTenantSync(tenancyId: string) {
@@ -155,6 +262,10 @@ export const GET = createSmartRouteHandler({
155262
throw new StatusError(401, "Unauthorized");
156263
}
157264

265+
let nonHostedTenancies = await getNonHostedTenancies();
266+
let lastTenancyRefreshMs = performance.now();
267+
const tenancyRefreshIntervalMs = 5_000;
268+
158269
const startTime = performance.now();
159270
const maxDurationMs = 3 * 60 * 1000;
160271
const pollIntervalMs = 50;
@@ -163,7 +274,12 @@ export const GET = createSmartRouteHandler({
163274

164275
while (performance.now() - startTime < maxDurationMs) {
165276
try {
277+
if (performance.now() - lastTenancyRefreshMs >= tenancyRefreshIntervalMs) {
278+
nonHostedTenancies = await getNonHostedTenancies();
279+
lastTenancyRefreshMs = performance.now();
280+
}
166281
await backfillSequenceIds();
282+
await backfillSequenceIdsForNonHostedTenancies(nonHostedTenancies);
167283
} catch (error) {
168284
captureError(
169285
`sequencer-iteration-error`,

apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,7 @@ describe.sequential('External DB Sync - Basic Tests', () => {
194194
// Wait for sync to create the table and populate data
195195
await waitForTable(client, 'users');
196196

197-
await waitForCondition(
198-
async () => {
199-
const res = await client.query(`SELECT * FROM "users" WHERE "primary_email" = $1`, ['sync-verify@example.com']);
200-
return res.rows.length > 0;
201-
},
202-
{ description: 'data to appear in external DB', timeoutMs: 90000 }
203-
);
197+
await waitForSyncedData(client, 'sync-verify@example.com', 'Sync Verify User');
204198
await verifyInExternalDb(client, 'sync-verify@example.com', 'Sync Verify User');
205199
}, TEST_TIMEOUT);
206200

0 commit comments

Comments
 (0)