Skip to content

Commit 06abd0a

Browse files
authored
Merge branch 'main' into run-engine-transactional-completion
2 parents fc20dc2 + 081b6ba commit 06abd0a

6 files changed

Lines changed: 206 additions & 2 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Record client-side dequeue API latency in the supervisor consumer pool as a Prometheus histogram (`queue_consumer_pool_dequeue_duration_seconds`, labelled by `outcome`: success/empty/error).

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
import { SupervisorHttpClient } from "./http.js";
88
import type { WorkerApiDequeueResponseBody } from "./schemas.js";
99
import type { QueueConsumer } from "./queueConsumer.js";
10+
import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
11+
import { Registry } from "prom-client";
1012

1113
// Mock only the logger
1214
vi.mock("../../utils/structuredLogger.js");
@@ -16,9 +18,11 @@ class TestQueueConsumer implements QueueConsumer {
1618
public started = false;
1719
public stopped = false;
1820
public onDequeue?: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
21+
public metrics?: ConsumerPoolMetrics;
1922

2023
constructor(opts: any) {
2124
this.onDequeue = opts.onDequeue;
25+
this.metrics = opts.metrics;
2226
}
2327

2428
start(): void {
@@ -719,6 +723,38 @@ describe("RunQueueConsumerPool", () => {
719723
});
720724
});
721725

