Skip to content

Commit 34fdd7e

Browse files
committed
refactor: updateMemberUsingSquashedPayload to use sequential db calls
1 parent 87d60d5 commit 34fdd7e

1 file changed

Lines changed: 34 additions & 49 deletions

File tree

  • services/apps/members_enrichment_worker/src/activities

services/apps/members_enrichment_worker/src/activities/enrichment.ts

Lines changed: 34 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -249,22 +249,19 @@ export async function updateMemberUsingSquashedPayload(
249249
return await svc.postgres.writer.transactionally(async (tx) => {
250250
let updated = false
251251
const qx = dbStoreQx(tx)
252-
const promises = []
253252

254253
// process identities
255254
if (squashedPayload.identities.length > 0) {
256255
svc.log.debug({ memberId }, 'Adding to member identities!')
257256
for (const i of squashedPayload.identities) {
258257
updated = true
259-
promises.push(
260-
upsertMemberIdentity(qx, {
261-
memberId,
262-
platform: i.platform,
263-
type: i.type,
264-
value: i.value,
265-
verified: i.verified,
266-
}),
267-
)
258+
await upsertMemberIdentity(qx, {
259+
memberId,
260+
platform: i.platform,
261+
type: i.type,
262+
value: i.value,
263+
verified: i.verified,
264+
})
268265
}
269266
}
270267

@@ -273,30 +270,22 @@ export async function updateMemberUsingSquashedPayload(
273270
// it's ommited from the payload because it takes a lot of space
274271
svc.log.debug('Processing contributions! ', { memberId, hasContributions })
275272
if (hasContributions) {
276-
promises.push(
277-
findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId)
278-
.then((caches) => {
279-
if (caches.length > 0 && caches[0].data) {
280-
const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
281-
MemberEnrichmentSource.PROGAI,
282-
svc.log,
283-
)
284-
return progaiService.normalize(caches[0].data)
285-
}
286-
287-
return undefined
288-
})
289-
.then((normalized) => {
290-
if (normalized) {
291-
const typed = normalized as IMemberEnrichmentDataNormalized
273+
const caches = await findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId)
274+
if (caches?.length > 0 && caches[0]?.data) {
275+
const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
276+
MemberEnrichmentSource.PROGAI,
277+
svc.log,
278+
)
279+
const normalized = await progaiService.normalize(caches[0].data)
280+
if (normalized) {
281+
const typed = normalized as IMemberEnrichmentDataNormalized
292282

293-
if (typed.contributions) {
294-
updated = true
295-
return updateMemberContributions(qx, memberId, typed.contributions)
296-
}
297-
}
298-
}),
299-
)
283+
if (typed.contributions) {
284+
updated = true
285+
await updateMemberContributions(qx, memberId, typed.contributions)
286+
}
287+
}
288+
}
300289
}
301290

302291
// process attributes
@@ -312,7 +301,7 @@ export async function updateMemberUsingSquashedPayload(
312301
attributes = await setAttributesDefaultValues(attributes, priorities)
313302
}
314303
updated = true
315-
promises.push(updateMemberAttributes(qx, memberId, attributes))
304+
await updateMemberAttributes(qx, memberId, attributes)
316305
}
317306

318307
// process reach
@@ -332,7 +321,7 @@ export async function updateMemberUsingSquashedPayload(
332321
}
333322

334323
updated = true
335-
promises.push(updateMemberReach(qx, memberId, reach))
324+
await updateMemberReach(qx, memberId, reach)
336325
}
337326
}
338327

@@ -422,7 +411,7 @@ export async function updateMemberUsingSquashedPayload(
422411
if (results.toDelete.length > 0) {
423412
for (const org of results.toDelete) {
424413
updated = true
425-
promises.push(deleteMemberOrgById(tx.transaction(), memberId, org.id))
414+
await deleteMemberOrgById(tx.transaction(), memberId, org.orgId)
426415
}
427416
}
428417

@@ -432,30 +421,26 @@ export async function updateMemberUsingSquashedPayload(
432421
throw new Error('Organization ID is missing!')
433422
}
434423
updated = true
435-
promises.push(
436-
insertWorkExperience(
437-
tx.transaction(),
438-
memberId,
439-
org.organizationId,
440-
org.title,
441-
org.startDate,
442-
org.endDate,
443-
org.source,
444-
),
424+
await insertWorkExperience(
425+
tx.transaction(),
426+
memberId,
427+
org.organizationId,
428+
org.title,
429+
org.startDate,
430+
org.endDate,
431+
org.source,
445432
)
446433
}
447434
}
448435

449436
if (results.toUpdate.size > 0) {
450437
for (const [org, toUpdate] of results.toUpdate) {
451438
updated = true
452-
promises.push(updateMemberOrg(tx.transaction(), memberId, org, toUpdate))
439+
await updateMemberOrg(tx.transaction(), memberId, org, toUpdate)
453440
}
454441
}
455442
}
456443

457-
await Promise.all(promises)
458-
459444
if (updated) {
460445
await setMemberEnrichmentUpdatedAt(tx.transaction(), memberId)
461446
await syncMember(memberId)

0 commit comments

Comments
 (0)