Skip to content

Commit ebab86d

Browse files
committed
chore: add code comments for readability
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
1 parent 83e38bd commit ebab86d

4 files changed

Lines changed: 18 additions & 11 deletions

File tree

services/apps/members_enrichment_worker/src/activities/enrichment.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,8 @@ export async function updateMemberUsingSquashedPayload(
582582
isHighConfidenceSourceSelectedForWorkExperiences,
583583
)
584584

585+
// Enrichment often deletes and recreates the same orgs with identical dates.
586+
// Skip the refresh when the timeline that drives activityRelations hasn't changed.
585587
affiliationNeedsRefresh =
586588
results.toUpdate.size > 0 ||
587589
hasMemberOrganizationTimelineChange(results.toDelete, results.toCreate)
@@ -779,6 +781,11 @@ function sanitizeWorkExperienceDateRanges(
779781
})
780782
}
781783

784+
/**
785+
* Returns true when the set of (orgId, startDate, endDate) tuples differs
786+
* between deletes and creates. Fields like title or source don't affect
787+
* the affiliation timeline, so they're intentionally ignored.
788+
*/
782789
function hasMemberOrganizationTimelineChange(
783790
toDelete: IMemberOrganizationData[],
784791
toCreate: IMemberEnrichmentDataNormalizedOrganization[],

services/apps/members_enrichment_worker/src/workflows/enrichMember.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ export async function enrichMember(
4343
// skip enrichment if member no longer exists
4444
if (!member) return
4545

46-
// Run all sources in parallel
46+
// Fetch all sources in parallel so one slow or rate-limited source doesn't block the rest.
47+
// If any source fails, the member's enrichment fails immediately.
4748
const sourceResults = await Promise.all(
4849
sources.map(async (source): Promise<boolean> => {
4950
const caches = await findMemberEnrichmentCache([source], input.id)

services/apps/profiles_worker/src/workflows/member/memberUpdate.ts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,10 @@ const { updateMemberAffiliations, syncOrganization, syncMember } = proxyActiviti
1919

2020
export const refreshAffiliationsSignal = defineSignal<[MemberUpdateInput]>('refreshAffiliations')
2121

22-
/*
23-
memberUpdate is a Temporal workflow that:
24-
- [Signal]: accepts 'refreshAffiliations' signals to queue affiliation refresh requests
25-
- [Activity]: Refresh all affiliations for a given member in the database.
26-
- [Activity]: Sync member and memberOrganizations to OpenSearch if specified.
27-
28-
Signals are coalesced: if N requests arrive while a refresh is in progress,
29-
they are merged into one follow-up pass. This eliminates the TERMINATE_IF_RUNNING
30-
race where concurrent callers killed mid-flight refreshes.
31-
*/
22+
/**
23+
* Per-member workflow that serializes async member operations (affiliations, sync, etc).
24+
* Concurrent signals are coalesced into one follow-up pass instead of racing.
25+
*/
3226
export async function memberUpdate(input?: MemberUpdateInput): Promise<void> {
3327
let queued: MemberUpdateInput | null = input ?? null
3428

@@ -50,12 +44,14 @@ export async function memberUpdate(input?: MemberUpdateInput): Promise<void> {
5044
})
5145

5246
if (!queued) {
47+
// signalWithStart starts with no args, so wait for the first signal to arrive
5348
const received = await condition(() => queued !== null, '5 minutes')
5449
if (!received) return
5550
}
5651

5752
while (queued) {
5853
const pending = queued
54+
// Clear before awaiting so new signals accumulate into the next pass
5955
queued = null
6056

6157
const memberId = pending.member.id

services/apps/profiles_worker/src/workflows/organization/organizationUpdate.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ export async function organizationUpdate(input: IOrganizationProfileSyncInput):
5656
memberOrganizationIds: [],
5757
syncToOpensearch: false,
5858
}
59+
// Routes through the per-member workflow so concurrent org updates coalesce.
60+
// ABANDON is needed because continueAsNew closes this execution, and without it
61+
// Temporal would terminate the child refreshes we just kicked off.
5962
const handle = await startChild('memberUpdate', {
6063
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${DEFAULT_TENANT_ID}/${memberId}`,
6164
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,

0 commit comments

Comments
 (0)