@@ -50,9 +50,10 @@ import { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal'
5050
5151const log = getServiceChildLogger ( 'recalculate-all-affiliations' )
5252
53- interface MemberWithActiveOrgs {
53+ interface BrokenMember {
5454 memberId : string
5555 activeOrgIds : string [ ]
56+ staleOrgIds : string [ ]
5657}
5758
5859interface ScriptOptions {
@@ -151,19 +152,19 @@ async function fetchMemberIdPage(
151152async function findBrokenMembers (
152153 qx : ReturnType < typeof pgpQx > ,
153154 memberIds : string [ ] ,
154- ) : Promise < MemberWithActiveOrgs [ ] > {
155+ ) : Promise < BrokenMember [ ] > {
155156 // First reduce to distinct (memberId, organizationId) pairs — a member may have
156157 // thousands of activities but only a handful of distinct org attributions.
157158 // The NOT EXISTS then runs on the small deduplicated set, not on every activity row.
158- const brokenRows = await qx . select (
159+ const staleRows = await qx . select (
159160 `
160161 WITH pairs AS (
161162 SELECT DISTINCT "memberId", "organizationId"
162163 FROM "activityRelations"
163164 WHERE "memberId" = ANY($(memberIds)::uuid[])
164165 AND "organizationId" IS NOT NULL
165166 )
166- SELECT DISTINCT p."memberId"
167+ SELECT p."memberId", p."organizationId" AS "staleOrgId "
167168 FROM pairs p
168169 WHERE NOT EXISTS (
169170 SELECT 1 FROM "memberOrganizations" mo
@@ -175,11 +176,18 @@ async function findBrokenMembers(
175176 { memberIds } ,
176177 )
177178
178- if ( brokenRows . length === 0 ) {
179+ if ( staleRows . length === 0 ) {
179180 return [ ]
180181 }
181182
182- const brokenMemberIds = brokenRows . map ( ( r : Record < string , unknown > ) => r . memberId as string )
183+ const staleMap = new Map < string , string [ ] > ( )
184+ for ( const r of staleRows as Record < string , string > [ ] ) {
185+ const existing = staleMap . get ( r . memberId ) ?? [ ]
186+ existing . push ( r . staleOrgId )
187+ staleMap . set ( r . memberId , existing )
188+ }
189+
190+ const brokenMemberIds = [ ...staleMap . keys ( ) ]
183191
184192 // Fetch currently active org IDs to pass to memberUpdate
185193 const orgRows = await qx . select (
@@ -193,7 +201,7 @@ async function findBrokenMembers(
193201 { brokenMemberIds } ,
194202 )
195203
196- const orgMap = new Map < string , string [ ] > (
204+ const activeOrgMap = new Map < string , string [ ] > (
197205 orgRows . map ( ( r : Record < string , unknown > ) => [
198206 r . memberId as string ,
199207 ( r . activeOrgIds as string [ ] | null ) ?? [ ] ,
@@ -202,7 +210,8 @@ async function findBrokenMembers(
202210
203211 return brokenMemberIds . map ( ( memberId : string ) => ( {
204212 memberId,
205- activeOrgIds : orgMap . get ( memberId ) ?? [ ] ,
213+ activeOrgIds : activeOrgMap . get ( memberId ) ?? [ ] ,
214+ staleOrgIds : staleMap . get ( memberId ) ?? [ ] ,
206215 } ) )
207216}
208217
@@ -289,9 +298,9 @@ async function main() {
289298 const loggedSoFar = totalBroken - brokenMembers . length
290299 const remaining = opts . limit !== null ? opts . limit - loggedSoFar : brokenMembers . length
291300 const toLog = brokenMembers . slice ( 0 , remaining )
292- for ( const { memberId, activeOrgIds } of toLog ) {
301+ for ( const { memberId, activeOrgIds, staleOrgIds } of toLog ) {
293302 log . info (
294- `[DRY RUN] memberUpdate | memberId: ${ memberId } | activeOrgs : ${ activeOrgIds . length } ` ,
303+ `[DRY RUN] broken member: ${ memberId } | stale orgs: [ ${ staleOrgIds . join ( ', ' ) } ] | active orgs : ${ activeOrgIds . length } ` ,
295304 )
296305 }
297306 if ( opts . limit !== null && loggedSoFar + toLog . length >= opts . limit ) {
@@ -313,8 +322,10 @@ async function main() {
313322 const { succeeded, failed } = await runWithConcurrency (
314323 toProcess ,
315324 opts . concurrency ,
316- async ( { memberId, activeOrgIds } ) => {
317- log . info ( { memberId, activeOrgs : activeOrgIds . length } , 'Triggering memberUpdate workflow' )
325+ async ( { memberId, activeOrgIds, staleOrgIds } ) => {
326+ log . info (
327+ `Triggering memberUpdate for broken member: ${ memberId } | stale orgs: [${ staleOrgIds . join ( ', ' ) } ] | active orgs: ${ activeOrgIds . length } ` ,
328+ )
318329 await temporal . workflow . start ( 'memberUpdate' , {
319330 taskQueue : 'profiles' ,
320331 workflowId : `member-update/${ DEFAULT_TENANT_ID } /${ memberId } ` ,
0 commit comments