Skip to content

Commit b3f298a

Browse files
committed
fix(supervisor): make dequeue latency buckets survive retry backoff; review fixes
The HTTP client retries internally (5 attempts, >=7.5s of backoff before a retryable error surfaces), so the 5s bucket ceiling would have pushed nearly every retried error into +Inf. Extend buckets to 30s and state in the help text that one observation spans the whole logical call including retries. Also: stop clobbering a caller-supplied consumer metrics instance, correct the catch-branch comment (defensive only - wrapZodFetch never throws), and cover the pool-to-consumer metrics injection with tests.
1 parent f917a87 commit b3f298a

5 files changed

Lines changed: 51 additions & 7 deletions

File tree

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
import { SupervisorHttpClient } from "./http.js";
88
import type { WorkerApiDequeueResponseBody } from "./schemas.js";
99
import type { QueueConsumer } from "./queueConsumer.js";
10+
import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
11+
import { Registry } from "prom-client";
1012

1113
// Mock only the logger
1214
vi.mock("../../utils/structuredLogger.js");
@@ -16,9 +18,11 @@ class TestQueueConsumer implements QueueConsumer {
1618
public started = false;
1719
public stopped = false;
1820
public onDequeue?: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
21+
public metrics?: ConsumerPoolMetrics;
1922

2023
constructor(opts: any) {
2124
this.onDequeue = opts.onDequeue;
25+
this.metrics = opts.metrics;
2226
}
2327

2428
start(): void {
@@ -719,6 +723,38 @@ describe("RunQueueConsumerPool", () => {
719723
});
720724
});
721725

726+
describe("Metrics wiring", () => {
727+
it("injects the pool's shared ConsumerPoolMetrics into every consumer when a registry is provided", async () => {
728+
pool = new RunQueueConsumerPool({
729+
...defaultOptions,
730+
metricsRegistry: new Registry(),
731+
scaling: { strategy: "none", maxConsumerCount: 3 },
732+
});
733+
734+
await pool.start();
735+
736+
expect(testConsumers.length).toBe(3);
737+
const poolMetrics = pool["promMetrics"];
738+
expect(poolMetrics).toBeInstanceOf(ConsumerPoolMetrics);
739+
testConsumers.forEach((consumer) => {
740+
expect(consumer.metrics).toBe(poolMetrics);
741+
});
742+
});
743+
744+
it("preserves a caller-supplied consumer metrics instance when no registry is provided", async () => {
745+
const callerMetrics = new ConsumerPoolMetrics({ register: new Registry() });
746+
pool = new RunQueueConsumerPool({
747+
...defaultOptions,
748+
consumer: { ...defaultOptions.consumer, metrics: callerMetrics },
749+
scaling: { strategy: "none", maxConsumerCount: 1 },
750+
});
751+
752+
await pool.start();
753+
754+
expect(testConsumers[0]?.metrics).toBe(callerMetrics);
755+
});
756+
});
757+
722758
describe("Backpressure scale-up freeze", () => {
723759
it("freezes scale-up while shouldPauseScaling returns true, then resumes", async () => {
724760
let paused = true;

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,9 @@ export class RunQueueConsumerPool {
370370
const consumer = this.consumerFactory({
371371
...this.consumerOptions,
372372
// Share the pool's single metrics instance so every consumer records onto
373-
// the same histogram (re-registering the metric name would throw).
374-
metrics: this.promMetrics,
373+
// the same histogram (re-registering the metric name would throw). Fall
374+
// back to a caller-supplied instance rather than clobbering it.
375+
metrics: this.promMetrics ?? this.consumerOptions.metrics,
375376
onDequeue: async (messages, timing) => {
376377
// Always update queue length, default to 0 for empty dequeues or missing value
377378
this.updateQueueLength(messages[0]?.workerQueueLength ?? 0);

packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,13 @@ export class ConsumerPoolMetrics {
117117

118118
this.dequeueDurationSeconds = new Histogram({
119119
name: `${this.prefix}_dequeue_duration_seconds`,
120-
help: "Client-side latency of the dequeue API call (POST /engine/v1/worker-actions/dequeue), measured around the HTTP request",
120+
help: "Client-side duration of the dequeue API call (POST /engine/v1/worker-actions/dequeue), including the HTTP client's internal retries and backoff",
121121
labelNames: ["outcome"],
122-
// Tuned for the dequeue endpoint: sub-10ms hits through a multi-second tail.
123-
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5],
122+
// The HTTP client retries internally (up to 5 attempts with 0.5-5s backoff),
123+
// so one observation can span multiple requests plus sleeps. A retryable
124+
// failure surfaces as `error` only after >=7.5s of backoff - the 10/30s
125+
// buckets exist so that mode doesn't collapse into +Inf.
126+
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30],
124127
registers: [this.register],
125128
});
126129
}

packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ describe("RunQueueConsumer dequeue latency metric", () => {
7979
);
8080
});
8181

82-
it('records outcome="error" when the dequeue call throws (network error / timeout)', async () => {
82+
// Defensive path: wrapZodFetch traps all errors today, so the real client
83+
// never throws - this guards against a future client that does.
84+
it('records outcome="error" when the dequeue call throws', async () => {
8385
await runOneDequeue({
8486
dequeueImpl: async () => {
8587
throw new Error("network down");

packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,9 @@ export class RunQueueConsumer implements QueueConsumer {
152152
}
153153
}
154154
} catch (clientError) {
155-
// Captures network errors and timeouts - exactly the tail latencies we care about.
155+
// wrapZodFetch traps all errors into { success: false }, so this branch is
156+
// unreachable with the real client today. Record defensively so a future
157+
// client that throws can't silently lose error samples.
156158
this.metrics?.observeDequeueLatency((performance.now() - dequeueStart) / 1000, "error");
157159
this.logger.error("client.dequeue error", { error: clientError });
158160
}

0 commit comments

Comments
 (0)