Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d997511
feat: infer memberOrganization stint dates from work-email activities
skwowet Apr 24, 2026
057464d
fix: resolve pr comments from ai bots
skwowet Apr 24, 2026
0af601c
fix: update redis member ID retrieval method and cleanup logic
skwowet Apr 24, 2026
34354a5
Update services/apps/data_sink_worker/src/service/member.service.ts
skwowet Apr 24, 2026
f74aef1
fix: resolve pr comments from ai bots
skwowet Apr 24, 2026
68a79fc
refactor: extract source rank logic into a separate func for clarity …
skwowet Apr 26, 2026
9553e2c
refactor: streamline email domain extraction logic in ActivityService
skwowet Apr 28, 2026
e0a9860
chore: backfill email-domain member organization dates
skwowet Apr 28, 2026
8a9f500
chore: enable debugger logs in prod
skwowet Apr 29, 2026
d7a18df
chore: add logging for member organization stint inference job start
skwowet Apr 29, 2026
15bd7ec
fix: debugger and info logs
skwowet Apr 29, 2026
8914f41
chore: remove unused constant and add TODO for applying stint changes…
skwowet Apr 29, 2026
498f0c1
feat: apply overrides in member organization stint changes job
skwowet Apr 29, 2026
f834a6b
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet Apr 29, 2026
4fead74
fix: resolve pr review comments
skwowet Apr 30, 2026
0f3d3a2
fix: redis key pruning script
skwowet Apr 30, 2026
21e42bf
chore: rm redis key clean up script
skwowet Apr 30, 2026
0adfab9
fix: lua script edge case
skwowet Apr 30, 2026
0c51d12
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet Apr 30, 2026
086253c
fix: resolve pr review comments
skwowet Apr 30, 2026
bdce7d1
fix: set organization source to UI for manual edits in member organiz…
skwowet Apr 30, 2026
3de432b
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet Apr 30, 2026
c2f76de
Merge branch 'main' into improve/CM-1105
skwowet Apr 30, 2026
4a4434a
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet Apr 30, 2026
afc5734
chore: rm debugger env
skwowet May 3, 2026
96e5871
Merge branch 'improve/CM-1105' into script/CM-1107
skwowet May 3, 2026
c9fac03
Merge branch 'main' into script/CM-1107
skwowet May 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts",
"script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts",
"script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts",
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts"
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts",
"script:backfill-email-domain-member-organization-dates": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/backfill-email-domain-member-organization-dates.ts"
},
"lint-staged": {
"**/*.ts": [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import commandLineArgs from 'command-line-args'

import { inferMemberOrganizationStintChanges } from '@crowd/common_services'
import {
changeMemberOrganizationAffiliationOverrides,
checkOrganizationAffiliationPolicy,
createMemberOrganization,
fetchEmailDomainMemberOrganizationActivityDates,
fetchEmailDomainMemberOrganizationsWithoutDates,
fetchMemberOrganizationsBySource,
pgpQx,
updateMemberOrganization,
} from '@crowd/data-access-layer'
import { getDbConnection } from '@crowd/data-access-layer/src/database'
import { chunkArray } from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/utils'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import { OrganizationSource } from '@crowd/types'

import { DB_CONFIG, REDIS_CONFIG } from '@/conf'

const log = getServiceLogger()

const options = [
{
name: 'testRun',
alias: 't',
type: Boolean,
description: 'Run in test mode (limit to 1 batch and 10 members).',
},
{
name: 'afterMemberId',
alias: 'a',
type: String,
description: 'The member ID to start processing after.',
},
{
name: 'batchSize',
alias: 'b',
type: Number,
description: 'The number of members to fetch in each batch.',
},
{
name: 'help',
alias: 'h',
type: Boolean,
description: 'Print this usage guide.',
},
]

const parameters = commandLineArgs(options)

setImmediate(async () => {
const testRun = parameters.testRun ?? false
const BATCH_SIZE = parameters.batchSize ?? (testRun ? 10 : 500)
let afterMemberId = parameters.afterMemberId ?? undefined

const db = await getDbConnection({
host: DB_CONFIG.writeHost,
port: DB_CONFIG.port,
database: DB_CONFIG.database,
user: DB_CONFIG.username,
password: DB_CONFIG.password,
})

const qx = pgpQx(db)
const redis = await getRedisClient(REDIS_CONFIG, true)

log.info({ testRun, BATCH_SIZE, afterMemberId }, 'Running script with the following parameters!')

let hasMore = true

while (hasMore) {
const memberIds = await fetchEmailDomainMemberOrganizationsWithoutDates(
qx,
BATCH_SIZE,
afterMemberId,
)

if (memberIds.length > 0) {
for (const chunk of chunkArray(memberIds, 50)) {
await Promise.all(
chunk.map(async (memberId) => {
if (testRun) {
log.info({ memberId }, 'Processing member!')
}

try {
const [existingMemberOrganizations, activityDates] = await Promise.all([
fetchMemberOrganizationsBySource(qx, memberId, OrganizationSource.EMAIL_DOMAIN),
fetchEmailDomainMemberOrganizationActivityDates(qx, memberId),
])

const changes = inferMemberOrganizationStintChanges(
memberId,
existingMemberOrganizations,
activityDates,
)

if (testRun) {
log.info(
{ existingMemberOrganizations, activityDates, changes },
'Previewing changes for member.',
)
}

if (changes.length > 0) {
await qx.tx(async (tx) => {
for (const change of changes) {
if (change.type === 'insert') {
const memberOrganizationId = await createMemberOrganization(tx, memberId, {
organizationId: change.organizationId,
dateStart: change.dateStart,
dateEnd: change.dateEnd,
source: OrganizationSource.EMAIL_DOMAIN,
})
Comment thread
skwowet marked this conversation as resolved.

const isAffiliationBlocked = await checkOrganizationAffiliationPolicy(
tx,
change.organizationId,
)

if (memberOrganizationId && isAffiliationBlocked) {
await changeMemberOrganizationAffiliationOverrides(tx, [
{
memberId,
memberOrganizationId,
allowAffiliation: false,
},
])
}
} else if (change.type === 'update') {
await updateMemberOrganization(tx, memberId, change.id, {
dateStart: change.dateStart,
dateEnd: change.dateEnd,
})
}

if (testRun) {
log.info(
{ memberId, orgId: change.organizationId, type: change.type },
'Member organization updated.',
)
}
}
})
await redis.sAdd('recalculate-member-affiliations', [memberId])
} else if (testRun) {
log.info({ memberId }, 'No changes found for member!')
}
} catch (err) {
log.error({ memberId, err }, 'Failed to process for member!')
throw err
}
}),
)
}

const lastMemberId = memberIds[memberIds.length - 1]
afterMemberId = lastMemberId

log.info({ lastMemberId, count: memberIds.length }, 'Batch processed!')

if (testRun || memberIds.length < BATCH_SIZE) {
hasMore = false
}
} else {
hasMore = false
}
}

process.exit(0)
})
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export async function blockMemberOrganizationAffiliation(

export async function markMemberForAffiliationRecalc(memberIds: string[]): Promise<void> {
try {
await svc.redis.sAdd('queue:recalculate:members:affiliation', memberIds)
await svc.redis.sAdd('recalculate-member-affiliations', memberIds)
Comment thread
skwowet marked this conversation as resolved.
Comment thread
skwowet marked this conversation as resolved.
} catch (error) {
Comment thread
skwowet marked this conversation as resolved.
svc.log.error(error, 'Error marking member for affiliation recalc!')
throw error
Expand All @@ -67,7 +67,7 @@ export async function markMemberForAffiliationRecalc(memberIds: string[]): Promi

export async function getMembersForAffiliationRecalc(batchSize: number): Promise<string[]> {
try {
return svc.redis.sPop('queue:recalculate:members:affiliation', batchSize)
return svc.redis.sPop('recalculate-member-affiliations', batchSize)
Comment thread
skwowet marked this conversation as resolved.
} catch (error) {
svc.log.error(error, 'Error getting members for affiliation recalc!')
throw error
Expand Down
67 changes: 67 additions & 0 deletions services/libs/data-access-layer/src/members/organizations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
import { EntityType } from '../old/apps/script_executor_worker/types'
import { QueryExecutor } from '../queryExecutor'

import { EmailDomainMemberOrganizationActivityDate } from './types'

/* eslint-disable @typescript-eslint/no-explicit-any */

export async function fetchMemberOrganizations(
Expand Down Expand Up @@ -59,6 +61,71 @@ export async function fetchMemberOrganizationsBySource(
)
}

export async function fetchEmailDomainMemberOrganizationsWithoutDates(
qx: QueryExecutor,
limit: number,
afterMemberId?: string,
): Promise<string[]> {
const rows = await qx.select(
`
SELECT DISTINCT "memberId"
FROM "memberOrganizations"
WHERE "source" = 'email-domain'
AND "dateStart" IS NULL
AND "dateEnd" IS NULL
AND "deletedAt" IS NULL
${afterMemberId ? `AND "memberId" > $(afterMemberId)` : ''}
ORDER BY "memberId"
LIMIT $(limit)
`,
{ limit, afterMemberId },
)

return rows.map((r) => r.memberId)
}

export async function fetchEmailDomainMemberOrganizationActivityDates(
qx: QueryExecutor,
memberId: string,
): Promise<EmailDomainMemberOrganizationActivityDate[]> {
return qx.select(
`
WITH email_domain_member_orgs AS (
SELECT DISTINCT
mo."memberId",
mo."organizationId",
lower(oi.value) AS domain
FROM "memberOrganizations" mo
INNER JOIN "organizationIdentities" oi
ON oi."organizationId" = mo."organizationId"
AND oi.type = 'primary-domain'
AND oi.verified = true
WHERE mo."memberId" = $(memberId)
AND mo."source" = 'email-domain'
AND mo."deletedAt" IS NULL
)
SELECT DISTINCT
edmo."memberId",
edmo."organizationId",
ar."timestamp"::date::text AS date
FROM email_domain_member_orgs edmo
INNER JOIN "memberIdentities" mi
ON mi."memberId" = edmo."memberId"
AND mi.verified = true
AND mi.type = 'email'
AND mi."deletedAt" IS NULL
AND lower(split_part(mi.value, '@', 2)) = edmo.domain
INNER JOIN "activityRelations" ar
ON ar."memberId" = mi."memberId"
AND ar.platform = mi.platform
AND lower(ar.username) = lower(mi.value)
Comment thread
skwowet marked this conversation as resolved.
AND ar."timestamp" IS NOT NULL
ORDER BY edmo."memberId", edmo."organizationId", date
`,
{ memberId },
)
}

export async function fetchOrganizationMemberIds(
qx: QueryExecutor,
organizationId: string,
Expand Down
6 changes: 5 additions & 1 deletion services/libs/data-access-layer/src/members/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IAttributes, IMemberAttribute, MemberAttributeType } from '@crowd/types'
import { IAttributes, IMemberAttribute, MemberAttributeType, MemberOrgDate } from '@crowd/types'

export interface IQueryNumberOfNewMembers {
segmentIds?: string[]
Expand Down Expand Up @@ -93,3 +93,7 @@ export interface IDbMemberBotSuggestionBySegment {
avatarUrl: string
attributes: IAttributes
}

export interface EmailDomainMemberOrganizationActivityDate extends MemberOrgDate {
memberId: string
}
Loading