Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain"
ON "memberOrganizations" ("memberId")
WHERE "source" = 'email-domain' AND "deletedAt" IS NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import CronTime from 'cron-time-generator'

import {
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
MEMBER_ORG_STINT_CHANGES_QUEUE,
inferMemberOrganizationStintChanges,
} from '@crowd/common_services'
import { fetchMemberOrganizationsBySource } from '@crowd/data-access-layer'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { REDIS_CONFIG, getRedisClient } from '@crowd/redis'
import { OrganizationSource } from '@crowd/types'

import { IJobDefinition } from '../types'

const job: IJobDefinition = {
name: 'infer-member-organization-stint-changes',
cronTime: CronTime.every(5).minutes(),
timeout: 10 * 60,
process: async (ctx) => {
const redis = await getRedisClient(REDIS_CONFIG())
const db = await getDbConnection(WRITE_DB_CONFIG(), 2, 0)
const qx = pgpQx(db)

// 1. Get a batch of work
const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500)
Comment thread
skwowet marked this conversation as resolved.
if (!memberIds?.length) return

ctx.log.info({ count: memberIds.length }, 'Processing pending members.')
const stats = { processed: 0, inserts: 0, updates: 0 }

for (const memberId of memberIds) {
try {
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
const hash = await redis.hGetAll(datesKey)

// If no data, just remove from queue and move on
if (!hash || Object.keys(hash).length === 0) {
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
continue
}

// 2. Parse Redis data into domain objects
const { activityDates, orgIds } = parseMemberActivityHash(hash)

if (activityDates.length > 0) {
// 3. Compare with DB and calculate delta
const existingOrgs = await fetchMemberOrganizationsBySource(
qx,
memberId,
OrganizationSource.EMAIL_DOMAIN,
)

const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates)

if (changes.length > 0) {
ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.')
stats.inserts += changes.filter((c) => c.type === 'insert').length
stats.updates += changes.filter((c) => c.type === 'update').length
}
}
Comment thread
skwowet marked this conversation as resolved.
Comment thread
skwowet marked this conversation as resolved.
Comment on lines +46 to +61
Copy link

Copilot AI Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cron job computes changes via inferMemberOrganizationStintChanges(...) but never applies them to Postgres (no INSERT/UPDATE of memberOrganizations). As written, it will log and then delete the buffered Redis dates, effectively dropping the inference signal. Persist the computed changes (ideally in a transaction per member) before cleaning up Redis.

Copilot uses AI. Check for mistakes.

// 4. Cleanup: Remove only the fields we actually read
await redis
.multi()
.hDel(datesKey, orgIds)
.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
.exec()
Comment thread
skwowet marked this conversation as resolved.

stats.processed++
} catch (err) {
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
}
Comment thread
skwowet marked this conversation as resolved.
}

ctx.log.info(stats, 'Batch complete.')
},
}

/**
* Parses the Redis hash into a clean, typed list of activity dates.
*/
function parseMemberActivityHash(hash: Record<string, string>) {
const orgIds = Object.keys(hash)
const activityDates = orgIds.flatMap((organizationId) => {
try {
const dates = JSON.parse(hash[organizationId])
return Array.isArray(dates)
? dates
.filter((d): d is string => typeof d === 'string')
.map((date) => ({ organizationId, date }))
: []
} catch {
return []
}
})
return { activityDates, orgIds }
}

export default job
2 changes: 0 additions & 2 deletions services/apps/data_sink_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
"script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts",
"script:process-results": "SERVICE=script tsx src/bin/process-results.ts",
"script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts",
"script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts",
"script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts",
"script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts",
"script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts",
"script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts",
Expand Down
97 changes: 0 additions & 97 deletions services/apps/data_sink_worker/src/bin/map-member-to-org.ts

This file was deleted.

This file was deleted.

11 changes: 11 additions & 0 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ export default class ActivityService extends LoggerBase {
value.platform,
undefined,
orgPromiseCache,
value.timestamp,
)
.then((memberId) => {
// map ids for members
Expand Down Expand Up @@ -1342,6 +1343,7 @@ export default class ActivityService extends LoggerBase {
payload.platform,
undefined,
orgPromiseCache,
payload.activity.timestamp,
)
.then(() => {
payload.memberId = payload.dbMember.id
Expand Down Expand Up @@ -1400,6 +1402,7 @@ export default class ActivityService extends LoggerBase {
payload.platform,
undefined,
orgPromiseCache,
payload.activity.timestamp,
)
.then(() => {
payload.objectMemberId = payload.dbObjectMember.id
Expand Down Expand Up @@ -1447,11 +1450,19 @@ export default class ActivityService extends LoggerBase {
) as boolean

if (!isBot) {
const verifiedEmailIdentity = payload.activity.member.identities?.find(
(i) => i.type === MemberIdentityType.EMAIL && i.verified,
)
const emailDomain = verifiedEmailIdentity
? verifiedEmailIdentity.value.split('@')[1]
: undefined
Comment thread
skwowet marked this conversation as resolved.

// associate activity with organization
payload.organizationId = await this.commonMemberService.findAffiliation(
payload.memberId,
payload.segmentId,
payload.activity.timestamp,
emailDomain,
)
} else {
// for bot members, we don't want to affiliate the activity with an organization
Expand Down
Loading
Loading