Skip to content

Commit 83e38bd

Browse files
committed
fix: prevent enrichment timeouts and activityRelations write churn
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
1 parent 15d59ff commit 83e38bd

10 files changed

Lines changed: 174 additions & 87 deletions

File tree

services/apps/members_enrichment_worker/src/activities/enrichment.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ export async function updateMemberUsingSquashedPayload(
297297
): Promise<boolean> {
298298
const affectedOrgIds: string[] = []
299299
const orgIdsToSync: string[] = []
300+
let affiliationNeedsRefresh = false
300301

301302
const wasUpdated = await svc.postgres.writer.transactionally(async (tx) => {
302303
let didUpdate = false
@@ -581,6 +582,10 @@ export async function updateMemberUsingSquashedPayload(
581582
isHighConfidenceSourceSelectedForWorkExperiences,
582583
)
583584

585+
affiliationNeedsRefresh =
586+
results.toUpdate.size > 0 ||
587+
hasMemberOrganizationTimelineChange(results.toDelete, results.toCreate)
588+
584589
if (results.toDelete.length > 0) {
585590
for (const org of results.toDelete) {
586591
didUpdate = true
@@ -676,7 +681,7 @@ export async function updateMemberUsingSquashedPayload(
676681
)
677682
}
678683

679-
if (affectedOrgIds.length > 0) {
684+
if (affiliationNeedsRefresh && affectedOrgIds.length > 0) {
680685
const commonMemberService = new CommonMemberService(
681686
pgpQx(svc.postgres.writer.connection()),
682687
svc.temporal,
@@ -774,6 +779,23 @@ function sanitizeWorkExperienceDateRanges(
774779
})
775780
}
776781

782+
function hasMemberOrganizationTimelineChange(
783+
toDelete: IMemberOrganizationData[],
784+
toCreate: IMemberEnrichmentDataNormalizedOrganization[],
785+
): boolean {
786+
const toKey = (orgId: string, start: string | null | undefined, end: string | null | undefined) =>
787+
`${orgId}|${start ? start.substring(0, 10) : ''}|${end ? end.substring(0, 10) : ''}`
788+
789+
const deletedKeys = new Set(toDelete.map((d) => toKey(d.orgId, d.dateStart, d.dateEnd)))
790+
const createdKeys = new Set(toCreate.map((c) => toKey(c.organizationId, c.startDate, c.endDate)))
791+
792+
if (deletedKeys.size !== createdKeys.size) return true
793+
for (const key of deletedKeys) {
794+
if (!createdKeys.has(key)) return true
795+
}
796+
return false
797+
}
798+
777799
function prepareWorkExperiences(
778800
oldVersion: IMemberOrganizationData[],
779801
newVersion: IMemberEnrichmentDataNormalizedOrganization[],

services/apps/members_enrichment_worker/src/schedules/membersEnrichment.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export const scheduleMembersEnrichment = async () => {
5757
type: 'startWorkflow',
5858
workflowType: triggerMembersEnrichment,
5959
taskQueue: 'members-enrichment',
60-
workflowExecutionTimeout: '2 hours',
60+
workflowExecutionTimeout: '3 hours',
6161
retry: {
6262
initialInterval: '15 seconds',
6363
backoffCoefficient: 2,

services/apps/members_enrichment_worker/src/workflows/enrichMember.ts

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -43,44 +43,45 @@ export async function enrichMember(
4343
// skip enrichment if member no longer exists
4444
if (!member) return
4545

46-
let changeInEnrichmentSourceData = false
46+
// Run all sources in parallel
47+
const sourceResults = await Promise.all(
48+
sources.map(async (source): Promise<boolean> => {
49+
const caches = await findMemberEnrichmentCache([source], input.id)
50+
const cache = caches.find((c) => c.source === source)
4751

48-
for (const source of sources) {
49-
// find if there's already saved enrichment data in source
50-
const caches = await findMemberEnrichmentCache([source], input.id)
51-
const cache = caches.find((c) => c.source === source)
52+
// cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds
53+
if (await isCacheObsolete(source, cache)) {
54+
const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input)
5255

53-
// cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds
54-
if (await isCacheObsolete(source, cache)) {
55-
const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input)
56-
57-
if (!(await hasRemainingCredits(source))) {
58-
// no credits remaining, throw error to fail the workflow
59-
throw ApplicationFailure.create({
60-
message: `No credits remaining for source ${source}`,
61-
type: 'MEMBER_ENRICHMENT_NO_CREDITS',
62-
nonRetryable: true,
63-
})
64-
}
56+
if (!(await hasRemainingCredits(source))) {
57+
// no credits remaining, throw error to fail the workflow
58+
throw ApplicationFailure.create({
59+
message: `No credits remaining for source ${source}`,
60+
type: 'MEMBER_ENRICHMENT_NO_CREDITS',
61+
nonRetryable: true,
62+
})
63+
}
6564

66-
const data = await getEnrichmentData(source, enrichmentInput)
65+
const data = await getEnrichmentData(source, enrichmentInput)
6766

68-
if (!cache) {
69-
await insertMemberEnrichmentCache(source, input.id, data)
70-
if (data) {
71-
changeInEnrichmentSourceData = true
67+
if (!cache) {
68+
await insertMemberEnrichmentCache(source, input.id, data)
69+
return !!data
70+
} else if (data === null && cache.data !== null) {
71+
await touchMemberEnrichmentCacheUpdatedAt(source, input.id)
72+
} else if (sourceHasDifferentDataComparedToCache(cache, data)) {
73+
await updateMemberEnrichmentCache(source, input.id, data)
74+
return true
75+
} else {
76+
// data is same as cache, only update cache.updatedAt
77+
await touchMemberEnrichmentCacheUpdatedAt(source, input.id)
7278
}
73-
} else if (data === null && cache.data !== null) {
74-
await touchMemberEnrichmentCacheUpdatedAt(source, input.id)
75-
} else if (sourceHasDifferentDataComparedToCache(cache, data)) {
76-
await updateMemberEnrichmentCache(source, input.id, data)
77-
changeInEnrichmentSourceData = true
78-
} else {
79-
// data is same as cache, only update cache.updatedAt
80-
await touchMemberEnrichmentCacheUpdatedAt(source, input.id)
8179
}
82-
}
83-
}
80+
return false
81+
}),
82+
)
83+
84+
const changeInEnrichmentSourceData = sourceResults.some(Boolean)
8485

8586
if (changeInEnrichmentSourceData && input.activityCount > 100) {
8687
// Member enrichment data has been updated, use squasher again!

services/apps/profiles_worker/src/activities/member/memberUpdate.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@ import { SearchSyncApiClient } from '@crowd/opensearch'
44

55
import { svc } from '../../main'
66

7-
/*
8-
updateMemberAffiliations is a Temporal activity that updates all affiliations for
9-
a given member.
10-
*/
117
export async function updateMemberAffiliations(memberId: string): Promise<void> {
128
const qx = pgpQx(svc.postgres.writer.connection())
139
await refreshMemberOrganizationAffiliations(qx, memberId)
Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
import { proxyActivities } from '@temporalio/workflow'
1+
import {
2+
condition,
3+
continueAsNew,
4+
defineSignal,
5+
proxyActivities,
6+
setHandler,
7+
workflowInfo,
8+
} from '@temporalio/workflow'
29

310
import * as activities from '../../activities'
411
import { MemberUpdateInput } from '../../types/member'
@@ -10,25 +17,59 @@ const { updateMemberAffiliations, syncOrganization, syncMember } = proxyActiviti
1017
startToCloseTimeout: '60 minutes',
1118
})
1219

20+
export const refreshAffiliationsSignal = defineSignal<[MemberUpdateInput]>('refreshAffiliations')
21+
1322
/*
1423
memberUpdate is a Temporal workflow that:
15-
- [Activity]: Update all affiliations for a given member in the database.
24+
- [Signal]: accepts 'refreshAffiliations' signals to queue affiliation refresh requests
25+
- [Activity]: Refresh all affiliations for a given member in the database.
1626
- [Activity]: Sync member and memberOrganizations to OpenSearch if specified.
27+
28+
Signals are coalesced: if N requests arrive while a refresh is in progress,
29+
they are merged into one follow-up pass. This eliminates the TERMINATE_IF_RUNNING
30+
race where concurrent callers killed mid-flight refreshes.
1731
*/
18-
export async function memberUpdate(input: MemberUpdateInput): Promise<void> {
19-
const memberId = input.member.id
20-
try {
32+
export async function memberUpdate(input?: MemberUpdateInput): Promise<void> {
33+
let queued: MemberUpdateInput | null = input ?? null
34+
35+
setHandler(refreshAffiliationsSignal, (next: MemberUpdateInput) => {
36+
if (!queued) {
37+
queued = next
38+
} else {
39+
queued = {
40+
member: queued.member,
41+
memberOrganizationIds: [
42+
...new Set([
43+
...(queued.memberOrganizationIds || []),
44+
...(next.memberOrganizationIds || []),
45+
]),
46+
],
47+
syncToOpensearch: queued.syncToOpensearch || next.syncToOpensearch,
48+
}
49+
}
50+
})
51+
52+
if (!queued) {
53+
const received = await condition(() => queued !== null, '5 minutes')
54+
if (!received) return
55+
}
56+
57+
while (queued) {
58+
const pending = queued
59+
queued = null
60+
61+
const memberId = pending.member.id
2162
await updateMemberAffiliations(memberId)
22-
if (input.syncToOpensearch) {
23-
// sync member
63+
if (pending.syncToOpensearch) {
2464
await syncMember(memberId)
25-
// sync all member organizations
26-
const organizationIds = input.memberOrganizationIds || []
27-
for (const orgId of organizationIds) {
65+
for (const orgId of pending.memberOrganizationIds || []) {
2866
await syncOrganization(orgId)
2967
}
3068
}
31-
} catch (err) {
32-
throw new Error(err)
69+
70+
if (workflowInfo().continueAsNewSuggested && queued) {
71+
await continueAsNew<typeof memberUpdate>(queued)
72+
return
73+
}
3374
}
3475
}

services/apps/profiles_worker/src/workflows/organization/organizationUpdate.ts

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
1-
import { continueAsNew, proxyActivities } from '@temporalio/workflow'
1+
import {
2+
ParentClosePolicy,
3+
WorkflowIdConflictPolicy,
4+
continueAsNew,
5+
proxyActivities,
6+
startChild,
7+
} from '@temporalio/workflow'
8+
9+
import { DEFAULT_TENANT_ID } from '@crowd/common'
10+
import { TemporalWorkflowId } from '@crowd/types'
211

312
import * as activities from '../../activities'
13+
import { MemberUpdateInput } from '../../types/member'
414
import { IOrganizationProfileSyncInput } from '../../types/organization'
515

616
// Configure timeouts and retry policies to update a member in the database.
7-
const { updateMemberAffiliations, syncOrganization, findMembersInOrganization } = proxyActivities<
8-
typeof activities
9-
>({
17+
const { syncOrganization, findMembersInOrganization } = proxyActivities<typeof activities>({
1018
startToCloseTimeout: '5 minutes',
1119
})
1220

@@ -43,7 +51,20 @@ export async function organizationUpdate(input: IOrganizationProfileSyncInput):
4351
}
4452

4553
for (const memberId of memberIds) {
46-
await updateMemberAffiliations(memberId)
54+
const memberInput: MemberUpdateInput = {
55+
member: { id: memberId },
56+
memberOrganizationIds: [],
57+
syncToOpensearch: false,
58+
}
59+
const handle = await startChild('memberUpdate', {
60+
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${DEFAULT_TENANT_ID}/${memberId}`,
61+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
62+
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
63+
taskQueue: 'profiles',
64+
args: [],
65+
searchAttributes: { TenantId: [DEFAULT_TENANT_ID] },
66+
})
67+
await handle.signal('refreshAffiliations', memberInput)
4768
}
4869

4970
await continueAsNew<typeof organizationUpdate>({

services/apps/script_executor_worker/src/activities/common.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import axios from 'axios'
22

3-
import { pgpQx } from '@crowd/data-access-layer'
4-
import { refreshMemberOrganizationAffiliations } from '@crowd/data-access-layer/src/member-organization-affiliation'
3+
import { DEFAULT_TENANT_ID } from '@crowd/common'
54
import { findOrganizationSegments } from '@crowd/data-access-layer/src/old/apps/entity_merging_worker'
5+
import { WorkflowIdConflictPolicy } from '@crowd/temporal'
66
import {
77
IMemberIdentity,
88
IMemberUnmergeBackup,
99
IMemberUnmergePreviewResult,
1010
IUnmergeBackup,
1111
IUnmergePreviewResult,
12+
TemporalWorkflowId,
1213
} from '@crowd/types'
1314

1415
import { svc } from '../main'
@@ -159,10 +160,14 @@ export async function getWorkflowsCount(workflowType: string, status: string): P
159160
}
160161

161162
export async function calculateMemberAffiliations(memberId: string): Promise<void> {
162-
try {
163-
const qx = pgpQx(svc.postgres.writer.connection())
164-
await refreshMemberOrganizationAffiliations(qx, memberId)
165-
} catch (err) {
166-
throw new Error(err)
167-
}
163+
await svc.temporal.workflow.signalWithStart('memberUpdate', {
164+
taskQueue: 'profiles',
165+
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${DEFAULT_TENANT_ID}/${memberId}`,
166+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
167+
signal: 'refreshAffiliations',
168+
signalArgs: [{ member: { id: memberId }, memberOrganizationIds: [], syncToOpensearch: false }],
169+
retry: { maximumAttempts: 10 },
170+
args: [],
171+
searchAttributes: { TenantId: [DEFAULT_TENANT_ID] },
172+
})
168173
}

services/apps/script_executor_worker/src/bin/recalculate-all-affiliations.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { WorkflowIdReusePolicy } from '@temporalio/client'
1+
import { WorkflowIdConflictPolicy } from '@temporalio/client'
22

33
import { DEFAULT_TENANT_ID } from '@crowd/common'
44
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
@@ -262,19 +262,20 @@ async function main() {
262262
log.info(
263263
`Triggering memberUpdate: ${memberId} | stale orgs: [${staleOrgIds.join(', ')}] | active orgs: ${activeOrgIds.length}`,
264264
)
265-
await temporal.workflow.start('memberUpdate', {
265+
await temporal.workflow.signalWithStart('memberUpdate', {
266266
taskQueue: 'profiles',
267267
workflowId: `member-update/${DEFAULT_TENANT_ID}/${memberId}`,
268-
workflowIdReusePolicy:
269-
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
270-
retry: { maximumAttempts: 10 },
271-
args: [
268+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
269+
signal: 'refreshAffiliations',
270+
signalArgs: [
272271
{
273272
member: { id: memberId },
274273
memberOrganizationIds: activeOrgIds,
275274
syncToOpensearch: true,
276275
},
277276
],
277+
retry: { maximumAttempts: 10 },
278+
args: [],
278279
searchAttributes: { TenantId: [DEFAULT_TENANT_ID] },
279280
})
280281
if (opts.workflowDelayMs > 0) {

services/apps/script_executor_worker/src/bin/recalculate-enrichment-affiliations.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* CROWD_TEMPORAL_NAMESPACE - Temporal namespace
3939
* SERVICE - Service identifier (used by Temporal client)
4040
*/
41-
import { WorkflowIdReusePolicy } from '@temporalio/client'
41+
import { WorkflowIdConflictPolicy } from '@temporalio/client'
4242

4343
import { DEFAULT_TENANT_ID } from '@crowd/common'
4444
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
@@ -253,19 +253,20 @@ async function main() {
253253
{ memberId, activeOrgs: activeOrgIds.length, deletedOrgs: deletedOrgCount },
254254
'Triggering memberUpdate workflow',
255255
)
256-
await temporal.workflow.start('memberUpdate', {
256+
await temporal.workflow.signalWithStart('memberUpdate', {
257257
taskQueue: 'profiles',
258258
workflowId: `member-update/${DEFAULT_TENANT_ID}/${memberId}`,
259-
workflowIdReusePolicy:
260-
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
261-
retry: { maximumAttempts: 10 },
262-
args: [
259+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
260+
signal: 'refreshAffiliations',
261+
signalArgs: [
263262
{
264263
member: { id: memberId },
265264
memberOrganizationIds: activeOrgIds,
266265
syncToOpensearch: true,
267266
},
268267
],
268+
retry: { maximumAttempts: 10 },
269+
args: [],
269270
searchAttributes: { TenantId: [DEFAULT_TENANT_ID] },
270271
})
271272
if (opts.workflowDelayMs > 0) {

0 commit comments

Comments
 (0)