@@ -5,14 +5,21 @@ import {
55 MEMBER_ORG_STINT_CHANGES_QUEUE ,
66 inferMemberOrganizationStintChanges ,
77} from '@crowd/common_services'
8- import { fetchMemberOrganizationsBySource } from '@crowd/data-access-layer'
8+ import {
9+ QueryExecutor ,
10+ createMemberOrganization ,
11+ fetchMemberOrganizationsBySource ,
12+ updateMemberOrganization ,
13+ } from '@crowd/data-access-layer'
914import { WRITE_DB_CONFIG , getDbConnection } from '@crowd/data-access-layer/src/database'
1015import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
1116import { REDIS_CONFIG , getRedisClient } from '@crowd/redis'
12- import { OrganizationSource } from '@crowd/types'
17+ import { MemberOrgStintChange , OrganizationSource } from '@crowd/types'
1318
1419import { IJobDefinition } from '../types'
1520
21+ const APPLY_STINT_CHANGES = false
22+
1623const job : IJobDefinition = {
1724 name : 'infer-member-organization-stint-changes' ,
1825 cronTime : CronTime . every ( 5 ) . minutes ( ) ,
@@ -22,31 +29,28 @@ const job: IJobDefinition = {
2229 const db = await getDbConnection ( WRITE_DB_CONFIG ( ) )
2330 const qx = pgpQx ( db )
2431
25- ctx . log . info ( 'Starting member organization stint inference job! ' )
32+ ctx . log . info ( 'Starting member organization stint inference job. ' )
2633
27- // 1. Get a batch of work
2834 const memberIds = await redis . sRandMemberCount ( MEMBER_ORG_STINT_CHANGES_QUEUE , 500 )
2935 if ( ! memberIds ?. length ) return
3036
31- ctx . log . info ( { count : memberIds . length } , 'Processing pending members.' )
32- const stats = { processed : 0 , inserts : 0 , updates : 0 }
37+ ctx . log . info ( { count : memberIds . length } , 'Processing members from queue.' )
38+
39+ let processed = 0
3340
3441 for ( const memberId of memberIds ) {
3542 try {
3643 const datesKey = `${ MEMBER_ORG_STINT_CHANGES_DATES_PREFIX } :${ memberId } `
3744 const hash = await redis . hGetAll ( datesKey )
3845
39- // If no data, just remove from queue and move on
4046 if ( ! hash || Object . keys ( hash ) . length === 0 ) {
4147 await redis . sRem ( MEMBER_ORG_STINT_CHANGES_QUEUE , memberId )
4248 continue
4349 }
4450
45- // 2. Parse Redis data into domain objects
4651 const { activityDates, orgIds } = parseMemberActivityHash ( hash )
4752
4853 if ( activityDates . length > 0 ) {
49- // 3. Compare with DB and calculate delta
5054 const existingOrgs = await fetchMemberOrganizationsBySource (
5155 qx ,
5256 memberId ,
@@ -56,26 +60,25 @@ const job: IJobDefinition = {
5660 const changes = inferMemberOrganizationStintChanges ( memberId , existingOrgs , activityDates )
5761
5862 if ( changes . length > 0 ) {
59- ctx . log . info ( { memberId, count : changes . length } , 'Stint changes identified.' )
60- stats . inserts += changes . filter ( ( c ) => c . type === 'insert' ) . length
61- stats . updates += changes . filter ( ( c ) => c . type === 'update' ) . length
63+ ctx . log . debug ( { memberId, changes } , 'Stint changes identified.' )
64+ // await applyStintChanges(qx, changes)
6265 }
6366 }
6467
65- // 4. Cleanup: Remove only the fields we actually read
68+ // Remove only the fields we actually read
6669 await redis
6770 . multi ( )
6871 . hDel ( datesKey , orgIds )
6972 . sRem ( MEMBER_ORG_STINT_CHANGES_QUEUE , memberId )
7073 . exec ( )
7174
72- stats . processed ++
75+ processed ++
7376 } catch ( err ) {
7477 ctx . log . error ( err , { memberId } , 'Failed to process member stint inference.' )
7578 }
7679 }
7780
78- ctx . log . info ( stats , 'Batch complete.' )
81+ ctx . log . info ( { processed } , 'Batch complete.' )
7982 } ,
8083}
8184
@@ -99,4 +102,25 @@ function parseMemberActivityHash(hash: Record<string, string>) {
99102 return { activityDates, orgIds }
100103}
101104
105+ /**
106+ * Applies the stint changes to the database.
107+ */
108+ async function applyStintChanges ( qx : QueryExecutor , changes : MemberOrgStintChange [ ] ) {
109+ for ( const change of changes ) {
110+ if ( change . type === 'insert' ) {
111+ await createMemberOrganization ( qx , change . memberId , {
112+ organizationId : change . organizationId ,
113+ dateStart : change . dateStart ,
114+ dateEnd : change . dateEnd ,
115+ source : OrganizationSource . EMAIL_DOMAIN ,
116+ } )
117+ } else {
118+ await updateMemberOrganization ( qx , change . memberId , change . id , {
119+ dateStart : change . dateStart ,
120+ dateEnd : change . dateEnd ,
121+ } )
122+ }
123+ }
124+ }
125+
102126export default job
0 commit comments