Skip to content

Commit 10ef579

Browse files
authored
feat: infer memberOrganization stint dates from work-email activities (CM-1105) (#4054)
1 parent 4f9abf6 commit 10ef579

19 files changed

Lines changed: 584 additions & 325 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain"
2+
ON "memberOrganizations" ("memberId")
3+
WHERE "source" = 'email-domain' AND "deletedAt" IS NULL;

backend/src/services/member/memberOrganizationsService.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
IOrganization,
2323
IRenderFriendlyMemberOrganization,
2424
MemberOrganizationUpdate,
25+
OrganizationSource,
2526
} from '@crowd/types'
2627

2728
import SequelizeRepository from '@/database/repositories/sequelizeRepository'
@@ -219,14 +220,18 @@ export default class MemberOrganizationsService extends LoggerBase {
219220
title: data.title,
220221
dateStart: data.dateStart,
221222
dateEnd: data.dateEnd,
222-
source: data.source,
223223
verified: data.verified,
224224
verifiedBy: data.verifiedBy,
225225
}).filter(([, v]) => v !== undefined),
226226
)
227227

228228
await cleanSoftDeletedMemberOrganization(qx, memberId, data.organizationId, data)
229-
await updateMemberOrganization(qx, memberId, id, update)
229+
// Any manual edit from the frontend promotes ownership to UI so automated
230+
// sources (e.g. email-domain inference) no longer overwrite user intent.
231+
await updateMemberOrganization(qx, memberId, id, {
232+
...update,
233+
source: OrganizationSource.UI,
234+
})
230235

231236
await this.commonMemberService.startAffiliationRecalculation(memberId, [data.organizationId])
232237

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import CronTime from 'cron-time-generator'
2+
3+
import {
4+
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
5+
MEMBER_ORG_STINT_CHANGES_QUEUE,
6+
inferMemberOrganizationStintChanges,
7+
} from '@crowd/common_services'
8+
import {
9+
QueryExecutor,
10+
changeMemberOrganizationAffiliationOverrides,
11+
checkOrganizationAffiliationPolicy,
12+
createMemberOrganization,
13+
fetchMemberOrganizationsBySource,
14+
updateMemberOrganization,
15+
} from '@crowd/data-access-layer'
16+
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
17+
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
18+
import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis'
19+
import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types'
20+
21+
import { IJobDefinition } from '../types'
22+
23+
const job: IJobDefinition = {
24+
name: 'infer-member-organization-stint-changes',
25+
cronTime: CronTime.every(5).minutes(),
26+
timeout: 10 * 60,
27+
process: async (ctx) => {
28+
const redis = await getRedisClient(REDIS_CONFIG())
29+
const db = await getDbConnection(WRITE_DB_CONFIG())
30+
const qx = pgpQx(db)
31+
32+
ctx.log.info('Starting member organization stint inference job.')
33+
34+
const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500)
35+
if (!memberIds?.length) return
36+
37+
ctx.log.info({ count: memberIds.length }, 'Processing members from queue.')
38+
39+
let processed = 0
40+
41+
for (const memberId of memberIds) {
42+
try {
43+
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
44+
const rawMembers = await redis.sMembers(datesKey)
45+
46+
if (!rawMembers?.length) {
47+
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
48+
continue
49+
}
50+
51+
const orgDates = parseSetMembers(rawMembers)
52+
53+
if (orgDates.length > 0) {
54+
const existingOrgs = await fetchMemberOrganizationsBySource(
55+
qx,
56+
memberId,
57+
OrganizationSource.EMAIL_DOMAIN,
58+
)
59+
60+
const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, orgDates)
61+
62+
if (changes.length > 0) {
63+
ctx.log.debug({ memberId, changes }, 'Stint changes identified.')
64+
await qx.tx((tx) => applyStintChanges(tx, changes))
65+
}
66+
}
67+
68+
// Atomically remove only the values we read.
69+
// If no new values were added, remove the member from the queue.
70+
await RedisCache.ackSetMembers(
71+
redis,
72+
datesKey,
73+
MEMBER_ORG_STINT_CHANGES_QUEUE,
74+
memberId,
75+
rawMembers,
76+
)
77+
78+
processed++
79+
} catch (err) {
80+
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
81+
throw err
82+
}
83+
}
84+
85+
ctx.log.info({ processed }, 'Batch complete.')
86+
},
87+
}
88+
89+
/**
90+
* Parses set members of the form "orgId|date" into typed activity dates.
91+
*/
92+
function parseSetMembers(members: string[]): MemberOrgDate[] {
93+
const results: MemberOrgDate[] = []
94+
95+
for (const m of members) {
96+
const idx = m.indexOf('|')
97+
if (idx > 0) {
98+
results.push({ organizationId: m.slice(0, idx), date: m.slice(idx + 1) })
99+
}
100+
}
101+
102+
return results
103+
}
104+
105+
/**
106+
* Applies the stint changes to the database.
107+
*/
108+
async function applyStintChanges(qx: QueryExecutor, changes: MemberOrgStintChange[]) {
109+
for (const change of changes) {
110+
if (change.type === 'insert') {
111+
const memberOrganizationId = await createMemberOrganization(qx, change.memberId, {
112+
organizationId: change.organizationId,
113+
dateStart: change.dateStart,
114+
dateEnd: change.dateEnd,
115+
source: OrganizationSource.EMAIL_DOMAIN,
116+
})
117+
118+
const isAffiliationBlocked = await checkOrganizationAffiliationPolicy(
119+
qx,
120+
change.organizationId,
121+
)
122+
123+
if (memberOrganizationId && isAffiliationBlocked) {
124+
await changeMemberOrganizationAffiliationOverrides(qx, [
125+
{
126+
memberId: change.memberId,
127+
memberOrganizationId,
128+
allowAffiliation: false,
129+
},
130+
])
131+
}
132+
} else {
133+
await updateMemberOrganization(qx, change.memberId, change.id, {
134+
dateStart: change.dateStart,
135+
dateEnd: change.dateEnd,
136+
})
137+
}
138+
}
139+
}
140+
141+
export default job

services/apps/data_sink_worker/package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
"script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts",
1818
"script:process-results": "SERVICE=script tsx src/bin/process-results.ts",
1919
"script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts",
20-
"script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts",
21-
"script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts",
2220
"script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts",
2321
"script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts",
2422
"script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts",

services/apps/data_sink_worker/src/bin/map-member-to-org.ts

Lines changed: 0 additions & 97 deletions
This file was deleted.

services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts

Lines changed: 0 additions & 98 deletions
This file was deleted.

0 commit comments

Comments
 (0)