From a5a077746d06df380b5175f76bb40966b33c64ee Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:59:06 +0530 Subject: [PATCH 1/5] fix: prevent premature workflow cancellation in members-enrichment --- .../src/schedules/getMembersToEnrich.ts | 2 +- .../members_enrichment_worker/src/workflows/enrichMember.ts | 4 ++-- .../src/workflows/getMembersToEnrich.ts | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts b/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts index 9d6e4e4991..dfd983670c 100644 --- a/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts +++ b/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts @@ -24,7 +24,7 @@ export const scheduleMembersEnrichment = async () => { type: 'startWorkflow', workflowType: getMembersToEnrich, taskQueue: 'members-enrichment', - workflowExecutionTimeout: '20 minutes', + workflowExecutionTimeout: '45 minutes', retry: { initialInterval: '15 seconds', backoffCoefficient: 2, diff --git a/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts b/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts index ac3cdef434..71d88192e2 100644 --- a/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts +++ b/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts @@ -24,7 +24,7 @@ const { hasRemainingCredits, getMemberById, } = proxyActivities({ - startToCloseTimeout: '5 minutes', + startToCloseTimeout: '10 minutes', retry: { initialInterval: '60s', backoffCoefficient: 2.0, @@ -82,7 +82,7 @@ export async function enrichMember( workflowId: 'member-enrichment/' + input.id + '/processMemberSources', cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - workflowExecutionTimeout: '15 minutes', + workflowExecutionTimeout: '25 minutes', retry: { backoffCoefficient: 2, maximumAttempts: 10, diff --git a/services/apps/members_enrichment_worker/src/workflows/getMembersToEnrich.ts b/services/apps/members_enrichment_worker/src/workflows/getMembersToEnrich.ts index e6c1a897d2..b749682792 100644 --- a/services/apps/members_enrichment_worker/src/workflows/getMembersToEnrich.ts +++ b/services/apps/members_enrichment_worker/src/workflows/getMembersToEnrich.ts @@ -14,7 +14,7 @@ import { chunkArray } from '../utils/common' import { enrichMember } from './enrichMember' const { getEnrichableMembers, getMaxConcurrentRequests } = proxyActivities({ - startToCloseTimeout: '10 minutes', + startToCloseTimeout: '15 minutes', }) export async function getMembersToEnrich(): Promise { @@ -46,7 +46,7 @@ export async function getMembersToEnrich(): Promise { workflowId: 'member-enrichment/' + member.id, cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - workflowExecutionTimeout: '15 minutes', + workflowExecutionTimeout: '30 minutes', retry: { backoffCoefficient: 2, maximumAttempts: 10, From 87d60d5752efa8517dac55940051813d24b05414 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Mon, 10 Nov 2025 14:56:37 +0530 Subject: [PATCH 2/5] fix: stop starting a nested transaction when a transaction is provided --- .../apps/members_enrichment_worker/index.ts | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts b/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts index 86e7056d04..20c2989cc8 100644 --- a/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts @@ -469,25 +469,28 @@ export async function deleteMemberOrg(db: DbConnOrTx, memberId: string, organiza }) } -export async function deleteMemberOrgById(db: DbConnOrTx, memberId: string, id: string) { - await db.tx(async (tx) => { - await tx.none( - ` +export async function deleteMemberOrgById( + tx: DbTransaction, + memberId: string, + id: string, +): Promise { + // Execute directly on the provided transaction to avoid creating nested savepoints + await tx.none( + ` DELETE FROM "memberOrganizationAffiliationOverrides" WHERE "memberOrganizationId" = $(id); - `, - { id }, - ) + `, + { id }, + ) - await tx.none( - ` + await tx.none( + ` UPDATE "memberOrganizations" SET "deletedAt" = NOW() WHERE "memberId" = $(memberId) and id = $(id); - `, - { memberId, id }, - ) - }) + `, + { memberId, id }, + ) } export async function findMemberOrgs(db: DbStore, memberId: string, orgId: string) { From 34fdd7e672d6d796f27698966bb15359cc520552 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Mon, 10 Nov 2025 15:00:38 +0530 Subject: [PATCH 3/5] refactor: updateMemberUsingSquashedPayload to use sequential db calls --- .../src/activities/enrichment.ts | 83 ++++++++----------- 1 file changed, 34 insertions(+), 49 deletions(-) diff --git a/services/apps/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/members_enrichment_worker/src/activities/enrichment.ts index 7fa2a32c2e..015c3b7642 100644 --- a/services/apps/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/members_enrichment_worker/src/activities/enrichment.ts @@ -249,22 +249,19 @@ export async function updateMemberUsingSquashedPayload( return await svc.postgres.writer.transactionally(async (tx) => { let updated = false const qx = dbStoreQx(tx) - const promises = [] // process identities if (squashedPayload.identities.length > 0) { svc.log.debug({ memberId }, 'Adding to member identities!') for (const i of squashedPayload.identities) { updated = true - promises.push( - upsertMemberIdentity(qx, { - memberId, - platform: i.platform, - type: i.type, - value: i.value, - verified: i.verified, - }), - ) + await upsertMemberIdentity(qx, { + memberId, + platform: i.platform, + type: i.type, + value: i.value, + verified: i.verified, + }) } } @@ -273,30 +270,22 @@ export async function updateMemberUsingSquashedPayload( // it's ommited from the payload because it takes a lot of space svc.log.debug('Processing contributions! ', { memberId, hasContributions }) if (hasContributions) { - promises.push( - findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId) - .then((caches) => { - if (caches.length > 0 && caches[0].data) { - const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService( - MemberEnrichmentSource.PROGAI, - svc.log, - ) - return progaiService.normalize(caches[0].data) - } - - return undefined - }) - .then((normalized) => { - if (normalized) { - const typed = normalized as IMemberEnrichmentDataNormalized + const caches = await findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId) + if (caches?.length > 0 && caches[0]?.data) { + const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService( + MemberEnrichmentSource.PROGAI, + svc.log, + ) + const normalized = await progaiService.normalize(caches[0].data) + if (normalized) { + const typed = normalized as IMemberEnrichmentDataNormalized - if (typed.contributions) { - updated = true - return updateMemberContributions(qx, memberId, typed.contributions) - } - } - }), - ) + if (typed.contributions) { + updated = true + await updateMemberContributions(qx, memberId, typed.contributions) + } + } + } } // process attributes @@ -312,7 +301,7 @@ export async function updateMemberUsingSquashedPayload( attributes = await setAttributesDefaultValues(attributes, priorities) } updated = true - promises.push(updateMemberAttributes(qx, memberId, attributes)) + await updateMemberAttributes(qx, memberId, attributes) } // process reach @@ -332,7 +321,7 @@ export async function updateMemberUsingSquashedPayload( } updated = true - promises.push(updateMemberReach(qx, memberId, reach)) + await updateMemberReach(qx, memberId, reach) } } @@ -422,7 +411,7 @@ export async function updateMemberUsingSquashedPayload( if (results.toDelete.length > 0) { for (const org of results.toDelete) { updated = true - promises.push(deleteMemberOrgById(tx.transaction(), memberId, org.id)) + await deleteMemberOrgById(tx.transaction(), memberId, org.orgId) } } @@ -432,16 +421,14 @@ export async function updateMemberUsingSquashedPayload( throw new Error('Organization ID is missing!') } updated = true - promises.push( - insertWorkExperience( - tx.transaction(), - memberId, - org.organizationId, - org.title, - org.startDate, - org.endDate, - org.source, - ), + await insertWorkExperience( + tx.transaction(), + memberId, + org.organizationId, + org.title, + org.startDate, + org.endDate, + org.source, ) } } @@ -449,13 +436,11 @@ export async function updateMemberUsingSquashedPayload( if (results.toUpdate.size > 0) { for (const [org, toUpdate] of results.toUpdate) { updated = true - promises.push(updateMemberOrg(tx.transaction(), memberId, org, toUpdate)) + await updateMemberOrg(tx.transaction(), memberId, org, toUpdate) } } } - await Promise.all(promises) - if (updated) { await setMemberEnrichmentUpdatedAt(tx.transaction(), memberId) await syncMember(memberId) From 74b8ffd875facf0087e4f41f1da0066385d861a1 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Mon, 10 Nov 2025 17:28:07 +0530 Subject: [PATCH 4/5] chore: bump workflowExecutionTimeout in scheduleMembersEnrichment --- .../src/schedules/getMembersToEnrich.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts b/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts index dfd983670c..ea1884a817 100644 --- a/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts +++ b/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts @@ -24,7 +24,7 @@ export const scheduleMembersEnrichment = async () => { type: 'startWorkflow', workflowType: getMembersToEnrich, taskQueue: 'members-enrichment', - workflowExecutionTimeout: '45 minutes', + workflowExecutionTimeout: '2 hours', retry: { initialInterval: '15 seconds', backoffCoefficient: 2, From d3f1ebdc4bebe5641ba47f240a4f62d9b7ee47d6 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Mon, 10 Nov 2025 18:00:31 +0530 Subject: [PATCH 5/5] refactor: delete logic in updateMemberUsingSquashedPayload --- .../members_enrichment_worker/src/activities/enrichment.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/members_enrichment_worker/src/activities/enrichment.ts index 015c3b7642..606ea7f2c7 100644 --- a/services/apps/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/members_enrichment_worker/src/activities/enrichment.ts @@ -276,7 +276,7 @@ export async function updateMemberUsingSquashedPayload( MemberEnrichmentSource.PROGAI, svc.log, ) - const normalized = await progaiService.normalize(caches[0].data) + const normalized = progaiService.normalize(caches[0].data) if (normalized) { const typed = normalized as IMemberEnrichmentDataNormalized @@ -411,7 +411,7 @@ export async function updateMemberUsingSquashedPayload( if (results.toDelete.length > 0) { for (const org of results.toDelete) { updated = true - await deleteMemberOrgById(tx.transaction(), memberId, org.orgId) + await deleteMemberOrgById(tx.transaction(), memberId, org.id) } }