Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 99 additions & 22 deletions services/apps/members_enrichment_worker/src/activities/enrichment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ import {
updateMemberEnrichmentCacheDb,
updateMemberOrg,
} from '@crowd/data-access-layer/src/old/apps/members_enrichment_worker'
import { findOrCreateOrganization } from '@crowd/data-access-layer/src/organizations'
import OrganizationMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo'
import {
findOrCreateOrganization,
findOrgByVerifiedIdentity,
} from '@crowd/data-access-layer/src/organizations'
import { dbStoreQx, pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils'
import { SearchSyncApiClient } from '@crowd/opensearch'
Expand All @@ -48,6 +52,7 @@ import {
MemberIdentityType,
OrganizationAttributeSource,
OrganizationIdentityType,
OrganizationMergeSuggestionTable,
OrganizationSource,
PlatformType,
} from '@crowd/types'
Expand Down Expand Up @@ -375,7 +380,6 @@ export async function updateMemberUsingSquashedPayload(
}
}

const orgIdsToSync: string[] = []
const newOrUpdatedMemberOrgs = []

if (squashedPayload.memberOrganizations.length > 0) {
Expand Down Expand Up @@ -420,32 +424,105 @@ export async function updateMemberUsingSquashedPayload(
const orgSource = OrganizationAttributeSource.ENRICHMENT

orgPromises.push(
findOrCreateOrganization(qx, orgSource, {
displayName: org.name,
description: org.organizationDescription,
identities: identities.map((i) => ({ ...i, source: orgSource })),
})
.then((orgId) => {
// set the organization id for later use
(async () => {
let orgId: string | undefined
const orgPayload = {
displayName: org.name,
description: org.organizationDescription,
identities: identities.map((i) => ({ ...i, source: orgSource })),
}

try {
// Keep the org write in a savepoint: if this identity is already verified
// on another org, we can recover without aborting the member update transaction.
orgId = await qx.tx((trnx) => findOrCreateOrganization(trnx, orgSource, orgPayload))
} catch (error) {
const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified'
const dbError = error as { constraint?: string; detail?: string }

if (
error.constructor?.name !== 'DatabaseError' ||
dbError.constraint !== constraint ||
!dbError.detail
) {
throw error
}

const match = dbError.detail.match(/=\((.*?)\)/)
if (!match) throw error

const [platform, value, type] = match[1].split(',').map((v) => v.trim())
const erroredIdentity = {
platform,
value,
type: type as OrganizationIdentityType,
verified: true,
}

const owner = await findOrgByVerifiedIdentity(qx, erroredIdentity)
if (!owner) throw error

// Keep the enriched org identity as an unverified signal. The verified version stays
// with the existing owner, preserving the unique identity invariant.
const demotedIdentities = identities.map((identity) => {
const isMatch =
identity.platform === erroredIdentity.platform &&
identity.type === erroredIdentity.type &&
identity.value.toLowerCase() === erroredIdentity.value.toLowerCase()

return isMatch
? { ...identity, verified: false, source: orgSource }
: { ...identity, source: orgSource }
})

orgId = await qx.tx((trnx) =>
findOrCreateOrganization(trnx, orgSource, {
...orgPayload,
identities: demotedIdentities,
Comment thread
skwowet marked this conversation as resolved.
Outdated
}),
)

if (orgId && owner.id !== orgId) {
const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository(
tx.transaction(),
svc.log,
)
const noMergeIds = await mergeSuggestionsRepo.findNoMergeIds(owner.id)
if (!noMergeIds.includes(orgId)) {
// A shared verified identity is a strong merge signal, unless the pair was
// explicitly marked as no-merge by a reviewer.
const mergeSuggestions = [
{
similarity: 0.95,
organizations: [owner.id, orgId] as [string, string],
},
]

await mergeSuggestionsRepo.addToMerge(
mergeSuggestions,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_RAW,
)
await mergeSuggestionsRepo.addToMerge(
mergeSuggestions,
OrganizationMergeSuggestionTable.ORGANIZATION_TO_MERGE_FILTERED,
)
Comment thread
skwowet marked this conversation as resolved.
}
}
}

if (orgId) {
org.organizationId = orgId
if (org.identities) {
for (const i of org.identities) {
i.organizationId = orgId
}
}
if (orgId) {
orgIdsToSync.push(orgId)
}
})
.then(() =>
Promise.all(
orgIdsToSync.map((orgId) =>
syncOrganization(orgId).catch((error) => {
console.error(`Failed to sync organization with ID ${orgId}:`, error)
}),
),
),
),

await syncOrganization(orgId).catch((error) => {
svc.log.error({ orgId, error }, 'Failed to sync organization')
})
}
})(),
)
}

Expand Down
Loading