Skip to content

Commit b7db107

Browse files
committed
feat(grouper): OOM-debug logging
Add MongoDB, payload and delta size metrics with OOM-debug logging
1 parent c80ce79 commit b7db107

1 file changed

Lines changed: 95 additions & 10 deletions

File tree

workers/grouper/src/index.ts

Lines changed: 95 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ export default class GrouperWorker extends Worker {
8383
registers: [register],
8484
});
8585

86-
private metricsEventDuration = new client.Histogram({
87-
name: 'hawk_grouper_event_duration_seconds',
88-
help: 'Duration of event processing in seconds',
86+
private metricsHandleDuration = new client.Histogram({
87+
name: 'hawk_grouper_handle_duration_seconds',
88+
help: 'Duration of handle() call in seconds',
8989
registers: [register],
9090
});
9191

@@ -95,6 +95,33 @@ export default class GrouperWorker extends Worker {
9595
registers: [register],
9696
});
9797

98+
private metricsMongoDuration = new client.Histogram({
99+
name: 'hawk_grouper_mongo_duration_seconds',
100+
help: 'Duration of MongoDB operations in seconds',
101+
labelNames: ['operation'],
102+
registers: [register],
103+
});
104+
105+
private metricsDeltaSize = new client.Histogram({
106+
name: 'hawk_grouper_delta_size_bytes',
107+
help: 'Size of computed repetition delta in bytes',
108+
buckets: [100, 500, 1000, 5000, 10000, 50000, 100000, 500000],
109+
registers: [register],
110+
});
111+
112+
private metricsPayloadSize = new client.Histogram({
113+
name: 'hawk_grouper_payload_size_bytes',
114+
help: 'Size of incoming event payload in bytes',
115+
buckets: [100, 500, 1000, 5000, 10000, 50000, 100000, 500000],
116+
registers: [register],
117+
});
118+
119+
private metricsDuplicateRetries = new client.Counter({
120+
name: 'hawk_grouper_duplicate_retries_total',
121+
help: 'Number of retries due to duplicate key errors',
122+
registers: [register],
123+
});
124+
98125
/**
99126
* Start consuming messages
100127
*/
@@ -128,7 +155,7 @@ export default class GrouperWorker extends Worker {
128155
* @param task - event to handle
129156
*/
130157
public async handle(task: GroupWorkerTask<ErrorsCatcherType>): Promise<void> {
131-
const endTimer = this.metricsEventDuration.startTimer();
158+
const endTimer = this.metricsHandleDuration.startTimer();
132159

133160
try {
134161
await this.handleInternal(task);
@@ -146,6 +173,12 @@ export default class GrouperWorker extends Worker {
146173
* @param task - event to handle
147174
*/
148175
private async handleInternal(task: GroupWorkerTask<ErrorsCatcherType>): Promise<void> {
176+
const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload));
177+
178+
this.metricsPayloadSize.observe(taskPayloadSize);
179+
180+
this.logger.info(`[handle] project=${task.projectId} catcher=${task.catcherType} title="${task.payload.title}" payloadSize=${taskPayloadSize}b backtraceFrames=${task.payload.backtrace?.length ?? 0}`);
181+
149182
let uniqueEventHash = await this.getUniqueEventHash(task);
150183

151184
// FIX RELEASE TYPE
@@ -211,6 +244,8 @@ export default class GrouperWorker extends Worker {
211244
try {
212245
const incrementAffectedUsers = !!task.payload.user;
213246

247+
this.logger.info(`[saveEvent] new event, payloadSize=${taskPayloadSize}b`);
248+
214249
/**
215250
* Insert new event
216251
*/
@@ -240,6 +275,8 @@ export default class GrouperWorker extends Worker {
240275
* and we need to process this event as repetition
241276
*/
242277
if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) {
278+
this.metricsDuplicateRetries.inc();
279+
this.logger.info(`[saveEvent] duplicate key, retrying as repetition`);
243280
await this.handle(task);
244281

245282
return;
@@ -266,6 +303,10 @@ export default class GrouperWorker extends Worker {
266303

267304
let delta: RepetitionDelta;
268305

306+
const existedPayloadSize = Buffer.byteLength(JSON.stringify(existedEvent.payload));
307+
308+
this.logger.info(`[computeDelta] existedPayloadSize=${existedPayloadSize}b taskPayloadSize=${taskPayloadSize}b`);
309+
269310
try {
270311
/**
271312
* Calculate delta between original event and repetition
@@ -276,9 +317,16 @@ export default class GrouperWorker extends Worker {
276317
throw new DiffCalculationError(e, existedEvent.payload, task.payload);
277318
}
278319

320+
const deltaStr = JSON.stringify(delta);
321+
const deltaSize = Buffer.byteLength(deltaStr);
322+
323+
this.metricsDeltaSize.observe(deltaSize);
324+
325+
this.logger.info(`[computeDelta] deltaSize=${deltaSize}b`);
326+
279327
const newRepetition = {
280328
groupHash: uniqueEventHash,
281-
delta: JSON.stringify(delta),
329+
delta: deltaStr,
282330
timestamp: task.timestamp,
283331
} as RepetitionDBScheme;
284332

@@ -296,6 +344,10 @@ export default class GrouperWorker extends Worker {
296344
incrementDailyAffectedUsers
297345
);
298346

347+
const mem = process.memoryUsage();
348+
349+
this.logger.info(`[handle] done, heapUsed=${Math.round(mem.heapUsed / 1024 / 1024)}MB heapTotal=${Math.round(mem.heapTotal / 1024 / 1024)}MB rss=${Math.round(mem.rss / 1024 / 1024)}MB`);
350+
299351
/**
300352
* Add task for NotifierWorker only if event is not ignored
301353
*/
@@ -394,7 +446,9 @@ export default class GrouperWorker extends Worker {
394446
try {
395447
const originalEvent = await this.findFirstEventByPattern(matchingPattern.pattern, projectId);
396448

397-
this.logger.info(`original event for pattern: ${JSON.stringify(originalEvent)}`);
449+
const originalEventSize = Buffer.byteLength(JSON.stringify(originalEvent));
450+
451+
this.logger.info(`[findSimilarEvent] found by pattern, originalEventSize=${originalEventSize}b`);
398452

399453
if (originalEvent) {
400454
return originalEvent;
@@ -564,14 +618,20 @@ export default class GrouperWorker extends Worker {
564618
const eventCacheKey = await this.getEventCacheKey(projectId, groupHash);
565619

566620
return this.cache.get(eventCacheKey, async () => {
567-
return this.eventsDb.getConnection()
621+
const endTimer = this.metricsMongoDuration.startTimer({ operation: 'getEvent' });
622+
623+
const result = await this.eventsDb.getConnection()
568624
.collection(`events:${projectId}`)
569625
.findOne({
570626
groupHash,
571627
})
572628
.catch((err) => {
573629
throw new DatabaseReadWriteError(err);
574630
});
631+
632+
endTimer();
633+
634+
return result;
575635
});
576636
}
577637

@@ -599,12 +659,18 @@ export default class GrouperWorker extends Worker {
599659
throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed');
600660
}
601661

662+
const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveEvent' });
663+
602664
const collection = this.eventsDb.getConnection().collection(`events:${projectId}`);
603665

604666
encodeUnsafeFields(groupedEventData);
605667

606-
return (await collection
668+
const result = (await collection
607669
.insertOne(groupedEventData)).insertedId as mongodb.ObjectID;
670+
671+
endTimer();
672+
673+
return result;
608674
}
609675

610676
/**
@@ -618,13 +684,20 @@ export default class GrouperWorker extends Worker {
618684
throw new ValidationError('Controller.saveRepetition: Project ID is invalid or missing');
619685
}
620686

687+
const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveRepetition' });
688+
621689
try {
622690
const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`);
623691

624692
encodeUnsafeFields(repetition);
625693

626-
return (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID;
694+
const result = (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID;
695+
696+
endTimer();
697+
698+
return result;
627699
} catch (err) {
700+
endTimer();
628701
throw new DatabaseReadWriteError(err, {
629702
repetition: repetition as unknown as Record<string, never>,
630703
projectId,
@@ -644,6 +717,8 @@ export default class GrouperWorker extends Worker {
644717
throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed');
645718
}
646719

720+
const endTimer = this.metricsMongoDuration.startTimer({ operation: 'incrementCounter' });
721+
647722
try {
648723
const updateQuery = incrementAffected
649724
? {
@@ -658,10 +733,15 @@ export default class GrouperWorker extends Worker {
658733
},
659734
};
660735

661-
return (await this.eventsDb.getConnection()
736+
const result = (await this.eventsDb.getConnection()
662737
.collection(`events:${projectId}`)
663738
.updateOne(query, updateQuery)).modifiedCount;
739+
740+
endTimer();
741+
742+
return result;
664743
} catch (err) {
744+
endTimer();
665745
throw new DatabaseReadWriteError(err);
666746
}
667747
}
@@ -687,6 +767,8 @@ export default class GrouperWorker extends Worker {
687767
throw new ValidationError('GrouperWorker.saveDailyEvents: Project ID is invalid or missed');
688768
}
689769

770+
const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveDailyEvents' });
771+
690772
try {
691773
const midnight = this.getMidnightByEventTimestamp(eventTimestamp);
692774

@@ -710,7 +792,10 @@ export default class GrouperWorker extends Worker {
710792
},
711793
},
712794
{ upsert: true });
795+
796+
endTimer();
713797
} catch (err) {
798+
endTimer();
714799
throw new DatabaseReadWriteError(err);
715800
}
716801
}

0 commit comments

Comments
 (0)