@@ -32,7 +32,11 @@ import {
3232 updateMemberEnrichmentCacheDb ,
3333 updateMemberOrg ,
3434} from '@crowd/data-access-layer/src/old/apps/members_enrichment_worker'
35- import { findOrCreateOrganization } from '@crowd/data-access-layer/src/organizations'
35+ import OrganizationMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo'
36+ import {
37+ findOrCreateOrganization ,
38+ findOrgByVerifiedIdentity ,
39+ } from '@crowd/data-access-layer/src/organizations'
3640import { dbStoreQx , pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
3741import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils'
3842import { SearchSyncApiClient } from '@crowd/opensearch'
@@ -48,6 +52,7 @@ import {
4852 MemberIdentityType ,
4953 OrganizationAttributeSource ,
5054 OrganizationIdentityType ,
55+ OrganizationMergeSuggestionTable ,
5156 OrganizationSource ,
5257 PlatformType ,
5358} from '@crowd/types'
@@ -375,7 +380,6 @@ export async function updateMemberUsingSquashedPayload(
375380 }
376381 }
377382
378- const orgIdsToSync : string [ ] = [ ]
379383 const newOrUpdatedMemberOrgs = [ ]
380384
381385 if ( squashedPayload . memberOrganizations . length > 0 ) {
@@ -420,32 +424,105 @@ export async function updateMemberUsingSquashedPayload(
420424 const orgSource = OrganizationAttributeSource . ENRICHMENT
421425
422426 orgPromises . push (
423- findOrCreateOrganization ( qx , orgSource , {
424- displayName : org . name ,
425- description : org . organizationDescription ,
426- identities : identities . map ( ( i ) => ( { ...i , source : orgSource } ) ) ,
427- } )
428- . then ( ( orgId ) => {
429- // set the organization id for later use
427+ ( async ( ) => {
428+ let orgId : string | undefined
429+ const orgPayload = {
430+ displayName : org . name ,
431+ description : org . organizationDescription ,
432+ identities : identities . map ( ( i ) => ( { ...i , source : orgSource } ) ) ,
433+ }
434+
435+ try {
436+ // Keep the org write in a savepoint: if this identity is already verified
437+ // on another org, we can recover without aborting the member update transaction.
438+ orgId = await qx . tx ( ( trnx ) => findOrCreateOrganization ( trnx , orgSource , orgPayload ) )
439+ } catch ( error ) {
440+ const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified'
441+ const dbError = error as { constraint ?: string ; detail ?: string }
442+
443+ if (
444+ error . constructor ?. name !== 'DatabaseError' ||
445+ dbError . constraint !== constraint ||
446+ ! dbError . detail
447+ ) {
448+ throw error
449+ }
450+
451+ const match = dbError . detail . match ( / = \( ( .* ?) \) / )
452+ if ( ! match ) throw error
453+
454+ const [ platform , value , type ] = match [ 1 ] . split ( ',' ) . map ( ( v ) => v . trim ( ) )
455+ const erroredIdentity = {
456+ platform,
457+ value,
458+ type : type as OrganizationIdentityType ,
459+ verified : true ,
460+ }
461+
462+ const owner = await findOrgByVerifiedIdentity ( qx , erroredIdentity )
463+ if ( ! owner ) throw error
464+
465+ // Keep the enriched org identity as an unverified signal. The verified version stays
466+ // with the existing owner, preserving the unique identity invariant.
467+ const demotedIdentities = identities . map ( ( identity ) => {
468+ const isMatch =
469+ identity . platform === erroredIdentity . platform &&
470+ identity . type === erroredIdentity . type &&
471+ identity . value . toLowerCase ( ) === erroredIdentity . value . toLowerCase ( )
472+
473+ return isMatch
474+ ? { ...identity , verified : false , source : orgSource }
475+ : { ...identity , source : orgSource }
476+ } )
477+
478+ orgId = await qx . tx ( ( trnx ) =>
479+ findOrCreateOrganization ( trnx , orgSource , {
480+ ...orgPayload ,
481+ identities : demotedIdentities ,
482+ } ) ,
483+ )
484+
485+ if ( orgId && owner . id !== orgId ) {
486+ const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository (
487+ tx . transaction ( ) ,
488+ svc . log ,
489+ )
490+ const noMergeIds = await mergeSuggestionsRepo . findNoMergeIds ( owner . id )
491+ if ( ! noMergeIds . includes ( orgId ) ) {
492+ // A shared verified identity is a strong merge signal, unless the pair was
493+ // explicitly marked as no-merge by a reviewer.
494+ const mergeSuggestions = [
495+ {
496+ similarity : 0.95 ,
497+ organizations : [ owner . id , orgId ] as [ string , string ] ,
498+ } ,
499+ ]
500+
501+ await mergeSuggestionsRepo . addToMerge (
502+ mergeSuggestions ,
503+ OrganizationMergeSuggestionTable . ORGANIZATION_TO_MERGE_RAW ,
504+ )
505+ await mergeSuggestionsRepo . addToMerge (
506+ mergeSuggestions ,
507+ OrganizationMergeSuggestionTable . ORGANIZATION_TO_MERGE_FILTERED ,
508+ )
509+ }
510+ }
511+ }
512+
513+ if ( orgId ) {
430514 org . organizationId = orgId
431515 if ( org . identities ) {
432516 for ( const i of org . identities ) {
433517 i . organizationId = orgId
434518 }
435519 }
436- if ( orgId ) {
437- orgIdsToSync . push ( orgId )
438- }
439- } )
440- . then ( ( ) =>
441- Promise . all (
442- orgIdsToSync . map ( ( orgId ) =>
443- syncOrganization ( orgId ) . catch ( ( error ) => {
444- console . error ( `Failed to sync organization with ID ${ orgId } :` , error )
445- } ) ,
446- ) ,
447- ) ,
448- ) ,
520+
521+ await syncOrganization ( orgId ) . catch ( ( error ) => {
522+ svc . log . error ( { orgId, error } , 'Failed to sync organization' )
523+ } )
524+ }
525+ } ) ( ) ,
449526 )
450527 }
451528
0 commit comments