Skip to content

Commit e679eba

Browse files
feat(grouper): add slow handle diagnostics (#549) (#560)
* feat(grouper): add slow handle diagnostics * refactor(grouper): extract slow handle diagnostics into session * fix(grouper): use monotonic time and exclusive timings in slow handle diagnostics Co-authored-by: Kuchizu <70284260+Kuchizu@users.noreply.github.com>
1 parent 10c302a commit e679eba

4 files changed

Lines changed: 374 additions & 75 deletions

File tree

workers/grouper/src/index.ts

Lines changed: 147 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import { rightTrim } from '../../../lib/utils/string';
2828
import { hasValue } from '../../../lib/utils/hasValue';
2929
import GrouperMetrics from './metrics/grouperMetrics';
3030
import GrouperMemoryMonitor from './metrics/memoryMonitor';
31-
import { grouperMemoryConfig } from './metrics/config';
31+
import SlowHandleDiagnostics, { SlowHandleSession } from './metrics/slowHandleDiagnostics';
32+
import { grouperDiagnosticsConfig, grouperMemoryConfig } from './metrics/config';
3233

3334
/**
3435
* eslint does not count decorators as a variable usage
@@ -101,6 +102,11 @@ export default class GrouperWorker extends Worker {
101102
*/
102103
private memoryMonitor = new GrouperMemoryMonitor(this.logger, grouperMemoryConfig);
103104

105+
/**
106+
* Slow handle diagnostics helper.
107+
*/
108+
private slowHandleDiagnostics = new SlowHandleDiagnostics(this.logger, this.grouperMetrics, grouperDiagnosticsConfig);
109+
104110
/**
105111
* Interval for periodic cache cleanup to prevent memory leaks from unbounded cache growth
106112
*/
@@ -180,9 +186,14 @@ export default class GrouperWorker extends Worker {
180186
* @param task - event to handle
181187
*/
182188
private async handleInternal(task: GroupWorkerTask<ErrorsCatcherType>): Promise<void> {
183-
const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload));
189+
const session = this.slowHandleDiagnostics.startSession();
190+
const taskPayloadSize = await session.measureStep('payloadSize', () => {
191+
return Buffer.byteLength(JSON.stringify(task.payload));
192+
});
184193
const handledTasksCount = ++this.handledTasksCount;
185194
const memoryBeforeHandle = process.memoryUsage();
195+
let deltaSize = 0;
196+
let eventType: 'new' | 'repeated' = 'new';
186197

187198
this.grouperMetrics.observePayloadSize(taskPayloadSize);
188199
this.memoryMonitor.logBeforeHandle(memoryBeforeHandle, handledTasksCount, taskPayloadSize, task.projectId);
@@ -198,25 +209,29 @@ export default class GrouperWorker extends Worker {
198209
};
199210
}
200211

201-
let uniqueEventHash = await this.getUniqueEventHash(task);
212+
let uniqueEventHash = await session.measureStep('hash', () => this.getUniqueEventHash(task));
202213
let existedEvent: GroupedEventDBScheme;
203214
let repetitionId = null;
204215
let incrementDailyAffectedUsers = false;
205216

206-
/**
207-
* Trim source code lines to prevent memory leaks
208-
*/
209-
this.trimSourceCodeLines(task.payload);
217+
await session.measureStep('preprocess', () => {
218+
/**
219+
* Trim source code lines to prevent memory leaks
220+
*/
221+
this.trimSourceCodeLines(task.payload);
210222

211-
/**
212-
* Filter sensitive information
213-
*/
214-
this.dataFilter.processEvent(task.payload);
223+
/**
224+
* Filter sensitive information
225+
*/
226+
this.dataFilter.processEvent(task.payload);
227+
});
215228

216229
/**
217230
* Find similar events by grouping pattern
218231
*/
219-
const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title);
232+
const similarEvent = await session.measureStep('findSimilarEvent', () => {
233+
return this.findSimilarEvent(task.projectId, task.payload.title);
234+
});
220235

