@@ -5,11 +5,18 @@ import {
55 MEMBER_ORG_STINT_CHANGES_QUEUE ,
66 inferMemberOrganizationStintChanges ,
77} from '@crowd/common_services'
8- import { fetchMemberOrganizationsBySource } from '@crowd/data-access-layer'
8+ import {
9+ QueryExecutor ,
10+ changeMemberOrganizationAffiliationOverrides ,
11+ checkOrganizationAffiliationPolicy ,
12+ createMemberOrganization ,
13+ fetchMemberOrganizationsBySource ,
14+ updateMemberOrganization ,
15+ } from '@crowd/data-access-layer'
916import { WRITE_DB_CONFIG , getDbConnection } from '@crowd/data-access-layer/src/database'
1017import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
1118import { REDIS_CONFIG , getRedisClient } from '@crowd/redis'
12- import { OrganizationSource } from '@crowd/types'
19+ import { MemberOrgStintChange , OrganizationSource } from '@crowd/types'
1320
1421import { IJobDefinition } from '../types'
1522
@@ -19,32 +26,31 @@ const job: IJobDefinition = {
1926 timeout : 10 * 60 ,
2027 process : async ( ctx ) => {
2128 const redis = await getRedisClient ( REDIS_CONFIG ( ) )
22- const db = await getDbConnection ( WRITE_DB_CONFIG ( ) , 2 , 0 )
29+ const db = await getDbConnection ( WRITE_DB_CONFIG ( ) )
2330 const qx = pgpQx ( db )
2431
25- // 1. Get a batch of work
32+ ctx . log . info ( 'Starting member organization stint inference job.' )
33+
2634 const memberIds = await redis . sRandMemberCount ( MEMBER_ORG_STINT_CHANGES_QUEUE , 500 )
2735 if ( ! memberIds ?. length ) return
2836
29- ctx . log . info ( { count : memberIds . length } , 'Processing pending members.' )
30- const stats = { processed : 0 , inserts : 0 , updates : 0 }
37+ ctx . log . info ( { count : memberIds . length } , 'Processing members from queue.' )
38+
39+ let processed = 0
3140
3241 for ( const memberId of memberIds ) {
3342 try {
3443 const datesKey = `${ MEMBER_ORG_STINT_CHANGES_DATES_PREFIX } :${ memberId } `
3544 const hash = await redis . hGetAll ( datesKey )
3645
37- // If no data, just remove from queue and move on
3846 if ( ! hash || Object . keys ( hash ) . length === 0 ) {
3947 await redis . sRem ( MEMBER_ORG_STINT_CHANGES_QUEUE , memberId )
4048 continue
4149 }
4250
43- // 2. Parse Redis data into domain objects
4451 const { activityDates, orgIds } = parseMemberActivityHash ( hash )
4552
4653 if ( activityDates . length > 0 ) {
47- // 3. Compare with DB and calculate delta
4854 const existingOrgs = await fetchMemberOrganizationsBySource (
4955 qx ,
5056 memberId ,
@@ -54,26 +60,25 @@ const job: IJobDefinition = {
5460 const changes = inferMemberOrganizationStintChanges ( memberId , existingOrgs , activityDates )
5561
5662 if ( changes . length > 0 ) {
57- ctx . log . info ( { memberId, count : changes . length } , 'Stint changes identified.' )
58- stats . inserts += changes . filter ( ( c ) => c . type === 'insert' ) . length
59- stats . updates += changes . filter ( ( c ) => c . type === 'update' ) . length
63+ ctx . log . debug ( { memberId, changes } , 'Stint changes identified.' )
64+ await applyStintChanges ( qx , changes )
6065 }
6166 }
6267
63- // 4. Cleanup: Remove only the fields we actually read
68+ // Remove only the fields we actually read
6469 await redis
6570 . multi ( )
6671 . hDel ( datesKey , orgIds )
6772 . sRem ( MEMBER_ORG_STINT_CHANGES_QUEUE , memberId )
6873 . exec ( )
6974
70- stats . processed ++
75+ processed ++
7176 } catch ( err ) {
7277 ctx . log . error ( err , { memberId } , 'Failed to process member stint inference.' )
7378 }
7479 }
7580
76- ctx . log . info ( stats , 'Batch complete.' )
81+ ctx . log . info ( { processed } , 'Batch complete.' )
7782 } ,
7883}
7984
@@ -97,4 +102,40 @@ function parseMemberActivityHash(hash: Record<string, string>) {
97102 return { activityDates, orgIds }
98103}
99104
105+ /**
106+ * Applies the stint changes to the database.
107+ */
108+ async function applyStintChanges ( qx : QueryExecutor , changes : MemberOrgStintChange [ ] ) {
109+ for ( const change of changes ) {
110+ if ( change . type === 'insert' ) {
111+ const memberOrganizationId = await createMemberOrganization ( qx , change . memberId , {
112+ organizationId : change . organizationId ,
113+ dateStart : change . dateStart ,
114+ dateEnd : change . dateEnd ,
115+ source : OrganizationSource . EMAIL_DOMAIN ,
116+ } )
117+
118+ const isAffiliationBlocked = await checkOrganizationAffiliationPolicy (
119+ qx ,
120+ change . organizationId ,
121+ )
122+
123+ if ( memberOrganizationId && isAffiliationBlocked ) {
124+ await changeMemberOrganizationAffiliationOverrides ( qx , [
125+ {
126+ memberId : change . memberId ,
127+ memberOrganizationId,
128+ allowAffiliation : false ,
129+ } ,
130+ ] )
131+ }
132+ } else {
133+ await updateMemberOrganization ( qx , change . memberId , change . id , {
134+ dateStart : change . dateStart ,
135+ dateEnd : change . dateEnd ,
136+ } )
137+ }
138+ }
139+ }
140+
100141export default job
0 commit comments