Skip to content

Commit 15d59ff

Browse files
authored
fix: prevent stale activityRelations in affiliations refresh (CM-1132) (#4088)
1 parent 68aeb10 commit 15d59ff

3 files changed

Lines changed: 86 additions & 50 deletions

File tree

  • services
    • apps/members_enrichment_worker/src/activities
    • libs/data-access-layer/src/member-organization-affiliation

services/apps/members_enrichment_worker/src/activities/member.ts

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -77,42 +77,30 @@ export async function getIdentitiesExistInOtherMembers(
7777
excludeMemberId: string,
7878
identities: IMemberIdentity[],
7979
): Promise<IMemberIdentity[]> {
80-
let rows: IMemberIdentity[] = []
81-
82-
try {
83-
const db = svc.postgres.reader
84-
rows = await getIdentitiesExistInOthers(db, excludeMemberId, identities)
85-
} catch (err) {
86-
throw err
87-
}
88-
89-
return rows
80+
const db = svc.postgres.reader
81+
return getIdentitiesExistInOthers(db, excludeMemberId, identities)
9082
}
9183

9284
export async function updateMemberWithEnrichmentData(
9385
memberId: string,
9486
identities: IMemberIdentity[],
9587
attributes?: IAttributes,
9688
): Promise<void> {
97-
try {
98-
await svc.postgres.writer.connection().tx(async (tx) => {
99-
for (const identity of identities) {
100-
await createMemberIdentity(new PgPromiseQueryExecutor(tx), {
101-
memberId,
102-
platform: identity.platform,
103-
value: identity.value,
104-
type: identity.type,
105-
verified: identity.verified || false,
106-
source: 'enrichment',
107-
})
108-
}
109-
if (attributes) {
110-
await updateMemberAttributes(tx, memberId, attributes)
111-
}
112-
})
113-
} catch (err) {
114-
throw err
115-
}
89+
await svc.postgres.writer.connection().tx(async (tx) => {
90+
for (const identity of identities) {
91+
await createMemberIdentity(new PgPromiseQueryExecutor(tx), {
92+
memberId,
93+
platform: identity.platform,
94+
value: identity.value,
95+
type: identity.type,
96+
verified: identity.verified || false,
97+
source: 'enrichment',
98+
})
99+
}
100+
if (attributes) {
101+
await updateMemberAttributes(tx, memberId, attributes)
102+
}
103+
})
116104
}
117105

118106
export async function mergeMembers(

services/libs/data-access-layer/src/member-organization-affiliation/index.ts

Lines changed: 68 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,11 @@ async function prepareMemberOrganizationAffiliationTimeline(
118118

119119
// solves conflicts in timeranges, always decides on one org when there are overlapping ranges
120120
const buildTimeline = (
121-
memberOrganizations: MemberOrganizationWithOverrides[],
122-
manualAffiliations: IManualAffiliationData[],
121+
affiliations: AffiliationItem[],
123122
fallbackOrganizationId: string | null,
123+
includeFallback = true,
124124
): TimelineItem[] => {
125-
const allAffiliationsWithDates = [...memberOrganizations, ...manualAffiliations].filter(
126-
(row) => !!row.dateStart,
127-
)
125+
const allAffiliationsWithDates = affiliations.filter((row) => !!row.dateStart)
128126

129127
const earliestStartDate =
130128
allAffiliationsWithDates.length > 0
@@ -148,7 +146,7 @@ async function prepareMemberOrganizationAffiliationTimeline(
148146
let gapStartDate = null
149147

150148
for (let date = new Date(earliestStartDate); date <= now; date.setDate(date.getDate() + 1)) {
151-
const orgs = findOrgsWithRolesInDate(date, [...memberOrganizations, ...manualAffiliations])
149+
const orgs = findOrgsWithRolesInDate(date, affiliations)
152150

153151
if (orgs.length === 0) {
154152
// means there's a gap in the timeline, close the current range if there's one
@@ -226,13 +224,15 @@ async function prepareMemberOrganizationAffiliationTimeline(
226224
fallbackEnd = oneDayBefore(earliestStartDate)
227225
}
228226

229-
// prepend range to cover all activities before the earliest affiliation date
230-
// also handles edge case where fallback org is null and the timeline is empty.
231-
timeline.unshift({
232-
organizationId: fallbackOrganizationId,
233-
dateStart: fallbackStart.toISOString(),
234-
dateEnd: fallbackEnd.toISOString(),
235-
})
227+
if (includeFallback) {
228+
// prepend range to cover all activities before the earliest affiliation date
229+
// also handles edge case where fallback org is null and the timeline is empty.
230+
timeline.unshift({
231+
organizationId: fallbackOrganizationId,
232+
dateStart: fallbackStart.toISOString(),
233+
dateEnd: fallbackEnd.toISOString(),
234+
})
235+
}
236236

237237
return timeline
238238
}
@@ -308,7 +308,38 @@ async function prepareMemberOrganizationAffiliationTimeline(
308308
.value() ?? null
309309
}
310310

311-
return buildTimeline(memberOrganizations, manualAffiliations, fallbackOrganizationId)
311+
// We separate global and manual timelines to prevent 'stale' organizationIds
312+
// Member organizations apply globally, while member segment affiliations only override specific segments.
313+
const baseTimeline = buildTimeline(memberOrganizations, fallbackOrganizationId).map((item) => ({
314+
...item,
315+
skipManualAffiliationSegments: manualAffiliations.length > 0,
316+
}))
317+
318+
// Only keep items with an actual org. Gaps (null org) are already handled by the base timeline.
319+
const manualTimeline = _.flatMap(
320+
_.groupBy(manualAffiliations, 'segmentId'),
321+
(affiliations, segmentId) => {
322+
const items = buildTimeline(affiliations, null, false)
323+
.filter((item) => item.organizationId !== null)
324+
.map((item) => ({ ...item, segmentId }))
325+
326+
// Undated MSAs are invisible to buildTimeline (no dateStart to anchor the loop).
327+
// Create a catch-all so the base pass's NOT EXISTS still has a matching manual item.
328+
if (items.length === 0) {
329+
const primary = selectPrimaryWorkExperience(affiliations)
330+
items.push({
331+
organizationId: primary.organizationId,
332+
dateStart: new Date('1970-01-01').toISOString(),
333+
dateEnd: primary.dateEnd ? new Date(primary.dateEnd).toISOString() : null,
334+
segmentId,
335+
})
336+
}
337+
338+
return items
339+
},
340+
)
341+
342+
return [...baseTimeline, ...manualTimeline]
312343
}
313344

314345
async function processAffiliationActivities(
@@ -327,31 +358,47 @@ async function processAffiliationActivities(
327358
}
328359

329360
// Build the where conditions for the subquery
330-
const conditions = [`"memberId" = $(memberId)`]
361+
const conditions = [`ar."memberId" = $(memberId)`]
331362

332363
// Organization filtering
333364
if (affiliation.organizationId) {
334-
conditions.push(`("organizationId" is null or "organizationId" <> $(organizationId))`)
365+
conditions.push(`(ar."organizationId" is null or ar."organizationId" <> $(organizationId))`)
335366
} else {
336-
conditions.push(`"organizationId" is not null`)
367+
conditions.push(`ar."organizationId" is not null`)
337368
}
338369

339370
// Date filtering
340371
if (affiliation.dateStart) {
341-
conditions.push(`"timestamp" >= $(dateStart)::date`)
372+
conditions.push(`ar."timestamp" >= $(dateStart)::date`)
342373
params.dateStart = affiliation.dateStart
343374
}
344375
if (affiliation.dateEnd) {
345-
conditions.push(`"timestamp" < $(dateEnd)::date + interval '1 day'`)
376+
conditions.push(`ar."timestamp" < $(dateEnd)::date + interval '1 day'`)
346377
params.dateEnd = affiliation.dateEnd
347378
}
348379

349380
// Segment filtering (for manual affiliations)
350381
if (affiliation.segmentId) {
351-
conditions.push(`"segmentId" = $(segmentId)`)
382+
conditions.push(`ar."segmentId" = $(segmentId)`)
352383
params.segmentId = affiliation.segmentId
353384
}
354385

386+
// Don't overwrite activities that a member segment affiliation covers
387+
// Those are handled in the manual timeline.
388+
if (affiliation.skipManualAffiliationSegments) {
389+
conditions.push(`
390+
NOT EXISTS (
391+
SELECT 1
392+
FROM "memberSegmentAffiliations" msa
393+
WHERE msa."memberId" = $(memberId)
394+
AND msa."segmentId" = ar."segmentId"
395+
AND msa."organizationId" IS NOT NULL
396+
AND (msa."dateStart" IS NULL OR ar."timestamp" >= msa."dateStart"::date)
397+
AND (msa."dateEnd" IS NULL OR ar."timestamp" < msa."dateEnd"::date + interval '1 day')
398+
)
399+
`)
400+
}
401+
355402
const whereClause = conditions.join(' and ')
356403

357404
do {
@@ -360,7 +407,7 @@ async function processAffiliationActivities(
360407
UPDATE "activityRelations"
361408
SET "organizationId" = $(organizationId), "updatedAt" = CURRENT_TIMESTAMP
362409
WHERE "activityId" in (
363-
select "activityId" from "activityRelations"
410+
select ar."activityId" from "activityRelations" ar
364411
where ${whereClause}
365412
limit $(batchSize)
366413
)

services/libs/data-access-layer/src/member-organization-affiliation/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ export type TimelineItem = {
1010
dateEnd: string | null
1111
organizationId: string | null
1212
segmentId?: string
13+
skipManualAffiliationSegments?: boolean
1314
}

0 commit comments

Comments
 (0)