221236
if (similarEvent) {
222237
this.logger.info(`[handle] project=${task.projectId} title="${task.payload.title}" similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`);
@@ -234,14 +249,18 @@ export default class GrouperWorker extends Worker {
234249
/**
235250
* Find event by group hash.
236251
*/
237-
existedEvent = await this.getEvent(task.projectId, uniqueEventHash);
252+
existedEvent = await session.measureStep('getEvent', () => {
253+
return this.getEvent(task.projectId, uniqueEventHash);
254+
});
238255
}
239256

240257
/**
241258
* Event happened for the first time
242259
*/
243260
const isFirstOccurrence = !existedEvent && !similarEvent;
244261

262+
eventType = isFirstOccurrence ? 'new' : 'repeated';
263+
245264
if (isFirstOccurrence) {
246265
try {
247266
const incrementAffectedUsers = !!task.payload.user;
@@ -251,14 +270,16 @@ export default class GrouperWorker extends Worker {
251270
/**
252271
* Insert new event
253272
*/
254-
await this.saveEvent(task.projectId, {
255-
groupHash: uniqueEventHash,
256-
totalCount: 1,
257-
catcherType: task.catcherType,
258-
payload: task.payload,
259-
timestamp: task.timestamp,
260-
usersAffected: incrementAffectedUsers ? 1 : 0,
261-
} as GroupedEventDBScheme);
273+
await session.measureStep('saveNewEvent', () => {
274+
return this.saveEvent(task.projectId, {
275+
groupHash: uniqueEventHash,
276+
totalCount: 1,
277+
catcherType: task.catcherType,
278+
payload: task.payload,
279+
timestamp: task.timestamp,
280+
usersAffected: incrementAffectedUsers ? 1 : 0,
281+
} as GroupedEventDBScheme);
282+
});
262283

263284
const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash);
264285

@@ -288,21 +309,27 @@ export default class GrouperWorker extends Worker {
288309
throw e;
289310
}
290311
} else {
291-
const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent);
312+
const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await session.measureStep('affectedUsers', () => {
313+
return this.shouldIncrementAffectedUsers(task, existedEvent, session);
314+
});
292315

293316
incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers;
294317

295318
/**
296319
* Increment existed task's counter
297320
*/
298-
await this.incrementEventCounterAndAffectedUsers(task.projectId, {
299-
groupHash: uniqueEventHash,
300-
}, incrementAffectedUsers);
321+
await session.measureStep('incrementCounter', () => {
322+
return this.incrementEventCounterAndAffectedUsers(task.projectId, {
323+
groupHash: uniqueEventHash,
324+
}, incrementAffectedUsers);
325+
});
301326

302327
/**
303328
* Decode existed event to calculate diffs correctly
304329
*/
305-
decodeUnsafeFields(existedEvent);
330+
await session.measureStep('decodeEvent', () => {
331+
decodeUnsafeFields(existedEvent);
332+
});
306333

307334
let delta: RepetitionDelta;
308335

@@ -314,14 +341,17 @@ export default class GrouperWorker extends Worker {
314341
/**
315342
* Calculate delta between original event and repetition
316343
*/
317-
delta = computeDelta(existedEvent.payload, task.payload);
344+
delta = await session.measureStep('computeDelta', () => {
345+
return computeDelta(existedEvent.payload, task.payload);
346+
});
318347
} catch (e) {
319348
console.error(e);
320349
throw new DiffCalculationError(e, existedEvent.payload, task.payload);
321350
}
322351

323352
const deltaStr = JSON.stringify(delta);
324-
const deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0;
353+
354+
deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0;
325355

326356
this.grouperMetrics.observeDeltaSize(deltaSize);
327357

@@ -333,7 +363,9 @@ export default class GrouperWorker extends Worker {
333363
timestamp: task.timestamp,
334364
} as RepetitionDBScheme;
335365

336-
repetitionId = await this.saveRepetition(task.projectId, newRepetition);
366+
repetitionId = await session.measureStep('saveRepetition', () => {
367+
return this.saveRepetition(task.projectId, newRepetition);
368+
});
337369

338370
/**
339371
* Clear the large event payload references to allow garbage collection
@@ -350,13 +382,15 @@ export default class GrouperWorker extends Worker {
350382
/**
351383
* Store events counter by days
352384
*/
353-
await this.saveDailyEvents(
354-
task.projectId,
355-
uniqueEventHash,
356-
task.timestamp,
357-
repetitionId,
358-
incrementDailyAffectedUsers
359-
);
385+
await session.measureStep('saveDailyEvents', () => {
386+
return this.saveDailyEvents(
387+
task.projectId,
388+
uniqueEventHash,
389+
task.timestamp,
390+
repetitionId,
391+
incrementDailyAffectedUsers
392+
);
393+
});
360394

361395
this.memoryMonitor.logHandleCompletion(
362396
memoryBeforeHandle,
@@ -373,19 +407,31 @@ export default class GrouperWorker extends Worker {
373407
const isIgnored = isFirstOccurrence ? false : !!existedEvent?.marks?.ignored;
374408

375409
if (!isIgnored) {
376-
await this.addTask(WorkerNames.NOTIFIER, {
377-
projectId: task.projectId,
378-
event: {
379-
title: task.payload.title,
380-
groupHash: uniqueEventHash,
381-
isNew: isFirstOccurrence,
382-
repetitionId: repetitionId ? repetitionId.toString() : null,
383-
},
410+
await session.measureStep('enqueueNotifier', () => {
411+
return this.addTask(WorkerNames.NOTIFIER, {
412+
projectId: task.projectId,
413+
event: {
414+
title: task.payload.title,
415+
groupHash: uniqueEventHash,
416+
isNew: isFirstOccurrence,
417+
repetitionId: repetitionId ? repetitionId.toString() : null,
418+
},
419+
});
384420
});
385421
}
386422
}
387423

388-
await this.recordProjectMetrics(task.projectId, 'events-accepted');
424+
await session.measureStep('recordProjectMetrics', () => {
425+
return this.recordProjectMetrics(task.projectId, 'events-accepted');
426+
});
427+
428+
session.logIfSlow({
429+
projectId: task.projectId,
430+
title: task.payload.title,
431+
type: eventType,
432+
payloadSize: taskPayloadSize,
433+
deltaSize,
434+
});
389435
}
390436

391437
/**
@@ -421,8 +467,18 @@ export default class GrouperWorker extends Worker {
421467
};
422468

423469
const series = [
424-
{ key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY, timestampMs: bucketTimestampMs('minutely') },
425-
{ key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK, timestampMs: bucketTimestampMs('hourly') },
470+
{
471+
key: minutelyKey,
472+
label: 'minutely',
473+
retentionMs: TimeMs.DAY,
474+
timestampMs: bucketTimestampMs('minutely'),
475+
},
476+
{
477+
key: hourlyKey,
478+
label: 'hourly',
479+
retentionMs: TimeMs.WEEK,
480+
timestampMs: bucketTimestampMs('hourly'),
481+
},
426482
{
427483
key: dailyKey,
428484
label: 'daily',
@@ -491,11 +547,13 @@ export default class GrouperWorker extends Worker {
491547
* @param projectId - id of the project to find event in
492548
*/
493549
private async findFirstEventByPattern(pattern: string, projectId: string): Promise<GroupedEventDBScheme> {
494-
return await this.eventsDb.getConnection()
495-
.collection(`events:${projectId}`)
496-
.findOne(
497-
{ 'payload.title': { $regex: pattern } }
498-
);
550+
return this.grouperMetrics.observeMongoDuration('findFirstEventByPattern', async () => {
551+
return await this.eventsDb.getConnection()
552+
.collection(`events:${projectId}`)
553+
.findOne(
554+
{ 'payload.title': { $regex: pattern } }
555+
);
556+
});
499557
}
500558

501559
/**
@@ -561,11 +619,13 @@ export default class GrouperWorker extends Worker {
561619
* @returns {ProjectEventGroupingPatternsDBScheme[]} EventPatterns object with projectId and list of patterns
562620
*/
563621
private async getProjectPatterns(projectId: string): Promise<ProjectEventGroupingPatternsDBScheme[]> {
564-
const project = await this.accountsDb.getConnection()
565-
.collection('projects')
566-
.findOne({
567-
_id: new mongodb.ObjectId(projectId),
568-
});
622+
const project = await this.grouperMetrics.observeMongoDuration('getProjectPatterns', async () => {
623+
return this.accountsDb.getConnection()
624+
.collection('projects')
625+
.findOne({
626+
_id: new mongodb.ObjectId(projectId),
627+
});
628+
});
569629

570630
return project?.eventGroupingPatterns || [];
571631
}
@@ -575,9 +635,14 @@ export default class GrouperWorker extends Worker {
575635
*
576636
* @param task - worker task to process
577637
* @param existedEvent - original event to get its user
638+
* @param session - current slow handle diagnostics session.
578639
* @returns {[boolean, boolean]} - whether to increment affected users for the repetition and the daily aggregation
579640
*/
580-
private async shouldIncrementAffectedUsers<Type extends ErrorsCatcherType>(task: GroupWorkerTask<Type>, existedEvent: GroupedEventDBScheme): Promise<[boolean, boolean]> {
641+
private async shouldIncrementAffectedUsers<Type extends ErrorsCatcherType>(
642+
task: GroupWorkerTask<Type>,
643+
existedEvent: GroupedEventDBScheme,
644+
session: SlowHandleSession
645+
): Promise<[boolean, boolean]> {
581646
const eventUser = task.payload.user;
582647

583648
/**
@@ -609,11 +674,13 @@ export default class GrouperWorker extends Worker {
609674
*/
610675
const repetitionCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}`;
611676
const repetition = await this.cache.get(repetitionCacheKey, async () => {
612-
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
613-
.findOne({
614-
groupHash: existedEvent.groupHash,
615-
'payload.user.id': eventUser.id,
616-
});
677+
return this.grouperMetrics.observeMongoDuration('findUserRepetition', async () => {
678+
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
679+
.findOne({
680+
groupHash: existedEvent.groupHash,
681+
'payload.user.id': eventUser.id,
682+
});
683+
});
617684
});
618685

619686
if (repetition) {
@@ -643,15 +710,17 @@ export default class GrouperWorker extends Worker {
643710
*/
644711
const repetitionDailyCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}:${eventMidnight}`;
645712
const repetitionDaily = await this.cache.get(repetitionDailyCacheKey, async () => {
646-
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
647-
.findOne({
648-
groupHash: existedEvent.groupHash,
649-
'payload.user.id': eventUser.id,
650-
timestamp: {
651-
$gte: eventMidnight,
652-
$lt: eventNextMidnight,
653-
},
654-
});
713+
return this.grouperMetrics.observeMongoDuration('findDailyUserRepetition', async () => {
714+
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
715+
.findOne({
716+
groupHash: existedEvent.groupHash,
717+
'payload.user.id': eventUser.id,
718+
timestamp: {
719+
$gte: eventMidnight,
720+
$lt: eventNextMidnight,
721+
},
722+
});
723+
});
655724
});
656725

657726
/**
@@ -665,8 +734,12 @@ export default class GrouperWorker extends Worker {
665734
/**
666735
* Check Redis lock - if locked, don't increment either counter
667736
*/
668-
const isEventLocked = await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id);
669-
const isDailyEventLocked = await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight);
737+
const [isEventLocked, isDailyEventLocked] = await session.measureStep('affectedUsersRedisLocks', async () => {
738+
return [
739+
await this.redis.checkOrSetlockEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id),
740+
await this.redis.checkOrSetlockDailyEventForAffectedUsersIncrement(existedEvent.groupHash, eventUser.id, eventMidnight),
741+
];
742+
});
670743

671744
shouldIncrementRepetitionAffectedUsers = isEventLocked ? false : shouldIncrementRepetitionAffectedUsers;
672745
shouldIncrementDailyAffectedUsers = isDailyEventLocked ? false : shouldIncrementDailyAffectedUsers;

0 commit comments

Comments
 (0)