Skip to content

Commit 6d1be2b

Browse files
authored
fix: refresh affiliations after email-domain stint changes (CM-1216) (#4181)
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
1 parent 36c8078 commit 6d1be2b

1 file changed

Lines changed: 13 additions & 2 deletions

File tree

services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
55
MEMBER_ORG_STINT_CHANGES_QUEUE,
66
inferMemberOrganizationStintChanges,
7+
signalMemberUpdate,
78
} from '@crowd/common_services'
89
import {
910
QueryExecutor,
@@ -18,6 +19,7 @@ import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/d
1819
import { deleteMemberSegmentAffiliations } from '@crowd/data-access-layer/src/member_segment_affiliations'
1920
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
2021
import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis'
22+
import { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal'
2123
import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types'
2224

2325
import { IJobDefinition } from '../types'
@@ -28,14 +30,17 @@ const job: IJobDefinition = {
2830
timeout: 10 * 60,
2931
process: async (ctx) => {
3032
const redis = await getRedisClient(REDIS_CONFIG())
31-
const db = await getDbConnection(WRITE_DB_CONFIG())
32-
const qx = pgpQx(db)
3333

3434
ctx.log.info('Starting member organization stint inference job.')
3535

3636
const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500)
37+
3738
if (!memberIds?.length) return
3839

40+
const db = await getDbConnection(WRITE_DB_CONFIG())
41+
const qx = pgpQx(db)
42+
const temporal = await getTemporalClient(TEMPORAL_CONFIG())
43+
3944
ctx.log.info({ count: memberIds.length }, 'Processing members from queue.')
4045

4146
let processed = 0
@@ -64,6 +69,12 @@ const job: IJobDefinition = {
6469
if (changes.length > 0) {
6570
ctx.log.debug({ memberId, changes }, 'Stint changes identified.')
6671
await qx.tx((tx) => applyStintChanges(tx, changes))
72+
73+
ctx.log.debug(
74+
{ memberId },
75+
'Triggering member update workflow to refresh affiliations.',
76+
)
77+
await signalMemberUpdate(temporal, memberId)
6778
}
6879
}
6980

0 commit comments

Comments
 (0)