Skip to content

Commit 990dca5

Browse files
committed
chore: try to insert identities and merge until we are done
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 1e8f370 commit 990dca5

2 files changed

Lines changed: 78 additions & 20 deletions

File tree

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,7 +1196,6 @@ export default class ActivityService extends LoggerBase {
11961196
reach: value.member.reach,
11971197
},
11981198
value.platform,
1199-
undefined,
12001199
orgPromiseCache,
12011200
value.timestamp,
12021201
)
@@ -1342,7 +1341,6 @@ export default class ActivityService extends LoggerBase {
13421341
payload.dbMember,
13431342
dbMemberIdentities.get(payload.dbMember.id),
13441343
payload.platform,
1345-
undefined,
13461344
orgPromiseCache,
13471345
payload.activity.timestamp,
13481346
)
@@ -1404,7 +1402,6 @@ export default class ActivityService extends LoggerBase {
14041402
payload.dbObjectMember,
14051403
dbMemberIdentities.get(payload.dbObjectMember.id),
14061404
payload.platform,
1407-
undefined,
14081405
orgPromiseCache,
14091406
payload.activity.timestamp,
14101407
)
@@ -1828,6 +1825,17 @@ export default class ActivityService extends LoggerBase {
18281825
return metadata
18291826
}
18301827

1828+
if (error instanceof ApplicationError && error.metadata?.mergeCount !== undefined) {
1829+
return {
1830+
...error.metadata,
1831+
errorMessage: error.message,
1832+
memberType,
1833+
memberIdToUpdate: dbMember?.id,
1834+
memberSource:
1835+
memberType === 'member' ? payload.dbMemberSource : payload.dbObjectMemberSource,
1836+
}
1837+
}
1838+
18311839
if (error instanceof ApplicationError) {
18321840
let nextError: any = error.originalError
18331841

services/apps/data_sink_worker/src/service/member.service.ts

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,65 @@ export default class MemberService extends LoggerBase {
184184
}
185185
}
186186

187+
/**
188+
* After an initial identity conflict redirects to a surviving member, inserts all incoming
189+
* identities that the surviving member doesn't already have. If a further conflict triggers
190+
* another merge the loop follows the new survivor. Throws ApplicationError once maxMerges
191+
* is reached so the error surfaces in integration.results.
192+
*/
193+
private async syncIdentitiesAfterRedirect(
194+
survivingId: string,
195+
integrationId: string,
196+
incomingIdentities: IMemberIdentity[],
197+
maxMerges = 2,
198+
): Promise<string> {
199+
for (let mergeCount = 0; ; mergeCount++) {
200+
const freshMap = await findIdentitiesForMembers(this.pgQx, [survivingId])
201+
const freshIdentities = freshMap.get(survivingId) ?? []
202+
203+
const toInsert = incomingIdentities.filter(
204+
(incoming) =>
205+
!freshIdentities.some(
206+
(existing) =>
207+
existing.platform === incoming.platform &&
208+
existing.value === incoming.value &&
209+
existing.type === incoming.type &&
210+
existing.verified === incoming.verified,
211+
),
212+
)
213+
214+
if (toInsert.length === 0) {
215+
return survivingId
216+
}
217+
218+
const nextRedirectId = await this.insertIdentitiesWithConflictResolution(
219+
survivingId,
220+
integrationId,
221+
toInsert,
222+
true,
223+
)
224+
225+
if (!nextRedirectId) {
226+
return survivingId
227+
}
228+
229+
if (mergeCount >= maxMerges) {
230+
throw new ApplicationError('identity sync exceeded merge limit', undefined, {
231+
mergeCount: mergeCount + 1,
232+
survivingId,
233+
maxMerges,
234+
})
235+
}
236+
237+
survivingId = nextRedirectId
238+
}
239+
}
240+
187241
public async create(
188242
segmentIds: string[],
189243
integrationId: string,
190244
data: IMemberCreateData,
191245
platform: PlatformType,
192-
releaseMemberLock?: () => Promise<void>,
193246
orgPromiseCache?: Map<string, Promise<string | undefined>>,
194247
activityTimestamp?: string,
195248
): Promise<string> {
@@ -295,16 +348,18 @@ export default class MemberService extends LoggerBase {
295348
}
296349

297350
if (redirectId) {
351+
await this.scheduleOrphanMemberDeletion(id)
352+
const finalMemberId = await this.syncIdentitiesAfterRedirect(
353+
redirectId,
354+
integrationId,
355+
data.identities,
356+
)
298357
await logExecutionTimeV2(
299-
() => this.memberRepo.addToSegments(redirectId, segmentIds),
358+
() => this.memberRepo.addToSegments(finalMemberId, segmentIds),
300359
this.log,
301360
'memberService -> create -> addToSegments (conflict redirect)',
302361
)
303-
if (releaseMemberLock) {
304-
await releaseMemberLock()
305-
}
306-
await this.scheduleOrphanMemberDeletion(id)
307-
return redirectId
362+
return finalMemberId
308363
}
309364

310365
try {
@@ -323,10 +378,6 @@ export default class MemberService extends LoggerBase {
323378
throw err
324379
}
325380

326-
if (releaseMemberLock) {
327-
await releaseMemberLock()
328-
}
329-
330381
// we should prevent organization creation for bot members
331382
if (botDetection === MemberBotDetection.CONFIRMED_BOT) {
332383
this.log.debug('Skipping organization creation for bot member')
@@ -442,7 +493,6 @@ export default class MemberService extends LoggerBase {
442493
original: IDbMember,
443494
originalIdentities: IMemberIdentity[],
444495
platform: PlatformType,
445-
releaseMemberLock?: () => Promise<void>,
446496
orgPromiseCache?: Map<string, Promise<string | undefined>>,
447497
activityTimestamp?: string,
448498
): Promise<string | void> {
@@ -543,7 +593,11 @@ export default class MemberService extends LoggerBase {
543593
'memberService -> update -> insertIdentitiesWithConflictResolution',
544594
)
545595
if (redirectId) {
546-
effectiveMemberId = redirectId
596+
effectiveMemberId = await this.syncIdentitiesAfterRedirect(
597+
redirectId,
598+
integrationId,
599+
identitiesToCreate,
600+
)
547601
}
548602
}
549603

@@ -563,10 +617,6 @@ export default class MemberService extends LoggerBase {
563617
}
564618
}
565619

566-
if (releaseMemberLock) {
567-
await releaseMemberLock()
568-
}
569-
570620
if (this.botDetectionService.isFlaggedAsBot(toUpdate.attributes)) {
571621
this.log.debug({ memberId: id }, 'Skipping organization creation for bot member')
572622
return effectiveMemberId !== id ? effectiveMemberId : undefined

0 commit comments

Comments
 (0)