Skip to content

Commit 0c51d12

Browse files
authored
Merge branch 'improve/CM-1105' into script/CM-1107
2 parents f834a6b + 0adfab9 commit 0c51d12

4 files changed

Lines changed: 80 additions & 75 deletions

File tree

services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ import {
1515
} from '@crowd/data-access-layer'
1616
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
1717
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
18-
import { REDIS_CONFIG, getRedisClient } from '@crowd/redis'
19-
import { MemberOrgStintChange, OrganizationSource } from '@crowd/types'
18+
import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis'
19+
import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types'
2020

2121
import { IJobDefinition } from '../types'
2222

@@ -41,40 +41,44 @@ const job: IJobDefinition = {
4141
for (const memberId of memberIds) {
4242
try {
4343
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
44-
const hash = await redis.hGetAll(datesKey)
44+
const rawMembers = await redis.sMembers(datesKey)
4545

46-
if (!hash || Object.keys(hash).length === 0) {
46+
if (!rawMembers?.length) {
4747
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
4848
continue
4949
}
5050

51-
const { activityDates, orgIds } = parseMemberActivityHash(hash)
51+
const orgDates = parseSetMembers(rawMembers)
5252

53-
if (activityDates.length > 0) {
53+
if (orgDates.length > 0) {
5454
const existingOrgs = await fetchMemberOrganizationsBySource(
5555
qx,
5656
memberId,
5757
OrganizationSource.EMAIL_DOMAIN,
5858
)
5959

60-
const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates)
60+
const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, orgDates)
6161

6262
if (changes.length > 0) {
6363
ctx.log.debug({ memberId, changes }, 'Stint changes identified.')
64-
await applyStintChanges(qx, changes)
64+
await qx.tx((tx) => applyStintChanges(tx, changes))
6565
}
6666
}
6767

68-
// Remove only the fields we actually read
69-
await redis
70-
.multi()
71-
.hDel(datesKey, orgIds)
72-
.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
73-
.exec()
68+
// Atomically remove only the values we read.
69+
// If no new values were added, remove the member from the queue.
70+
await RedisCache.ackSetMembers(
71+
redis,
72+
datesKey,
73+
MEMBER_ORG_STINT_CHANGES_QUEUE,
74+
memberId,
75+
rawMembers,
76+
)
7477

7578
processed++
7679
} catch (err) {
7780
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
81+
throw err
7882
}
7983
}
8084

@@ -83,23 +87,19 @@ const job: IJobDefinition = {
8387
}
8488

8589
/**
86-
* Parses the Redis hash into a clean, typed list of activity dates.
90+
* Parses set members of the form "orgId|date" into typed activity dates.
8791
*/
88-
function parseMemberActivityHash(hash: Record<string, string>) {
89-
const orgIds = Object.keys(hash)
90-
const activityDates = orgIds.flatMap((organizationId) => {
91-
try {
92-
const dates = JSON.parse(hash[organizationId])
93-
return Array.isArray(dates)
94-
? dates
95-
.filter((d): d is string => typeof d === 'string')
96-
.map((date) => ({ organizationId, date }))
97-
: []
98-
} catch {
99-
return []
92+
function parseSetMembers(members: string[]): MemberOrgDate[] {
93+
const results: MemberOrgDate[] = []
94+
95+
for (const m of members) {
96+
const idx = m.indexOf('|')
97+
if (idx > 0) {
98+
results.push({ organizationId: m.slice(0, idx), date: m.slice(idx + 1) })
10099
}
101-
})
102-
return { activityDates, orgIds }
100+
}
101+
102+
return results
103103
}
104104

105105
/**

services/apps/data_sink_worker/src/service/member.service.ts

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,56 +1065,26 @@ export default class MemberService extends LoggerBase {
10651065
}
10661066

10671067
/**
1068-
* Queues member activity dates in Redis as raw input for stint inference.
1069-
*
1070-
* To maximize throughput, this uses a non-atomic HGET/HSET pattern. While
1071-
* concurrent writes may occasionally drop a date, the system is self-healing
1072-
* as future activity will re-populate the buffer.
1068+
* Queues a member activity date in Redis for stint inference.
1069+
* Uses SADD for natural concurrency safety and deduplication.
10731070
*/
10741071
private async bufferMemberOrganizationActivityDates(
10751072
memberId: string,
10761073
organizationId: string,
10771074
activityTimestamp: string,
10781075
): Promise<void> {
10791076
const date = new Date(activityTimestamp).toISOString().split('T')[0]
1080-
const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
1081-
1082-
// Safe parser for existing Redis strings.
1083-
// Returns a valid string array even if data is corrupt or missing.
1084-
const parseExistingDates = (value: string | null | undefined): string[] => {
1085-
if (!value) return []
1086-
try {
1087-
const parsed = JSON.parse(value)
1088-
return Array.isArray(parsed) ? parsed.filter((d): d is string => typeof d === 'string') : []
1089-
} catch {
1090-
this.log.warn(
1091-
{ memberId, organizationId, key },
1092-
'Corrupt dates buffer value detected during buffering for member organization activity dates.',
1093-
)
1094-
return []
1095-
}
1096-
}
10971077

1098-
// 1. Fetch current dates for this specific organization
1099-
const existing = await this.redisClient.hGet(key, organizationId)
1100-
const dates: string[] = parseExistingDates(existing)
1078+
// Each member gets one flat set: values are "orgId|date"
1079+
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
1080+
const value = `${organizationId}|${date}`
11011081

1102-
// 2. If the date is new, update the set and the queue
1103-
if (!dates.includes(date)) {
1104-
dates.push(date)
1105-
dates.sort()
1082+
await this.redisClient
1083+
.multi()
1084+
.sAdd(datesKey, value)
1085+
.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
1086+
.exec()
11061087

1107-
await Promise.all([
1108-
// Update the specific org field in the hash
1109-
this.redisClient.hSet(key, organizationId, JSON.stringify(dates)),
1110-
// Ensure the member is in the processing queue
1111-
this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId),
1112-
])
1113-
1114-
this.log.debug(
1115-
{ memberId, organizationId, date, count: dates.length },
1116-
'Buffered activity date and queued member.',
1117-
)
1118-
}
1088+
this.log.debug({ memberId, organizationId, date }, 'Buffered activity date and queued member.')
11191089
}
11201090
}

services/libs/common_services/src/services/common.member.service.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,13 +248,12 @@ export class CommonMemberService extends LoggerBase {
248248
highestPrioritySourceExperiences.map((e) => e.organizationId),
249249
)
250250

251-
if (memberCounts[0].memberCount > memberCounts[1].memberCount) {
251+
// memberCounts is sorted desc by memberCount — pick the winner if it's strictly highest
252+
if (memberCounts?.length >= 2 && memberCounts[0].memberCount > memberCounts[1].memberCount) {
252253
return memberCounts[0].organizationId
253-
} else if (memberCounts[0].memberCount < memberCounts[1].memberCount) {
254-
return memberCounts[1].organizationId
255254
}
256255

257-
// if there's a draw in the member count, use the one with the longer period
256+
// tie or no data — fall back to longest date range
258257
return getLongestDateRange(highestPrioritySourceExperiences).organizationId
259258
}
260259

services/libs/redis/src/cache.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Logger, LoggerBase } from '@crowd/logging'
22
import { ICache } from '@crowd/types'
33

