diff --git a/services/apps/members_enrichment_worker/src/activities/enrichment.ts b/services/apps/members_enrichment_worker/src/activities/enrichment.ts index 7fa2a32c2e..606ea7f2c7 100644 --- a/services/apps/members_enrichment_worker/src/activities/enrichment.ts +++ b/services/apps/members_enrichment_worker/src/activities/enrichment.ts @@ -249,22 +249,19 @@ export async function updateMemberUsingSquashedPayload( return await svc.postgres.writer.transactionally(async (tx) => { let updated = false const qx = dbStoreQx(tx) - const promises = [] // process identities if (squashedPayload.identities.length > 0) { svc.log.debug({ memberId }, 'Adding to member identities!') for (const i of squashedPayload.identities) { updated = true - promises.push( - upsertMemberIdentity(qx, { - memberId, - platform: i.platform, - type: i.type, - value: i.value, - verified: i.verified, - }), - ) + await upsertMemberIdentity(qx, { + memberId, + platform: i.platform, + type: i.type, + value: i.value, + verified: i.verified, + }) } } @@ -273,30 +270,22 @@ export async function updateMemberUsingSquashedPayload( // it's ommited from the payload because it takes a lot of space svc.log.debug('Processing contributions! ', { memberId, hasContributions }) if (hasContributions) { - promises.push( - findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId) - .then((caches) => { - if (caches.length > 0 && caches[0].data) { - const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService( - MemberEnrichmentSource.PROGAI, - svc.log, - ) - return progaiService.normalize(caches[0].data) - } - - return undefined - }) - .then((normalized) => { - if (normalized) { - const typed = normalized as IMemberEnrichmentDataNormalized + const caches = await findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId) + if (caches?.length > 0 && caches[0]?.data) { + const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService( + MemberEnrichmentSource.PROGAI, + svc.log, + ) + const normalized = progaiService.normalize(caches[0].data) + if (normalized) { + const typed = normalized as IMemberEnrichmentDataNormalized - if (typed.contributions) { - updated = true - return updateMemberContributions(qx, memberId, typed.contributions) - } - } - }), - ) + if (typed.contributions) { + updated = true + await updateMemberContributions(qx, memberId, typed.contributions) + } + } + } } // process attributes @@ -312,7 +301,7 @@ export async function updateMemberUsingSquashedPayload( attributes = await setAttributesDefaultValues(attributes, priorities) } updated = true - promises.push(updateMemberAttributes(qx, memberId, attributes)) + await updateMemberAttributes(qx, memberId, attributes) } // process reach @@ -332,7 +321,7 @@ export async function updateMemberUsingSquashedPayload( } updated = true - promises.push(updateMemberReach(qx, memberId, reach)) + await updateMemberReach(qx, memberId, reach) } } @@ -422,7 +411,7 @@ export async function updateMemberUsingSquashedPayload( if (results.toDelete.length > 0) { for (const org of results.toDelete) { updated = true - promises.push(deleteMemberOrgById(tx.transaction(), memberId, org.id)) + await deleteMemberOrgById(tx.transaction(), memberId, org.id) } } @@ -432,16 +421,14 @@ export async function updateMemberUsingSquashedPayload( throw new Error('Organization ID is missing!') } updated = true - promises.push( - insertWorkExperience( - tx.transaction(), - memberId, - org.organizationId, - org.title, - org.startDate, - org.endDate, - org.source, - ), + await insertWorkExperience( + tx.transaction(), + memberId, + org.organizationId, + org.title, + org.startDate, + org.endDate, + org.source, ) } } @@ -449,13 +436,11 @@ export async function updateMemberUsingSquashedPayload( if (results.toUpdate.size > 0) { for (const [org, toUpdate] of results.toUpdate) { updated = true - promises.push(updateMemberOrg(tx.transaction(), memberId, org, toUpdate)) + await updateMemberOrg(tx.transaction(), memberId, org, toUpdate) } } } - await Promise.all(promises) - if (updated) { await setMemberEnrichmentUpdatedAt(tx.transaction(), memberId) await syncMember(memberId) diff --git a/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts b/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts index 9d6e4e4991..ea1884a817 100644 --- a/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts +++ b/services/apps/members_enrichment_worker/src/schedules/getMembersToEnrich.ts @@ -24,7 +24,7 @@ export const scheduleMembersEnrichment = async () => { type: 'startWorkflow', workflowType: getMembersToEnrich, taskQueue: 'members-enrichment', - workflowExecutionTimeout: '20 minutes', + workflowExecutionTimeout: '2 hours', retry: { initialInterval: '15 seconds', backoffCoefficient: 2, diff --git a/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts b/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts index ac3cdef434..71d88192e2 100644 --- a/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts +++ b/services/apps/members_enrichment_worker/src/workflows/enrichMember.ts @@ -24,7 +24,7 @@ const { hasRemainingCredits, getMemberById, } = proxyActivities({ - startToCloseTimeout: '5 minutes', + startToCloseTimeout: '10 minutes', retry: { initialInterval: '60s', backoffCoefficient: 2.0, @@ -82,7 +82,7 @@ export async function enrichMember( workflowId: 'member-enrichment/' + input.id + '/processMemberSources', cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - workflowExecutionTimeout: '15 minutes', + workflowExecutionTimeout: '25 minutes', retry: { backoffCoefficient: 2, maximumAttempts: 10, diff --git a/services/apps/members_enrichment_worker/src/workflows/getMembersToEnrich.ts b/services/apps/members_enrichment_worker/src/workflows/getMembersToEnrich.ts index e6c1a897d2..b749682792 100644 --- a/services/apps/members_enrichment_worker/src/workflows/getMembersToEnrich.ts +++ b/services/apps/members_enrichment_worker/src/workflows/getMembersToEnrich.ts @@ -14,7 +14,7 @@ import { chunkArray } from '../utils/common' import { enrichMember } from './enrichMember' const { getEnrichableMembers, getMaxConcurrentRequests } = proxyActivities({ - startToCloseTimeout: '10 minutes', + startToCloseTimeout: '15 minutes', }) export async function getMembersToEnrich(): Promise { @@ -46,7 +46,7 @@ export async function getMembersToEnrich(): Promise { workflowId: 'member-enrichment/' + member.id, cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - workflowExecutionTimeout: '15 minutes', + workflowExecutionTimeout: '30 minutes', retry: { backoffCoefficient: 2, maximumAttempts: 10, diff --git a/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts b/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts index 86e7056d04..20c2989cc8 100644 --- a/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts +++ b/services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts @@ -469,25 +469,28 @@ export async function deleteMemberOrg(db: DbConnOrTx, memberId: string, organiza }) } -export async function deleteMemberOrgById(db: DbConnOrTx, memberId: string, id: string) { - await db.tx(async (tx) => { - await tx.none( - ` +export async function deleteMemberOrgById( + tx: DbTransaction, + memberId: string, + id: string, +): Promise { + // Execute directly on the provided transaction to avoid creating nested savepoints + await tx.none( + ` DELETE FROM "memberOrganizationAffiliationOverrides" WHERE "memberOrganizationId" = $(id); - `, - { id }, - ) + `, + { id }, + ) - await tx.none( - ` + await tx.none( + ` UPDATE "memberOrganizations" SET "deletedAt" = NOW() WHERE "memberId" = $(memberId) and id = $(id); - `, - { memberId, id }, - ) - }) + `, + { memberId, id }, + ) } export async function findMemberOrgs(db: DbStore, memberId: string, orgId: string) {