Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 136 additions & 135 deletions services/apps/members_enrichment_worker/src/activities/enrichment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ export async function updateMemberUsingSquashedPayload(
isHighConfidenceSourceSelectedForWorkExperiences: boolean,
): Promise<boolean> {
const affectedOrgIds: string[] = []
const orgIdsToSync: string[] = []

const wasUpdated = await svc.postgres.writer.transactionally(async (tx) => {
let didUpdate = false
Expand Down Expand Up @@ -389,8 +390,6 @@ export async function updateMemberUsingSquashedPayload(
)

if (squashedPayload.memberOrganizations.length > 0) {
const orgPromises = []

// try matching member's existing organizations with the new ones
// we'll be using displayName, title, dates
for (const org of squashedPayload.memberOrganizations) {
Expand Down Expand Up @@ -429,156 +428,148 @@ export async function updateMemberUsingSquashedPayload(

const orgSource = OrganizationAttributeSource.ENRICHMENT

orgPromises.push(
(async () => {
let orgId: string | undefined
const orgPayload = {
displayName: org.name,
description: org.organizationDescription,
identities: identities.map((i) => ({ ...i, source: orgSource })),
}
let orgId: string | undefined
const orgPayload = {
displayName: org.name,
description: org.organizationDescription,
identities: identities.map((i) => ({ ...i, source: orgSource })),
}

try {
// Keep the org write in a savepoint: if this identity is already verified
// on another org, we can recover without aborting the member update transaction.
orgId = await qx.tx((trnx) => findOrCreateOrganization(trnx, orgSource, orgPayload))
} catch (error) {
const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified'
const dbError = error as { constraint?: string; detail?: string }

if (
error.constructor?.name !== 'DatabaseError' ||
dbError.constraint !== constraint ||
!dbError.detail
) {
throw error
}
try {
// Keep the org write in a savepoint: if this identity is already verified
// on another org, we can recover without aborting the member update transaction.
orgId = await qx.tx((trnx) => findOrCreateOrganization(trnx, orgSource, orgPayload))
} catch (error) {
const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified'
const dbError = error as { constraint?: string; detail?: string }

if (
error.constructor?.name !== 'DatabaseError' ||
dbError.constraint !== constraint ||
!dbError.detail
) {
throw error
}

const match = dbError.detail.match(/=\((.*?)\)/)
if (!match) throw error
const match = dbError.detail.match(/=\((.*?)\)/)
if (!match) throw error

const [platform, value, type] = match[1].split(',').map((v) => v.trim())
const erroredIdentity = {
platform,
value,
type: type as OrganizationIdentityType,
verified: true,
}
const [platform, value, type] = match[1].split(',').map((v) => v.trim())
const erroredIdentity = {
platform,
value,
type: type as OrganizationIdentityType,
verified: true,
}

const identityOwners = []
const erroredIdentityOwner = await findOrgByVerifiedIdentity(qx, erroredIdentity)
if (!erroredIdentityOwner) throw error

identityOwners.push({
identity: erroredIdentity,
organizationId: erroredIdentityOwner.id,
})

// The first write normalizes domain identities before failing. Use that normalized
// payload when checking the rest, so the retry won't hit the same index again.
for (const identity of orgPayload.identities.filter((i) => i.verified)) {
const isErroredIdentity =
identity.platform === erroredIdentity.platform &&
identity.type === erroredIdentity.type &&
identity.value.toLowerCase() === erroredIdentity.value.toLowerCase()

if (!isErroredIdentity) {
const owner = await findOrgByVerifiedIdentity(qx, identity)

if (owner) {
identityOwners.push({ identity, organizationId: owner.id })
}
}
}
const identityOwners = []
const erroredIdentityOwner = await findOrgByVerifiedIdentity(qx, erroredIdentity)
if (!erroredIdentityOwner) throw error

// Keep the enriched org identity as an unverified signal. The verified version stays
// with the existing owner, preserving the unique identity invariant.
const identitiesToAddAsUnverified = identityOwners.map((owner) => owner.identity)
const retryIdentities = orgPayload.identities.filter(
(identity) =>
!identitiesToAddAsUnverified.some(
(identityToAddAsUnverified) =>
identity.platform === identityToAddAsUnverified.platform &&
identity.type === identityToAddAsUnverified.type &&
identity.value.toLowerCase() ===
identityToAddAsUnverified.value.toLowerCase(),
),
)
identityOwners.push({
identity: erroredIdentity,
organizationId: erroredIdentityOwner.id,
})

orgId = await qx.tx((trnx) =>
findOrCreateOrganization(trnx, orgSource, {
...orgPayload,
identities: retryIdentities,
}),
)
// The first write normalizes domain identities before failing. Use that normalized
// payload when checking the rest, so the retry won't hit the same index again.
for (const identity of orgPayload.identities.filter((i) => i.verified)) {
const isErroredIdentity =
identity.platform === erroredIdentity.platform &&
identity.type === erroredIdentity.type &&
identity.value.toLowerCase() === erroredIdentity.value.toLowerCase()

if (orgId) {
const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository(
tx.transaction(),
svc.log,
)
const mergeSuggestions = []
const suggestedOwnerIds = new Set<string>()

for (const identityOwner of identityOwners) {
if (identityOwner.organizationId !== orgId) {
await addOrgIdentity(qx, {
organizationId: orgId,
platform: identityOwner.identity.platform,
value: identityOwner.identity.value,
type: identityOwner.identity.type,
verified: false,
source: orgSource,
})

const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds(
identityOwner.organizationId,
)
if (
!noMergeIds.includes(orgId) &&
!suggestedOwnerIds.has(identityOwner.organizationId)
) {
suggestedOwnerIds.add(identityOwner.organizationId)
mergeSuggestions.push({
similarity: 0.95,
organizations: [identityOwner.organizationId, orgId] as [string, string],
})
}
}
}
if (!isErroredIdentity) {
const owner = await findOrgByVerifiedIdentity(qx, identity)

if (mergeSuggestions.length > 0) {
// A shared verified identity is a strong merge signal, unless the pair was
// explicitly marked as no-merge by a reviewer.
await mergeSuggestionsRepo.addToMerge(
mergeSuggestions,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW,
)
await mergeSuggestionsRepo.addToMerge(
mergeSuggestions,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED,
)
}
if (owner) {
identityOwners.push({ identity, organizationId: owner.id })
}
}
}

// Keep the enriched org identity as an unverified signal. The verified version stays
// with the existing owner, preserving the unique identity invariant.
const identitiesToAddAsUnverified = identityOwners.map((owner) => owner.identity)
const retryIdentities = orgPayload.identities.filter(
(identity) =>
!identitiesToAddAsUnverified.some(
(identityToAddAsUnverified) =>
identity.platform === identityToAddAsUnverified.platform &&
identity.type === identityToAddAsUnverified.type &&
identity.value.toLowerCase() === identityToAddAsUnverified.value.toLowerCase(),
),
)

if (orgId) {
org.organizationId = orgId
if (org.identities) {
for (const i of org.identities) {
i.organizationId = orgId
orgId = await qx.tx((trnx) =>
findOrCreateOrganization(trnx, orgSource, {
...orgPayload,
identities: retryIdentities,
}),
)

if (orgId) {
const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository(
tx.transaction(),
svc.log,
)
const mergeSuggestions = []
const suggestedOwnerIds = new Set<string>()

for (const identityOwner of identityOwners) {
if (identityOwner.organizationId !== orgId) {
await addOrgIdentity(qx, {
organizationId: orgId,
platform: identityOwner.identity.platform,
value: identityOwner.identity.value,
type: identityOwner.identity.type,
verified: false,
source: orgSource,
})

const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds(
identityOwner.organizationId,
)
if (
!noMergeIds.includes(orgId) &&
!suggestedOwnerIds.has(identityOwner.organizationId)
) {
suggestedOwnerIds.add(identityOwner.organizationId)
mergeSuggestions.push({
similarity: 0.95,
organizations: [identityOwner.organizationId, orgId] as [string, string],
})
}
}
}

await syncOrganization(orgId).catch((error) => {
svc.log.error({ orgId, error }, 'Failed to sync organization')
})
if (mergeSuggestions.length > 0) {
// A shared verified identity is a strong merge signal, unless the pair was
// explicitly marked as no-merge by a reviewer.
await mergeSuggestionsRepo.addToMerge(
mergeSuggestions,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW,
)
await mergeSuggestionsRepo.addToMerge(
mergeSuggestions,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED,
)
}
})(),
)
}
}

if (orgId) {
org.organizationId = orgId
if (org.identities) {
for (const i of org.identities) {
i.organizationId = orgId
}
}

orgIdsToSync.push(orgId)
}
}

await Promise.all(orgPromises)
// ignore all organizations that were not created
squashedPayload.memberOrganizations = squashedPayload.memberOrganizations.filter(
(o) => o.organizationId,
Expand Down Expand Up @@ -675,6 +666,16 @@ export async function updateMemberUsingSquashedPayload(
return didUpdate
})

if (orgIdsToSync.length > 0) {
await Promise.all(
[...new Set(orgIdsToSync)].map((orgId) =>
syncOrganization(orgId).catch((error) => {
svc.log.error({ orgId, error }, 'Failed to sync organization')
}),
),
)
}

if (affectedOrgIds.length > 0) {
const commonMemberService = new CommonMemberService(
pgpQx(svc.postgres.writer.connection()),
Expand Down
Loading