Skip to content

Commit 4fead74

Browse files
committed
fix: resolve pr review comments
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
1 parent 498f0c1 commit 4fead74

6 files changed

Lines changed: 160 additions & 76 deletions

File tree

backend/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
"script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts",
3333
"script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts",
3434
"script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts",
35-
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts"
35+
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts",
36+
"script:remove-member-org-stint-hash-keys": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/remove-member-org-stint-hash-keys.ts"
3637
},
3738
"lint-staged": {
3839
"**/*.ts": [
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import commandLineArgs from 'command-line-args'
2+
3+
import {
4+
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
5+
MEMBER_ORG_STINT_CHANGES_QUEUE,
6+
} from '@crowd/common_services'
7+
import { getServiceLogger } from '@crowd/logging'
8+
import { getRedisClient, stopClient } from '@crowd/redis'
9+
10+
import { REDIS_CONFIG } from '@/conf'
11+
12+
const log = getServiceLogger()
13+
14+
const options = [
15+
{
16+
name: 'confirm',
17+
alias: 'c',
18+
type: Boolean,
19+
description: 'Actually delete old hash keys. Defaults to dry-run.',
20+
},
21+
{
22+
name: 'count',
23+
type: Number,
24+
defaultValue: 500,
25+
description: 'SCAN count hint.',
26+
},
27+
]
28+
29+
const parameters = commandLineArgs(options)
30+
31+
function memberIdFromDatesKey(key: string): string {
32+
return key.slice(`${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:`.length)
33+
}
34+
35+
setImmediate(async () => {
36+
const dryRun = !parameters.confirm
37+
const scanCount = parameters.count
38+
const redis = await getRedisClient(REDIS_CONFIG, true)
39+
const pattern = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:*`
40+
41+
let scanned = 0
42+
let hashKeys = 0
43+
let deleted = 0
44+
let cursor = 0
45+
46+
log.info({ dryRun, pattern, scanCount }, 'Removing old member organization stint hash keys.')
47+
48+
try {
49+
do {
50+
const result = await redis.scan(cursor, {
51+
MATCH: pattern,
52+
COUNT: scanCount,
53+
})
54+
55+
cursor = Number(result.cursor)
56+
scanned += result.keys.length
57+
58+
for (const key of result.keys) {
59+
const type = await redis.type(key)
60+
if (type === 'hash') {
61+
hashKeys++
62+
const memberId = memberIdFromDatesKey(key)
63+
64+
if (dryRun) {
65+
log.info({ key, memberId }, 'Would remove old hash key and queue member.')
66+
} else {
67+
await redis.multi().del(key).sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId).exec()
68+
69+
deleted++
70+
}
71+
}
72+
}
73+
} while (cursor !== 0)
74+
75+
log.info({ dryRun, scanned, hashKeys, deleted }, 'Finished removing old hash keys.')
76+
} finally {
77+
await stopClient(redis)
78+
}
79+
80+
process.exit(0)
81+
})

services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ import {
1515
} from '@crowd/data-access-layer'
1616
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
1717
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
18-
import { REDIS_CONFIG, getRedisClient } from '@crowd/redis'
19-
import { MemberOrgStintChange, OrganizationSource } from '@crowd/types'
18+
import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis'
19+
import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types'
2020

2121
import { IJobDefinition } from '../types'
2222

@@ -41,40 +41,44 @@ const job: IJobDefinition = {
4141
for (const memberId of memberIds) {
4242
try {
4343
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
44-
const hash = await redis.hGetAll(datesKey)
44+
const rawMembers = await redis.sMembers(datesKey)
4545

46-
if (!hash || Object.keys(hash).length === 0) {
46+
if (!rawMembers?.length) {
4747
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
4848
continue
4949
}
5050

51-
const { activityDates, orgIds } = parseMemberActivityHash(hash)
51+
const orgDates = parseSetMembers(rawMembers)
5252

53-
if (activityDates.length > 0) {
53+
if (orgDates.length > 0) {
5454
const existingOrgs = await fetchMemberOrganizationsBySource(
5555
qx,
5656
memberId,
5757
OrganizationSource.EMAIL_DOMAIN,
5858
)
5959

60-
const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates)
60+
const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, orgDates)
6161

6262
if (changes.length > 0) {
6363
ctx.log.debug({ memberId, changes }, 'Stint changes identified.')
64-
await applyStintChanges(qx, changes)
64+
await qx.tx((tx) => applyStintChanges(tx, changes))
6565
}
6666
}
6767

68-
// Remove only the fields we actually read
69-
await redis
70-
.multi()
71-
.hDel(datesKey, orgIds)
72-
.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
73-
.exec()
68+
// Atomically remove only the values we read.
69+
// If no new values were added, remove the member from the queue.
70+
await RedisCache.ackSetMembers(
71+
redis,
72+
datesKey,
73+
MEMBER_ORG_STINT_CHANGES_QUEUE,
74+
memberId,
75+
rawMembers,
76+
)
7477

7578
processed++
7679
} catch (err) {
7780
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
81+
throw err
7882
}
7983
}
8084

@@ -83,23 +87,19 @@ const job: IJobDefinition = {
8387
}
8488

8589
/**
86-
* Parses the Redis hash into a clean, typed list of activity dates.
90+
* Parses set members of the form "orgId|date" into typed activity dates.
8791
*/
88-
function parseMemberActivityHash(hash: Record<string, string>) {
89-
const orgIds = Object.keys(hash)
90-
const activityDates = orgIds.flatMap((organizationId) => {
91-
try {
92-
const dates = JSON.parse(hash[organizationId])
93-
return Array.isArray(dates)
94-
? dates
95-
.filter((d): d is string => typeof d === 'string')
96-
.map((date) => ({ organizationId, date }))
97-
: []
98-
} catch {
99-
return []
92+
function parseSetMembers(members: string[]): MemberOrgDate[] {
93+
const results: MemberOrgDate[] = []
94+
95+
for (const m of members) {
96+
const idx = m.indexOf('|')
97+
if (idx > 0) {
98+
results.push({ organizationId: m.slice(0, idx), date: m.slice(idx + 1) })
10099
}
101-
})
102-
return { activityDates, orgIds }
100+
}
101+
102+
return results
103103
}
104104

105105
/**

services/apps/data_sink_worker/src/service/member.service.ts

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,56 +1065,26 @@ export default class MemberService extends LoggerBase {
10651065
}
10661066

10671067
/**
1068-
* Queues member activity dates in Redis as raw input for stint inference.
1069-
*
1070-
* To maximize throughput, this uses a non-atomic HGET/HSET pattern. While
1071-
* concurrent writes may occasionally drop a date, the system is self-healing
1072-
* as future activity will re-populate the buffer.
1068+
* Queues a member activity date in Redis for stint inference.
1069+
* Uses SADD for natural concurrency safety and deduplication.
10731070
*/
10741071
private async bufferMemberOrganizationActivityDates(
10751072
memberId: string,
10761073
organizationId: string,
10771074
activityTimestamp: string,
10781075
): Promise<void> {
10791076
const date = new Date(activityTimestamp).toISOString().split('T')[0]
1080-
const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
1081-
1082-
// Safe parser for existing Redis strings.
1083-
// Returns a valid string array even if data is corrupt or missing.
1084-
const parseExistingDates = (value: string | null | undefined): string[] => {
1085-
if (!value) return []
1086-
try {
1087-
const parsed = JSON.parse(value)
1088-
return Array.isArray(parsed) ? parsed.filter((d): d is string => typeof d === 'string') : []
1089-
} catch {
1090-
this.log.warn(
1091-
{ memberId, organizationId, key },
1092-
'Corrupt dates buffer value detected during buffering for member organization activity dates.',
1093-
)
1094-
return []
1095-
}
1096-
}
10971077

1098-
// 1. Fetch current dates for this specific organization
1099-
const existing = await this.redisClient.hGet(key, organizationId)
1100-
const dates: string[] = parseExistingDates(existing)
1078+
// Each member gets one flat set: values are "orgId|date"
1079+
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
1080+
const value = `${organizationId}|${date}`
11011081

1102-
// 2. If the date is new, update the set and the queue
1103-
if (!dates.includes(date)) {
1104-
dates.push(date)
1105-
dates.sort()
1082+
await this.redisClient
1083+
.multi()
1084+
.sAdd(datesKey, value)
1085+
.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
1086+
.exec()
11061087

1107-
await Promise.all([
1108-
// Update the specific org field in the hash
1109-
this.redisClient.hSet(key, organizationId, JSON.stringify(dates)),
1110-
// Ensure the member is in the processing queue
1111-
this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId),
1112-
])
1113-
1114-
this.log.debug(
1115-
{ memberId, organizationId, date, count: dates.length },
1116-
'Buffered activity date and queued member.',
1117-
)
1118-
}
1088+
this.log.debug({ memberId, organizationId, date }, 'Buffered activity date and queued member.')
11191089
}
11201090
}

services/libs/common_services/src/services/common.member.service.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,13 +248,12 @@ export class CommonMemberService extends LoggerBase {
248248
highestPrioritySourceExperiences.map((e) => e.organizationId),
249249
)
250250

251-
if (memberCounts[0].memberCount > memberCounts[1].memberCount) {
251+
// memberCounts is sorted desc by memberCount — pick the winner if it's strictly highest
252+
if (memberCounts?.length >= 2 && memberCounts[0].memberCount > memberCounts[1].memberCount) {
252253
return memberCounts[0].organizationId
253-
} else if (memberCounts[0].memberCount < memberCounts[1].memberCount) {
254-
return memberCounts[1].organizationId
255254
}
256255

257-
// if there's a draw in the member count, use the one with the longer period
256+
// tie or no data — fall back to longest date range
258257
return getLongestDateRange(highestPrioritySourceExperiences).organizationId
259258
}
260259

services/libs/redis/src/cache.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Logger, LoggerBase } from '@crowd/logging'
22
import { ICache } from '@crowd/types'
33

4-
import { RedisClient } from './types'
4+
import type { RedisClient } from './types'
55

66
export class RedisCache extends LoggerBase implements ICache {
77
private readonly prefixer: (key: string) => string
@@ -202,6 +202,39 @@ return cjson.encode(results)`
202202
return map
203203
}
204204

205+
/**
206+
* Atomically removes members from a set and cleans up the tracking queue
207+
* if the set is now empty. Prevents race conditions between SCARD and SREM.
208+
*
209+
* @returns 1 if the queue entry was removed, 0 otherwise.
210+
*/
211+
public static async ackSetMembers(
212+
client: RedisClient,
213+
setKey: string,
214+
queueKey: string,
215+
queueMember: string,
216+
members: string[],
217+
): Promise<number> {
218+
// Guard clause: Redis errors if you call SREM with no members
219+
if (members.length === 0) return 0
220+
221+
// Uses 'unpack' for O(1) script execution instead of a loop
222+
const script = `
223+
redis.call('SREM', KEYS[1], unpack(ARGV, 2))
224+
if redis.call('SCARD', KEYS[1]) == 0 then
225+
return redis.call('SREM', KEYS[2], ARGV[1])
226+
end
227+
return 0
228+
`
229+
230+
const result = await client.eval(script, {
231+
keys: [setKey, queueKey],
232+
arguments: [queueMember, ...members],
233+
})
234+
235+
return Number(result)
236+
}
237+
205238
public async setIfNotExistsOrGet(
206239
key: string,
207240
value: string,

0 commit comments

Comments
 (0)