Skip to content

Commit bea2469

Browse files
KuchizuneSpecc
andauthored
feat(grouper): add Prometheus metrics (#520)
* feat(grouper): add Prometheus metrics * feat(grouper): OOM-debug logging Add MongoDB, payload and delta size metrics with OOM-debug logging * fix(grouper): handle undefined delta * feat(grouper): add memory leak diagnostics logs * fix(metrics): validate push interval, add push cleanup, and prevent retry double-counting in grouper * fix(grouper-metrics): (docs, log context, pushgateway options, duplicate-retry test * fix(grouper): resolve PR conflict & dupkey retry & getEvent fix * chore(grouper): remove duplicate-key retry loop and clarify memory config units * test(grouper): remove duplicate-key race regression case --------- Co-authored-by: Peter <specc.dev@gmail.com>
1 parent ab8db60 commit bea2469

9 files changed

Lines changed: 747 additions & 168 deletions

File tree

.env.sample

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ PROMETHEUS_PUSHGATEWAY_URL=
3131
# pushgateway push interval in ms
3232
PROMETHEUS_PUSHGATEWAY_INTERVAL=10000
3333

34+
# Grouper memory log controls
35+
# Number of handled tasks between memory checkpoint logs
36+
GROUPER_MEMORY_LOG_EVERY_TASKS=50
37+
# Number of handled tasks in one sustained-growth evaluation window
38+
GROUPER_MEMORY_GROWTH_WINDOW_TASKS=200
39+
# Sustained-growth warning threshold in megabytes (MB)
40+
GROUPER_MEMORY_GROWTH_WARN_MB=64
41+
# Single-handle growth warning threshold in megabytes (MB)
42+
GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB=16
43+
3444
# project token for error catching
3545
HAWK_CATCHER_TOKEN=
3646

@@ -40,4 +50,4 @@ HAWK_CATCHER_TOKEN=
4050
IS_NOTIFIER_WORKER_ENABLED=false
4151

4252
## Url for telegram notifications about workspace blocks and unblocks
43-
TELEGRAM_LIMITER_CHAT_URL=
53+
TELEGRAM_LIMITER_CHAT_URL=

lib/metrics.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import * as client from 'prom-client';
2+
import os from 'os';
3+
import { nanoid } from 'nanoid';
4+
import createLogger from './logger';
5+
6+
const register = new client.Registry();
7+
const logger = createLogger();
8+
9+
const DEFAULT_PUSH_INTERVAL_MS = 10_000;
10+
const ID_SIZE = 5;
11+
const METRICS_JOB_NAME = 'workers';
12+
13+
let pushInterval: NodeJS.Timeout | null = null;
14+
let currentWorkerName = '';
15+
16+
client.collectDefaultMetrics({ register });
17+
18+
export { register, client };
19+
20+
/**
21+
* Parse push interval from environment.
22+
*/
23+
function getPushIntervalMs(): number {
24+
const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL;
25+
const parsedInterval = rawInterval === undefined
26+
? DEFAULT_PUSH_INTERVAL_MS
27+
: Number(rawInterval);
28+
29+
const interval = Number.isFinite(parsedInterval) && parsedInterval > 0
30+
? parsedInterval
31+
: DEFAULT_PUSH_INTERVAL_MS;
32+
33+
if (rawInterval !== undefined && interval !== parsedInterval) {
34+
logger.warn(`[metrics] invalid PROMETHEUS_PUSHGATEWAY_INTERVAL="${rawInterval}", fallback to ${DEFAULT_PUSH_INTERVAL_MS}ms`);
35+
}
36+
37+
return interval;
38+
}
39+
40+
/**
41+
* Stop periodic push to pushgateway.
42+
*/
43+
export function stopMetricsPushing(): void {
44+
if (!pushInterval) {
45+
return;
46+
}
47+
48+
clearInterval(pushInterval);
49+
pushInterval = null;
50+
logger.info(`[metrics] stopped pushing metrics for worker=${currentWorkerName}`);
51+
currentWorkerName = '';
52+
}
53+
54+
/**
55+
* Start periodic push to pushgateway.
56+
*
57+
* @param workerName - name of the worker for grouping.
58+
*/
59+
export function startMetricsPushing(workerName: string): () => void {
60+
const url = process.env.PROMETHEUS_PUSHGATEWAY_URL;
61+
62+
if (!url) {
63+
return stopMetricsPushing;
64+
}
65+
66+
if (pushInterval) {
67+
logger.warn(`[metrics] pushing is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`);
68+
69+
return stopMetricsPushing;
70+
}
71+
72+
const interval = getPushIntervalMs();
73+
const hostname = os.hostname();
74+
const id = nanoid(ID_SIZE);
75+
const gateway = new client.Pushgateway(url, undefined, register);
76+
77+
currentWorkerName = workerName;
78+
79+
logger.info(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id}, worker: ${workerName})`);
80+
81+
pushInterval = setInterval(() => {
82+
gateway.pushAdd({
83+
jobName: METRICS_JOB_NAME,
84+
groupings: {
85+
worker: workerName,
86+
host: hostname,
87+
id,
88+
},
89+
}, (err) => {
90+
if (err) {
91+
logger.error(`Metrics push error: ${err.message || err}`);
92+
}
93+
});
94+
}, interval);
95+
96+
return stopMetricsPushing;
97+
}

runner.ts

Lines changed: 34 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import * as utils from './lib/utils';
99
import { Worker } from './lib/worker';
1010
import HawkCatcher from '@hawk.so/nodejs';
1111
import * as dotenv from 'dotenv';
12+
import { startMetricsPushing } from './lib/metrics';
1213

1314
dotenv.config();
1415

@@ -48,9 +49,9 @@ class WorkerRunner {
4849
// private gateway?: promClient.Pushgateway;
4950

5051
/**
51-
* number returned by setInterval() of metrics push function
52+
* Metrics push cleanup callback.
5253
*/
53-
private pushIntervalNumber?: ReturnType<typeof setInterval>;
54+
private stopMetricsPushing?: () => void;
5455

5556
/**
5657
* Create runner instance
@@ -65,19 +66,17 @@ class WorkerRunner {
6566
.then((workerConstructors) => {
6667
this.constructWorkers(workerConstructors);
6768
})
68-
// .then(() => {
69-
// try {
70-
// this.startMetrics();
71-
// } catch (e) {
72-
// HawkCatcher.send(e);
73-
// console.error(`Metrics not started: ${e}`);
74-
// }
75-
//
76-
// return Promise.resolve();
77-
// })
7869
.then(() => {
7970
return this.startWorkers();
8071
})
72+
.then(() => {
73+
try {
74+
this.startMetrics();
75+
} catch (e) {
76+
HawkCatcher.send(e);
77+
console.error(`Metrics not started: ${e}`);
78+
}
79+
})
8180
.then(() => {
8281
this.observeProcess();
8382
})
@@ -90,67 +89,27 @@ class WorkerRunner {
9089
/**
9190
* Run metrics exporter
9291
*/
93-
// private startMetrics(): void {
94-
// if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) {
95-
// return;
96-
// }
97-
//
98-
// const PUSH_INTERVAL = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL);
99-
//
100-
// if (isNaN(PUSH_INTERVAL)) {
101-
// throw new Error('PROMETHEUS_PUSHGATEWAY_INTERVAL is invalid or not set');
102-
// }
103-
//
104-
// const collectDefaultMetrics = promClient.collectDefaultMetrics;
105-
// const Registry = promClient.Registry;
106-
//
107-
// const register = new Registry();
108-
// const startGcStats = gcStats(register);
109-
//
110-
// const hostname = os.hostname();
111-
//
112-
// const ID_SIZE = 5;
113-
// const id = nanoid(ID_SIZE);
114-
//
115-
// // eslint-disable-next-line node/no-deprecated-api
116-
// const instance = url.parse(process.env.PROMETHEUS_PUSHGATEWAY_URL).host;
117-
//
118-
// // Initialize metrics for workers
119-
// this.workers.forEach((worker) => {
120-
// // worker.initMetrics();
121-
// worker.getMetrics().forEach((metric: promClient.Counter<string>) => register.registerMetric(metric));
122-
// });
123-
//
124-
// collectDefaultMetrics({ register });
125-
// startGcStats();
126-
//
127-
// this.gateway = new promClient.Pushgateway(process.env.PROMETHEUS_PUSHGATEWAY_URL, null, register);
128-
//
129-
// console.log(`Start pushing metrics to ${process.env.PROMETHEUS_PUSHGATEWAY_URL}`);
130-
//
131-
// // Pushing metrics to the pushgateway every PUSH_INTERVAL
132-
// this.pushIntervalNumber = setInterval(() => {
133-
// this.workers.forEach((worker) => {
134-
// if (!this.gateway || !instance) {
135-
// return;
136-
// }
137-
// // Use pushAdd not to overwrite previous metrics
138-
// this.gateway.pushAdd({
139-
// jobName: 'workers',
140-
// groupings: {
141-
// worker: worker.type.replace('/', '_'),
142-
// host: hostname,
143-
// id,
144-
// },
145-
// }, (err?: Error) => {
146-
// if (err) {
147-
// HawkCatcher.send(err);
148-
// console.log(`Error of pushing metrics to gateway: ${err}`);
149-
// }
150-
// });
151-
// });
152-
// }, PUSH_INTERVAL);
153-
// }
92+
private startMetrics(): void {
93+
if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) {
94+
return;
95+
}
96+
97+
if (this.workers.length === 0) {
98+
return;
99+
}
100+
101+
const workerTypes = Array.from(new Set(this.workers.map((worker) => {
102+
return worker.type.replace('/', '_');
103+
})));
104+
105+
const workerTypeForMetrics = workerTypes.length === 1 ? workerTypes[0] : 'multi_worker_process';
106+
107+
if (workerTypes.length > 1) {
108+
console.warn(`[metrics] ${workerTypes.length} workers are running in one process; pushing metrics as "${workerTypeForMetrics}" to avoid duplicated attribution`);
109+
}
110+
111+
this.stopMetricsPushing = startMetricsPushing(workerTypeForMetrics);
112+
}
154113

155114
/**
156115
* Dynamically loads workers through the yarn workspaces
@@ -285,7 +244,8 @@ class WorkerRunner {
285244
private async stopWorker(worker: Worker): Promise<void> {
286245
try {
287246
// stop pushing metrics
288-
clearInterval(this.pushIntervalNumber);
247+
this.stopMetricsPushing?.();
248+
this.stopMetricsPushing = undefined;
289249
await worker.finish();
290250

291251
console.log(

0 commit comments

Comments
 (0)