Skip to content

Commit e12aad1

Browse files
committed
feat: add recalculate all affiliation
Signed-off-by: Umberto Sgueglia <usgueglia@contractor.linuxfoundation.org>
1 parent 58ee92a commit e12aad1

2 files changed

Lines changed: 354 additions & 0 deletions

File tree

services/apps/script_executor_worker/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"cleanup-fork-activities": "npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts",
1111
"cleanup-member-aggregates": "npx tsx src/bin/cleanup-member-aggregates.ts",
1212
"recalculate-enrichment-affiliations": "npx tsx src/bin/recalculate-enrichment-affiliations.ts",
13+
"recalculate-all-affiliations": "npx tsx src/bin/recalculate-all-affiliations.ts",
1314
"add-lf-projects-to-collection": "npx tsx src/bin/add-lf-projects-to-collection.ts",
1415
"lint": "npx eslint --ext .ts src --max-warnings=0",
1516
"format": "npx prettier --write \"src/**/*.ts\"",
Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
/**
2+
* Recalculate All Affiliations Script
3+
*
4+
* PROBLEM:
5+
* activityRelations.organizationId may be stale for members who have activities
6+
* attributed to an organization they no longer have an active work experience for
7+
* (no matching memberOrganizations row with deletedAt IS NULL). This can happen
8+
* regardless of the source of the work experience change (enrichment, manual, etc.).
9+
*
10+
* SOLUTION:
11+
* Paginates over distinct memberIds from memberOrganizations. For each batch,
12+
* detects members with stale org attributions by checking activityRelations via
13+
* ix_activityRelations_memberId (no full table scan). Triggers a memberUpdate
14+
* Temporal workflow only for affected members.
15+
*
16+
* Usage:
17+
* pnpm run recalculate-all-affiliations -- [options]
18+
* npx tsx src/bin/recalculate-all-affiliations.ts [options]
19+
*
20+
* Options:
21+
* --page-size <n> Number of memberIds per page (default: 100)
22+
* Kept small intentionally: each page triggers an
23+
* activityRelations lookup per member via memberId index.
24+
* --concurrency <n> Max concurrent Temporal workflow starts per page (default: 20)
25+
* --page-delay <ms> Milliseconds to wait between pages (default: 5000)
26+
* --start-after <id> Resume from a specific memberId (exclusive, for restarts)
27+
* --dry-run Log what would be processed without starting workflows
28+
* --limit <n> Stop after triggering at most N workflows (for testing)
29+
* --workflow-delay <ms> Milliseconds to wait after each workflow start (default: 0)
30+
*
31+
* Environment Variables Required:
32+
* CROWD_DB_WRITE_HOST - Postgres write host
33+
* CROWD_DB_PORT - Postgres port
34+
* CROWD_DB_USERNAME - Postgres username
35+
* CROWD_DB_PASSWORD - Postgres password
36+
* CROWD_DB_DATABASE - Postgres database name
37+
* CROWD_TEMPORAL_SERVER_URL - Temporal server URL
38+
* CROWD_TEMPORAL_NAMESPACE - Temporal namespace
39+
* SERVICE - Service identifier (used by Temporal client)
40+
*/
41+
import { WorkflowIdReusePolicy } from '@temporalio/client'
42+
43+
import { DEFAULT_TENANT_ID } from '@crowd/common'
44+
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
45+
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
46+
import { getServiceChildLogger } from '@crowd/logging'
47+
import { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal'
48+
49+
const log = getServiceChildLogger('recalculate-all-affiliations')
50+
51+
interface MemberWithActiveOrgs {
52+
memberId: string
53+
activeOrgIds: string[]
54+
}
55+
56+
interface ScriptOptions {
57+
pageSize: number
58+
concurrency: number
59+
pageDelayMs: number
60+
workflowDelayMs: number
61+
startAfter: string | null
62+
dryRun: boolean
63+
limit: number | null
64+
}
65+
66+
function parseArgs(): ScriptOptions {
67+
const args = process.argv.slice(2)
68+
69+
const getArg = (flag: string): string | undefined => {
70+
const idx = args.indexOf(flag)
71+
if (idx !== -1 && idx + 1 < args.length) return args[idx + 1]
72+
return undefined
73+
}
74+
75+
const pageSize = parseInt(getArg('--page-size') ?? '100', 10)
76+
const concurrency = parseInt(getArg('--concurrency') ?? '20', 10)
77+
const pageDelayMs = parseInt(getArg('--page-delay') ?? '5000', 10)
78+
const workflowDelayMs = parseInt(getArg('--workflow-delay') ?? '0', 10)
79+
const startAfter = getArg('--start-after') ?? null
80+
const dryRun = args.includes('--dry-run')
81+
const limitRaw = getArg('--limit')
82+
const limit = limitRaw !== undefined ? parseInt(limitRaw, 10) : null
83+
84+
if (isNaN(pageSize) || pageSize <= 0) {
85+
log.error('--page-size must be a positive integer')
86+
process.exit(1)
87+
}
88+
if (isNaN(concurrency) || concurrency <= 0) {
89+
log.error('--concurrency must be a positive integer')
90+
process.exit(1)
91+
}
92+
if (isNaN(pageDelayMs) || pageDelayMs < 0) {
93+
log.error('--page-delay must be a non-negative integer')
94+
process.exit(1)
95+
}
96+
if (isNaN(workflowDelayMs) || workflowDelayMs < 0) {
97+
log.error('--workflow-delay must be a non-negative integer')
98+
process.exit(1)
99+
}
100+
if (limit !== null && (isNaN(limit) || limit <= 0)) {
101+
log.error('--limit must be a positive integer')
102+
process.exit(1)
103+
}
104+
105+
return { pageSize, concurrency, pageDelayMs, workflowDelayMs, startAfter, dryRun, limit }
106+
}
107+
108+
// Returns a page of distinct memberIds from memberOrganizations, cursor-based.
109+
async function fetchMemberIdPage(
110+
qx: ReturnType<typeof pgpQx>,
111+
afterMemberId: string | null,
112+
pageSize: number,
113+
): Promise<string[]> {
114+
const cursorClause = afterMemberId ? `AND "memberId" > $(afterMemberId)` : ''
115+
116+
const rows = await qx.select(
117+
`
118+
SELECT "memberId"
119+
FROM "memberOrganizations"
120+
WHERE TRUE ${cursorClause}
121+
GROUP BY "memberId"
122+
ORDER BY "memberId"
123+
LIMIT $(pageSize)
124+
`,
125+
{ afterMemberId, pageSize },
126+
)
127+
128+
return rows.map((r: Record<string, unknown>) => r.memberId as string)
129+
}
130+
131+
// Finds members in the batch whose activityRelations.organizationId is stale —
132+
// i.e. attributed to an org they no longer have an active memberOrganization for.
133+
// Uses ix_activityRelations_memberId for the activityRelations lookup (no seq scan).
134+
async function findBrokenMembers(
135+
qx: ReturnType<typeof pgpQx>,
136+
memberIds: string[],
137+
): Promise<MemberWithActiveOrgs[]> {
138+
// First reduce to distinct (memberId, organizationId) pairs — a member may have
139+
// thousands of activities but only a handful of distinct org attributions.
140+
// The NOT EXISTS then runs on the small deduplicated set, not on every activity row.
141+
const brokenRows = await qx.select(
142+
`
143+
WITH pairs AS (
144+
SELECT DISTINCT "memberId", "organizationId"
145+
FROM "activityRelations"
146+
WHERE "memberId" = ANY($(memberIds))
147+
AND "organizationId" IS NOT NULL
148+
)
149+
SELECT DISTINCT p."memberId"
150+
FROM pairs p
151+
WHERE NOT EXISTS (
152+
SELECT 1 FROM "memberOrganizations" mo
153+
WHERE mo."memberId" = p."memberId"
154+
AND mo."organizationId" = p."organizationId"
155+
AND mo."deletedAt" IS NULL
156+
)
157+
`,
158+
{ memberIds },
159+
)
160+
161+
if (brokenRows.length === 0) {
162+
return []
163+
}
164+
165+
const brokenMemberIds = brokenRows.map((r: Record<string, unknown>) => r.memberId as string)
166+
167+
// Fetch currently active org IDs to pass to memberUpdate
168+
const orgRows = await qx.select(
169+
`
170+
SELECT "memberId", array_agg(DISTINCT "organizationId") AS "activeOrgIds"
171+
FROM "memberOrganizations"
172+
WHERE "memberId" = ANY($(brokenMemberIds))
173+
AND "deletedAt" IS NULL
174+
GROUP BY "memberId"
175+
`,
176+
{ brokenMemberIds },
177+
)
178+
179+
const orgMap = new Map<string, string[]>(
180+
orgRows.map((r: Record<string, unknown>) => [
181+
r.memberId as string,
182+
(r.activeOrgIds as string[] | null) ?? [],
183+
]),
184+
)
185+
186+
return brokenMemberIds.map((memberId: string) => ({
187+
memberId,
188+
activeOrgIds: orgMap.get(memberId) ?? [],
189+
}))
190+
}
191+
192+
async function runWithConcurrency<T>(
193+
items: T[],
194+
concurrency: number,
195+
fn: (item: T) => Promise<void>,
196+
): Promise<{ succeeded: number; failed: number }> {
197+
let succeeded = 0
198+
let failed = 0
199+
let index = 0
200+
201+
async function worker() {
202+
while (index < items.length) {
203+
const item = items[index++]
204+
try {
205+
await fn(item)
206+
succeeded++
207+
} catch (err) {
208+
failed++
209+
log.error({ err }, 'Failed to process member')
210+
}
211+
}
212+
}
213+
214+
const workers = Array.from({ length: Math.min(concurrency, items.length) }, worker)
215+
await Promise.all(workers)
216+
217+
return { succeeded, failed }
218+
}
219+
220+
async function main() {
221+
const opts = parseArgs()
222+
223+
log.info('='.repeat(80))
224+
log.info('Recalculate All Affiliations Script')
225+
log.info('='.repeat(80))
226+
log.info(`Page size: ${opts.pageSize}`)
227+
log.info(`Concurrency: ${opts.concurrency}`)
228+
log.info(`Page delay: ${opts.pageDelayMs}ms`)
229+
log.info(`Workflow delay: ${opts.workflowDelayMs}ms`)
230+
log.info(`Start after: ${opts.startAfter ?? '(beginning)'}`)
231+
log.info(`Mode: ${opts.dryRun ? 'DRY RUN' : 'LIVE'}`)
232+
log.info(`Limit: ${opts.limit ?? '(none)'}`)
233+
log.info('='.repeat(80))
234+
235+
const dbConnection = await getDbConnection(WRITE_DB_CONFIG())
236+
const qx = pgpQx(dbConnection)
237+
const temporal = await getTemporalClient(TEMPORAL_CONFIG())
238+
239+
let cursor: string | null = opts.startAfter
240+
let pageNum = 0
241+
let totalScanned = 0
242+
let totalBroken = 0
243+
let totalSucceeded = 0
244+
let totalFailed = 0
245+
246+
let hasMore = true
247+
while (hasMore) {
248+
pageNum++
249+
250+
const memberIds = await fetchMemberIdPage(qx, cursor, opts.pageSize)
251+
252+
if (memberIds.length === 0) {
253+
log.info('No more members to process.')
254+
hasMore = false
255+
continue
256+
}
257+
258+
const lastMemberId = memberIds[memberIds.length - 1]
259+
totalScanned += memberIds.length
260+
261+
const brokenMembers = await findBrokenMembers(qx, memberIds)
262+
totalBroken += brokenMembers.length
263+
264+
log.info(
265+
`Page ${pageNum}: scanned ${memberIds.length} | broken: ${brokenMembers.length} | cursor: ${lastMemberId}`,
266+
)
267+
268+
if (brokenMembers.length > 0) {
269+
if (opts.dryRun) {
270+
for (const { memberId, activeOrgIds } of brokenMembers) {
271+
log.info(
272+
`[DRY RUN] memberUpdate | memberId: ${memberId} | activeOrgs: ${activeOrgIds.length}`,
273+
)
274+
}
275+
} else {
276+
const triggeredSoFar = totalSucceeded + totalFailed
277+
const remaining = opts.limit !== null ? opts.limit - triggeredSoFar : brokenMembers.length
278+
if (remaining <= 0) {
279+
log.info(`Limit of ${opts.limit} workflows reached.`)
280+
hasMore = false
281+
continue
282+
}
283+
284+
const toProcess = brokenMembers.slice(0, remaining)
285+
const { succeeded, failed } = await runWithConcurrency(
286+
toProcess,
287+
opts.concurrency,
288+
async ({ memberId, activeOrgIds }) => {
289+
log.info({ memberId, activeOrgs: activeOrgIds.length }, 'Triggering memberUpdate workflow')
290+
await temporal.workflow.start('memberUpdate', {
291+
taskQueue: 'profiles',
292+
workflowId: `member-update/${DEFAULT_TENANT_ID}/${memberId}`,
293+
workflowIdReusePolicy:
294+
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
295+
retry: { maximumAttempts: 10 },
296+
args: [
297+
{
298+
member: { id: memberId },
299+
memberOrganizationIds: activeOrgIds,
300+
syncToOpensearch: true,
301+
},
302+
],
303+
searchAttributes: { TenantId: [DEFAULT_TENANT_ID] },
304+
})
305+
if (opts.workflowDelayMs > 0) {
306+
await new Promise((resolve) => setTimeout(resolve, opts.workflowDelayMs))
307+
}
308+
},
309+
)
310+
311+
totalSucceeded += succeeded
312+
totalFailed += failed
313+
log.info(`Page ${pageNum} done: ${succeeded} ok, ${failed} failed`)
314+
315+
if (opts.limit !== null && totalSucceeded + totalFailed >= opts.limit) {
316+
log.info(`Limit of ${opts.limit} workflows reached.`)
317+
hasMore = false
318+
continue
319+
}
320+
}
321+
}
322+
323+
log.info(`Resume with: --start-after ${lastMemberId}`)
324+
cursor = lastMemberId
325+
326+
if (memberIds.length < opts.pageSize) {
327+
hasMore = false
328+
}
329+
330+
if (opts.pageDelayMs > 0) {
331+
await new Promise((resolve) => setTimeout(resolve, opts.pageDelayMs))
332+
}
333+
}
334+
335+
log.info('='.repeat(80))
336+
log.info('Summary')
337+
log.info('='.repeat(80))
338+
const brokenPct = totalScanned > 0 ? ((totalBroken / totalScanned) * 100).toFixed(2) : '0.00'
339+
log.info(`Pages processed: ${pageNum}`)
340+
log.info(`Members scanned: ${totalScanned}`)
341+
log.info(`Members broken: ${totalBroken} (${brokenPct}%)`)
342+
if (!opts.dryRun) {
343+
log.info(`Workflows succeeded: ${totalSucceeded}`)
344+
log.info(`Workflows failed: ${totalFailed}`)
345+
}
346+
347+
process.exit(totalFailed > 0 ? 1 : 0)
348+
}
349+
350+
main().catch((err) => {
351+
log.error({ err }, 'Unexpected error')
352+
process.exit(1)
353+
})

0 commit comments

Comments
 (0)