@@ -32,7 +32,12 @@ import {
3232 updateMemberEnrichmentCacheDb ,
3333 updateMemberOrg ,
3434} from '@crowd/data-access-layer/src/old/apps/members_enrichment_worker'
35- import { findOrCreateOrganization } from '@crowd/data-access-layer/src/organizations'
35+ import OrganizationMergeSuggestionsRepository from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/organizationMergeSuggestions.repo'
36+ import {
37+ addOrgIdentity ,
38+ findOrCreateOrganization ,
39+ findOrgByVerifiedIdentity ,
40+ } from '@crowd/data-access-layer/src/organizations'
3641import { dbStoreQx , pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
3742import { refreshMaterializedView } from '@crowd/data-access-layer/src/utils'
3843import { SearchSyncApiClient } from '@crowd/opensearch'
@@ -48,6 +53,7 @@ import {
4853 MemberIdentityType ,
4954 OrganizationAttributeSource ,
5055 OrganizationIdentityType ,
56+ OrganizationMergeSuggestionTable ,
5157 OrganizationSource ,
5258 PlatformType ,
5359} from '@crowd/types'
@@ -375,7 +381,6 @@ export async function updateMemberUsingSquashedPayload(
375381 }
376382 }
377383
378- const orgIdsToSync : string [ ] = [ ]
379384 const newOrUpdatedMemberOrgs = [ ]
380385
381386 if ( squashedPayload . memberOrganizations . length > 0 ) {
@@ -420,32 +425,151 @@ export async function updateMemberUsingSquashedPayload(
420425 const orgSource = OrganizationAttributeSource . ENRICHMENT
421426
422427 orgPromises . push (
423- findOrCreateOrganization ( qx , orgSource , {
424- displayName : org . name ,
425- description : org . organizationDescription ,
426- identities : identities . map ( ( i ) => ( { ...i , source : orgSource } ) ) ,
427- } )
428- . then ( ( orgId ) => {
429- // set the organization id for later use
428+ ( async ( ) => {
429+ let orgId : string | undefined
430+ const orgPayload = {
431+ displayName : org . name ,
432+ description : org . organizationDescription ,
433+ identities : identities . map ( ( i ) => ( { ...i , source : orgSource } ) ) ,
434+ }
435+
436+ try {
437+ // Keep the org write in a savepoint: if this identity is already verified
438+ // on another org, we can recover without aborting the member update transaction.
439+ orgId = await qx . tx ( ( trnx ) => findOrCreateOrganization ( trnx , orgSource , orgPayload ) )
440+ } catch ( error ) {
441+ const constraint = 'uix_organizationIdentities_plat_val_typ_tenantId_verified'
442+ const dbError = error as { constraint ?: string ; detail ?: string }
443+
444+ if (
445+ error . constructor ?. name !== 'DatabaseError' ||
446+ dbError . constraint !== constraint ||
447+ ! dbError . detail
448+ ) {
449+ throw error
450+ }
451+
452+ const match = dbError . detail . match ( / = \( ( .* ?) \) / )
453+ if ( ! match ) throw error
454+
455+ const [ platform , value , type ] = match [ 1 ] . split ( ',' ) . map ( ( v ) => v . trim ( ) )
456+ const erroredIdentity = {
457+ platform,
458+ value,
459+ type : type as OrganizationIdentityType ,
460+ verified : true ,
461+ }
462+
463+ const identityOwners = [ ]
464+ const erroredIdentityOwner = await findOrgByVerifiedIdentity ( qx , erroredIdentity )
465+ if ( ! erroredIdentityOwner ) throw error
466+
467+ identityOwners . push ( {
468+ identity : erroredIdentity ,
469+ organizationId : erroredIdentityOwner . id ,
470+ } )
471+
472+ // The first write normalizes domain identities before failing. Use that normalized
473+ // payload when checking the rest, so the retry won't hit the same index again.
474+ for ( const identity of orgPayload . identities . filter ( ( i ) => i . verified ) ) {
475+ const isErroredIdentity =
476+ identity . platform === erroredIdentity . platform &&
477+ identity . type === erroredIdentity . type &&
478+ identity . value . toLowerCase ( ) === erroredIdentity . value . toLowerCase ( )
479+
480+ if ( ! isErroredIdentity ) {
481+ const owner = await findOrgByVerifiedIdentity ( qx , identity )
482+
483+ if ( owner ) {
484+ identityOwners . push ( { identity, organizationId : owner . id } )
485+ }
486+ }
487+ }
488+
489+ // Keep the enriched org identity as an unverified signal. The verified version stays
490+ // with the existing owner, preserving the unique identity invariant.
491+ const identitiesToAddAsUnverified = identityOwners . map ( ( owner ) => owner . identity )
492+ const retryIdentities = orgPayload . identities . filter (
493+ ( identity ) =>
494+ ! identitiesToAddAsUnverified . some (
495+ ( identityToAddAsUnverified ) =>
496+ identity . platform === identityToAddAsUnverified . platform &&
497+ identity . type === identityToAddAsUnverified . type &&
498+ identity . value . toLowerCase ( ) ===
499+ identityToAddAsUnverified . value . toLowerCase ( ) ,
500+ ) ,
501+ )
502+
503+ orgId = await qx . tx ( ( trnx ) =>
504+ findOrCreateOrganization ( trnx , orgSource , {
505+ ...orgPayload ,
506+ identities : retryIdentities ,
507+ } ) ,
508+ )
509+
510+ if ( orgId ) {
511+ const mergeSuggestionsRepo = new OrganizationMergeSuggestionsRepository (
512+ tx . transaction ( ) ,
513+ svc . log ,
514+ )
515+ const mergeSuggestions = [ ]
516+ const suggestedOwnerIds = new Set < string > ( )
517+
518+ for ( const identityOwner of identityOwners ) {
519+ if ( identityOwner . organizationId !== orgId ) {
520+ await addOrgIdentity ( qx , {
521+ organizationId : orgId ,
522+ platform : identityOwner . identity . platform ,
523+ value : identityOwner . identity . value ,
524+ type : identityOwner . identity . type ,
525+ verified : false ,
526+ source : orgSource ,
527+ } )
528+
529+ const noMergeIds = await mergeSuggestionsRepo . findNoMergeIds (
530+ identityOwner . organizationId ,
531+ )
532+ if (
533+ ! noMergeIds . includes ( orgId ) &&
534+ ! suggestedOwnerIds . has ( identityOwner . organizationId )
535+ ) {
536+ suggestedOwnerIds . add ( identityOwner . organizationId )
537+ mergeSuggestions . push ( {
538+ similarity : 0.95 ,
539+ organizations : [ identityOwner . organizationId , orgId ] as [ string , string ] ,
540+ } )
541+ }
542+ }
543+ }
544+
545+ if ( mergeSuggestions . length > 0 ) {
546+ // A shared verified identity is a strong merge signal, unless the pair was
547+ // explicitly marked as no-merge by a reviewer.
548+ await mergeSuggestionsRepo . addToMerge (
549+ mergeSuggestions ,
550+ OrganizationMergeSuggestionTable . ORGANIZATION_TO_MERGE_RAW ,
551+ )
552+ await mergeSuggestionsRepo . addToMerge (
553+ mergeSuggestions ,
554+ OrganizationMergeSuggestionTable . ORGANIZATION_TO_MERGE_FILTERED ,
555+ )
556+ }
557+ }
558+ }
559+
560+ if ( orgId ) {
430561 org . organizationId = orgId
431562 if ( org . identities ) {
432563 for ( const i of org . identities ) {
433564 i . organizationId = orgId
434565 }
435566 }
436- if ( orgId ) {
437- orgIdsToSync . push ( orgId )
438- }
439- } )
440- . then ( ( ) =>
441- Promise . all (
442- orgIdsToSync . map ( ( orgId ) =>
443- syncOrganization ( orgId ) . catch ( ( error ) => {
444- console . error ( `Failed to sync organization with ID ${ orgId } :` , error )
445- } ) ,
446- ) ,
447- ) ,
448- ) ,
567+
568+ await syncOrganization ( orgId ) . catch ( ( error ) => {
569+ svc . log . error ( { orgId, error } , 'Failed to sync organization' )
570+ } )
571+ }
572+ } ) ( ) ,
449573 )
450574 }
451575
0 commit comments