726+
describe("Metrics wiring", () => {
727+
it("injects the pool's shared ConsumerPoolMetrics into every consumer when a registry is provided", async () => {
728+
pool = new RunQueueConsumerPool({
729+
...defaultOptions,
730+
metricsRegistry: new Registry(),
731+
scaling: { strategy: "none", maxConsumerCount: 3 },
732+
});
733+
734+
await pool.start();
735+
736+
expect(testConsumers.length).toBe(3);
737+
const poolMetrics = pool["promMetrics"];
738+
expect(poolMetrics).toBeInstanceOf(ConsumerPoolMetrics);
739+
testConsumers.forEach((consumer) => {
740+
expect(consumer.metrics).toBe(poolMetrics);
741+
});
742+
});
743+
744+
it("preserves a caller-supplied consumer metrics instance when no registry is provided", async () => {
745+
const callerMetrics = new ConsumerPoolMetrics({ register: new Registry() });
746+
pool = new RunQueueConsumerPool({
747+
...defaultOptions,
748+
consumer: { ...defaultOptions.consumer, metrics: callerMetrics },
749+
scaling: { strategy: "none", maxConsumerCount: 1 },
750+
});
751+
752+
await pool.start();
753+
754+
expect(testConsumers[0]?.metrics).toBe(callerMetrics);
755+
});
756+
});
757+
722758
describe("Backpressure scale-up freeze", () => {
723759
it("freezes scale-up while shouldPauseScaling returns true, then resumes", async () => {
724760
let paused = true;

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,10 @@ export class RunQueueConsumerPool {
369369

370370
const consumer = this.consumerFactory({
371371
...this.consumerOptions,
372+
// Share the pool's single metrics instance so every consumer records onto
373+
// the same histogram (re-registering the metric name would throw). Fall
374+
// back to a caller-supplied instance rather than clobbering it.
375+
metrics: this.promMetrics ?? this.consumerOptions.metrics,
372376
onDequeue: async (messages, timing) => {
373377
// Always update queue length, default to 0 for empty dequeues or missing value
374378
this.updateQueueLength(messages[0]?.workerQueueLength ?? 0);

packages/core/src/v3/runEngineWorker/supervisor/consumerPoolMetrics.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@ export interface ConsumerPoolMetricsOptions {
55
prefix?: string;
66
}
77

8+
/**
9+
* Outcome of a single dequeue API round-trip, used as a low-cardinality label
10+
* on the dequeue latency histogram.
11+
* - `success`: the call returned at least one run
12+
* - `empty`: the call succeeded but returned no runs (the common idle case)
13+
* - `error`: the call failed (unsuccessful response, network error, or timeout)
14+
*/
15+
export type DequeueOutcome = "success" | "empty" | "error";
16+
817
export class ConsumerPoolMetrics {
918
private readonly register: Registry;
1019
private readonly prefix: string;
@@ -26,6 +35,9 @@ export class ConsumerPoolMetrics {
2635
public readonly queueLengthUpdatesTotal: Counter;
2736
public readonly batchesProcessedTotal: Counter;
2837

38+
// Dequeue API latency (client-side, measured around the dequeue HTTP call)
39+
public readonly dequeueDurationSeconds: Histogram;
40+
2941
constructor(opts: ConsumerPoolMetricsOptions = {}) {
3042
this.register = opts.register ?? new Registry();
3143
this.prefix = opts.prefix ?? "queue_consumer_pool";
@@ -102,6 +114,23 @@ export class ConsumerPoolMetrics {
102114
help: "Total number of metric batches processed",
103115
registers: [this.register],
104116
});
117+
118+
this.dequeueDurationSeconds = new Histogram({
119+
name: `${this.prefix}_dequeue_duration_seconds`,
120+
help: "Client-side duration of the dequeue API call (POST /engine/v1/worker-actions/dequeue), including the HTTP client's internal retries and backoff",
121+
labelNames: ["outcome"],
122+
// The HTTP client retries internally (up to 5 attempts with 0.5-5s backoff),
123+
// so one observation can span multiple requests plus sleeps. A retryable
124+
// failure surfaces as `error` only after >=7.5s of backoff - the 10-30s
125+
// buckets exist so that mode doesn't collapse into +Inf. The server also
126+
// long-polls (RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS, default 10s),
127+
// parking empty dequeues at ~10s - the 11/12.5/15/20 buckets give the
128+
// quantiles resolution just above that boundary, where the mass sits.
129+
// 60s brackets the worst-case error envelope (5 attempts that each hit
130+
// the ~10s hold, plus backoff); beyond that the connection is hung.
131+
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 11, 12.5, 15, 20, 30, 60],
132+
registers: [this.register],
133+
});
105134
}
106135

107136
/**
@@ -157,4 +186,13 @@ export class ConsumerPoolMetrics {
157186
recordQueueLengthUpdate() {
158187
this.queueLengthUpdatesTotal.inc();
159188
}
189+
190+
/**
191+
* Record the client-side latency of a single dequeue API round-trip.
192+
* @param seconds Wall-clock duration of the dequeue call, in seconds.
193+
* @param outcome Whether the call returned runs, was empty, or errored.
194+
*/
195+
observeDequeueLatency(seconds: number, outcome: DequeueOutcome) {
196+
this.dequeueDurationSeconds.observe({ outcome }, seconds);
197+
}
160198
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
2+
import { Registry } from "prom-client";
3+
import { RunQueueConsumer } from "./queueConsumer.js";
4+
import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
5+
import type { SupervisorHttpClient } from "./http.js";
6+
import type { WorkerApiDequeueResponseBody } from "./schemas.js";
7+
8+
// Mock only the logger (same approach as consumerPool.test.ts)
9+
vi.mock("../../utils/structuredLogger.js");
10+
11+
function makeClient(dequeueImpl: () => Promise<unknown>): SupervisorHttpClient {
12+
return { dequeue: vi.fn(dequeueImpl) } as unknown as SupervisorHttpClient;
13+
}
14+
15+
describe("RunQueueConsumer dequeue latency metric", () => {
16+
let register: Registry;
17+
let metrics: ConsumerPoolMetrics;
18+
let consumer: RunQueueConsumer | undefined;
19+
20+
beforeEach(() => {
21+
vi.clearAllMocks();
22+
// Fake timers so the trailing scheduleNextDequeue() never fires during the test.
23+
vi.useFakeTimers();
24+
register = new Registry();
25+
metrics = new ConsumerPoolMetrics({ register });
26+
});
27+
28+
afterEach(() => {
29+
consumer?.stop();
30+
vi.clearAllTimers();
31+
vi.useRealTimers();
32+
});
33+
34+
/**
35+
* Runs exactly one dequeue iteration and awaits it. We set `isEnabled`
36+
* directly and invoke the private `dequeue()` rather than `start()`, so no
37+
* timer-driven loop runs - the metric is recorded before scheduleNextDequeue().
38+
*/
39+
async function runOneDequeue(opts: {
40+
dequeueImpl: () => Promise<unknown>;
41+
withMetrics?: boolean;
42+
}) {
43+
consumer = new RunQueueConsumer({
44+
client: makeClient(opts.dequeueImpl),
45+
intervalMs: 600_000,
46+
idleIntervalMs: 600_000,
47+
onDequeue: async () => {},
48+
...(opts.withMetrics === false ? {} : { metrics }),
49+
});
50+
51+
(consumer as unknown as { isEnabled: boolean }).isEnabled = true;
52+
await (consumer as unknown as { dequeue(): Promise<void> }).dequeue();
53+
}
54+
55+
it('records outcome="empty" for a successful empty dequeue', async () => {
56+
await runOneDequeue({ dequeueImpl: async () => ({ success: true, data: [] }) });
57+
58+
expect(await register.metrics()).toContain(
59+
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="empty"} 1'
60+
);
61+
});
62+
63+
it('records outcome="success" once per round-trip, regardless of message count', async () => {
64+
const messages = [{ run: {} }, { run: {} }] as unknown as WorkerApiDequeueResponseBody;
65+
await runOneDequeue({ dequeueImpl: async () => ({ success: true, data: messages }) });
66+
67+
const text = await register.metrics();
68+
// One observation for the whole batch, not one per message.
69+
expect(text).toContain('queue_consumer_pool_dequeue_duration_seconds_count{outcome="success"} 1');
70+
});
71+
72+
it('records outcome="error" when the response is unsuccessful', async () => {
73+
await runOneDequeue({
74+
dequeueImpl: async () => ({ success: false, error: new Error("boom") }),
75+
});
76+
77+
expect(await register.metrics()).toContain(
78+
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="error"} 1'
79+
);
80+
});
81+
82+
// Defensive path: wrapZodFetch traps all errors today, so the real client
83+
// never throws - this guards against a future client that does.
84+
it('records outcome="error" when the dequeue call throws', async () => {
85+
await runOneDequeue({
86+
dequeueImpl: async () => {
87+
throw new Error("network down");
88+
},
89+
});
90+
91+
expect(await register.metrics()).toContain(
92+
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="error"} 1'
93+
);
94+
});
95+
96+
it("is a no-op (does not throw) when no metrics instance is provided", async () => {
97+
await expect(
98+
runOneDequeue({ dequeueImpl: async () => ({ success: true, data: [] }), withMetrics: false })
99+
).resolves.not.toThrow();
100+
101+
// Histogram has no observations - the labelled count line should be absent.
102+
expect(await register.metrics()).not.toContain("queue_consumer_pool_dequeue_duration_seconds_count");
103+
});
104+
});

packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { SimpleStructuredLogger } from "../../utils/structuredLogger.js";
22
import { SupervisorHttpClient } from "./http.js";
33
import { WorkerApiDequeueResponseBody, WorkerQueueClass } from "./schemas.js";
44
import { PreDequeueFn, PreSkipFn } from "./types.js";
5+
import type { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
56

67
export interface QueueConsumer {
78
start(): void;
@@ -18,6 +19,8 @@ export type RunQueueConsumerOptions = {
1819
/** Which worker-queue class this consumer pulls from. Defaults to the worker's region queue. */
1920
queueClass?: WorkerQueueClass;
2021
onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
22+
/** Optional shared pool metrics. When provided, dequeue API latency is recorded as a histogram. */
23+
metrics?: ConsumerPoolMetrics;
2124
};
2225

2326
export class RunQueueConsumer implements QueueConsumer {
@@ -27,6 +30,7 @@ export class RunQueueConsumer implements QueueConsumer {
2730
private readonly maxRunCount?: number;
2831
private readonly queueClass?: WorkerQueueClass;
2932
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
33+
private readonly metrics?: ConsumerPoolMetrics;
3034

3135
private readonly logger = new SimpleStructuredLogger("queue-consumer");
3236

@@ -46,6 +50,7 @@ export class RunQueueConsumer implements QueueConsumer {
4650
this.lastScheduledIntervalMs = opts.idleIntervalMs;
4751
this.onDequeue = opts.onDequeue;
4852
this.client = opts.client;
53+
this.metrics = opts.metrics;
4954
}
5055

5156
start() {
@@ -116,18 +121,26 @@ export class RunQueueConsumer implements QueueConsumer {
116121

117122
let nextIntervalMs = this.idleIntervalMs;
118123

124+
const dequeueStart = performance.now();
125+
119126
try {
120-
const dequeueStart = performance.now();
121127
const response = await this.client.dequeue({
122128
maxResources: preDequeueResult?.maxResources,
123129
maxRunCount: this.maxRunCount,
124130
queueClass: this.queueClass,
125131
});
126-
const dequeueResponseMs = Math.round(performance.now() - dequeueStart);
132+
const dequeueDurationSeconds = (performance.now() - dequeueStart) / 1000;
133+
const dequeueResponseMs = Math.round(dequeueDurationSeconds * 1000);
127134

128135
if (!response.success) {
136+
this.metrics?.observeDequeueLatency(dequeueDurationSeconds, "error");
129137
this.logger.error("Failed to dequeue", { error: response.error });
130138
} else {
139+
this.metrics?.observeDequeueLatency(
140+
dequeueDurationSeconds,
141+
response.data.length > 0 ? "success" : "empty"
142+
);
143+
131144
try {
132145
await this.onDequeue(response.data, { dequeueResponseMs, pollingIntervalMs: this.lastScheduledIntervalMs });
133146

@@ -139,6 +152,10 @@ export class RunQueueConsumer implements QueueConsumer {
139152
}
140153
}
141154
} catch (clientError) {
155+
// wrapZodFetch traps all errors into { success: false }, so this branch is
156+
// unreachable with the real client today. Record defensively so a future
157+
// client that throws can't silently lose error samples.
158+
this.metrics?.observeDequeueLatency((performance.now() - dequeueStart) / 1000, "error");
142159
this.logger.error("client.dequeue error", { error: clientError });
143160
}
144161

0 commit comments

Comments
 (0)