Skip to content

Commit 375a48e

Browse files
authored
fix: paginate organization member sync with heartbeats to prevent timeout loop (#4042)
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
1 parent 46cd642 commit 375a48e

10 files changed

Lines changed: 123 additions & 76 deletions

File tree

backend/src/services/searchSyncService.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,13 @@ export default class SearchSyncService extends LoggerBase {
6666
async triggerOrganizationMembersSync(organizationId: string) {
6767
const client = await this.getSearchSyncClient()
6868

69-
if (client instanceof SearchSyncApiClient || client instanceof SearchSyncWorkerEmitter) {
69+
if (client instanceof SearchSyncApiClient) {
7070
await this.logExecutionTime(
71-
() => client.triggerOrganizationMembersSync(organizationId, false),
71+
() => client.syncOrganizationMembers(organizationId),
7272
`triggerOrganizationMembersSync: organization:${organizationId}`,
7373
)
74+
} else if (client instanceof SearchSyncWorkerEmitter) {
75+
await client.triggerOrganizationMembersSync(organizationId, false)
7476
} else {
7577
throw new Error('Unexpected search client type!')
7678
}

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/apps/entity_merging_worker/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"@crowd/queue": "workspace:*",
2121
"@crowd/redis": "workspace:*",
2222
"@crowd/types": "workspace:*",
23+
"@temporalio/activity": "~1.11.8",
2324
"@temporalio/workflow": "~1.11.8",
2425
"tsx": "^4.7.1",
2526
"typescript": "^5.6.3"

services/apps/entity_merging_worker/src/activities/organizations.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Context, heartbeat } from '@temporalio/activity'
12
import { WorkflowIdConflictPolicy, WorkflowIdReusePolicy } from '@temporalio/workflow'
23

34
import { DEFAULT_TENANT_ID } from '@crowd/common'
@@ -96,7 +97,19 @@ export async function syncOrganization(organizationId: string, syncStart: Date):
9697
})
9798

9899
await syncApi.triggerOrganizationSync(organizationId)
99-
await syncApi.triggerOrganizationMembersSync(organizationId, null, syncStart)
100+
101+
const cursor = Context.current().info.heartbeatDetails as string | undefined
102+
if (cursor) {
103+
svc.log.info({ organizationId, cursor }, 'Resuming organization member sync from heartbeat')
104+
}
105+
106+
const { totalSynced } = await syncApi.syncOrganizationMembers(organizationId, {
107+
syncFrom: syncStart,
108+
cursor,
109+
onPageComplete: (c) => heartbeat(c),
110+
})
111+
112+
svc.log.info({ organizationId, totalSynced }, 'Finished syncing organization members')
100113
}
101114

