Skip to content

Commit c80ce79

Browse files
committed
feat(grouper): add Prometheus metrics
1 parent c4cc417 commit c80ce79

3 files changed

Lines changed: 103 additions & 71 deletions

File tree

lib/metrics.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import * as client from 'prom-client';
2+
import os from 'os';
3+
import { nanoid } from 'nanoid';
4+
5+
const register = new client.Registry();
6+
7+
client.collectDefaultMetrics({ register });
8+
9+
export { register, client };
10+
11+
/**
12+
* Start periodic push to pushgateway
13+
*
14+
* @param workerName - name of the worker for grouping
15+
*/
16+
export function startMetricsPushing(workerName: string): void {
17+
const url = process.env.PROMETHEUS_PUSHGATEWAY_URL;
18+
const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000');
19+
20+
if (!url) {
21+
return;
22+
}
23+
24+
const hostname = os.hostname();
25+
const ID_SIZE = 5;
26+
const id = nanoid(ID_SIZE);
27+
28+
const gateway = new client.Pushgateway(url, [], register);
29+
30+
console.log(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id})`);
31+
32+
setInterval(() => {
33+
gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => {
34+
if (err) {
35+
console.error('Metrics push error:', err);
36+
}
37+
});
38+
}, interval);
39+
}

runner.ts

Lines changed: 18 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import * as utils from './lib/utils';
99
import { Worker } from './lib/worker';
1010
import HawkCatcher from '@hawk.so/nodejs';
1111
import * as dotenv from 'dotenv';
12+
import { startMetricsPushing } from './lib/metrics';
1213

1314
dotenv.config();
1415

@@ -57,19 +58,17 @@ class WorkerRunner {
5758
.then((workerConstructors) => {
5859
this.constructWorkers(workerConstructors);
5960
})
60-
// .then(() => {
61-
// try {
62-
// this.startMetrics();
63-
// } catch (e) {
64-
// HawkCatcher.send(e);
65-
// console.error(`Metrics not started: ${e}`);
66-
// }
67-
//
68-
// return Promise.resolve();
69-
// })
7061
.then(() => {
7162
return this.startWorkers();
7263
})
64+
.then(() => {
65+
try {
66+
this.startMetrics();
67+
} catch (e) {
68+
HawkCatcher.send(e);
69+
console.error(`Metrics not started: ${e}`);
70+
}
71+
})
7372
.then(() => {
7473
this.observeProcess();
7574
})
@@ -82,67 +81,15 @@ class WorkerRunner {
8281
/**
8382
* Run metrics exporter
8483
*/
85-
// private startMetrics(): void {
86-
// if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) {
87-
// return;
88-
// }
89-
//
90-
// const PUSH_INTERVAL = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL);
91-
//
92-
// if (isNaN(PUSH_INTERVAL)) {
93-
// throw new Error('PROMETHEUS_PUSHGATEWAY_INTERVAL is invalid or not set');
94-
// }
95-
//
96-
// const collectDefaultMetrics = promClient.collectDefaultMetrics;
97-
// const Registry = promClient.Registry;
98-
//
99-
// const register = new Registry();
100-
// const startGcStats = gcStats(register);
101-
//
102-
// const hostname = os.hostname();
103-
//
104-
// const ID_SIZE = 5;
105-
// const id = nanoid(ID_SIZE);
106-
//
107-
// // eslint-disable-next-line node/no-deprecated-api
108-
// const instance = url.parse(process.env.PROMETHEUS_PUSHGATEWAY_URL).host;
109-
//
110-
// // Initialize metrics for workers
111-
// this.workers.forEach((worker) => {
112-
// // worker.initMetrics();
113-
// worker.getMetrics().forEach((metric: promClient.Counter<string>) => register.registerMetric(metric));
114-
// });
115-
//
116-
// collectDefaultMetrics({ register });
117-
// startGcStats();
118-
//
119-
// this.gateway = new promClient.Pushgateway(process.env.PROMETHEUS_PUSHGATEWAY_URL, null, register);
120-
//
121-
// console.log(`Start pushing metrics to ${process.env.PROMETHEUS_PUSHGATEWAY_URL}`);
122-
//
123-
// // Pushing metrics to the pushgateway every PUSH_INTERVAL
124-
// this.pushIntervalNumber = setInterval(() => {
125-
// this.workers.forEach((worker) => {
126-
// if (!this.gateway || !instance) {
127-
// return;
128-
// }
129-
// // Use pushAdd not to overwrite previous metrics
130-
// this.gateway.pushAdd({
131-
// jobName: 'workers',
132-
// groupings: {
133-
// worker: worker.type.replace('/', '_'),
134-
// host: hostname,
135-
// id,
136-
// },
137-
// }, (err?: Error) => {
138-
// if (err) {
139-
// HawkCatcher.send(err);
140-
// console.log(`Error of pushing metrics to gateway: ${err}`);
141-
// }
142-
// });
143-
// });
144-
// }, PUSH_INTERVAL);
145-
// }
84+
private startMetrics(): void {
85+
if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) {
86+
return;
87+
}
88+
89+
this.workers.forEach((worker) => {
90+
startMetricsPushing(worker.type.replace('/', '_'));
91+
});
92+
}
14693

14794
/**
14895
* Dynamically loads workers through the yarn workspaces

workers/grouper/src/index.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string';
2626
import { hasValue } from '../../../lib/utils/hasValue';
2727
/* eslint-disable-next-line no-unused-vars */
2828
import { memoize } from '../../../lib/memoize';
29+
import { register, client } from '../../../lib/metrics';
2930

3031
/**
3132
* eslint does not count decorators as a variable usage
@@ -72,6 +73,28 @@ export default class GrouperWorker extends Worker {
7273
*/
7374
private redis = new RedisHelper();
7475

76+
/**
77+
* Prometheus metrics
78+
*/
79+
private metricsEventsTotal = new client.Counter({
80+
name: 'hawk_grouper_events_total',
81+
help: 'Total number of events processed by grouper',
82+
labelNames: ['type'],
83+
registers: [register],
84+
});
85+
86+
private metricsEventDuration = new client.Histogram({
87+
name: 'hawk_grouper_event_duration_seconds',
88+
help: 'Duration of event processing in seconds',
89+
registers: [register],
90+
});
91+
92+
private metricsErrorsTotal = new client.Counter({
93+
name: 'hawk_grouper_errors_total',
94+
help: 'Total number of errors during event processing',
95+
registers: [register],
96+
});
97+
7598
/**
7699
* Start consuming messages
77100
*/
@@ -105,6 +128,24 @@ export default class GrouperWorker extends Worker {
105128
* @param task - event to handle
106129
*/
107130
public async handle(task: GroupWorkerTask<ErrorsCatcherType>): Promise<void> {
131+
const endTimer = this.metricsEventDuration.startTimer();
132+
133+
try {
134+
await this.handleInternal(task);
135+
endTimer();
136+
} catch (error) {
137+
endTimer();
138+
this.metricsErrorsTotal.inc();
139+
throw error;
140+
}
141+
}
142+
143+
/**
144+
* Internal task handling function
145+
*
146+
* @param task - event to handle
147+
*/
148+
private async handleInternal(task: GroupWorkerTask<ErrorsCatcherType>): Promise<void> {
108149
let uniqueEventHash = await this.getUniqueEventHash(task);
109150

110151
// FIX RELEASE TYPE
@@ -147,6 +188,11 @@ export default class GrouperWorker extends Worker {
147188
*/
148189
const isFirstOccurrence = !existedEvent && !similarEvent;
149190

191+
/**
192+
* Increment metrics counter
193+
*/
194+
this.metricsEventsTotal.inc({ type: isFirstOccurrence ? 'new' : 'repeated' });
195+
150196
let repetitionId = null;
151197

152198
let incrementDailyAffectedUsers = false;

0 commit comments

Comments
 (0)