Skip to content

Commit f917a87

Browse files
committed
feat(supervisor): publish client-side dequeue API latency as a Prometheus histogram
The dequeue round-trip time was only visible in wide events and span attributes, so there was no way to query latency percentiles or error rates. Record it as queue_consumer_pool_dequeue_duration_seconds with an outcome label (success/empty/error), covering failed and timed-out calls that previously emitted no timing at all. The pool's shared ConsumerPoolMetrics instance is injected into each consumer, mirroring how BackpressureMetrics is wired into BackpressureMonitor.
1 parent 1b0f2c7 commit f917a87

4 files changed

Lines changed: 152 additions & 2 deletions

File tree

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,9 @@ export class RunQueueConsumerPool {
369369

370370
const consumer = this.consumerFactory({
371371
...this.consumerOptions,
372+
// Share the pool's single metrics instance so every consumer records onto
373+
// the same histogram (re-registering the metric name would throw).
374+
metrics: this.promMetrics,
372375
onDequeue: async (messages, timing) => {
373376
// Always update queue length, default to 0 for empty dequeues or missing value
374377
this.updateQueueLength(messages[0]?.workerQueueLength ?? 0);

packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@ export interface ConsumerPoolMetricsOptions {
55
prefix?: string;
66
}
77

8+
/**
9+
* Outcome of a single dequeue API round-trip, used as a low-cardinality label
10+
* on the dequeue latency histogram.
11+
* - `success`: the call returned at least one run
12+
* - `empty`: the call succeeded but returned no runs (the common idle case)
13+
* - `error`: the call failed (unsuccessful response, network error, or timeout)
14+
*/
15+
export type DequeueOutcome = "success" | "empty" | "error";
16+
817
export class ConsumerPoolMetrics {
918
private readonly register: Registry;
1019
private readonly prefix: string;
@@ -26,6 +35,9 @@ export class ConsumerPoolMetrics {
2635
public readonly queueLengthUpdatesTotal: Counter;
2736
public readonly batchesProcessedTotal: Counter;
2837

38+
// Dequeue API latency (client-side, measured around the dequeue HTTP call)
39+
public readonly dequeueDurationSeconds: Histogram;
40+
2941
constructor(opts: ConsumerPoolMetricsOptions = {}) {
3042
this.register = opts.register ?? new Registry();
3143
this.prefix = opts.prefix ?? "queue_consumer_pool";
@@ -102,6 +114,15 @@ export class ConsumerPoolMetrics {
102114
help: "Total number of metric batches processed",
103115
registers: [this.register],
104116
});
117+
118+
this.dequeueDurationSeconds = new Histogram({
119+
name: `${this.prefix}_dequeue_duration_seconds`,
120+
help: "Client-side latency of the dequeue API call (POST /engine/v1/worker-actions/dequeue), measured around the HTTP request",
121+
labelNames: ["outcome"],
122+
// Tuned for the dequeue endpoint: sub-10ms hits through a multi-second tail.
123+
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5],
124+
registers: [this.register],
125+
});
105126
}
106127

107128
/**
@@ -157,4 +178,13 @@ export class ConsumerPoolMetrics {
157178
recordQueueLengthUpdate() {
158179
this.queueLengthUpdatesTotal.inc();
159180
}
181+
182+
/**
183+
* Record the client-side latency of a single dequeue API round-trip.
184+
* @param seconds Wall-clock duration of the dequeue call, in seconds.
185+
* @param outcome Whether the call returned runs, was empty, or errored.
186+
*/
187+
observeDequeueLatency(seconds: number, outcome: DequeueOutcome) {
188+
this.dequeueDurationSeconds.observe({ outcome }, seconds);
189+
}
160190
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
2+
import { Registry } from "prom-client";
3+
import { RunQueueConsumer } from "./queueConsumer.js";
4+
import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
5+
import type { SupervisorHttpClient } from "./http.js";
6+
import type { WorkerApiDequeueResponseBody } from "./schemas.js";
7+
8+
// Mock only the logger (same approach as consumerPool.test.ts)
9+
vi.mock("../../utils/structuredLogger.js");
10+
11+
function makeClient(dequeueImpl: () => Promise<unknown>): SupervisorHttpClient {
12+
return { dequeue: vi.fn(dequeueImpl) } as unknown as SupervisorHttpClient;
13+
}
14+
15+
describe("RunQueueConsumer dequeue latency metric", () => {
16+
let register: Registry;
17+
let metrics: ConsumerPoolMetrics;
18+
let consumer: RunQueueConsumer | undefined;
19+
20+
beforeEach(() => {
21+
vi.clearAllMocks();
22+
// Fake timers so the trailing scheduleNextDequeue() never fires during the test.
23+
vi.useFakeTimers();
24+
register = new Registry();
25+
metrics = new ConsumerPoolMetrics({ register });
26+
});
27+
28+
afterEach(() => {
29+
consumer?.stop();
30+
vi.clearAllTimers();
31+
vi.useRealTimers();
32+
});
33+
34+
/**
35+
* Runs exactly one dequeue iteration and awaits it. We set `isEnabled`
36+
* directly and invoke the private `dequeue()` rather than `start()`, so no
37+
* timer-driven loop runs - the metric is recorded before scheduleNextDequeue().
38+
*/
39+
async function runOneDequeue(opts: {
40+
dequeueImpl: () => Promise<unknown>;
41+
withMetrics?: boolean;
42+
}) {
43+
consumer = new RunQueueConsumer({
44+
client: makeClient(opts.dequeueImpl),
45+
intervalMs: 600_000,
46+
idleIntervalMs: 600_000,
47+
onDequeue: async () => {},
48+
...(opts.withMetrics === false ? {} : { metrics }),
49+
});
50+
51+
(consumer as unknown as { isEnabled: boolean }).isEnabled = true;
52+
await (consumer as unknown as { dequeue(): Promise<void> }).dequeue();
53+
}
54+
55+
it('records outcome="empty" for a successful empty dequeue', async () => {
56+
await runOneDequeue({ dequeueImpl: async () => ({ success: true, data: [] }) });
57+
58+
expect(await register.metrics()).toContain(
59+
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="empty"} 1'
60+
);
61+
});
62+
63+
it('records outcome="success" once per round-trip, regardless of message count', async () => {
64+
const messages = [{ run: {} }, { run: {} }] as unknown as WorkerApiDequeueResponseBody;
65+
await runOneDequeue({ dequeueImpl: async () => ({ success: true, data: messages }) });
66+
67+
const text = await register.metrics();
68+
// One observation for the whole batch, not one per message.
69+
expect(text).toContain('queue_consumer_pool_dequeue_duration_seconds_count{outcome="success"} 1');
70+
});
71+
72+
it('records outcome="error" when the response is unsuccessful', async () => {
73+
await runOneDequeue({
74+
dequeueImpl: async () => ({ success: false, error: new Error("boom") }),
75+
});
76+
77+
expect(await register.metrics()).toContain(
78+
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="error"} 1'
79+
);
80+
});
81+
82+
it('records outcome="error" when the dequeue call throws (network error / timeout)', async () => {
83+
await runOneDequeue({
84+
dequeueImpl: async () => {
85+
throw new Error("network down");
86+
},
87+
});
88+
89+
expect(await register.metrics()).toContain(
90+
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="error"} 1'
91+
);
92+
});
93+
94+
it("is a no-op (does not throw) when no metrics instance is provided", async () => {
95+
await expect(
96+
runOneDequeue({ dequeueImpl: async () => ({ success: true, data: [] }), withMetrics: false })
97+
).resolves.not.toThrow();
98+
99+
// Histogram has no observations - the labelled count line should be absent.
100+
expect(await register.metrics()).not.toContain("queue_consumer_pool_dequeue_duration_seconds_count");
101+
});
102+
});

packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { SimpleStructuredLogger } from "../../utils/structuredLogger.js";
22
import { SupervisorHttpClient } from "./http.js";
33
import { WorkerApiDequeueResponseBody, WorkerQueueClass } from "./schemas.js";
44
import { PreDequeueFn, PreSkipFn } from "./types.js";
5+
import type { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
56

67
export interface QueueConsumer {
78
start(): void;
@@ -18,6 +19,8 @@ export type RunQueueConsumerOptions = {
1819
/** Which worker-queue class this consumer pulls from. Defaults to the worker's region queue. */
1920
queueClass?: WorkerQueueClass;
2021
onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
22+
/** Optional shared pool metrics. When provided, dequeue API latency is recorded as a histogram. */
23+
metrics?: ConsumerPoolMetrics;
2124
};
2225

2326
export class RunQueueConsumer implements QueueConsumer {
@@ -27,6 +30,7 @@ export class RunQueueConsumer implements QueueConsumer {
2730
private readonly maxRunCount?: number;
2831
private readonly queueClass?: WorkerQueueClass;
2932
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
33+
private readonly metrics?: ConsumerPoolMetrics;
3034

3135
private readonly logger = new SimpleStructuredLogger("queue-consumer");
3236

@@ -46,6 +50,7 @@ export class RunQueueConsumer implements QueueConsumer {
4650
this.lastScheduledIntervalMs = opts.idleIntervalMs;
4751
this.onDequeue = opts.onDequeue;
4852
this.client = opts.client;
53+
this.metrics = opts.metrics;
4954
}
5055

5156
start() {
@@ -116,18 +121,26 @@ export class RunQueueConsumer implements QueueConsumer {
116121

117122
let nextIntervalMs = this.idleIntervalMs;
118123

124+
const dequeueStart = performance.now();
125+
119126
try {
120-
const dequeueStart = performance.now();
121127
const response = await this.client.dequeue({
122128
maxResources: preDequeueResult?.maxResources,
123129
maxRunCount: this.maxRunCount,
124130
queueClass: this.queueClass,
125131
});
126-
const dequeueResponseMs = Math.round(performance.now() - dequeueStart);
132+
const dequeueDurationSeconds = (performance.now() - dequeueStart) / 1000;
133+
const dequeueResponseMs = Math.round(dequeueDurationSeconds * 1000);
127134

128135
if (!response.success) {
136+
this.metrics?.observeDequeueLatency(dequeueDurationSeconds, "error");
129137
this.logger.error("Failed to dequeue", { error: response.error });
130138
} else {
139+
this.metrics?.observeDequeueLatency(
140+
dequeueDurationSeconds,
141+
response.data.length > 0 ? "success" : "empty"
142+
);
143+
131144
try {
132145
await this.onDequeue(response.data, { dequeueResponseMs, pollingIntervalMs: this.lastScheduledIntervalMs });
133146

@@ -139,6 +152,8 @@ export class RunQueueConsumer implements QueueConsumer {
139152
}
140153
}
141154
} catch (clientError) {
155+
// Captures network errors and timeouts - exactly the tail latencies we care about.
156+
this.metrics?.observeDequeueLatency((performance.now() - dequeueStart) / 1000, "error");
142157
this.logger.error("client.dequeue error", { error: clientError });
143158
}
144159

0 commit comments

Comments
 (0)