Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -249,22 +249,19 @@ export async function updateMemberUsingSquashedPayload(
return await svc.postgres.writer.transactionally(async (tx) => {
let updated = false
const qx = dbStoreQx(tx)
const promises = []

// process identities
if (squashedPayload.identities.length > 0) {
svc.log.debug({ memberId }, 'Adding to member identities!')
for (const i of squashedPayload.identities) {
updated = true
promises.push(
upsertMemberIdentity(qx, {
memberId,
platform: i.platform,
type: i.type,
value: i.value,
verified: i.verified,
}),
)
await upsertMemberIdentity(qx, {
memberId,
platform: i.platform,
type: i.type,
value: i.value,
verified: i.verified,
})
Comment thread
skwowet marked this conversation as resolved.
}
}

Expand All @@ -273,30 +270,22 @@ export async function updateMemberUsingSquashedPayload(
// it's ommited from the payload because it takes a lot of space
svc.log.debug('Processing contributions! ', { memberId, hasContributions })
if (hasContributions) {
promises.push(
findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId)
.then((caches) => {
if (caches.length > 0 && caches[0].data) {
const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
MemberEnrichmentSource.PROGAI,
svc.log,
)
return progaiService.normalize(caches[0].data)
}

return undefined
})
.then((normalized) => {
if (normalized) {
const typed = normalized as IMemberEnrichmentDataNormalized
const caches = await findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId)
if (caches?.length > 0 && caches[0]?.data) {
const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
MemberEnrichmentSource.PROGAI,
svc.log,
)
const normalized = progaiService.normalize(caches[0].data)
if (normalized) {
const typed = normalized as IMemberEnrichmentDataNormalized

if (typed.contributions) {
updated = true
return updateMemberContributions(qx, memberId, typed.contributions)
}
}
}),
)
if (typed.contributions) {
updated = true
await updateMemberContributions(qx, memberId, typed.contributions)
}
}
}
}

// process attributes
Expand All @@ -312,7 +301,7 @@ export async function updateMemberUsingSquashedPayload(
attributes = await setAttributesDefaultValues(attributes, priorities)
}
updated = true
promises.push(updateMemberAttributes(qx, memberId, attributes))
await updateMemberAttributes(qx, memberId, attributes)
}

// process reach
Expand All @@ -332,7 +321,7 @@ export async function updateMemberUsingSquashedPayload(
}

updated = true
promises.push(updateMemberReach(qx, memberId, reach))
await updateMemberReach(qx, memberId, reach)
}
}

Expand Down Expand Up @@ -422,7 +411,7 @@ export async function updateMemberUsingSquashedPayload(
if (results.toDelete.length > 0) {
for (const org of results.toDelete) {
updated = true
promises.push(deleteMemberOrgById(tx.transaction(), memberId, org.id))
await deleteMemberOrgById(tx.transaction(), memberId, org.id)
}
Comment thread
skwowet marked this conversation as resolved.
}

Expand All @@ -432,30 +421,26 @@ export async function updateMemberUsingSquashedPayload(
throw new Error('Organization ID is missing!')
}
updated = true
promises.push(
insertWorkExperience(
tx.transaction(),
memberId,
org.organizationId,
org.title,
org.startDate,
org.endDate,
org.source,
),
await insertWorkExperience(
tx.transaction(),
memberId,
org.organizationId,
org.title,
org.startDate,
org.endDate,
org.source,
)
}
}

if (results.toUpdate.size > 0) {
for (const [org, toUpdate] of results.toUpdate) {
updated = true
promises.push(updateMemberOrg(tx.transaction(), memberId, org, toUpdate))
await updateMemberOrg(tx.transaction(), memberId, org, toUpdate)
}
Comment thread
skwowet marked this conversation as resolved.
}
}

await Promise.all(promises)

if (updated) {
await setMemberEnrichmentUpdatedAt(tx.transaction(), memberId)
await syncMember(memberId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const scheduleMembersEnrichment = async () => {
type: 'startWorkflow',
workflowType: getMembersToEnrich,
taskQueue: 'members-enrichment',
workflowExecutionTimeout: '20 minutes',
workflowExecutionTimeout: '2 hours',
retry: {
initialInterval: '15 seconds',
backoffCoefficient: 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const {
hasRemainingCredits,
getMemberById,
} = proxyActivities<typeof activities>({
startToCloseTimeout: '5 minutes',
startToCloseTimeout: '10 minutes',
retry: {
initialInterval: '60s',
backoffCoefficient: 2.0,
Expand Down Expand Up @@ -82,7 +82,7 @@ export async function enrichMember(
workflowId: 'member-enrichment/' + input.id + '/processMemberSources',
cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
workflowExecutionTimeout: '15 minutes',
workflowExecutionTimeout: '25 minutes',
retry: {
backoffCoefficient: 2,
maximumAttempts: 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { chunkArray } from '../utils/common'
import { enrichMember } from './enrichMember'

const { getEnrichableMembers, getMaxConcurrentRequests } = proxyActivities<typeof activities>({
startToCloseTimeout: '10 minutes',
startToCloseTimeout: '15 minutes',
})

export async function getMembersToEnrich(): Promise<void> {
Expand Down Expand Up @@ -46,7 +46,7 @@ export async function getMembersToEnrich(): Promise<void> {
workflowId: 'member-enrichment/' + member.id,
cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
workflowExecutionTimeout: '15 minutes',
workflowExecutionTimeout: '30 minutes',
retry: {
backoffCoefficient: 2,
maximumAttempts: 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,25 +469,28 @@ export async function deleteMemberOrg(db: DbConnOrTx, memberId: string, organiza
})
}

export async function deleteMemberOrgById(db: DbConnOrTx, memberId: string, id: string) {
await db.tx(async (tx) => {
await tx.none(
`
export async function deleteMemberOrgById(
tx: DbTransaction,
memberId: string,
id: string,
): Promise<void> {
// Execute directly on the provided transaction to avoid creating nested savepoints
await tx.none(
`
DELETE FROM "memberOrganizationAffiliationOverrides"
WHERE "memberOrganizationId" = $(id);
`,
{ id },
)
`,
{ id },
)

await tx.none(
`
await tx.none(
`
UPDATE "memberOrganizations"
SET "deletedAt" = NOW()
WHERE "memberId" = $(memberId) and id = $(id);
`,
{ memberId, id },
)
})
`,
{ memberId, id },
)
}

export async function findMemberOrgs(db: DbStore, memberId: string, orgId: string) {
Expand Down
Loading