Skip to content

Commit 69c0e64

Browse files
committed
fix: pgsql savepoint error (3B001)
1 parent a16c387 commit 69c0e64

1 file changed

Lines changed: 34 additions & 50 deletions

File tree

  • services/apps/members_enrichment_worker/src/activities

services/apps/members_enrichment_worker/src/activities/enrichment.ts

Lines changed: 34 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -249,22 +249,19 @@ export async function updateMemberUsingSquashedPayload(
249249
return await svc.postgres.writer.transactionally(async (tx) => {
250250
let updated = false
251251
const qx = dbStoreQx(tx)
252-
const promises = []
253252

254253
// process identities
255254
if (squashedPayload.identities.length > 0) {
256255
svc.log.debug({ memberId }, 'Adding to member identities!')
257256
for (const i of squashedPayload.identities) {
258257
updated = true
259-
promises.push(
260-
upsertMemberIdentity(qx, {
261-
memberId,
262-
platform: i.platform,
263-
type: i.type,
264-
value: i.value,
265-
verified: i.verified,
266-
}),
267-
)
258+
await upsertMemberIdentity(qx, {
259+
memberId,
260+
platform: i.platform,
261+
type: i.type,
262+
value: i.value,
263+
verified: i.verified,
264+
})
268265
}
269266
}
270267

@@ -273,30 +270,21 @@ export async function updateMemberUsingSquashedPayload(
273270
// it's ommited from the payload because it takes a lot of space
274271
svc.log.debug('Processing contributions! ', { memberId, hasContributions })
275272
if (hasContributions) {
276-
promises.push(
277-
findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId)
278-
.then((caches) => {
279-
if (caches.length > 0 && caches[0].data) {
280-
const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
281-
MemberEnrichmentSource.PROGAI,
282-
svc.log,
283-
)
284-
return progaiService.normalize(caches[0].data)
285-
}
286-
287-
return undefined
288-
})
289-
.then((normalized) => {
290-
if (normalized) {
291-
const typed = normalized as IMemberEnrichmentDataNormalized
292-
293-
if (typed.contributions) {
294-
updated = true
295-
return updateMemberContributions(qx, memberId, typed.contributions)
296-
}
297-
}
298-
}),
299-
)
273+
const caches = await findMemberEnrichmentCache([MemberEnrichmentSource.PROGAI], memberId)
274+
if (caches?.length > 0 && caches[0]?.data) {
275+
const progaiService = EnrichmentSourceServiceFactory.getEnrichmentSourceService(
276+
MemberEnrichmentSource.PROGAI,
277+
svc.log,
278+
)
279+
const normalized = progaiService.normalize(caches[0]?.data)
280+
if (normalized) {
281+
const typed = normalized as IMemberEnrichmentDataNormalized
282+
if (typed.contributions) {
283+
updated = true
284+
await updateMemberContributions(qx, memberId, typed.contributions)
285+
}
286+
}
287+
}
300288
}
301289

302290
// process attributes
@@ -312,7 +300,7 @@ export async function updateMemberUsingSquashedPayload(
312300
attributes = await setAttributesDefaultValues(attributes, priorities)
313301
}
314302
updated = true
315-
promises.push(updateMemberAttributes(qx, memberId, attributes))
303+
await updateMemberAttributes(qx, memberId, attributes)
316304
}
317305

318306
// process reach
@@ -332,7 +320,7 @@ export async function updateMemberUsingSquashedPayload(
332320
}
333321

334322
updated = true
335-
promises.push(updateMemberReach(qx, memberId, reach))
323+
await updateMemberReach(qx, memberId, reach)
336324
}
337325
}
338326

@@ -422,7 +410,7 @@ export async function updateMemberUsingSquashedPayload(
422410
if (results.toDelete.length > 0) {
423411
for (const org of results.toDelete) {
424412
updated = true
425-
promises.push(deleteMemberOrgById(tx.transaction(), memberId, org.id))
413+
await deleteMemberOrgById(tx.transaction(), memberId, org.id)
426414
}
427415
}
428416

@@ -432,30 +420,26 @@ export async function updateMemberUsingSquashedPayload(
432420
throw new Error('Organization ID is missing!')
433421
}
434422
updated = true
435-
promises.push(
436-
insertWorkExperience(
437-
tx.transaction(),
438-
memberId,
439-
org.organizationId,
440-
org.title,
441-
org.startDate,
442-
org.endDate,
443-
org.source,
444-
),
423+
await insertWorkExperience(
424+
tx.transaction(),
425+
memberId,
426+
org.organizationId,
427+
org.title,
428+
org.startDate,
429+
org.endDate,
430+
org.source,
445431
)
446432
}
447433
}
448434

449435
if (results.toUpdate.size > 0) {
450436
for (const [org, toUpdate] of results.toUpdate) {
451437
updated = true
452-
promises.push(updateMemberOrg(tx.transaction(), memberId, org, toUpdate))
438+
await updateMemberOrg(tx.transaction(), memberId, org, toUpdate)
453439
}
454440
}
455441
}
456442

457-
await Promise.all(promises)
458-
459443
if (updated) {
460444
await setMemberEnrichmentUpdatedAt(tx.transaction(), memberId)
461445
await syncMember(memberId)

0 commit comments

Comments
 (0)