Skip to content

Commit 750214d

Browse files
committed
chore(integrations): improved logging and bugfixes
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 267fb87 commit 750214d

2 files changed

Lines changed: 40 additions & 6 deletions

File tree

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ export default class ActivityService extends LoggerBase {
421421
const resultMap = new Map<string, { success: boolean; err?: any }>()
422422

423423
let relevantPayloads = payloads
424+
this.log.trace(`[ACTIVITY] Processing ${relevantPayloads.length} activities!`)
424425

425426
const prepareMemberResults = this.prepareMemberData(relevantPayloads)
426427

@@ -437,6 +438,10 @@ export default class ActivityService extends LoggerBase {
437438
return resultMap
438439
}
439440

441+
this.log.trace(
442+
`[ACTIVITY] We still have ${relevantPayloads.length} activities left to process after member preparation!`,
443+
)
444+
440445
const allMemberIdentities = relevantPayloads
441446
.flatMap((a) => a.activity.member.identities)
442447
.concat(
@@ -598,6 +603,10 @@ export default class ActivityService extends LoggerBase {
598603

599604
await Promise.all(promises)
600605

606+
this.log.trace(
607+
`[ACTIVITY] We still have ${relevantPayloads.length} activities left to process after finding segments!`,
608+
)
609+
601610
const orConditions = relevantPayloads.map((r) => {
602611
return {
603612
and: [
@@ -823,7 +832,9 @@ export default class ActivityService extends LoggerBase {
823832

824833
// we should have now all relevant dbActivity, dbMember and dbObjectMember set
825834
// we can now upsert activities and members
826-
835+
this.log.trace(
836+
`[ACTIVITY] We still have ${relevantPayloads.length} activities left after mapping db activities and members!`,
837+
)
827838
const preparedActivities: IActivityPrepareForUpsertResult[] = []
828839

829840
const memberService = new MemberService(
@@ -1028,9 +1039,12 @@ export default class ActivityService extends LoggerBase {
10281039
)
10291040
}
10301041

1042+
this.log.trace(`[ACTIVITY] We have ${preparedActivities.length} intermediate results!`)
1043+
10311044
const preparedForUpsert = preparedActivities.filter((a) => a.payload)
10321045
const toUpsert = preparedForUpsert.map((a) => a.payload)
10331046
if (toUpsert.length > 0) {
1047+
this.log.trace(`[ACTIVITY] Upserting ${toUpsert.length} activities!`)
10341048
await insertActivities(this.client, toUpsert)
10351049
}
10361050

@@ -1070,8 +1084,6 @@ export default class ActivityService extends LoggerBase {
10701084
)
10711085
}
10721086

1073-
resultMap.set(prepared.resultId, { success: true })
1074-
10751087
await this.searchSyncWorkerEmitter.triggerMemberSync(
10761088
prepared.payload.memberId,
10771089
onboarding,
@@ -1094,6 +1106,11 @@ export default class ActivityService extends LoggerBase {
10941106
}
10951107
}
10961108

1109+
for (const prepared of preparedActivities) {
1110+
resultMap.set(prepared.resultId, { success: true })
1111+
}
1112+
1113+
this.log.trace(`[ACTIVITY] We have ${resultMap.size} results to return!`)
10971114
return resultMap
10981115
}
10991116

services/apps/data_sink_worker/src/service/dataSink.service.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,14 +209,17 @@ export default class DataSinkService extends LoggerBase {
209209

210210
await Promise.all(promises)
211211

212-
this.log.info(`Prepared ${prepared.length} in memory results stored ${promises.length} in db!`)
212+
this.log.info(
213+
`[RESULTS] Prepared ${prepared.length} in memory results stored ${promises.length} in db!`,
214+
)
213215

214216
return prepared
215217
}
216218

217219
public async processResults(
218220
batch: { resultId: string; data: IResultData | undefined; created: boolean }[],
219221
): Promise<void> {
222+
this.log.trace(`[RESULTS] Processing ${batch.length} results!`)
220223
const start = performance.now()
221224

222225
const results: IResultData[] = []
@@ -233,6 +236,7 @@ export default class DataSinkService extends LoggerBase {
233236
}
234237

235238
if (toLoadById.length > 0) {
239+
this.log.info(`[RESULTS] Loading ${toLoadById.length} results from db!`)
236240
const infos = await this.repo.getResultInfos(toLoadById)
237241
results.push(...infos)
238242
}
@@ -269,10 +273,12 @@ export default class DataSinkService extends LoggerBase {
269273

270274
while (toProcess.length > 0 && retry < 5) {
271275
if (retry > 0) {
276+
this.log.trace(`[RESULTS] Retrying but sleeping first...`)
272277
await timeout(100)
273278
}
274279

275280
try {
281+
this.log.trace(`[RESULTS] Processing ${toProcess.length} activity results...`)
276282
const processResults = await service.processActivities(
277283
toProcess.map((r) => {
278284
return {
@@ -296,6 +302,10 @@ export default class DataSinkService extends LoggerBase {
296302
}
297303
}
298304

305+
this.log.trace(
306+
`[RESULTS] Processed ${processResults.size} activity results! We have total of ${resultMap.size} results for this batch and ${toProcess.length} to retry.`,
307+
)
308+
299309
// clear last error because we processed without unhandled error
300310
lastError = undefined
301311
} catch (err) {
@@ -310,6 +320,9 @@ export default class DataSinkService extends LoggerBase {
310320

311321
// if lastError is still set and we have some left to process, we set the error for them cuz they were retried but failed
312322
if (lastError && toProcess.length > 0) {
323+
this.log.trace(
324+
`[RESULTS] Setting error for ${toProcess.length} activity results because we hit a retry limit!`,
325+
)
313326
for (const leftToProcess of toProcess) {
314327
resultMap.set(leftToProcess.id, {
315328
success: false,
@@ -331,15 +344,18 @@ export default class DataSinkService extends LoggerBase {
331344
const memberData = entry.data.data as IMemberData
332345

333346
await service.processMemberUpdate(entry.integrationId, entry.platform, memberData)
347+
resultMap.set(entry.id, { success: true })
334348
} catch (err) {
335349
resultMap.set(entry.id, { success: false, err })
336350
}
337351
}
338352
} else {
339-
this.log.error(`Unknown result type: ${type}!`)
353+
this.log.error(`[RESULTS] Unknown result type: ${type}!`)
340354
}
341355
}
342356

357+
this.log.trace(`[RESULTS] Processing ${resultMap.size} process results!`)
358+
343359
// handle results
344360
let errors = 0
345361
let successes = 0
@@ -371,10 +387,11 @@ export default class DataSinkService extends LoggerBase {
371387
}
372388

373389
if (resultsToDelete.length > 0) {
390+
this.log.trace(`[RESULTS] Deleting ${resultsToDelete.length} results from db!`)
374391
await this.repo.deleteResults(resultsToDelete)
375392
}
376393

377-
this.log.info(
394+
this.log.trace(
378395
`Processed ${successes} results successfully, ${errors} with error, ${deletions} were deleted from db!`,
379396
)
380397

0 commit comments

Comments
 (0)