Skip to content

Commit 67f38c7

Browse files
committed
fix external db sync issues from PR #1267 review
- Narrow team invitation cascade to specific changed teamId instead of entire tenancy - Populate status dashboard stats for all 11 sync mappings, not just users and email_outboxes - Wrap all OAuth provider updates with withExternalDbSyncUpdate so allowSignIn and allowConnectedAccounts changes trigger sync - Remove session_replays from sequencer and column type map since no mapping exists yet - Add DeletedRow union to email_outboxes fetch queries for future safety
1 parent 4157998 commit 67f38c7

5 files changed

Lines changed: 215 additions & 134 deletions

File tree

apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
110110
didUpdate = true;
111111
}
112112

113-
const teamTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
113+
const teamTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string, teamId: string }[]>`
114114
WITH rows_to_update AS (
115115
SELECT "tenancyId", "teamId"
116116
FROM "Team"
@@ -126,9 +126,9 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
126126
FROM rows_to_update r
127127
WHERE t."tenancyId" = r."tenancyId"
128128
AND t."teamId" = r."teamId"
129-
RETURNING t."tenancyId"
129+
RETURNING t."tenancyId", t."teamId"
130130
)
131-
SELECT DISTINCT "tenancyId" FROM updated_rows
131+
SELECT DISTINCT "tenancyId", "teamId" FROM updated_rows
132132
`;
133133