102115
export async function notifyFrontendOrganizationUnmergeSuccessful(

services/apps/entity_merging_worker/src/workflows/all.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,18 @@ const {
1414
recalculateActivityAffiliationsOfOrganizationAsync,
1515
setMergeAction,
1616
syncMember,
17-
syncOrganization,
1817
notifyFrontendMemberMergeSuccessful,
1918
notifyFrontendMemberUnmergeSuccessful,
2019
syncRemoveMember,
2120
finishMemberMergingUpdateActivities,
2221
finishMemberUnmergingUpdateActivities,
2322
} = proxyActivities<typeof activities>({
24-
startToCloseTimeout: '60 minutes',
23+
startToCloseTimeout: '2 hours',
24+
})
25+
26+
const { syncOrganization } = proxyActivities<typeof activities>({
27+
startToCloseTimeout: '2 hours',
28+
heartbeatTimeout: '5 minutes',
2529
})
2630

2731
export async function deleteOrphanMember(memberId: string): Promise<void> {

services/apps/search_sync_api/src/routes/member.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,14 @@ router.post(
3030
router.post(
3131
'/sync/organization/members',
3232
asyncWrap(async (req: ApiRequest, res) => {
33-
const memberSyncService = syncService(req)
34-
35-
const { organizationId, syncFrom } = req.body
33+
const { organizationId, lastId, batchSize, syncFrom } = req.body
3634
try {
37-
req.log.trace(
38-
`Calling memberSyncService.syncOrganizationMembers for organization ${organizationId}`,
39-
)
40-
await memberSyncService.syncOrganizationMembers(organizationId, {
35+
const result = await syncService(req).syncOrganizationMembers(organizationId, {
36+
lastId: lastId ?? undefined,
37+
batchSize: batchSize ?? undefined,
4138
syncFrom: syncFrom ? new Date(syncFrom) : null,
4239
})
43-
res.sendStatus(200)
40+
res.json(result)
4441
} catch (error) {
4542
req.log.error(error)
4643
res.status(500).send(error.message)

services/apps/search_sync_worker/src/queue/index.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,15 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
9696
break
9797
case SearchSyncWorkerQueueMessageType.SYNC_ORGANIZATION_MEMBERS:
9898
if (data.organizationId) {
99-
this.initMemberService()
100-
.syncOrganizationMembers(data.organizationId)
101-
.catch((err) => this.log.error(err, 'Error while syncing organization members!'))
99+
const memberService = this.initMemberService()
100+
const orgId = data.organizationId as string
101+
;(async () => {
102+
let lastId: string | undefined
103+
do {
104+
const result = await memberService.syncOrganizationMembers(orgId, { lastId })
105+
lastId = result.lastId ?? undefined
106+
} while (lastId)
107+
})().catch((err) => this.log.error(err, 'Error while syncing organization members!'))
102108
}
103109

104110
break

services/libs/opensearch/src/apiClient.ts

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@ export interface ISearchSyncApiConfig {
44
baseUrl: string
55
}
66

7+
export interface ISyncOrganizationMembersOptions {
8+
syncFrom?: Date | null
9+
cursor?: string
10+
pageSize?: number
11+
maxPages?: number
12+
onPageComplete?: (cursor: string, membersSynced: number) => void | Promise<void>
13+
}
14+
715
export class SearchSyncApiClient {
816
private searchSyncApi
917

@@ -23,19 +31,43 @@ export class SearchSyncApiClient {
2331
})
2432
}
2533

26-
public async triggerOrganizationMembersSync(
34+
public async syncOrganizationMembers(
2735
organizationId: string,
28-
onboarding?: boolean,
29-
syncFrom: Date | null = null,
30-
): Promise<void> {
36+
opts: ISyncOrganizationMembersOptions = {},
37+
): Promise<{ totalSynced: number }> {
3138
if (!organizationId) {
3239
throw new Error('organizationId is required!')
3340
}
3441

35-
await this.searchSyncApi.post('/sync/organization/members', {
36-
organizationId,
37-
syncFrom,
38-
})
42+
const pageSize = opts.pageSize ?? 500
43+
const maxPages = opts.maxPages ?? 2000
44+
45+
let cursor = opts.cursor
46+
let totalSynced = 0
47+
48+
for (let i = 0; i < maxPages; i++) {
49+
const { data } = await this.searchSyncApi.post('/sync/organization/members', {
50+
organizationId,
51+
lastId: cursor ?? null,
52+
batchSize: pageSize,
53+
syncFrom: opts.syncFrom ?? null,
54+
})
55+
56+
const result: { lastId: string | null; membersSynced: number } = data
57+
totalSynced += result.membersSynced
58+
59+
if (result.lastId === null || result.membersSynced < pageSize) {
60+
return { totalSynced }
61+
}
62+
63+
cursor = result.lastId
64+
await opts.onPageComplete?.(cursor, result.membersSynced)
65+
}
66+
67+
throw new Error(
68+
`syncOrganizationMembers exceeded maxPages (${maxPages}) for organization ${organizationId}. ` +
69+
`Synced ${totalSynced} members before aborting.`,
70+
)
3971
}
4072

4173
public async triggerRemoveMember(memberId: string): Promise<void> {

services/libs/opensearch/src/repo/member.repo.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
2222
return JSON.parse(cachedString)
2323
}
2424

25-
const results = await this.db().one(
25+
const results = await this.db().any(
2626
`select type, "canDelete", show, label, name, options from "memberAttributeSettings"`,
2727
)
2828

@@ -37,6 +37,10 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
3737
lastId?: string,
3838
syncFrom?: Date,
3939
): Promise<string[]> {
40+
if (!Number.isInteger(perPage) || perPage <= 0) {
41+
throw new Error('perPage must be a positive integer')
42+
}
43+
4044
const rows = await this.db().any(
4145
`
4246
SELECT
@@ -51,12 +55,13 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
5155
m."deletedAt" is null AND
5256
exists (select 1 from "memberIdentities" where "memberId" = mo."memberId" and "deletedAt" is null)
5357
ORDER BY mo."memberId"
54-
LIMIT ${perPage};
58+
LIMIT $(perPage);
5559
`,
5660
{
5761
organizationId,
5862
lastId,
5963
syncFrom,
64+
perPage,
6065
},
6166
)
6267

services/libs/opensearch/src/service/member.sync.service.ts

Lines changed: 34 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -207,64 +207,48 @@ export class MemberSyncService {
207207

208208
public async syncOrganizationMembers(
209209
organizationId: string,
210-
opts: { syncFrom: Date | null } = { syncFrom: null },
211-
): Promise<void> {
212-
this.log.debug({ organizationId }, 'Syncing all organization members!')
213-
const batchSize = 500
214-
let docCount = 0
215-
const memberCount = 0
216-
217-
const now = new Date()
218-
219-
const loadNextPage = async (lastId?: string): Promise<string[]> => {
220-
this.log.info('Loading next page of organization members!', { organizationId, lastId })
221-
const memberIds = await logExecutionTimeV2(
222-
() =>
223-
this.memberRepo.getOrganizationMembersForSync(
224-
organizationId,
225-
batchSize,
226-
lastId,
227-
opts.syncFrom,
228-
),
229-
this.log,
230-
`getOrganizationMembersForSync`,
231-
)
232-
233-
if (memberIds.length === 0) {
234-
return []
235-
}
210+
opts: {
211+
lastId?: string
212+
batchSize?: number
213+
syncFrom?: Date | null
214+
} = {},
215+
): Promise<{ lastId: string | null; membersSynced: number }> {
216+
const batchSize = opts.batchSize ?? 500
217+
218+
const memberIds = await logExecutionTimeV2(
219+
() =>
220+
this.memberRepo.getOrganizationMembersForSync(
221+
organizationId,
222+
batchSize,
223+
opts.lastId,
224+
opts.syncFrom ?? undefined,
225+
),
226+
this.log,
227+
'getOrganizationMembersForSync',
228+
)
236229

237-
return memberIds
230+
if (memberIds.length === 0) {
231+
return { lastId: null, membersSynced: 0 }
238232
}
239233

240-
let memberIds: string[] = await loadNextPage()
241-
242-
while (memberIds.length > 0) {
243-
for (let i = 0; i < memberIds.length; i++) {
244-
const memberId = memberIds[i]
245-
const { documentsIndexed } = await logExecutionTimeV2(
246-
() => this.syncMembers(memberId),
247-
this.log,
248-
`syncMembers (${i}/${memberIds.length})`,
249-
)
250-
251-
docCount += documentsIndexed
252-
}
253-
254-
const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000
255-
this.log.info(
256-
{ organizationId },
257-
`Synced ${memberCount} members! Speed: ${Math.round(
258-
memberCount / diffInSeconds,
259-
)} members/second!`,
234+
let docCount = 0
235+
for (let i = 0; i < memberIds.length; i++) {
236+
const { documentsIndexed } = await logExecutionTimeV2(
237+
() => this.syncMembers(memberIds[i]),
238+
this.log,
239+
`syncMembers (${i + 1}/${memberIds.length})`,
260240
)
261-
memberIds = await loadNextPage(memberIds[memberIds.length - 1])
241+
docCount += documentsIndexed
262242
}
263243

244+
const lastId = memberIds[memberIds.length - 1]
245+
264246
this.log.info(
265-
{ organizationId },
266-
`Synced total of ${memberCount} members with ${docCount} documents!`,
247+
{ organizationId, membersSynced: memberIds.length, docCount, lastId },
248+
`Synced page of ${memberIds.length} members (${docCount} docs indexed)`,
267249
)
250+
251+
return { lastId, membersSynced: memberIds.length }
268252
}
269253

270254
public async syncMembers(memberId: string): Promise<IMemberSyncResult> {

0 commit comments

Comments
 (0)