Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dequeue-latency-histogram.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Record client-side dequeue API latency in the supervisor consumer pool as a Prometheus histogram (`queue_consumer_pool_dequeue_duration_seconds`, labelled by `outcome`: success/empty/error).
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
import { SupervisorHttpClient } from "./http.js";
import type { WorkerApiDequeueResponseBody } from "./schemas.js";
import type { QueueConsumer } from "./queueConsumer.js";
import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
import { Registry } from "prom-client";

// Mock only the logger
vi.mock("../../utils/structuredLogger.js");
Expand All @@ -16,9 +18,11 @@ class TestQueueConsumer implements QueueConsumer {
public started = false;
public stopped = false;
public onDequeue?: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
public metrics?: ConsumerPoolMetrics;

constructor(opts: any) {
this.onDequeue = opts.onDequeue;
this.metrics = opts.metrics;
}

start(): void {
Expand Down Expand Up @@ -719,6 +723,38 @@ describe("RunQueueConsumerPool", () => {
});
});

describe("Metrics wiring", () => {
it("injects the pool's shared ConsumerPoolMetrics into every consumer when a registry is provided", async () => {
pool = new RunQueueConsumerPool({
...defaultOptions,
metricsRegistry: new Registry(),
scaling: { strategy: "none", maxConsumerCount: 3 },
});

await pool.start();

expect(testConsumers.length).toBe(3);
const poolMetrics = pool["promMetrics"];
expect(poolMetrics).toBeInstanceOf(ConsumerPoolMetrics);
testConsumers.forEach((consumer) => {
expect(consumer.metrics).toBe(poolMetrics);
});
});

it("preserves a caller-supplied consumer metrics instance when no registry is provided", async () => {
const callerMetrics = new ConsumerPoolMetrics({ register: new Registry() });
pool = new RunQueueConsumerPool({
...defaultOptions,
consumer: { ...defaultOptions.consumer, metrics: callerMetrics },
scaling: { strategy: "none", maxConsumerCount: 1 },
});

await pool.start();

expect(testConsumers[0]?.metrics).toBe(callerMetrics);
});
});

