Skip to content

Commit 24226f2

Browse files
committed
Merge branch 'main' into better-error-message-handling-CM-1117
2 parents 01d19fb + f3b33a5 commit 24226f2

1 file changed

Lines changed: 136 additions & 135 deletions

File tree

  • services/apps/members_enrichment_worker/src/activities

services/apps/members_enrichment_worker/src/activities/enrichment.ts

Lines changed: 136 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ export async function updateMemberUsingSquashedPayload(
296296
isHighConfidenceSourceSelectedForWorkExperiences: boolean,
297297
): Promise<boolean> {
298298
const affectedOrgIds: string[] = []
299+
const orgIdsToSync: string[] = []
299300

300301
const wasUpdated = await svc.postgres.writer.transactionally(async (tx) => {
301302
let didUpdate = false
@@ -389,8 +390,6 @@ export async function updateMemberUsingSquashedPayload(
389390
)
390391

391392
if (squashedPayload.memberOrganizations.length > 0) {
392-
const orgPromises = []
393-
394393
// try matching member's existing organizations with the new ones
395394
// we'll be using displayName, title, dates
396395
for (const org of squashedPayload.memberOrganizations) {
@@ -429,156 +428,148 @@ export async function updateMemberUsingSquashedPayload(
429428

430429
const orgSource = OrganizationAttributeSource.ENRICHMENT
431430

432-
orgPromises.push(
433-
(async () => {
434-
let orgId: string | undefined
435-
const orgPayload = {
436-
displayName: org.name,
437-
description: org.organizationDescription,
438-
identities: identities.map((i) => ({ ...i, source: orgSource })),
439-
}
431+
let orgId: string | undefined
432+
const orgPayload = {
433+
displayName: org.name,
434+
description: org.organizationDescription,
435+
identities: identities.map((i) => ({ ...i, source: orgSource })),
436+
}
440437

441-
try {
442-
// Keep the org write in a savepoint: if this identity is already verified
443-
// on another org, we can recover without aborting the member update transaction.
444-
orgId = await qx.tx((trnx) => findOrCreateOrganization(trnx, orgSource, orgPayload))
445-
} catch (error) {
446-
const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified'
447-
const dbError = error as { constraint?: string; detail?: string }
448-
449-
if (
450-
error.constructor?.name !== 'DatabaseError' ||
451-
dbError.constraint !== constraint ||
452-
!dbError.detail
453-
) {
454-
throw error
455-
}
438+
try {
439+
// Keep the org write in a savepoint: if this identity is already verified
440+
// on another org, we can recover without aborting the member update transaction.
441+
orgId = await qx.tx((trnx) => findOrCreateOrganization(trnx, orgSource, orgPayload))
442+
} catch (error) {
443+
const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified'
444+
const dbError = error as { constraint?: string; detail?: string }
445+
446+
if (
447+
error.constructor?.name !== 'DatabaseError' ||
448+
dbError.constraint !== constraint ||
449+
!dbError.detail
450+
) {
451+
throw error
452+
}
456453

457-
const match = dbError.detail.match(/=\((.*?)\)/)
458-
if (!match) throw error
454+
const match = dbError.detail.match(/=\((.*?)\)/)
455+
if (!match) throw error
459456

460-
const [platform, value, type] = match[1].split(',').map((v) => v.trim())
461-
const erroredIdentity = {
462-
platform,
463-
value,
464-
type: type as OrganizationIdentityType,
465-
verified: true,
466-
}
457+
const [platform, value, type] = match[1].split(',').map((v) => v.trim())
458+
const erroredIdentity = {
459+
platform,
460+
value,
461+
type: type as OrganizationIdentityType,
462+
verified: true,
463+
}
467464

468-
const identityOwners = []
469-
const erroredIdentityOwner = await findOrgByVerifiedIdentity(qx, erroredIdentity)
470-
if (!erroredIdentityOwner) throw error
471-
472-
identityOwners.push({
473-
identity: erroredIdentity,
474-
organizationId: erroredIdentityOwner.id,
475-
})
476-
477-
// The first write normalizes domain identities before failing. Use that normalized
478-
// payload when checking the rest, so the retry won't hit the same index again.
479-
for (const identity of orgPayload.identities.filter((i) => i.verified)) {
480-
const isErroredIdentity =
481-
identity.platform === erroredIdentity.platform &&
482-
identity.type === erroredIdentity.type &&
483-
identity.value.toLowerCase() === erroredIdentity.value.toLowerCase()
484-
485-
if (!isErroredIdentity) {
486-
const owner = await findOrgByVerifiedIdentity(qx, identity)
487-
488-
if (owner) {
489-
identityOwners.push({ identity, organizationId: owner.id })
490-
}
491-
}
492-
}
465+
const identityOwners = []
466+
const erroredIdentityOwner = await findOrgByVerifiedIdentity(qx, erroredIdentity)
467+
if (!erroredIdentityOwner) throw error
493468

494-
// Keep the enriched org identity as an unverified signal. The verified version stays
495-
// with the existing owner, preserving the unique identity invariant.
496-
const identitiesToAddAsUnverified = identityOwners.map((owner) => owner.identity)
497-
const retryIdentities = orgPayload.identities.filter(
498-
(identity) =>
499-
!identitiesToAddAsUnverified.some(
500-
(identityToAddAsUnverified) =>
501-
identity.platform === identityToAddAsUnverified.platform &&
502-
identity.type === identityToAddAsUnverified.type &&
503-
identity.value.toLowerCase() ===
504-
identityToAddAsUnverified.value.toLowerCase(),
505-
),
506-
)
469+
identityOwners.push({
470+
identity: erroredIdentity,
471+
organizationId: erroredIdentityOwner.id,
472+
})
507473

508-
orgId = await qx.tx((trnx) =>
509-
findOrCreateOrganization(trnx, orgSource, {
510-
...orgPayload,
511-
identities: retryIdentities,
512-
}),
513-
)
474+
// The first write normalizes domain identities before failing. Use that normalized
475+
// payload when checking the rest, so the retry won't hit the same index again.
476+
for (const identity of orgPayload.identities.filter((i) => i.verified)) {
477+
const isErroredIdentity =
478+
identity.platform === erroredIdentity.platform &&
479+
identity.type === erroredIdentity.type &&
480+
identity.value.toLowerCase() === erroredIdentity.value.toLowerCase()
514481

515-
if (orgId) {
516-
const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository(
517-
tx.transaction(),
518-
svc.log,
519-
)
520-
const mergeSuggestions = []
521-
const suggestedOwnerIds = new Set<string>()
522-
523-
for (const identityOwner of identityOwners) {
524-
if (identityOwner.organizationId !== orgId) {
525-
await addOrgIdentity(qx, {
526-
organizationId: orgId,
527-
platform: identityOwner.identity.platform,
528-
value: identityOwner.identity.value,
529-
type: identityOwner.identity.type,
530-
verified: false,
531-
source: orgSource,
532-
})
533-
534-
const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds(
535-
identityOwner.organizationId,
536-
)
537-
if (
538-
!noMergeIds.includes(orgId) &&
539-
!suggestedOwnerIds.has(identityOwner.organizationId)
540-
) {
541-
suggestedOwnerIds.add(identityOwner.organizationId)
542-
mergeSuggestions.push({
543-
similarity: 0.95,
544-
organizations: [identityOwner.organizationId, orgId] as [string, string],
545-
})
546-
}
547-
}
548-
}
482+
if (!isErroredIdentity) {
483+
const owner = await findOrgByVerifiedIdentity(qx, identity)
549484

550-
if (mergeSuggestions.length > 0) {
551-
// A shared verified identity is a strong merge signal, unless the pair was
552-
// explicitly marked as no-merge by a reviewer.
553-
await mergeSuggestionsRepo.addToMerge(
554-
mergeSuggestions,
555-
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW,
556-
)
557-
await mergeSuggestionsRepo.addToMerge(
558-
mergeSuggestions,
559-
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED,
560-
)
561-
}
485+
if (owner) {
486+
identityOwners.push({ identity, organizationId: owner.id })
562487
}
563488
}
489+
}
490+
491+
// Keep the enriched org identity as an unverified signal. The verified version stays
492+
// with the existing owner, preserving the unique identity invariant.
493+
const identitiesToAddAsUnverified = identityOwners.map((owner) => owner.identity)
494+
const retryIdentities = orgPayload.identities.filter(
495+
(identity) =>
496+
!identitiesToAddAsUnverified.some(
497+
(identityToAddAsUnverified) =>
498+
identity.platform === identityToAddAsUnverified.platform &&
499+
identity.type === identityToAddAsUnverified.type &&
500+
identity.value.toLowerCase() === identityToAddAsUnverified.value.toLowerCase(),
501+
),
502+
)
564503

565-
if (orgId) {
566-
org.organizationId = orgId
567-
if (org.identities) {
568-
for (const i of org.identities) {
569-
i.organizationId = orgId
504+
orgId = await qx.tx((trnx) =>
505+
findOrCreateOrganization(trnx, orgSource, {
506+
...orgPayload,
507+
identities: retryIdentities,
508+
}),
509+
)
510+
511+
if (orgId) {
512+
const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository(
513+
tx.transaction(),
514+
svc.log,
515+
)
516+
const mergeSuggestions = []
517+
const suggestedOwnerIds = new Set<string>()
518+
519+
for (const identityOwner of identityOwners) {
520+
if (identityOwner.organizationId !== orgId) {
521+
await addOrgIdentity(qx, {
522+
organizationId: orgId,
523+
platform: identityOwner.identity.platform,
524+
value: identityOwner.identity.value,
525+
type: identityOwner.identity.type,
526+
verified: false,
527+
source: orgSource,
528+
})
529+
530+
const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds(
531+
identityOwner.organizationId,
532+
)
533+
if (
534+
!noMergeIds.includes(orgId) &&
535+
!suggestedOwnerIds.has(identityOwner.organizationId)
536+
) {
537+
suggestedOwnerIds.add(identityOwner.organizationId)
538+
mergeSuggestions.push({
539+
similarity: 0.95,
540+
organizations: [identityOwner.organizationId, orgId] as [string, string],
541+
})
570542
}
571543
}
544+
}
572545

573-
await syncOrganization(orgId).catch((error) => {
574-
svc.log.error({ orgId, error }, 'Failed to sync organization')
575-
})
546+
if (mergeSuggestions.length > 0) {
547+
// A shared verified identity is a strong merge signal, unless the pair was
548+
// explicitly marked as no-merge by a reviewer.
549+
await mergeSuggestionsRepo.addToMerge(
550+
mergeSuggestions,
551+
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW,
552+
)
553+
await mergeSuggestionsRepo.addToMerge(
554+
mergeSuggestions,
555+
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED,
556+
)
576557
}
577-
})(),
578-
)
558+
}
559+
}
560+
561+
if (orgId) {
562+
org.organizationId = orgId
563+
if (org.identities) {
564+
for (const i of org.identities) {
565+
i.organizationId = orgId
566+
}
567+
}
568+
569+
orgIdsToSync.push(orgId)
570+
}
579571
}
580572

581-
await Promise.all(orgPromises)
582573
// ignore all organizations that were not created
583574
squashedPayload.memberOrganizations = squashedPayload.memberOrganizations.filter(
584575
(o) => o.organizationId,
@@ -675,6 +666,16 @@ export async function updateMemberUsingSquashedPayload(
675666
return didUpdate
676667
})
677668

669+
if (orgIdsToSync.length > 0) {
670+
await Promise.all(
671+
[...new Set(orgIdsToSync)].map((orgId) =>
672+
syncOrganization(orgId).catch((error) => {
673+
svc.log.error({ orgId, error }, 'Failed to sync organization')
674+
}),
675+
),
676+
)
677+
}
678+
678679
if (affectedOrgIds.length > 0) {
679680
const commonMemberService = new CommonMemberService(
680681
pgpQx(svc.postgres.writer.connection()),

0 commit comments

Comments
 (0)