Skip to content

Commit f7f9693

Browse files
authored
fix: pcc sync - multiple segments have the same sourceId causing errors (#4046)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 375a48e commit f7f9693

3 files changed

Lines changed: 152 additions & 67 deletions

File tree

services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts

Lines changed: 123 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,11 @@ export class PccProjectConsumer {
156156
// project — useful for triage even when the depth rule is unsupported.
157157
if (parsed.pccProjectId) {
158158
const matched = await findSegmentBySourceId(tx, parsed.pccProjectId)
159-
if (matched) {
159+
if (isAmbiguousMatch(matched)) {
160+
errorDetails.matchedVia =
161+
'sourceId (ambiguous — multiple subprojects share this sourceId)'
162+
errorDetails.candidates = matched.candidates
163+
} else if (matched) {
160164
schemaMismatchMatchedCount++
161165
errorDetails.matchedSegmentId = matched.id
162166
errorDetails.matchedSegmentName = matched.name
@@ -230,7 +234,6 @@ export class PccProjectConsumer {
230234
})
231235
}
232236
} catch (err) {
233-
const errorMessage = err instanceof Error ? err.message : String(err)
234237
log.error({ jobId: job.id, err }, 'PCC job failed')
235238

236239
if (this.dryRun) {
@@ -239,7 +242,9 @@ export class PccProjectConsumer {
239242
await this.releaseClaimBestEffort(job.id)
240243
} else {
241244
try {
242-
await this.metadataStore.markFailed(job.id, errorMessage, {
245+
await this.metadataStore.markFailed(job.id, err, {
246+
transformedCount: upsertedCount,
247+
skippedCount: skippedCount + schemaMismatchCount + missingProjectIdCount,
243248
processingDurationMs: Date.now() - startTime,
244249
})
245250
} catch (updateErr) {
@@ -272,7 +277,28 @@ export class PccProjectConsumer {
272277

273278
// Step 2: sourceId fallback
274279
if (!segment) {
275-
segment = await findSegmentBySourceId(tx, project.pccProjectId)
280+
const fallback = await findSegmentBySourceId(tx, project.pccProjectId)
281+
if (isAmbiguousMatch(fallback)) {
282+
log.warn(
283+
{
284+
pccProjectId: project.pccProjectId,
285+
pccSlug: project.pccSlug,
286+
candidates: fallback.candidates,
287+
},
288+
'Multiple subproject segments share this sourceId — cannot determine match, skipping',
289+
)
290+
await this.recordSyncError(
291+
project.pccProjectId,
292+
project.pccSlug,
293+
'AMBIGUOUS_SEGMENT_MATCH',
294+
{
295+
sourceId: project.pccProjectId,
296+
candidates: fallback.candidates,
297+
},
298+
)
299+
return { action: 'SKIPPED' }
300+
}
301+
segment = fallback as SegmentRow | null
276302
}
277303

278304
// Step 3: no match → SKIP (Phase 1: project doesn't exist in CDP yet)
@@ -419,6 +445,17 @@ interface SegmentRow {
419445
grandparentName: string | null
420446
}
421447

448+
interface AmbiguousSegmentMatch {
449+
ambiguous: true
450+
candidates: Array<Pick<SegmentRow, 'id' | 'name'>>
451+
}
452+
453+
function isAmbiguousMatch(
454+
result: SegmentRow | null | AmbiguousSegmentMatch,
455+
): result is AmbiguousSegmentMatch {
456+
return result !== null && (result as AmbiguousSegmentMatch).ambiguous === true
457+
}
458+
422459
async function findSegmentById(db: DbConnOrTx, segmentId: string): Promise<SegmentRow | null> {
423460
return db.oneOrNone<SegmentRow>(
424461
`SELECT id, name, slug, "parentName", "grandparentName"
@@ -428,13 +465,20 @@ async function findSegmentById(db: DbConnOrTx, segmentId: string): Promise<Segme
428465
)
429466
}
430467

431-
async function findSegmentBySourceId(db: DbConnOrTx, sourceId: string): Promise<SegmentRow | null> {
432-
return db.oneOrNone<SegmentRow>(
468+
async function findSegmentBySourceId(
469+
db: DbConnOrTx,
470+
sourceId: string,
471+
): Promise<SegmentRow | null | AmbiguousSegmentMatch> {
472+
const rows = await db.manyOrNone<SegmentRow>(
433473
`SELECT id, name, slug, "parentName", "grandparentName"
434474
FROM segments
435-
WHERE "sourceId" = $(sourceId) AND type = 'subproject' AND "tenantId" = $(tenantId)`,
475+
WHERE "sourceId" = $(sourceId) AND type = 'subproject' AND "tenantId" = $(tenantId)
476+
LIMIT 2`,
436477
{ sourceId, tenantId: DEFAULT_TENANT_ID },
437478
)
479+
if (rows.length === 0) return null
480+
if (rows.length === 1) return rows[0]
481+
return { ambiguous: true, candidates: rows.map((r) => ({ id: r.id, name: r.name })) }
438482
}
439483

440484
function detectHierarchyMismatch(segment: SegmentRow, cdpTarget: CdpHierarchyTarget): string[] {
@@ -483,61 +527,65 @@ async function upsertSegment(
483527
)
484528
}
485529

486-
// Returns true if a name conflict prevented creating the insightsProject row.
487-
// Updates insightsProject rows for ALL segment levels sharing the same sourceId
488-
// (group, project, subproject). The INSERT is restricted to the matched subproject
489-
// segment (identified by segmentId) to avoid duplicating insights projects for
490-
// hierarchy-only segments.
530+
// Returns true if a name conflict prevented writing the insightsProject row.
531+
// The INSERT is restricted to the matched subproject segment (identified by segmentId)
532+
// to avoid duplicating insights projects for hierarchy-only segments.
491533
async function upsertInsightsProject(
492534
db: DbConnOrTx,
493535
segmentId: string,
494536
sourceId: string,
495537
project: ParsedPccProject,
496538
): Promise<boolean> {
497-
// Check for a cross-family name conflict — another PCC project (different sourceId)
498-
// already holds this name in an active insightsProject row.
499-
const conflicting = await db.oneOrNone<{ id: string }>(
500-
`SELECT ip.id
501-
FROM "insightsProjects" ip
502-
JOIN segments s ON s.id = ip."segmentId"
503-
WHERE ip.name = $(name)
504-
AND ip."deletedAt" IS NULL
505-
AND s."sourceId" IS DISTINCT FROM $(sourceId)
506-
AND s."tenantId" = $(tenantId)`,
507-
{ name: project.name, sourceId, tenantId: DEFAULT_TENANT_ID },
539+
// Split UPDATE vs INSERT paths upfront — each needs a different name-collision guard.
540+
const exists = await db.oneOrNone<{ id: string }>(
541+
`SELECT id FROM "insightsProjects" WHERE "segmentId" = $(segmentId) AND "deletedAt" IS NULL`,
542+
{ segmentId },
508543
)
509-
if (conflicting) return true
510544

511-
// Update the matched subproject segment's insightsProject only.
512-
// Scoped to segmentId rather than all siblings sharing sourceId — the bulk-sibling
513-
// approach causes unique-name constraint violations when siblings have pre-existing
514-
// insightsProjects rows with different names that would all be renamed to the same value.
515-
// Slug is intentionally not updated — it is a stable identifier referenced by FK from
516-
// securityInsightsEvaluations and related tables.
517-
// logoUrl won't be updated in InsightsProject until we confirm that the format is
518-
// compatible with the Insights Squared standard. Do NOT reintroduce it as a
519-
// `--`-commented SQL line: pg-promise scans placeholders textually and would still
520-
// require the `logoUrl` param, triggering "Property 'logoUrl' doesn't exist".
521-
await db.none(
522-
`UPDATE "insightsProjects"
523-
SET name = $(name),
524-
description = $(description),
525-
"updatedAt" = NOW()
526-
WHERE "segmentId" = $(segmentId)
527-
AND "deletedAt" IS NULL`,
528-
{
529-
segmentId,
530-
name: project.name,
531-
description: project.description,
532-
},
533-
)
545+
if (exists) {
546+
// UPDATE path. The partial unique index unique_insightsProjects_name is global, so any
547+
// other active row with the target name will collide. This includes same-sourceId duplicate
548+
// subproject segments (data anomaly — e.g. FIDOPower / OpenFIDO where two CDP subprojects
549+
// share one PCC project_id) as well as cross-family conflicts and NULL-segmentId rows.
550+
// We exclude by PK (never null) rather than by segmentId to stay NULL-safe.
551+
const conflicting = await db.oneOrNone<{ id: string }>(
552+
`SELECT ip.id
553+
FROM "insightsProjects" ip
554+
WHERE ip.name = $(name)
555+
AND ip."deletedAt" IS NULL
556+
AND ip.id <> $(id)`,
557+
{ name: project.name, id: exists.id },
558+
)
559+
if (conflicting) return true
560+
561+
// Slug is intentionally not updated — it is a stable identifier referenced by FK from
562+
// securityInsightsEvaluations and related tables.
563+
// logoUrl won't be updated in InsightsProject until we confirm that the format is
564+
// compatible with the Insights Squared standard. Do NOT reintroduce it as a
565+
// `--`-commented SQL line: pg-promise scans placeholders textually and would still
566+
// require the `logoUrl` param, triggering "Property 'logoUrl' doesn't exist".
567+
try {
568+
await db.none(
569+
`UPDATE "insightsProjects"
570+
SET name = $(name),
571+
description = $(description),
572+
"updatedAt" = NOW()
573+
WHERE "segmentId" = $(segmentId)
574+
AND "deletedAt" IS NULL`,
575+
{ segmentId, name: project.name, description: project.description },
576+
)
577+
} catch (err) {
578+
if (isDuplicateKeyError(err)) return true
579+
throw err
580+
}
581+
return false
582+
}
534583

535-
// INSERT for the subproject segment only (the matched leaf).
536-
// Before inserting, check if a same-family sibling (group/project level sharing the same
537-
// sourceId) already holds this name. Shallow hierarchies (eff=1/2) have group+project+subproject
538-
// all sharing the same name and sourceId — the group/project rows are written first and would
539-
// cause a name conflict on the subproject INSERT. Skip the INSERT in that case; the family is
540-
// already represented.
584+
// INSERT path. Two guards before writing:
585+
//
586+
// 1. Same-family skip: a group/project-level segment sharing this sourceId already holds the
587+
// canonical name (shallow eff=1/2 hierarchy). The family is already represented — skip the
588+
// INSERT without recording a conflict.
541589
const sameFamilyNameHolder = await db.oneOrNone(
542590
`SELECT 1
543591
FROM "insightsProjects" ip
@@ -551,22 +599,34 @@ async function upsertInsightsProject(
551599
)
552600
if (sameFamilyNameHolder) return false
553601

554-
const exists = await db.oneOrNone<{ id: string }>(
555-
`SELECT id FROM "insightsProjects" WHERE "segmentId" = $(segmentId) AND "deletedAt" IS NULL`,
556-
{ segmentId },
602+
// 2. Any remaining active row with this name is a conflict — cross-family, different PCC
603+
// project, or a NULL-segmentId orphan row. The unique index is global and includes those.
604+
// No join needed here: sameFamilyNameHolder above already cleared the same-sourceId case,
605+
// so anything found now is genuinely incompatible.
606+
const conflicting = await db.oneOrNone<{ id: string }>(
607+
`SELECT id FROM "insightsProjects" WHERE name = $(name) AND "deletedAt" IS NULL LIMIT 1`,
608+
{ name: project.name },
557609
)
558-
if (exists) return false
610+
if (conflicting) return true
559611

560612
// logoUrl intentionally omitted from the INSERT column list — see note above.
561-
await db.none(
562-
`INSERT INTO "insightsProjects" (name, slug, description, "segmentId", "isLF")
563-
VALUES ($(name), generate_slug('insightsProjects', $(name)), $(description), $(segmentId), TRUE)`,
564-
{ name: project.name, description: project.description, segmentId },
565-
)
566-
613+
try {
614+
await db.none(
615+
`INSERT INTO "insightsProjects" (name, slug, description, "segmentId", "isLF")
616+
VALUES ($(name), generate_slug('insightsProjects', $(name)), $(description), $(segmentId), TRUE)`,
617+
{ name: project.name, description: project.description, segmentId },
618+
)
619+
} catch (err) {
620+
if (isDuplicateKeyError(err)) return true
621+
throw err
622+
}
567623
return false
568624
}
569625

626+
function isDuplicateKeyError(err: unknown): boolean {
627+
return err instanceof Error && 'code' in err && (err as { code: unknown }).code === '23505'
628+
}
629+
570630
async function insertSyncError(
571631
db: DbConnOrTx,
572632
externalProjectId: string | null,

services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,10 @@ export class TransformerConsumer {
126126

127127
log.info({ jobId: job.id, ...processingMetrics }, 'Job completed')
128128
} catch (err) {
129-
const errorMessage = err instanceof Error ? err.message : String(err)
130129
log.error({ jobId: job.id, err }, 'Job failed')
131130

132131
try {
133-
await this.metadataStore.markFailed(job.id, errorMessage, {
132+
await this.metadataStore.markFailed(job.id, err, {
134133
processingDurationMs: Date.now() - startTime,
135134
})
136135
} catch (updateErr) {

services/libs/snowflake/src/metadataStore.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,14 @@ export class MetadataStore {
177177
)
178178
}
179179

180-
async markFailed(jobId: number, error: string, metrics?: Partial<JobMetrics>): Promise<void> {
180+
async markFailed(jobId: number, error: unknown, metrics?: Partial<JobMetrics>): Promise<void> {
181181
await this.db.none(
182182
`UPDATE integration."snowflakeExportJobs"
183183
SET error = $(error), "completedAt" = NOW(),
184184
metrics = COALESCE(metrics, '{}'::jsonb) || COALESCE($(metrics)::jsonb, '{}'::jsonb),
185185
"updatedAt" = NOW()
186186
WHERE id = $(jobId)`,
187-
{ jobId, error, metrics: metrics ? JSON.stringify(metrics) : null },
187+
{ jobId, error: serializeJobError(error), metrics: metrics ? JSON.stringify(metrics) : null },
188188
)
189189
}
190190

@@ -202,6 +202,32 @@ export class MetadataStore {
202202
}
203203
}
204204

205+
function serializeJobError(err: unknown): string {
206+
try {
207+
if (err instanceof Error) {
208+
const obj: Record<string, unknown> = {
209+
message: err.message,
210+
name: err.name,
211+
stack: err.stack,
212+
}
213+
// cause is non-enumerable on Error instances — capture it explicitly
214+
const cause = (err as { cause?: unknown }).cause
215+
if (cause !== undefined) {
216+
obj.cause = cause instanceof Error ? cause.message : cause
217+
}
218+
for (const key of Object.keys(err)) {
219+
if (!(key in obj)) obj[key] = (err as unknown as Record<string, unknown>)[key]
220+
}
221+
return JSON.stringify(obj)
222+
}
223+
// Non-Error throwables: preserve all fields if it's an object, otherwise wrap in {message}
224+
const payload = typeof err === 'object' && err !== null ? err : { message: String(err) }
225+
return JSON.stringify(payload)
226+
} catch {
227+
return JSON.stringify({ message: String(err) })
228+
}
229+
}
230+
205231
function mapRowToJob(row: {
206232
id: number
207233
platform: string

0 commit comments

Comments
 (0)