describe("Backpressure scale-up freeze", () => {
it("freezes scale-up while shouldPauseScaling returns true, then resumes", async () => {
let paused = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ export class RunQueueConsumerPool {

const consumer = this.consumerFactory({
...this.consumerOptions,
// Share the pool's single metrics instance so every consumer records onto
// the same histogram (re-registering the metric name would throw). Fall
// back to a caller-supplied instance rather than clobbering it.
metrics: this.promMetrics ?? this.consumerOptions.metrics,
onDequeue: async (messages, timing) => {
// Always update queue length, default to 0 for empty dequeues or missing value
this.updateQueueLength(messages[0]?.workerQueueLength ?? 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ export interface ConsumerPoolMetricsOptions {
prefix?: string;
}

/**
* Outcome of a single dequeue API round-trip, used as a low-cardinality label
* on the dequeue latency histogram.
* - `success`: the call returned at least one run
* - `empty`: the call succeeded but returned no runs (the common idle case)
* - `error`: the call failed (unsuccessful response, network error, or timeout)
*/
export type DequeueOutcome = "success" | "empty" | "error";

export class ConsumerPoolMetrics {
private readonly register: Registry;
private readonly prefix: string;
Expand All @@ -26,6 +35,9 @@ export class ConsumerPoolMetrics {
public readonly queueLengthUpdatesTotal: Counter;
public readonly batchesProcessedTotal: Counter;

// Dequeue API latency (client-side, measured around the dequeue HTTP call)
public readonly dequeueDurationSeconds: Histogram;

constructor(opts: ConsumerPoolMetricsOptions = {}) {
this.register = opts.register ?? new Registry();
this.prefix = opts.prefix ?? "queue_consumer_pool";
Expand Down Expand Up @@ -102,6 +114,18 @@ export class ConsumerPoolMetrics {
help: "Total number of metric batches processed",
registers: [this.register],
});

this.dequeueDurationSeconds = new Histogram({
name: `${this.prefix}_dequeue_duration_seconds`,
help: "Client-side duration of the dequeue API call (POST /engine/v1/worker-actions/dequeue), including the HTTP client's internal retries and backoff",
labelNames: ["outcome"],
// The HTTP client retries internally (up to 5 attempts with 0.5-5s backoff),
// so one observation can span multiple requests plus sleeps. A retryable
// failure surfaces as `error` only after >=7.5s of backoff - the 10/30s
// buckets exist so that mode doesn't collapse into +Inf.
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30],
registers: [this.register],
});
}

/**
Expand Down Expand Up @@ -157,4 +181,13 @@ export class ConsumerPoolMetrics {
recordQueueLengthUpdate() {
this.queueLengthUpdatesTotal.inc();
}

/**
* Record the client-side latency of a single dequeue API round-trip.
* @param seconds Wall-clock duration of the dequeue call, in seconds.
* @param outcome Whether the call returned runs, was empty, or errored.
*/
observeDequeueLatency(seconds: number, outcome: DequeueOutcome) {
this.dequeueDurationSeconds.observe({ outcome }, seconds);
}
}
104 changes: 104 additions & 0 deletions packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { Registry } from "prom-client";
import { RunQueueConsumer } from "./queueConsumer.js";
import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
import type { SupervisorHttpClient } from "./http.js";
import type { WorkerApiDequeueResponseBody } from "./schemas.js";

// Mock only the logger (same approach as consumerPool.test.ts)
vi.mock("../../utils/structuredLogger.js");

function makeClient(dequeueImpl: () => Promise<unknown>): SupervisorHttpClient {
return { dequeue: vi.fn(dequeueImpl) } as unknown as SupervisorHttpClient;
}

describe("RunQueueConsumer dequeue latency metric", () => {
let register: Registry;
let metrics: ConsumerPoolMetrics;
let consumer: RunQueueConsumer | undefined;

beforeEach(() => {
vi.clearAllMocks();
// Fake timers so the trailing scheduleNextDequeue() never fires during the test.
vi.useFakeTimers();
register = new Registry();
metrics = new ConsumerPoolMetrics({ register });
});

afterEach(() => {
consumer?.stop();
vi.clearAllTimers();
vi.useRealTimers();
});

/**
* Runs exactly one dequeue iteration and awaits it. We set `isEnabled`
* directly and invoke the private `dequeue()` rather than `start()`, so no
* timer-driven loop runs - the metric is recorded before scheduleNextDequeue().
*/
async function runOneDequeue(opts: {
dequeueImpl: () => Promise<unknown>;
withMetrics?: boolean;
}) {
consumer = new RunQueueConsumer({
client: makeClient(opts.dequeueImpl),
intervalMs: 600_000,
idleIntervalMs: 600_000,
onDequeue: async () => {},
...(opts.withMetrics === false ? {} : { metrics }),
});

(consumer as unknown as { isEnabled: boolean }).isEnabled = true;
await (consumer as unknown as { dequeue(): Promise<void> }).dequeue();
}

it('records outcome="empty" for a successful empty dequeue', async () => {
await runOneDequeue({ dequeueImpl: async () => ({ success: true, data: [] }) });

expect(await register.metrics()).toContain(
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="empty"} 1'
);
});

it('records outcome="success" once per round-trip, regardless of message count', async () => {
const messages = [{ run: {} }, { run: {} }] as unknown as WorkerApiDequeueResponseBody;
await runOneDequeue({ dequeueImpl: async () => ({ success: true, data: messages }) });

const text = await register.metrics();
// One observation for the whole batch, not one per message.
expect(text).toContain('queue_consumer_pool_dequeue_duration_seconds_count{outcome="success"} 1');
});

it('records outcome="error" when the response is unsuccessful', async () => {
await runOneDequeue({
dequeueImpl: async () => ({ success: false, error: new Error("boom") }),
});

expect(await register.metrics()).toContain(
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="error"} 1'
);
});

// Defensive path: wrapZodFetch traps all errors today, so the real client
// never throws - this guards against a future client that does.
it('records outcome="error" when the dequeue call throws', async () => {
await runOneDequeue({
dequeueImpl: async () => {
throw new Error("network down");
},
});

expect(await register.metrics()).toContain(
'queue_consumer_pool_dequeue_duration_seconds_count{outcome="error"} 1'
);
});

it("is a no-op (does not throw) when no metrics instance is provided", async () => {
await expect(
runOneDequeue({ dequeueImpl: async () => ({ success: true, data: [] }), withMetrics: false })
).resolves.not.toThrow();

// Histogram has no observations - the labelled count line should be absent.
expect(await register.metrics()).not.toContain("queue_consumer_pool_dequeue_duration_seconds_count");
});
});
21 changes: 19 additions & 2 deletions packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { SimpleStructuredLogger } from "../../utils/structuredLogger.js";
import { SupervisorHttpClient } from "./http.js";
import { WorkerApiDequeueResponseBody, WorkerQueueClass } from "./schemas.js";
import { PreDequeueFn, PreSkipFn } from "./types.js";
import type { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";

export interface QueueConsumer {
start(): void;
Expand All @@ -18,6 +19,8 @@ export type RunQueueConsumerOptions = {
/** Which worker-queue class this consumer pulls from. Defaults to the worker's region queue. */
queueClass?: WorkerQueueClass;
onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
/** Optional shared pool metrics. When provided, dequeue API latency is recorded as a histogram. */
metrics?: ConsumerPoolMetrics;
};

export class RunQueueConsumer implements QueueConsumer {
Expand All @@ -27,6 +30,7 @@ export class RunQueueConsumer implements QueueConsumer {
private readonly maxRunCount?: number;
private readonly queueClass?: WorkerQueueClass;
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
private readonly metrics?: ConsumerPoolMetrics;

private readonly logger = new SimpleStructuredLogger("queue-consumer");

Expand All @@ -46,6 +50,7 @@ export class RunQueueConsumer implements QueueConsumer {
this.lastScheduledIntervalMs = opts.idleIntervalMs;
this.onDequeue = opts.onDequeue;
this.client = opts.client;
this.metrics = opts.metrics;
}

start() {
Expand Down Expand Up @@ -116,18 +121,26 @@ export class RunQueueConsumer implements QueueConsumer {

let nextIntervalMs = this.idleIntervalMs;

const dequeueStart = performance.now();

try {
const dequeueStart = performance.now();
const response = await this.client.dequeue({
maxResources: preDequeueResult?.maxResources,
maxRunCount: this.maxRunCount,
queueClass: this.queueClass,
});
const dequeueResponseMs = Math.round(performance.now() - dequeueStart);
const dequeueDurationSeconds = (performance.now() - dequeueStart) / 1000;
const dequeueResponseMs = Math.round(dequeueDurationSeconds * 1000);

if (!response.success) {
this.metrics?.observeDequeueLatency(dequeueDurationSeconds, "error");
this.logger.error("Failed to dequeue", { error: response.error });
} else {
this.metrics?.observeDequeueLatency(
dequeueDurationSeconds,
response.data.length > 0 ? "success" : "empty"
);

try {
await this.onDequeue(response.data, { dequeueResponseMs, pollingIntervalMs: this.lastScheduledIntervalMs });

Expand All @@ -139,6 +152,10 @@ export class RunQueueConsumer implements QueueConsumer {
}
}
} catch (clientError) {
// wrapZodFetch traps all errors into { success: false }, so this branch is
// unreachable with the real client today. Record defensively so a future
// client that throws can't silently lose error samples.
this.metrics?.observeDequeueLatency((performance.now() - dequeueStart) / 1000, "error");
this.logger.error("client.dequeue error", { error: clientError });
}

Expand Down