Skip to content

Commit db796e4

Browse files
authored
feat: add recalculate affiliations script (CM-1118) (#4023)
Signed-off-by: Umberto Sgueglia <usgueglia@contractor.linuxfoundation.org>
1 parent f40f137 commit db796e4

3 files changed

Lines changed: 322 additions & 0 deletions

File tree

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/apps/script_executor_worker/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
99
"cleanup-fork-activities": "npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts",
1010
"cleanup-member-aggregates": "npx tsx src/bin/cleanup-member-aggregates.ts",
11+
"recalculate-enrichment-affiliations": "npx tsx src/bin/recalculate-enrichment-affiliations.ts",
1112
"add-lf-projects-to-collection": "npx tsx src/bin/add-lf-projects-to-collection.ts",
1213
"lint": "npx eslint --ext .ts src --max-warnings=0",
1314
"format": "npx prettier --write \"src/**/*.ts\"",
@@ -24,6 +25,7 @@
2425
"@crowd/redis": "workspace:*",
2526
"@crowd/snowflake": "workspace:*",
2627
"@crowd/types": "workspace:*",
28+
"@crowd/temporal": "workspace:*",
2729
"@temporalio/client": "~1.11.8",
2830
"@temporalio/workflow": "~1.11.8",
2931
"axios": "^1.6.8",
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
/**
2+
* Recalculate Enrichment Affiliations Script
3+
*
4+
* PROBLEM:
5+
* Before fix CM-1118, the members_enrichment_worker modified work experiences
6+
* (memberOrganizations) without triggering affiliation recalculation. As a result,
7+
* activityRelations.organizationId may be stale for members whose work experiences
8+
* were created, updated, or deleted by enrichment.
9+
*
10+
* SOLUTION:
11+
* This script finds all members with enrichment-sourced work experiences and triggers
12+
* a memberUpdate Temporal workflow for each, which recalculates affiliations and syncs
13+
* to OpenSearch.
14+
*
15+
* Usage:
16+
* # Via package.json script (recommended):
17+
* pnpm run recalculate-enrichment-affiliations -- [options]
18+
*
19+
* # Or directly with tsx:
20+
* npx tsx src/bin/recalculate-enrichment-affiliations.ts [options]
21+
*
22+
* Options:
23+
* --page-size <n> Number of members to fetch per DB page (default: 1000)
24+
* --concurrency <n> Max concurrent Temporal workflow starts per page (default: 20)
25+
* --page-delay <ms> Milliseconds to wait between pages (default: 5000)
26+
* --start-after <id> Resume from a specific memberId (exclusive, for restarts)
27+
* --dry-run Log what would be processed without starting workflows
28+
* --limit <n> Stop after processing at most N members total (for testing)
29+
* --workflow-delay <ms> Milliseconds to wait after each workflow start, to avoid overwhelming Temporal (default: 0)
30+
*
31+
* Environment Variables Required:
32+
* CROWD_DB_WRITE_HOST - Postgres write host
33+
* CROWD_DB_PORT - Postgres port
34+
* CROWD_DB_USERNAME - Postgres username
35+
* CROWD_DB_PASSWORD - Postgres password
36+
* CROWD_DB_DATABASE - Postgres database name
37+
* CROWD_TEMPORAL_SERVER_URL - Temporal server URL
38+
* CROWD_TEMPORAL_NAMESPACE - Temporal namespace
39+
* SERVICE - Service identifier (used by Temporal client)
40+
*/
41+
import { WorkflowIdReusePolicy } from '@temporalio/client'
42+
43+
import { DEFAULT_TENANT_ID } from '@crowd/common'
44+
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
45+
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
46+
import { getServiceChildLogger } from '@crowd/logging'
47+
import { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal'
48+
49+
const ENRICHMENT_SOURCES = ['enrichment-progai', 'enrichment-clearbit', 'enrichment-crustdata']
50+
51+
const log = getServiceChildLogger('recalculate-enrichment-affiliations')
52+
53+
interface MemberWithOrgs {
54+
memberId: string
55+
activeOrgIds: string[]
56+
deletedOrgCount: number
57+
activeOrgCount: number
58+
}
59+
60+
interface ScriptOptions {
61+
pageSize: number
62+
concurrency: number
63+
pageDelayMs: number
64+
workflowDelayMs: number
65+
startAfter: string | null
66+
dryRun: boolean
67+
limit: number | null
68+
}
69+
70+
function parseArgs(): ScriptOptions {
71+
const args = process.argv.slice(2)
72+
73+
const getArg = (flag: string): string | undefined => {
74+
const idx = args.indexOf(flag)
75+
if (idx !== -1 && idx + 1 < args.length) return args[idx + 1]
76+
return undefined
77+
}
78+
79+
const pageSize = parseInt(getArg('--page-size') ?? '1000', 10)
80+
const concurrency = parseInt(getArg('--concurrency') ?? '20', 10)
81+
const pageDelayMs = parseInt(getArg('--page-delay') ?? '5000', 10)
82+
const workflowDelayMs = parseInt(getArg('--workflow-delay') ?? '0', 10)
83+
const startAfter = getArg('--start-after') ?? null
84+
const dryRun = args.includes('--dry-run')
85+
const limitRaw = getArg('--limit')
86+
const limit = limitRaw !== undefined ? parseInt(limitRaw, 10) : null
87+
88+
if (isNaN(pageSize) || pageSize <= 0) {
89+
log.error('--page-size must be a positive integer')
90+
process.exit(1)
91+
}
92+
if (isNaN(concurrency) || concurrency <= 0) {
93+
log.error('--concurrency must be a positive integer')
94+
process.exit(1)
95+
}
96+
if (isNaN(pageDelayMs) || pageDelayMs < 0) {
97+
log.error('--page-delay must be a non-negative integer')
98+
process.exit(1)
99+
}
100+
if (isNaN(workflowDelayMs) || workflowDelayMs < 0) {
101+
log.error('--workflow-delay must be a non-negative integer')
102+
process.exit(1)
103+
}
104+
if (limit !== null && (isNaN(limit) || limit <= 0)) {
105+
log.error('--limit must be a positive integer')
106+
process.exit(1)
107+
}
108+
109+
return { pageSize, concurrency, pageDelayMs, workflowDelayMs, startAfter, dryRun, limit }
110+
}
111+
112+
async function fetchPage(
113+
qx: ReturnType<typeof pgpQx>,
114+
afterMemberId: string | null,
115+
pageSize: number,
116+
): Promise<MemberWithOrgs[]> {
117+
const cursorClause = afterMemberId ? `AND "memberId" > $(afterMemberId)` : ''
118+
119+
// Selects only members that have at least one work experience created by enrichment
120+
// (source IN enrichment-progai, enrichment-clearbit, enrichment-crustdata).
121+
// activeOrgIds contains only the active enrichment-sourced org IDs for that member —
122+
// used by the memberUpdate workflow to determine which orgs to sync to OpenSearch.
123+
// deletedOrgCount and activeOrgCount are used for impact logging only.
124+
const rows = await qx.select(
125+
`
126+
SELECT
127+
"memberId",
128+
array_agg(DISTINCT "organizationId") FILTER (WHERE "deletedAt" IS NULL) AS "activeOrgIds",
129+
COUNT(*) FILTER (WHERE "deletedAt" IS NOT NULL) AS "deletedOrgCount",
130+
COUNT(*) FILTER (WHERE "deletedAt" IS NULL) AS "activeOrgCount"
131+
FROM "memberOrganizations"
132+
WHERE source = ANY($(sources))
133+
${cursorClause}
134+
GROUP BY "memberId"
135+
ORDER BY "memberId"
136+
LIMIT $(pageSize)
137+
`,
138+
{ sources: ENRICHMENT_SOURCES, afterMemberId, pageSize },
139+
)
140+
141+
return rows.map((r: Record<string, unknown>) => ({
142+
memberId: r.memberId,
143+
activeOrgIds: (r.activeOrgIds as string[] | null) ?? [],
144+
deletedOrgCount: Number(r.deletedOrgCount),
145+
activeOrgCount: Number(r.activeOrgCount),
146+
})) as MemberWithOrgs[]
147+
}
148+
149+
async function runWithConcurrency<T>(
150+
items: T[],
151+
concurrency: number,
152+
fn: (item: T) => Promise<void>,
153+
): Promise<{ succeeded: number; failed: number }> {
154+
let succeeded = 0
155+
let failed = 0
156+
let index = 0
157+
158+
async function worker() {
159+
while (index < items.length) {
160+
const item = items[index++]
161+
try {
162+
await fn(item)
163+
succeeded++
164+
} catch (err) {
165+
failed++
166+
log.error({ err }, 'Failed to process member')
167+
}
168+
}
169+
}
170+
171+
const workers = Array.from({ length: Math.min(concurrency, items.length) }, worker)
172+
await Promise.all(workers)
173+
174+
return { succeeded, failed }
175+
}
176+
177+
async function main() {
178+
const opts = parseArgs()
179+
180+
log.info('='.repeat(80))
181+
log.info('Recalculate Enrichment Affiliations Script')
182+
log.info('='.repeat(80))
183+
log.info(`Page size: ${opts.pageSize}`)
184+
log.info(`Concurrency: ${opts.concurrency}`)
185+
log.info(`Page delay: ${opts.pageDelayMs}ms`)
186+
log.info(`Workflow delay: ${opts.workflowDelayMs}ms`)
187+
log.info(`Start after: ${opts.startAfter ?? '(beginning)'}`)
188+
log.info(`Mode: ${opts.dryRun ? 'DRY RUN' : 'LIVE'}`)
189+
log.info(`Limit: ${opts.limit ?? '(none)'}`)
190+
log.info('='.repeat(80))
191+
192+
// Init DB
193+
const dbConnection = await getDbConnection(WRITE_DB_CONFIG())
194+
const qx = pgpQx(dbConnection)
195+
196+
// Init Temporal
197+
const temporal = await getTemporalClient(TEMPORAL_CONFIG())
198+
199+
let cursor: string | null = opts.startAfter
200+
let pageNum = 0
201+
let totalSucceeded = 0
202+
let totalFailed = 0
203+
let totalProcessed = 0
204+
let totalActiveOrgs = 0
205+
let totalDeletedOrgs = 0
206+
let totalMembersWithDeletedOrgs = 0
207+
208+
let hasMore = true
209+
while (hasMore) {
210+
pageNum++
211+
212+
const remaining = opts.limit !== null ? opts.limit - totalProcessed : opts.pageSize
213+
if (remaining <= 0) {
214+
log.info(`Limit of ${opts.limit} members reached.`)
215+
hasMore = false
216+
continue
217+
}
218+
219+
const fetchSize = Math.min(opts.pageSize, remaining)
220+
const membersPage = await fetchPage(qx, cursor, fetchSize)
221+
222+
if (membersPage.length === 0) {
223+
log.info('No more members to process.')
224+
hasMore = false
225+
continue
226+
}
227+
228+
const lastMemberId = membersPage[membersPage.length - 1].memberId
229+
const pageActiveOrgs = membersPage.reduce((sum, m) => sum + m.activeOrgCount, 0)
230+
const pageDeletedOrgs = membersPage.reduce((sum, m) => sum + m.deletedOrgCount, 0)
231+
const membersWithDeletedOrgs = membersPage.filter((m) => m.deletedOrgCount > 0).length
232+
totalActiveOrgs += pageActiveOrgs
233+
totalDeletedOrgs += pageDeletedOrgs
234+
totalMembersWithDeletedOrgs += membersWithDeletedOrgs
235+
log.info(
236+
`Page ${pageNum}: ${membersPage.length} members | active orgs: ${pageActiveOrgs} | deleted orgs: ${pageDeletedOrgs} (${membersWithDeletedOrgs} members affected) | cursor: ${lastMemberId}`,
237+
)
238+
239+
if (opts.dryRun) {
240+
log.info(`[DRY RUN] Would trigger ${membersPage.length} workflows`)
241+
for (const { memberId, activeOrgIds, deletedOrgCount } of membersPage) {
242+
log.info(
243+
`[DRY RUN] memberUpdate | memberId: ${memberId} | activeOrgs: ${activeOrgIds.length} | deletedOrgs: ${deletedOrgCount}`,
244+
)
245+
}
246+
totalProcessed += membersPage.length
247+
} else {
248+
const { succeeded, failed } = await runWithConcurrency(
249+
membersPage,
250+
opts.concurrency,
251+
async ({ memberId, activeOrgIds, deletedOrgCount }) => {
252+
log.debug(
253+
{ memberId, activeOrgs: activeOrgIds.length, deletedOrgs: deletedOrgCount },
254+
'Triggering memberUpdate workflow',
255+
)
256+
await temporal.workflow.start('memberUpdate', {
257+
taskQueue: 'profiles',
258+
workflowId: `member-update/${DEFAULT_TENANT_ID}/${memberId}`,
259+
workflowIdReusePolicy:
260+
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
261+
retry: { maximumAttempts: 10 },
262+
args: [
263+
{
264+
member: { id: memberId },
265+
memberOrganizationIds: activeOrgIds,
266+
syncToOpensearch: true,
267+
},
268+
],
269+
searchAttributes: { TenantId: [DEFAULT_TENANT_ID] },
270+
})
271+
if (opts.workflowDelayMs > 0) {
272+
await new Promise((resolve) => setTimeout(resolve, opts.workflowDelayMs))
273+
}
274+
},
275+
)
276+
277+
totalSucceeded += succeeded
278+
totalFailed += failed
279+
totalProcessed += succeeded + failed
280+
log.info(`Page ${pageNum} done: ${succeeded} ok, ${failed} failed`)
281+
}
282+
283+
log.info(`Resume with: --start-after ${lastMemberId}`)
284+
cursor = lastMemberId
285+
286+
if (membersPage.length < fetchSize) {
287+
// Last page (fewer results than requested means no more data)
288+
hasMore = false
289+
}
290+
291+
if (opts.pageDelayMs > 0) {
292+
await new Promise((resolve) => setTimeout(resolve, opts.pageDelayMs))
293+
}
294+
}
295+
296+
log.info('='.repeat(80))
297+
log.info('Summary')
298+
log.info('='.repeat(80))
299+
log.info(`Pages processed: ${pageNum}`)
300+
log.info(
301+
`Members with active orgs: ${totalProcessed} (active enrichment orgs: ${totalActiveOrgs})`,
302+
)
303+
log.info(
304+
`Members with deleted orgs: ${totalMembersWithDeletedOrgs} (deleted enrichment orgs: ${totalDeletedOrgs})`,
305+
)
306+
if (!opts.dryRun) {
307+
log.info(`Workflows succeeded: ${totalSucceeded}`)
308+
log.info(`Workflows failed: ${totalFailed}`)
309+
}
310+
311+
process.exit(totalFailed > 0 ? 1 : 0)
312+
}
313+
314+
main().catch((err) => {
315+
log.error({ err }, 'Unexpected error')
316+
process.exit(1)
317+
})

0 commit comments

Comments
 (0)