Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d997511
feat: infer memberOrganization stint dates from work-email activities
skwowet Apr 24, 2026
057464d
fix: resolve pr comments from ai bots
skwowet Apr 24, 2026
0af601c
fix: update redis member ID retrieval method and cleanup logic
skwowet Apr 24, 2026
34354a5
Update services/apps/data_sink_worker/src/service/member.service.ts
skwowet Apr 24, 2026
f74aef1
fix: resolve pr comments from ai bots
skwowet Apr 24, 2026
68a79fc
refactor: extract source rank logic into a separate func for clarity …
skwowet Apr 26, 2026
9553e2c
refactor: streamline email domain extraction logic in ActivityService
skwowet Apr 28, 2026
e0a9860
chore: backfill email-domain member organization dates
skwowet Apr 28, 2026
8a9f500
chore: enable debugger logs in prod
skwowet Apr 29, 2026
d7a18df
chore: add logging for member organization stint inference job start
skwowet Apr 29, 2026
15bd7ec
fix: debugger and info logs
skwowet Apr 29, 2026
8914f41
chore: remove unused constant and add TODO for applying stint changes…
skwowet Apr 29, 2026
498f0c1
feat: apply overrides in member organization stint changes job
skwowet Apr 29, 2026
f834a6b
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet Apr 29, 2026
4fead74
fix: resolve pr review comments
skwowet Apr 30, 2026
0f3d3a2
fix: redis key pruning script
skwowet Apr 30, 2026
21e42bf
chore: rm redis key clean up script
skwowet Apr 30, 2026
0adfab9
fix: lua script edge case
skwowet Apr 30, 2026
0c51d12
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet Apr 30, 2026
086253c
fix: resolve pr review comments
skwowet Apr 30, 2026
bdce7d1
fix: set organization source to UI for manual edits in member organiz…
skwowet Apr 30, 2026
3de432b
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet Apr 30, 2026
c2f76de
Merge branch 'main' into improve/CM-1105
skwowet Apr 30, 2026
4a4434a
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet Apr 30, 2026
afc5734
chore: rm debugger env
skwowet May 3, 2026
96e5871
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet May 3, 2026
c9fac03
Merge branch 'main' into script/CM-1107
skwowet May 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts",
"script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts",
"script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts",
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts"
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts",
"script:backfill-email-domain-member-organization-dates": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/backfill-email-domain-member-organization-dates.ts"
},
"lint-staged": {
"**/*.ts": [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import commandLineArgs from 'command-line-args'

import { inferMemberOrganizationStintChanges } from '@crowd/common_services'
import {
createMemberOrganization,
fetchEmailDomainMemberOrganizationActivityDates,
fetchEmailDomainMemberOrganizationsWithoutDates,
fetchMemberOrganizationsBySource,
pgpQx,
updateMemberOrganization,
} from '@crowd/data-access-layer'
import { getDbConnection } from '@crowd/data-access-layer/src/database'
import { chunkArray } from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/utils'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import { OrganizationSource } from '@crowd/types'

import { DB_CONFIG, REDIS_CONFIG } from '@/conf'

const log = getServiceLogger()

const options = [
{
name: 'testRun',
alias: 't',
type: Boolean,
description: 'Run in test mode (limit to 1 batch and 10 members).',
},
{
name: 'afterMemberId',
alias: 'a',
type: String,
description: 'The member ID to start processing after.',
},
{
name: 'batchSize',
alias: 'b',
type: Number,
description: 'The number of members to fetch in each batch.',
},
{
name: 'help',
alias: 'h',
type: Boolean,
description: 'Print this usage guide.',
},
]

const parameters = commandLineArgs(options)

setImmediate(async () => {
const testRun = parameters.testRun ?? false
const BATCH_SIZE = parameters.batchSize ?? (testRun ? 10 : 500)
let afterMemberId = parameters.afterMemberId ?? undefined

const db = await getDbConnection({
host: DB_CONFIG.writeHost,
port: DB_CONFIG.port,
database: DB_CONFIG.database,
user: DB_CONFIG.username,
password: DB_CONFIG.password,
})

const qx = pgpQx(db)
const redis = await getRedisClient(REDIS_CONFIG, true)

log.info({ testRun, BATCH_SIZE, afterMemberId }, 'Running script with the following parameters!')

let hasMore = true

while (hasMore) {
const memberIds = await fetchEmailDomainMemberOrganizationsWithoutDates(
qx,
BATCH_SIZE,
afterMemberId,
)

if (memberIds.length > 0) {
for (const chunk of chunkArray(memberIds, 50)) {
await Promise.all(
chunk.map(async (memberId) => {
if (testRun) {
log.info({ memberId }, 'Processing member!')
}

try {
const [existingMemberOrganizations, activityDates] = await Promise.all([
fetchMemberOrganizationsBySource(qx, memberId, OrganizationSource.EMAIL_DOMAIN),
fetchEmailDomainMemberOrganizationActivityDates(qx, memberId),
])

const changes = inferMemberOrganizationStintChanges(
memberId,
existingMemberOrganizations,
activityDates,
)

if (testRun) {
log.info(
{ existingMemberOrganizations, activityDates, changes },
'Previewing changes for member.',
)
}

if (changes.length > 0) {
await qx.tx(async (tx) => {
for (const change of changes) {
if (change.type === 'insert') {
await createMemberOrganization(tx, memberId, {
organizationId: change.organizationId,
dateStart: change.dateStart,
dateEnd: change.dateEnd,
source: OrganizationSource.EMAIL_DOMAIN,
})
Comment thread
skwowet marked this conversation as resolved.
} else if (change.type === 'update') {
await updateMemberOrganization(tx, memberId, change.id, {
dateStart: change.dateStart,
dateEnd: change.dateEnd,
})
}

if (testRun) {
log.info(
{ memberId, orgId: change.organizationId, type: change.type },
'Member organization updated.',
)
}
}
})
await redis.sAdd('recalculate-member-affiliations', [memberId])
} else if (testRun) {
log.info({ memberId }, 'No changes found for member!')
}
} catch (err) {
log.error({ memberId, err }, 'Failed to process for member!')
throw err
}
}),
)
}

const lastMemberId = memberIds[memberIds.length - 1]
afterMemberId = lastMemberId

log.info({ lastMemberId, count: memberIds.length }, 'Batch processed!')

if (testRun || memberIds.length < BATCH_SIZE) {
hasMore = false
}
} else {
hasMore = false
}
}

process.exit(0)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain"
ON "memberOrganizations" ("memberId")
WHERE "source" = 'email-domain' AND "deletedAt" IS NULL;
2 changes: 1 addition & 1 deletion services/apps/cron_service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/cron-service",
"private": true,
"scripts": {
"start": "SERVICE=cron-service tsx src/main.ts",
"start": "SERVICE=cron-service LOG_LEVEL=trace tsx src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug": "SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import CronTime from 'cron-time-generator'

import {
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
MEMBER_ORG_STINT_CHANGES_QUEUE,
inferMemberOrganizationStintChanges,
} from '@crowd/common_services'
import {
QueryExecutor,
changeMemberOrganizationAffiliationOverrides,
checkOrganizationAffiliationPolicy,
createMemberOrganization,
fetchMemberOrganizationsBySource,
updateMemberOrganization,
} from '@crowd/data-access-layer'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { REDIS_CONFIG, getRedisClient } from '@crowd/redis'
import { MemberOrgStintChange, OrganizationSource } from '@crowd/types'

import { IJobDefinition } from '../types'

const job: IJobDefinition = {
name: 'infer-member-organization-stint-changes',
cronTime: CronTime.every(5).minutes(),
timeout: 10 * 60,
process: async (ctx) => {
const redis = await getRedisClient(REDIS_CONFIG())
const db = await getDbConnection(WRITE_DB_CONFIG())
const qx = pgpQx(db)

ctx.log.info('Starting member organization stint inference job.')

const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500)
if (!memberIds?.length) return

ctx.log.info({ count: memberIds.length }, 'Processing members from queue.')

let processed = 0

for (const memberId of memberIds) {
try {
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
const hash = await redis.hGetAll(datesKey)

if (!hash || Object.keys(hash).length === 0) {
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
continue
}

const { activityDates, orgIds } = parseMemberActivityHash(hash)

if (activityDates.length > 0) {
const existingOrgs = await fetchMemberOrganizationsBySource(
qx,
memberId,
OrganizationSource.EMAIL_DOMAIN,
)

const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates)

if (changes.length > 0) {
ctx.log.debug({ memberId, changes }, 'Stint changes identified.')
await applyStintChanges(qx, changes)
}
}

// Remove only the fields we actually read
await redis
.multi()
.hDel(datesKey, orgIds)
.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
.exec()

processed++
} catch (err) {
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
}
}

ctx.log.info({ processed }, 'Batch complete.')
},
}

/**
* Parses the Redis hash into a clean, typed list of activity dates.
*/
function parseMemberActivityHash(hash: Record<string, string>) {
const orgIds = Object.keys(hash)
const activityDates = orgIds.flatMap((organizationId) => {
try {
const dates = JSON.parse(hash[organizationId])
return Array.isArray(dates)
? dates
.filter((d): d is string => typeof d === 'string')
.map((date) => ({ organizationId, date }))
: []
} catch {
return []
}
})
return { activityDates, orgIds }
}

/**
* Applies the stint changes to the database.
*/
async function applyStintChanges(qx: QueryExecutor, changes: MemberOrgStintChange[]) {
for (const change of changes) {
if (change.type === 'insert') {
const memberOrganizationId = await createMemberOrganization(qx, change.memberId, {
organizationId: change.organizationId,
dateStart: change.dateStart,
dateEnd: change.dateEnd,
source: OrganizationSource.EMAIL_DOMAIN,
})

const isAffiliationBlocked = await checkOrganizationAffiliationPolicy(
qx,
change.organizationId,
)

if (memberOrganizationId && isAffiliationBlocked) {
await changeMemberOrganizationAffiliationOverrides(qx, [
{
memberId: change.memberId,
memberOrganizationId,
allowAffiliation: false,
},
])
}
} else {
await updateMemberOrganization(qx, change.memberId, change.id, {
dateStart: change.dateStart,
dateEnd: change.dateEnd,
})
}
}
}

export default job
4 changes: 1 addition & 3 deletions services/apps/data_sink_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/data-sink-worker",
"private": true,
"scripts": {
"start": "SERVICE=data-sink-worker tsx src/main.ts",
"start": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"start:debug": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand All @@ -17,8 +17,6 @@
"script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts",
"script:process-results": "SERVICE=script tsx src/bin/process-results.ts",
"script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts",
"script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts",
"script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts",
"script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts",
"script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts",
"script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts",
Expand Down
Loading
Loading