Skip to content

Commit 2d4904c

Browse files
feat(metrics): expose worker metrics over HTTP (#547) (#548)
Co-authored-by: Kuchizu <70284260+Kuchizu@users.noreply.github.com>
1 parent bea2469 commit 2d4904c

4 files changed

Lines changed: 162 additions & 86 deletions

File tree

.env.sample

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ EVENT_SECRET=hell
2525
# @codex_bot webhook for reports
2626
CODEX_BOT_WEBHOOK=
2727

28-
# address of prometheus pushgateway
29-
PROMETHEUS_PUSHGATEWAY_URL=
28+
# Port for VictoriaMetrics metrics endpoint.
29+
PROMETHEUS_METRICS_PORT=
3030

31-
# pushgateway push interval in ms
32-
PROMETHEUS_PUSHGATEWAY_INTERVAL=10000
31+
# Host for metrics endpoint binding.
32+
PROMETHEUS_METRICS_HOST=0.0.0.0
33+
34+
# Path for metrics endpoint.
35+
PROMETHEUS_METRICS_PATH=/metrics
3336

3437
# Grouper memory log controls
3538
# Number of handled tasks between memory checkpoint logs

.env.test

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ EVENT_SECRET=hell
2222
# @codex_bot webhook for reports
2323
CODEX_BOT_WEBHOOK=
2424

25-
# address of prometheus pushgateway
26-
PROMETHEUS_PUSHGATEWAY=
25+
# Port for VictoriaMetrics metrics endpoint.
26+
PROMETHEUS_METRICS_PORT=
2727

2828
# Feature flags
2929

lib/metrics.ts

Lines changed: 141 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,97 +1,180 @@
11
import * as client from 'prom-client';
2-
import os from 'os';
3-
import { nanoid } from 'nanoid';
2+
import * as http from 'http';
43
import createLogger from './logger';
54

65
const register = new client.Registry();
76
const logger = createLogger();
87

9-
const DEFAULT_PUSH_INTERVAL_MS = 10_000;
10-
const ID_SIZE = 5;
11-
const METRICS_JOB_NAME = 'workers';
8+
const DEFAULT_METRICS_HOST = '0.0.0.0';
9+
const DEFAULT_METRICS_PATH = '/metrics';
10+
const MIN_PORT = 1;
11+
const MAX_PORT = 65535;
12+
const HTTP_OK = 200;
13+
const HTTP_NOT_FOUND = 404;
14+
const HTTP_INTERNAL_SERVER_ERROR = 500;
1215

13-
let pushInterval: NodeJS.Timeout | null = null;
16+
let metricsServer: http.Server | null = null;
1417
let currentWorkerName = '';
1518

1619
client.collectDefaultMetrics({ register });
1720

1821
export { register, client };
1922

2023
/**
21-
* Parse push interval from environment.
24+
* Parse metrics endpoint port from environment.
2225
*/
23-
function getPushIntervalMs(): number {
24-
const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL;
25-
const parsedInterval = rawInterval === undefined
26-
? DEFAULT_PUSH_INTERVAL_MS
27-
: Number(rawInterval);
28-
29-
const interval = Number.isFinite(parsedInterval) && parsedInterval > 0
30-
? parsedInterval
31-
: DEFAULT_PUSH_INTERVAL_MS;
32-
33-
if (rawInterval !== undefined && interval !== parsedInterval) {
34-
logger.warn(`[metrics] invalid PROMETHEUS_PUSHGATEWAY_INTERVAL="${rawInterval}", fallback to ${DEFAULT_PUSH_INTERVAL_MS}ms`);
26+
function getMetricsPort(): number | null {
27+
const rawPort = process.env.PROMETHEUS_METRICS_PORT;
28+
29+
if (!rawPort) {
30+
return null;
31+
}
32+
33+
const port = Number(rawPort);
34+
35+
if (!Number.isInteger(port) || port < MIN_PORT || port > MAX_PORT) {
36+
logger.warn(`[metrics] invalid PROMETHEUS_METRICS_PORT="${rawPort}"; expected an integer between ${MIN_PORT} and ${MAX_PORT}`);
37+
38+
return null;
39+
}
40+
41+
return port;
42+
}
43+
44+
/**
45+
* Read metrics endpoint path from environment.
46+
*/
47+
function getMetricsPath(): string {
48+
const rawPath = process.env.PROMETHEUS_METRICS_PATH;
49+
50+
if (!rawPath) {
51+
return DEFAULT_METRICS_PATH;
52+
}
53+
54+
const path = rawPath.trim();
55+
56+
if (!path) {
57+
logger.warn(`[metrics] invalid PROMETHEUS_METRICS_PATH="${rawPath}", fallback to ${DEFAULT_METRICS_PATH}`);
58+
59+
return DEFAULT_METRICS_PATH;
60+
}
61+
62+
if (!path.startsWith('/')) {
63+
const normalizedPath = `/${path}`;
64+
65+
logger.warn(`[metrics] normalized PROMETHEUS_METRICS_PATH from "${rawPath}" to "${normalizedPath}"`);
66+
67+
return normalizedPath;
3568
}
3669

37-
return interval;
70+
return path;
3871
}
3972

4073
/**
41-
* Stop periodic push to pushgateway.
74+
* Stop HTTP metrics endpoint.
4275
*/
43-
export function stopMetricsPushing(): void {
44-
if (!pushInterval) {
76+
export function stopMetricsServer(): void {
77+
if (!metricsServer) {
4578
return;
4679
}
4780

48-
clearInterval(pushInterval);
49-
pushInterval = null;
50-
logger.info(`[metrics] stopped pushing metrics for worker=${currentWorkerName}`);
51-
currentWorkerName = '';
81+
const serverToStop = metricsServer;
82+
const stoppedWorkerName = currentWorkerName;
83+
84+
if (!serverToStop.listening) {
85+
logger.info(`[metrics] endpoint already stopped for worker=${stoppedWorkerName}`);
86+
87+
if (metricsServer === serverToStop) {
88+
metricsServer = null;
89+
currentWorkerName = '';
90+
}
91+
92+
return;
93+
}
94+
95+
serverToStop.close((error) => {
96+
if (error) {
97+
logger.error(`[metrics] failed to stop endpoint for worker=${stoppedWorkerName}: ${error.message}`);
98+
99+
return;
100+
}
101+
102+
if (metricsServer === serverToStop) {
103+
metricsServer = null;
104+
currentWorkerName = '';
105+
}
106+
107+
logger.info(`[metrics] stopped endpoint for worker=${stoppedWorkerName}`);
108+
});
52109
}
53110

54111
/**
55-
* Start periodic push to pushgateway.
112+
* Start HTTP metrics endpoint for scraper-based monitoring.
56113
*
57-
* @param workerName - name of the worker for grouping.
114+
* @param workerName - name of the worker for default metric labels.
58115
*/
59-
export function startMetricsPushing(workerName: string): () => void {
60-
const url = process.env.PROMETHEUS_PUSHGATEWAY_URL;
116+
export function startMetricsServer(workerName: string): () => void {
117+
const port = getMetricsPort();
61118

62-
if (!url) {
63-
return stopMetricsPushing;
119+
if (!port) {
120+
return stopMetricsServer;
64121
}
65122

66-
if (pushInterval) {
67-
logger.warn(`[metrics] pushing is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`);
123+
if (metricsServer) {
124+
logger.warn(`[metrics] endpoint is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`);
68125

69-
return stopMetricsPushing;
126+
return stopMetricsServer;
70127
}
71128

72-
const interval = getPushIntervalMs();
73-
const hostname = os.hostname();
74-
const id = nanoid(ID_SIZE);
75-
const gateway = new client.Pushgateway(url, undefined, register);
129+
const host = process.env.PROMETHEUS_METRICS_HOST || DEFAULT_METRICS_HOST;
130+
const path = getMetricsPath();
131+
132+
register.setDefaultLabels({ worker: workerName });
76133

134+
const server = http.createServer(async (request, response) => {
135+
const requestPath = request.url?.split('?')[0];
136+
137+
if (requestPath === '/-/healthy') {
138+
response.writeHead(HTTP_OK, { 'Content-Type': 'text/plain' });
139+
response.end('ok');
140+
141+
return;
142+
}
143+
144+
if (request.method !== 'GET' || requestPath !== path) {
145+
response.writeHead(HTTP_NOT_FOUND, { 'Content-Type': 'text/plain' });
146+
response.end('not found');
147+
148+
return;
149+
}
150+
151+
try {
152+
response.writeHead(HTTP_OK, { 'Content-Type': register.contentType });
153+
response.end(await register.metrics());
154+
} catch (error) {
155+
const message = error instanceof Error ? error.message : String(error);
156+
157+
logger.error(`[metrics] failed to render metrics: ${message}`);
158+
response.writeHead(HTTP_INTERNAL_SERVER_ERROR, { 'Content-Type': 'text/plain' });
159+
response.end('metrics error');
160+
}
161+
});
162+
163+
server.on('error', (error) => {
164+
logger.error(`[metrics] endpoint error for worker=${workerName}: ${error.message}`);
165+
166+
if (metricsServer === server) {
167+
metricsServer = null;
168+
currentWorkerName = '';
169+
}
170+
});
171+
172+
metricsServer = server;
77173
currentWorkerName = workerName;
78174

79-
logger.info(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id}, worker: ${workerName})`);
80-
81-
pushInterval = setInterval(() => {
82-
gateway.pushAdd({
83-
jobName: METRICS_JOB_NAME,
84-
groupings: {
85-
worker: workerName,
86-
host: hostname,
87-
id,
88-
},
89-
}, (err) => {
90-
if (err) {
91-
logger.error(`Metrics push error: ${err.message || err}`);
92-
}
93-
});
94-
}, interval);
95-
96-
return stopMetricsPushing;
175+
server.listen(port, host, () => {
176+
logger.info(`[metrics] endpoint started for worker=${workerName} at http://${host}:${port}${path}`);
177+
});
178+
179+
return stopMetricsServer;
97180
}

runner.ts

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,8 @@
11
import * as utils from './lib/utils';
2-
3-
/* Prometheus client for pushing metrics to the pushgateway */
4-
// import os from 'os';
5-
// import * as promClient from 'prom-client';
6-
// import gcStats from 'prometheus-gc-stats';
7-
// import { nanoid } from 'nanoid';
8-
// import * as url from 'url';
92
import { Worker } from './lib/worker';
103
import HawkCatcher from '@hawk.so/nodejs';
114
import * as dotenv from 'dotenv';
12-
import { startMetricsPushing } from './lib/metrics';
5+
import { startMetricsServer } from './lib/metrics';
136

147
dotenv.config();
158

@@ -24,15 +17,15 @@ const BEGINNING_OF_ARGS = 2;
2417
*/
2518
const workerNames = process.argv.slice(BEGINNING_OF_ARGS);
2619

27-
/**
20+
/**
2821
* Initialize HawkCatcher
29-
*/
22+
*/
3023
if (process.env.HAWK_CATCHER_TOKEN) {
3124
HawkCatcher.init({
3225
token: process.env.HAWK_CATCHER_TOKEN,
3326
context: {
34-
workerTypes: workerNames.join(","),
35-
}
27+
workerTypes: workerNames.join(','),
28+
},
3629
});
3730
}
3831

@@ -46,12 +39,10 @@ class WorkerRunner {
4639
*/
4740
private workers: Worker[] = [];
4841

49-
// private gateway?: promClient.Pushgateway;
50-
5142
/**
52-
* Metrics push cleanup callback.
43+
* Metrics endpoint cleanup callback.
5344
*/
54-
private stopMetricsPushing?: () => void;
45+
private stopMetricsServer?: () => void;
5546

5647
/**
5748
* Create runner instance
@@ -90,7 +81,7 @@ class WorkerRunner {
9081
* Run metrics exporter
9182
*/
9283
private startMetrics(): void {
93-
if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) {
84+
if (!process.env.PROMETHEUS_METRICS_PORT && !process.env.PROMETHEUS_PUSHGATEWAY_URL) {
9485
return;
9586
}
9687

@@ -105,10 +96,10 @@ class WorkerRunner {
10596
const workerTypeForMetrics = workerTypes.length === 1 ? workerTypes[0] : 'multi_worker_process';
10697

10798
if (workerTypes.length > 1) {
108-
console.warn(`[metrics] ${workerTypes.length} workers are running in one process; pushing metrics as "${workerTypeForMetrics}" to avoid duplicated attribution`);
99+
console.warn(`[metrics] ${workerTypes.length} workers are running in one process; exposing metrics as "${workerTypeForMetrics}"`);
109100
}
110101

111-
this.stopMetricsPushing = startMetricsPushing(workerTypeForMetrics);
102+
this.stopMetricsServer = startMetricsServer(workerTypeForMetrics);
112103
}
113104

114105
/**
@@ -243,9 +234,8 @@ class WorkerRunner {
243234
*/
244235
private async stopWorker(worker: Worker): Promise<void> {
245236
try {
246-
// stop pushing metrics
247-
this.stopMetricsPushing?.();
248-
this.stopMetricsPushing = undefined;
237+
this.stopMetricsServer?.();
238+
this.stopMetricsServer = undefined;
249239
await worker.finish();
250240

251241
console.log(

0 commit comments

Comments
 (0)