134134
span.setAttribute("stack.external-db-sync.team-tenants", teamTenants.length);
@@ -143,7 +143,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
143143
UPDATE "VerificationCode"
144144
SET "shouldUpdateSequenceId" = TRUE
145145
FROM (
146-
SELECT DISTINCT "Tenancy"."projectId", "Tenancy"."branchId"
146+
SELECT DISTINCT "Tenancy"."projectId", "Tenancy"."branchId", "Team"."teamId"
147147
FROM "Team"
148148
JOIN "Tenancy" ON "Tenancy"."id" = "Team"."tenancyId"
149149
WHERE "Team"."tenancyId" IN (${Prisma.join(teamTenants.map(t => t.tenancyId))})
@@ -153,6 +153,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
153153
WHERE "VerificationCode"."projectId" = changed_teams."projectId"
154154
AND "VerificationCode"."branchId" = changed_teams."branchId"
155155
AND "VerificationCode"."type" = 'TEAM_INVITATION'
156+
AND "VerificationCode"."data"->>'team_id' = changed_teams."teamId"
156157
AND "VerificationCode"."shouldUpdateSequenceId" = FALSE
157158
`;
158159
}
@@ -274,34 +275,6 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
274275
didUpdate = true;
275276
}
276277

277-
const sessionReplayTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
278-
WITH rows_to_update AS (
279-
SELECT "tenancyId", "id"
280-
FROM "SessionReplay"
281-
WHERE "shouldUpdateSequenceId" = TRUE
282-
ORDER BY "tenancyId"
283-
LIMIT ${batchSize}
284-
FOR UPDATE SKIP LOCKED
285-
),
286-
updated_rows AS (
287-
UPDATE "SessionReplay" sr
288-
SET "sequenceId" = nextval('global_seq_id'),
289-
"shouldUpdateSequenceId" = FALSE
290-
FROM rows_to_update r
291-
WHERE sr."tenancyId" = r."tenancyId"
292-
AND sr."id" = r."id"
293-
RETURNING sr."tenancyId"
294-
)
295-
SELECT DISTINCT "tenancyId" FROM updated_rows
296-
`;
297-
298-
span.setAttribute("stack.external-db-sync.session-replay-tenants", sessionReplayTenants.length);
299-
300-
if (sessionReplayTenants.length > 0) {
301-
await enqueueExternalDbSyncBatch(sessionReplayTenants.map(t => t.tenancyId));
302-
didUpdate = true;
303-
}
304-
305278
const projectPermissionTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
306279
WITH rows_to_update AS (
307280
SELECT "id"
@@ -442,7 +415,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
442415

443416
span.setAttribute("stack.external-db-sync.did-update", didUpdate);
444417
if (didUpdate) {
445-
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, TP=${teamPermissionTenants.length}, TI=${teamInvitationTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, PP=${projectPermissionTenants.length}, NP=${notificationPreferenceTenants.length}, RT=${refreshTokenTenants.length}, CA=${oauthAccountTenants.length}, DR=${deletedRowTenants.length}`);
418+
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, TP=${teamPermissionTenants.length}, TI=${teamInvitationTenants.length}, EO=${emailOutboxTenants.length}, PP=${projectPermissionTenants.length}, NP=${notificationPreferenceTenants.length}, RT=${refreshTokenTenants.length}, CA=${oauthAccountTenants.length}, DR=${deletedRowTenants.length}`);
446419
}
447420

448421
return didUpdate;

apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -250,45 +250,51 @@ function maxBigIntString(values: Array<string | null | undefined>): string | nul
250250
}
251251

252252
function buildMappingInternalStats(
253-
projectUsersStats: SequenceStats,
254-
emailOutboxStats: SequenceStats,
253+
stats: {
254+
projectUsersStats: SequenceStats,
255+
contactChannelStats: SequenceStats,
256+
teamStats: SequenceStats,
257+
teamMemberStats: SequenceStats,
258+
teamPermissionStats: SequenceStats,
259+
teamInvitationStats: SequenceStats,
260+
emailOutboxStats: SequenceStats,
261+
projectPermissionStats: SequenceStats,
262+
notificationPreferenceStats: SequenceStats,
263+
refreshTokenStats: SequenceStats,
264+
connectedAccountStats: SequenceStats,
265+
},
255266
deletedRowsByTable: DeletedRowSummary[],
256267
) {
257-
const deletedProjectUserStats = deletedRowsByTable.find((row) => row.table_name === "ProjectUser") ?? null;
258-
259268
const mappingInternalStats = new Map<string, {
260269
mapping_id: string,
261270
internal_min_sequence_id: string | null,
262271
internal_max_sequence_id: string | null,
263272
internal_pending_count: string,
264273
}>();
265274

266-
const usersMappingMin = minBigIntString([
267-
projectUsersStats.min_sequence_id,
268-
deletedProjectUserStats?.min_sequence_id,
269-
]);
270-
const usersMappingMax = maxBigIntString([
271-
projectUsersStats.max_sequence_id,
272-
deletedProjectUserStats?.max_sequence_id,
273-
]);
274-
const usersMappingPending = addBigIntStrings(
275-
projectUsersStats.pending,
276-
deletedProjectUserStats?.pending,
277-
);
278-
279-
mappingInternalStats.set("users", {
280-
mapping_id: "users",
281-
internal_min_sequence_id: usersMappingMin,
282-
internal_max_sequence_id: usersMappingMax,
283-
internal_pending_count: usersMappingPending,
284-
});
275+
function addMapping(mappingId: string, primaryStats: SequenceStats, deletedRowTableName: string | null) {
276+
const deletedStats = deletedRowTableName
277+
? deletedRowsByTable.find((row) => row.table_name === deletedRowTableName) ?? null
278+
: null;
279+
mappingInternalStats.set(mappingId, {
280+
mapping_id: mappingId,
281+
internal_min_sequence_id: minBigIntString([primaryStats.min_sequence_id, deletedStats?.min_sequence_id]),
282+
internal_max_sequence_id: maxBigIntString([primaryStats.max_sequence_id, deletedStats?.max_sequence_id]),
283+
internal_pending_count: addBigIntStrings(primaryStats.pending, deletedStats?.pending),
284+
});
285+
}
285286

286-
mappingInternalStats.set("email_outboxes", {
287-
mapping_id: "email_outboxes",
288-
internal_min_sequence_id: emailOutboxStats.min_sequence_id,
289-
internal_max_sequence_id: emailOutboxStats.max_sequence_id,
290-
internal_pending_count: emailOutboxStats.pending,
291-
});
287+
addMapping("users", stats.projectUsersStats, "ProjectUser");
288+
addMapping("contact_channels", stats.contactChannelStats, "ContactChannel");
289+
addMapping("teams", stats.teamStats, "Team");
290+
addMapping("team_member_profiles", stats.teamMemberStats, "TeamMember");
291+
addMapping("team_permissions", stats.teamPermissionStats, "TeamMemberDirectPermission");
292+
addMapping("team_invitations", stats.teamInvitationStats, "VerificationCode_TEAM_INVITATION");
293+
addMapping("email_outboxes", stats.emailOutboxStats, "EmailOutbox");
294+
addMapping("project_permissions", stats.projectPermissionStats, "ProjectUserDirectPermission");
295+
addMapping("notification_preferences", stats.notificationPreferenceStats, "UserNotificationPreference");
296+
addMapping("refresh_tokens", stats.refreshTokenStats, "ProjectUserRefreshToken");
297+
addMapping("connected_accounts", stats.connectedAccountStats, "ProjectUserOAuthAccount");
292298

293299
const mappings = Array.from(mappingInternalStats.values());
294300
const mappingStatuses = mappings.map((mapping) => ({
@@ -489,7 +495,19 @@ async function fetchInternalStats(tenancyId: string | null) {
489495
...formatSequenceStats(row),
490496
}));
491497

492-
const { mappings, mappingStatuses } = buildMappingInternalStats(projectUsersStats, emailOutboxStats, deletedRowsByTable);
498+
const { mappings, mappingStatuses } = buildMappingInternalStats({
499+
projectUsersStats,
500+
contactChannelStats,
501+
teamStats,
502+
teamMemberStats,
503+
teamPermissionStats,
504+
teamInvitationStats,
505+
emailOutboxStats,
506+
projectPermissionStats,
507+
notificationPreferenceStats,
508+
refreshTokenStats,
509+
connectedAccountStats,
510+
}, deletedRowsByTable);
493511

494512
return {
495513
projectUsersStats,

apps/backend/src/app/api/latest/oauth-providers/crud.tsx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,9 @@ export const oauthProviderCrudHandlers = createLazyProxy(() => createCrudHandler
253253
id: params.provider_id,
254254
},
255255
},
256-
data: {
256+
data: withExternalDbSyncUpdate({
257257
allowSignIn: data.allow_sign_in,
258-
},
258+
}),
259259
});
260260

261261
if (data.allow_sign_in) {
@@ -297,9 +297,9 @@ export const oauthProviderCrudHandlers = createLazyProxy(() => createCrudHandler
297297
id: params.provider_id,
298298
},
299299
},
300-
data: {
300+
data: withExternalDbSyncUpdate({
301301
allowConnectedAccounts: data.allow_connected_accounts,
302-
},
302+
}),
303303
});
304304
}
305305

apps/backend/src/lib/external-db-sync.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,10 +1146,6 @@ export const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json'
11461146
is_paused: 'boolean',
11471147
sync_is_deleted: 'boolean',
11481148
},
1149-
session_replays: {
1150-
chunk_count: 'bigint',
1151-
sync_is_deleted: 'boolean',
1152-
},
11531149
project_permissions: {
11541150
sync_is_deleted: 'boolean',
11551151
},

0 commit comments

Comments
 (0)