4-
import { RedisClient } from './types'
4+
import type { RedisClient } from './types'
55

66
export class RedisCache extends LoggerBase implements ICache {
77
private readonly prefixer: (key: string) => string
@@ -202,6 +202,42 @@ return cjson.encode(results)`
202202
return map
203203
}
204204

205+
/**
206+
* Atomically removes members from a set and cleans up the tracking queue
207+
* if the set is now empty. Prevents race conditions between SCARD and SREM.
208+
*
209+
* @returns 1 if the queue entry was removed, 0 otherwise.
210+
*/
211+
public static async ackSetMembers(
212+
client: RedisClient,
213+
setKey: string,
214+
queueKey: string,
215+
queueMember: string,
216+
members: string[],
217+
): Promise<number> {
218+
// Guard clause: Redis errors if you call SREM with no members
219+
if (members.length === 0) return 0
220+
221+
const script = `
222+
local chunkSize = 500
223+
for i = 2, #ARGV, chunkSize do
224+
redis.call('SREM', KEYS[1], unpack(ARGV, i, math.min(i + chunkSize - 1, #ARGV)))
225+
end
226+
227+
if redis.call('SCARD', KEYS[1]) == 0 then
228+
return redis.call('SREM', KEYS[2], ARGV[1])
229+
end
230+
return 0
231+
`
232+
233+
const result = await client.eval(script, {
234+
keys: [setKey, queueKey],
235+
arguments: [queueMember, ...members],
236+
})
237+
238+
return Number(result)
239+
}
240+
205241
public async setIfNotExistsOrGet(
206242
key: string,
207243
value: string,

0 commit comments

Comments
 (0)