11import CronTime from 'cron-time-generator'
2- import {
3- inferMemberOrganizationStintChanges ,
4- MEMBER_ORG_STINT_CHANGES_DATES_PREFIX ,
5- MEMBER_ORG_STINT_CHANGES_QUEUE
6- } from '@crowd/common_services'
2+
73import {
8- createMemberOrganization ,
9- fetchMemberOrganizationsBySource ,
10- updateMemberOrganization ,
11- } from '@crowd/data-access-layer'
4+ MEMBER_ORG_STINT_CHANGES_DATES_PREFIX ,
5+ MEMBER_ORG_STINT_CHANGES_QUEUE ,
6+ inferMemberOrganizationStintChanges ,
7+ } from '@crowd/common_services'
8+ import { fetchMemberOrganizationsBySource } from '@crowd/data-access-layer'
129import { WRITE_DB_CONFIG , getDbConnection } from '@crowd/data-access-layer/src/database'
13- import { pgpQx , QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor'
14- import { REDIS_CONFIG , RedisClient , getRedisClient } from '@crowd/redis'
15- import { MemberOrgDate , MemberOrgStintChange , OrganizationSource } from '@crowd/types'
10+ import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
11+ import { REDIS_CONFIG , getRedisClient } from '@crowd/redis'
12+ import { OrganizationSource } from '@crowd/types'
13+
1614import { IJobDefinition } from '../types'
1715
1816const job : IJobDefinition = {
@@ -24,106 +22,82 @@ const job: IJobDefinition = {
2422 const db = await getDbConnection ( WRITE_DB_CONFIG ( ) , 2 , 0 )
2523 const qx = pgpQx ( db )
2624
27- // 1. Fetch a batch of work triggers (Member IDs)
28- const memberIds = await redis . sPop ( MEMBER_ORG_STINT_CHANGES_QUEUE , 500 )
25+ // 1. Get a batch of work
26+ const memberIds = await redis . sRandMember ( MEMBER_ORG_STINT_CHANGES_QUEUE , 500 )
2927 if ( ! memberIds ?. length ) return
3028
3129 ctx . log . info ( { count : memberIds . length } , 'Processing pending members.' )
3230 const stats = { processed : 0 , inserts : 0 , updates : 0 }
3331
3432 for ( const memberId of memberIds ) {
3533 try {
36- // 2. Get the activity dates for this member
37- const activityDates = await popMemberOrganizationActivityDates ( redis , memberId )
38-
39- // If the member has no activity dates, move to the next member
40- if ( activityDates . length === 0 ) continue
41-
42- // 3. Sync with existing database state
43- const existingOrgs = await fetchMemberOrganizationsBySource (
44- qx ,
45- memberId ,
46- OrganizationSource . EMAIL_DOMAIN ,
47- )
48-
49- // 4. Calculate required stint changes
50- const stintChanges = inferMemberOrganizationStintChanges (
51- memberId ,
52- existingOrgs ,
53- activityDates ,
54- )
55-
56- if ( stintChanges . length === 0 ) continue
57-
58- const counts = {
59- inserts : stintChanges . filter ( ( c ) => c . type === 'insert' ) . length ,
60- updates : stintChanges . filter ( ( c ) => c . type === 'update' ) . length ,
34+ const datesKey = `${ MEMBER_ORG_STINT_CHANGES_DATES_PREFIX } :${ memberId } `
35+ const hash = await redis . hGetAll ( datesKey )
36+
37+ // If no data, just remove from queue and move on
38+ if ( ! hash || Object . keys ( hash ) . length === 0 ) {
39+ await redis . sRem ( MEMBER_ORG_STINT_CHANGES_QUEUE , memberId )
40+ continue
6141 }
6242
63- ctx . log . info ( { memberId, ...counts } , 'Stint changes identified.' )
43+ // 2. Parse Redis data into domain objects
44+ const { activityDates, orgIds } = parseMemberActivityHash ( hash )
6445
65- ctx . log . debug (
66- { memberId, activityDates, existingOrgs, stintChanges } ,
67- 'Stint inference trace.' ,
68- )
46+ if ( activityDates . length > 0 ) {
47+ // 3. Compare with DB and calculate delta
48+ const existingOrgs = await fetchMemberOrganizationsBySource (
49+ qx ,
50+ memberId ,
51+ OrganizationSource . EMAIL_DOMAIN ,
52+ )
6953
70- // @todo : Enable writes after dry-run validation
71- // await applyStintChanges(qx, stintChanges)
54+ const changes = inferMemberOrganizationStintChanges ( memberId , existingOrgs , activityDates )
55+
56+ if ( changes . length > 0 ) {
57+ ctx . log . info ( { memberId, count : changes . length } , 'Stint changes identified.' )
58+ stats . inserts += changes . filter ( ( c ) => c . type === 'insert' ) . length
59+ stats . updates += changes . filter ( ( c ) => c . type === 'update' ) . length
60+ }
61+ }
62+
63+ // 4. Cleanup: Remove only the fields we actually read
64+ await redis
65+ . multi ( )
66+ . hDel ( datesKey , ...orgIds )
67+ . sRem ( MEMBER_ORG_STINT_CHANGES_QUEUE , memberId )
68+ . exec ( )
7269
7370 stats . processed ++
74- stats . inserts += counts . inserts
75- stats . updates += counts . updates
7671 } catch ( err ) {
7772 ctx . log . error ( err , { memberId } , 'Failed to process member stint inference.' )
7873 }
7974 }
8075
81- ctx . log . info ( stats , 'Batch inference complete.' )
76+ ctx . log . info ( stats , 'Batch complete.' )
8277 } ,
8378}
8479
85- async function popMemberOrganizationActivityDates (
86- redis : RedisClient ,
87- memberId : string ,
88- ) : Promise < MemberOrgDate [ ] > {
89- const key = `${ MEMBER_ORG_STINT_CHANGES_DATES_PREFIX } :${ memberId } `
90-
91- // hGetAll + del in a multi block makes the "Pop" atomic for the entire Hash
92- const [ hash ] = ( await redis . multi ( ) . hGetAll ( key ) . del ( key ) . exec ( ) ) as [ Record < string , string > | null , number ]
93-
94- if ( ! hash || Object . keys ( hash ) . length === 0 ) return [ ]
95-
96- return Object . entries ( hash )
97- . flatMap ( ( [ organizationId , datesJson ] ) =>
98- ( JSON . parse ( datesJson ) as string [ ] ) . map ( ( date ) => ( { organizationId, date } ) ) ,
99- )
80+ /**
81+ * Parses the Redis hash into a clean, typed list of activity dates.
82+ */
83+ function parseMemberActivityHash ( hash : Record < string , string > ) {
84+ const orgIds = Object . keys ( hash )
85+ const activityDates = orgIds
86+ . flatMap ( ( organizationId ) => {
87+ try {
88+ const dates = JSON . parse ( hash [ organizationId ] )
89+ return Array . isArray ( dates )
90+ ? dates
91+ . filter ( ( d ) : d is string => typeof d === 'string' )
92+ . map ( ( date ) => ( { organizationId, date } ) )
93+ : [ ]
94+ } catch {
95+ return [ ]
96+ }
97+ } )
10098 . sort ( ( a , b ) => a . date . localeCompare ( b . date ) )
101- }
102-
103- async function applyStintChanges (
104- qx : QueryExecutor ,
105- stintChanges : MemberOrgStintChange [ ] ,
106- ) : Promise < void > {
107- for ( const change of stintChanges ) {
108- if ( change . type === 'insert' ) {
109- await createMemberOrganization ( qx , change . memberId , {
110- organizationId : change . organizationId ,
111- dateStart : change . dateStart ,
112- dateEnd : change . dateEnd ,
113- source : OrganizationSource . EMAIL_DOMAIN ,
114- } )
115- continue
116- }
117-
118- if ( ! change . id ) {
119- throw new Error ( 'Missing id for update stint change.' )
120- }
12199
122- await updateMemberOrganization ( qx , change . memberId , change . id , {
123- dateStart : change . dateStart ,
124- dateEnd : change . dateEnd ,
125- } )
126- }
100+ return { activityDates, orgIds }
127101}
128102
129- export default job
103+ export default job
0 commit comments