fix: prevent enrichment timeouts and activityRelations write churn (CM-1179)#4098
Conversation
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR refactors the member enrichment + affiliation refresh flow to reduce Temporal timeouts and prevent churn/partial-write races in activityRelations by routing affiliation refresh triggers through a per-member, signal-driven memberUpdate workflow and skipping refreshes when the affiliation timeline hasn’t changed.
Changes:
- Parallelizes per-member enrichment source fetching via
Promise.alland increases the members-enrichment schedule execution timeout to 3 hours. - Introduces a signal-driven, coalescing
memberUpdateworkflow and updates multiple callers to usesignalWithStart/USE_EXISTINGinstead ofTERMINATE_IF_RUNNING. - Avoids unnecessary affiliation refreshes after enrichment by detecting whether
(organizationId, startDate, endDate)timeline tuples actually changed.
Reviewed changes
Copilot reviewed 8 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| services/libs/common_services/src/services/common.member.service.ts | Switches affiliation refresh trigger to signalWithStart using WorkflowIdConflictPolicy.USE_EXISTING and tenant-scoped workflowId. |
| services/apps/script_executor_worker/src/bin/recalculate-enrichment-affiliations.ts | Updates recalc script to use signalWithStart and USE_EXISTING to avoid terminating/racing refreshes. |
| services/apps/script_executor_worker/src/bin/recalculate-all-affiliations.ts | Same signalWithStart/coalescing behavior for full affiliation recalculation script. |
| services/apps/script_executor_worker/src/activities/common.ts | Routes “calculateMemberAffiliations” through the per-member Temporal workflow instead of direct DB writes. |
| services/apps/profiles_worker/src/workflows/organization/organizationUpdate.ts | Signals per-member memberUpdate child workflows and uses PARENT_CLOSE_POLICY_ABANDON to survive continueAsNew. |
| services/apps/profiles_worker/src/workflows/member/memberUpdate.ts | Reworks memberUpdate into a per-member signal-driven workflow that coalesces concurrent refresh requests. |
| services/apps/profiles_worker/src/activities/member/memberUpdate.ts | Removes outdated comment; activity still performs the DB affiliation refresh. |
| services/apps/members_enrichment_worker/src/workflows/enrichMember.ts | Runs enrichment sources in parallel and returns a boolean “did any source change” aggregation. |
| services/apps/members_enrichment_worker/src/schedules/membersEnrichment.ts | Increases schedule workflow execution timeout from 2h to 3h. |
| services/apps/members_enrichment_worker/src/activities/enrichment.ts | Skips post-enrichment affiliation refresh when the org/date timeline is unchanged. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 16 changed files in this pull request and generated 1 comment.
Files not reviewed (1)
- pnpm-lock.yaml: Language not supported
Comments suppressed due to low confidence (1)
services/apps/profiles_worker/src/workflows/organization/organizationUpdate.ts:56
- PR description mentions adding
parentClosePolicy: ABANDONto anorganizationUpdate“child fan-out”, butorganizationUpdatedoesn’t start child workflows (it signalsmemberUpdatevia an activity) and there’s noparentClosePolicyto update here. Please either update the PR description to reflect the actual mechanism, or add the intendedparentClosePolicychange in the relevant workflow if something else is supposed to be using child workflows.
await triggerMemberAffiliationsRefresh(memberId, [], false, true)
}
await continueAsNew<typeof organizationUpdate>({
...input,
afterMemberId: memberIds[memberIds.length - 1],
})
}
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
…sh logic Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
…overrides are applied Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Reviewed by Cursor Bugbot for commit c8ee3e9. Configure here.
| const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input) | ||
| // cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds | ||
| if (await isCacheObsolete(source, cache)) { | ||
| const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input) |
There was a problem hiding this comment.
Redundant getEnrichmentInput activity calls per source
Low Severity
The PR specifically hoisted findMemberEnrichmentCache outside the per-source loop to avoid redundant DB round-trips (line 47), yet getEnrichmentInput(input) is still called inside the Promise.all mapper — once per source with an obsolete cache. Since getEnrichmentInput depends only on input (not source), every call schedules an identical Temporal activity that returns the same result. With Promise.all, these N identical activities now run in parallel, wasting worker resources and inflating the workflow history.
Reviewed by Cursor Bugbot for commit c8ee3e9. Configure here.
| queued = next | ||
| } else { | ||
| queued = { | ||
| member: queued.member, |
There was a problem hiding this comment.
Signal coalescing drops member from later signals
Low Severity
When merging two queued signals, the handler always keeps queued.member and discards next.member. Today member only contains { id } and the workflow ID is keyed by member, so the values are identical. But the MemberUpdateInput type defines member as { id: string }, and if it ever gains additional fields (e.g. displayName), the fields from later signals would be silently lost. Using next.member (or merging both) would be more defensive.
Reviewed by Cursor Bugbot for commit c8ee3e9. Configure here.


Summary
triggerMembersEnrichmentwas timing out on scheduled runs, andactivityRelationswas seeing an unusually high write rate. Both came from the enrichment pipeline doing unnecessary work and then racing on that work.Enrichment sources ran sequentially per member, so one slow source could stall the rest and blow the workflow budget. Separately, enrichment-sourced work experiences were rebuilt on every run, which triggered affiliation refreshes even when the effective timeline was unchanged. Concurrent refresh requests then used start/terminate behavior, so in-flight refreshes could be killed after partial
activityRelationswrites had already committed.Changes
Promise.allwhile preserving fail-fast behavior.2 hoursto3 hours.(orgId, startDate, endDate).memberUpdateinto a signal-driven per-member workflow that coalesces concurrentrefreshAffiliationsrequests into one follow-up pass.signalMemberUpdatewithWorkflowIdConflictPolicy.USE_EXISTING, so refreshes coordinate on one workflow slot per member.signalMemberUpdatecalls to after DB transaction commits somemberUpdatereads persisted member/org data.organizationUpdateto queue member affiliation refreshes throughtriggerMemberAffiliationsRefreshinstead of callingupdateMemberAffiliationsdirectly.calculateMemberAffiliationstotriggerMemberAffiliationsRefresh, since it now queues a refresh instead of doing the work inline.Why This Helps
All refresh requests for a member now flow through the same
memberUpdateworkflow. If a refresh is already running, new requests are merged into one follow-up pass instead of terminating the current walk or starting parallel refreshes.This reduces unnecessary
activityRelationswrites, avoids duplicate partial refresh work, and keeps refresh behavior deterministic under bursts from enrichment, organization updates, manual recalc scripts, or merge flows.Note
Medium Risk
Changes the Temporal
memberUpdateexecution model and reroutes many callers to a new signal-driven path, which could affect affiliation refresh timing and OpenSearch sync behavior under load.Overview
This PR replaces direct/transaction-scoped affiliation recalculation kicks with a centralized
signalMemberUpdatehelper thatsignalWithStarts the per-membermemberUpdateworkflow and coalesces concurrent refresh requests.It updates API endpoints, services, workers, and scripts to signal after DB commits (so refresh reads persisted data), and adjusts
memberUpdateto be signal-driven (merge org IDs, optionally sync member/orgs to OpenSearch, andcontinueAsNewwhen suggested).It also reduces enrichment-driven churn by running enrichment sources in parallel, fetching caches once per member, increasing the scheduled enrichment timeout to 3 hours, and only triggering affiliation refresh when the effective work-experience timeline (orgId/start/end) or affiliation overrides change.
Reviewed by Cursor Bugbot for commit c8ee3e9. Bugbot is set up for automated code reviews on this repo. Configure here.