Skip to content

Commit e0a9860

Browse files
committed
chore: backfill email-domain member organization dates
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
1 parent 9553e2c commit e0a9860

5 files changed

Lines changed: 232 additions & 4 deletions

File tree

backend/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
"script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts",
3333
"script:fix-duplicate-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-duplicate-members.ts",
3434
"script:fix-members-activities-after-unaffilation": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/fix-members-activities-after-unaffilation.ts",
35-
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts"
35+
"script:process-bot-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/process-bot-members.ts",
36+
"script:backfill-email-domain-member-organization-dates": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/backfill-email-domain-member-organization-dates.ts"
3637
},
3738
"lint-staged": {
3839
"**/*.ts": [
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import commandLineArgs from 'command-line-args'
2+
3+
import { inferMemberOrganizationStintChanges } from '@crowd/common_services'
4+
import {
5+
createMemberOrganization,
6+
fetchEmailDomainMemberOrganizationActivityDates,
7+
fetchEmailDomainMemberOrganizationsWithoutDates,
8+
fetchMemberOrganizationsBySource,
9+
pgpQx,
10+
updateMemberOrganization,
11+
} from '@crowd/data-access-layer'
12+
import { getDbConnection } from '@crowd/data-access-layer/src/database'
13+
import { chunkArray } from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/utils'
14+
import { getServiceLogger } from '@crowd/logging'
15+
import { getRedisClient } from '@crowd/redis'
16+
import { OrganizationSource } from '@crowd/types'
17+
18+
import { DB_CONFIG, REDIS_CONFIG } from '@/conf'
19+
20+
const log = getServiceLogger()
21+
22+
const options = [
23+
{
24+
name: 'testRun',
25+
alias: 't',
26+
type: Boolean,
27+
description: 'Run in test mode (limit to 1 batch and 10 members).',
28+
},
29+
{
30+
name: 'afterMemberId',
31+
alias: 'a',
32+
type: String,
33+
description: 'The member ID to start processing after.',
34+
},
35+
{
36+
name: 'batchSize',
37+
alias: 'b',
38+
type: Number,
39+
description: 'The number of members to fetch in each batch.',
40+
},
41+
{
42+
name: 'help',
43+
alias: 'h',
44+
type: Boolean,
45+
description: 'Print this usage guide.',
46+
},
47+
]
48+
49+
const parameters = commandLineArgs(options)
50+
51+
setImmediate(async () => {
52+
const testRun = parameters.testRun ?? false
53+
const BATCH_SIZE = parameters.batchSize ?? (testRun ? 10 : 500)
54+
let afterMemberId = parameters.afterMemberId ?? undefined
55+
56+
const db = await getDbConnection({
57+
host: DB_CONFIG.writeHost,
58+
port: DB_CONFIG.port,
59+
database: DB_CONFIG.database,
60+
user: DB_CONFIG.username,
61+
password: DB_CONFIG.password,
62+
})
63+
64+
const qx = pgpQx(db)
65+
const redis = await getRedisClient(REDIS_CONFIG, true)
66+
67+
log.info({ testRun, BATCH_SIZE, afterMemberId }, 'Running script with the following parameters!')
68+
69+
let hasMore = true
70+
71+
while (hasMore) {
72+
const memberIds = await fetchEmailDomainMemberOrganizationsWithoutDates(
73+
qx,
74+
BATCH_SIZE,
75+
afterMemberId,
76+
)
77+
78+
if (memberIds.length > 0) {
79+
for (const chunk of chunkArray(memberIds, 50)) {
80+
await Promise.all(
81+
chunk.map(async (memberId) => {
82+
if (testRun) {
83+
log.info({ memberId }, 'Processing member!')
84+
}
85+
86+
try {
87+
const [existingMemberOrganizations, activityDates] = await Promise.all([
88+
fetchMemberOrganizationsBySource(qx, memberId, OrganizationSource.EMAIL_DOMAIN),
89+
fetchEmailDomainMemberOrganizationActivityDates(qx, memberId),
90+
])
91+
92+
const changes = inferMemberOrganizationStintChanges(
93+
memberId,
94+
existingMemberOrganizations,
95+
activityDates,
96+
)
97+
98+
if (testRun) {
99+
log.info(
100+
{ existingMemberOrganizations, activityDates, changes },
101+
'Previewing changes for member.',
102+
)
103+
}
104+
105+
if (changes.length > 0) {
106+
await qx.tx(async (tx) => {
107+
for (const change of changes) {
108+
if (change.type === 'insert') {
109+
await createMemberOrganization(tx, memberId, {
110+
organizationId: change.organizationId,
111+
dateStart: change.dateStart,
112+
dateEnd: change.dateEnd,
113+
source: OrganizationSource.EMAIL_DOMAIN,
114+
})
115+
} else if (change.type === 'update') {
116+
await updateMemberOrganization(tx, memberId, change.id, {
117+
dateStart: change.dateStart,
118+
dateEnd: change.dateEnd,
119+
})
120+
}
121+
122+
if (testRun) {
123+
log.info(
124+
{ memberId, orgId: change.organizationId, type: change.type },
125+
'Member organization updated.',
126+
)
127+
}
128+
}
129+
})
130+
await redis.sAdd('recalculate-member-affiliations', [memberId])
131+
} else if (testRun) {
132+
log.info({ memberId }, 'No changes found for member!')
133+
}
134+
} catch (err) {
135+
log.error({ memberId, err }, 'Failed to process for member!')
136+
throw err
137+
}
138+
}),
139+
)
140+
}
141+
142+
const lastMemberId = memberIds[memberIds.length - 1]
143+
afterMemberId = lastMemberId
144+
145+
log.info({ lastMemberId, count: memberIds.length }, 'Batch processed!')
146+
147+
if (testRun || memberIds.length < BATCH_SIZE) {
148+
hasMore = false
149+
}
150+
} else {
151+
hasMore = false
152+
}
153+
}
154+
155+
process.exit(0)
156+
})

services/apps/script_executor_worker/src/activities/block-project-organization-affiliations.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export async function blockMemberOrganizationAffiliation(
5858

5959
export async function markMemberForAffiliationRecalc(memberIds: string[]): Promise<void> {
6060
try {
61-
await svc.redis.sAdd('queue:recalculate:members:affiliation', memberIds)
61+
await svc.redis.sAdd('recalculate-member-affiliations', memberIds)
6262
} catch (error) {
6363
svc.log.error(error, 'Error marking member for affiliation recalc!')
6464
throw error
@@ -67,7 +67,7 @@ export async function markMemberForAffiliationRecalc(memberIds: string[]): Promi
6767

6868
export async function getMembersForAffiliationRecalc(batchSize: number): Promise<string[]> {
6969
try {
70-
return svc.redis.sPop('queue:recalculate:members:affiliation', batchSize)
70+
return svc.redis.sPop('recalculate-member-affiliations', batchSize)
7171
} catch (error) {
7272
svc.log.error(error, 'Error getting members for affiliation recalc!')
7373
throw error

services/libs/data-access-layer/src/members/organizations.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import {
1414
import { EntityType } from '../old/apps/script_executor_worker/types'
1515
import { QueryExecutor } from '../queryExecutor'
1616

17+
import { EmailDomainMemberOrganizationActivityDate } from './types'
18+
1719
/* eslint-disable @typescript-eslint/no-explicit-any */
1820

1921
export async function fetchMemberOrganizations(
@@ -59,6 +61,71 @@ export async function fetchMemberOrganizationsBySource(
5961
)
6062
}
6163

64+
export async function fetchEmailDomainMemberOrganizationsWithoutDates(
65+
qx: QueryExecutor,
66+
limit: number,
67+
afterMemberId?: string,
68+
): Promise<string[]> {
69+
const rows = await qx.select(
70+
`
71+
SELECT DISTINCT "memberId"
72+
FROM "memberOrganizations"
73+
WHERE "source" = 'email-domain'
74+
AND "dateStart" IS NULL
75+
AND "dateEnd" IS NULL
76+
AND "deletedAt" IS NULL
77+
${afterMemberId ? `AND "memberId" > $(afterMemberId)` : ''}
78+
ORDER BY "memberId"
79+
LIMIT $(limit)
80+
`,
81+
{ limit, afterMemberId },
82+
)
83+
84+
return rows.map((r) => r.memberId)
85+
}
86+
87+
export async function fetchEmailDomainMemberOrganizationActivityDates(
88+
qx: QueryExecutor,
89+
memberId: string,
90+
): Promise<EmailDomainMemberOrganizationActivityDate[]> {
91+
return qx.select(
92+
`
93+
WITH email_domain_member_orgs AS (
94+
SELECT DISTINCT
95+
mo."memberId",
96+
mo."organizationId",
97+
lower(oi.value) AS domain
98+
FROM "memberOrganizations" mo
99+
INNER JOIN "organizationIdentities" oi
100+
ON oi."organizationId" = mo."organizationId"
101+
AND oi.type = 'primary-domain'
102+
AND oi.verified = true
103+
WHERE mo."memberId" = $(memberId)
104+
AND mo."source" = 'email-domain'
105+
AND mo."deletedAt" IS NULL
106+
)
107+
SELECT DISTINCT
108+
edmo."memberId",
109+
edmo."organizationId",
110+
ar."timestamp"::date::text AS date
111+
FROM email_domain_member_orgs edmo
112+
INNER JOIN "memberIdentities" mi
113+
ON mi."memberId" = edmo."memberId"
114+
AND mi.verified = true
115+
AND mi.type = 'email'
116+
AND mi."deletedAt" IS NULL
117+
AND lower(split_part(mi.value, '@', 2)) = edmo.domain
118+
INNER JOIN "activityRelations" ar
119+
ON ar."memberId" = mi."memberId"
120+
AND ar.platform = mi.platform
121+
AND lower(ar.username) = lower(mi.value)
122+
AND ar."timestamp" IS NOT NULL
123+
ORDER BY edmo."memberId", edmo."organizationId", date
124+
`,
125+
{ memberId },
126+
)
127+
}
128+
62129
export async function fetchOrganizationMemberIds(
63130
qx: QueryExecutor,
64131
organizationId: string,

services/libs/data-access-layer/src/members/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { IAttributes, IMemberAttribute, MemberAttributeType } from '@crowd/types'
1+
import { IAttributes, IMemberAttribute, MemberAttributeType, MemberOrgDate } from '@crowd/types'
22

33
export interface IQueryNumberOfNewMembers {
44
segmentIds?: string[]
@@ -93,3 +93,7 @@ export interface IDbMemberBotSuggestionBySegment {
9393
avatarUrl: string
9494
attributes: IAttributes
9595
}
96+
97+
export interface EmailDomainMemberOrganizationActivityDate extends MemberOrgDate {
98+
memberId: string
99+
}

0 commit comments

Comments
 (0)