Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/batch-rate-limiter-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Move batch queue global rate limiter from FairQueue claim phase to BatchQueue worker queue consumer for accurate per-item rate limiting. Add worker queue depth cap to prevent unbounded growth that could cause visibility timeouts.
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,9 @@ const EnvironmentSchema = z
// Global rate limit: max items processed per second across all consumers
// If not set, no global rate limiting is applied
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
// Max items in the worker queue before claiming pauses (protects visibility timeouts)
// If not set, no depth limit is applied
BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH: z.coerce.number().int().positive().optional(),

ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ function createRunEngine() {
globalRateLimiter: env.BATCH_QUEUE_GLOBAL_RATE_LIMIT
? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT)
: undefined,
// Worker queue depth cap - prevents unbounded growth protecting visibility timeouts
workerQueueMaxDepth: env.BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH,
retry: {
maxAttempts: 6,
minTimeoutInMs: 1_000,
Expand Down
50 changes: 48 additions & 2 deletions internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
isAbortError,
WorkerQueueManager,
type FairQueueOptions,
type GlobalRateLimiter,
} from "@trigger.dev/redis-worker";
import { BatchCompletionTracker } from "./completionTracker.js";
import type {
Expand Down Expand Up @@ -76,6 +77,7 @@ export class BatchQueue {
private abortController: AbortController;
private workerQueueConsumerLoops: Promise<void>[] = [];
private workerQueueBlockingTimeoutSeconds: number;
private globalRateLimiter?: GlobalRateLimiter;
private batchedSpanManager: BatchedSpanManager;

// Metrics
Expand All @@ -87,6 +89,7 @@ export class BatchQueue {
private batchProcessingDurationHistogram?: Histogram;
private itemQueueTimeHistogram?: Histogram;
private workerQueueLengthGauge?: ObservableGauge;
private rateLimitDeniedCounter?: Counter;

constructor(private options: BatchQueueOptions) {
this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info");
Expand All @@ -95,6 +98,7 @@ export class BatchQueue {
this.maxAttempts = options.retry?.maxAttempts ?? 1;
this.abortController = new AbortController();
this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10;
this.globalRateLimiter = options.globalRateLimiter;

// Initialize metrics if meter is provided
if (options.meter) {
Expand Down Expand Up @@ -174,8 +178,9 @@ export class BatchQueue {
},
},
],
// Optional global rate limiter to limit max items/sec across all consumers
globalRateLimiter: options.globalRateLimiter,
// Worker queue depth cap to prevent unbounded growth (protects visibility timeouts)
workerQueueMaxDepth: options.workerQueueMaxDepth,
workerQueueDepthCheckId: BATCH_WORKER_QUEUE_ID,
// Enable retry with DLQ disabled when retry config is provided.
// BatchQueue handles the "final failure" in its own processing loop,
// so we don't need the DLQ - we just need the retry scheduling.
Expand Down Expand Up @@ -608,6 +613,11 @@ export class BatchQueue {
unit: "ms",
});

this.rateLimitDeniedCounter = meter.createCounter("batch_queue.rate_limit_denied", {
description: "Number of times the global rate limiter denied processing",
unit: "denials",
});

this.workerQueueLengthGauge = meter.createObservableGauge("batch_queue.worker_queue.length", {
description: "Number of items waiting in the batch worker queue",
unit: "items",
Expand Down Expand Up @@ -641,6 +651,42 @@ export class BatchQueue {
}

try {
// Rate limit per-item at the processing level (1 token per message).
// Loop until allowed so multiple consumers don't all rush through after one sleep.
if (this.globalRateLimiter) {
while (this.isRunning) {
const result = await this.globalRateLimiter.limit();
if (result.allowed) {
break;
}
this.rateLimitDeniedCounter?.add(1);
const waitMs = Math.max(0, (result.resetAt ?? Date.now()) - Date.now());
if (waitMs > 0) {
await new Promise<void>((resolve, reject) => {
const onAbort = () => {
clearTimeout(timer);
reject(this.abortController.signal.reason);
};
const timer = setTimeout(() => {
// Must remove listener when timeout fires, otherwise listeners accumulate
// (the { once: true } option only removes on abort, not on timeout)
this.abortController.signal.removeEventListener("abort", onAbort);
resolve();
}, waitMs);
if (this.abortController.signal.aborted) {
clearTimeout(timer);
reject(this.abortController.signal.reason);
return;
}
this.abortController.signal.addEventListener("abort", onAbort, { once: true });
});
}
}
if (!this.isRunning) {
break;
}
}

await this.batchedSpanManager.withBatchedSpan(
loopId,
async (span) => {
Expand Down
118 changes: 118 additions & 0 deletions internal-packages/run-engine/src/batch-queue/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { redisTest } from "@internal/testcontainers";
import { describe, expect, vi } from "vitest";
import { BatchQueue } from "../index.js";
import type { GlobalRateLimiter } from "@trigger.dev/redis-worker";
import type { CompleteBatchResult, InitializeBatchOptions, BatchItem } from "../types.js";

vi.setConfig({ testTimeout: 60_000 });
Expand Down Expand Up @@ -658,4 +659,121 @@ describe("BatchQueue", () => {
}
);
});

describe("global rate limiter at worker queue consumer level", () => {
redisTest(
"should call rate limiter before each processing attempt",
async ({ redisContainer }) => {
let limitCallCount = 0;
const rateLimiter: GlobalRateLimiter = {
async limit() {
limitCallCount++;
return { allowed: true };
},
};

const queue = new BatchQueue({
redis: {
host: redisContainer.getHost(),
port: redisContainer.getPort(),
keyPrefix: "test:",
},
drr: { quantum: 5, maxDeficit: 50 },
consumerCount: 1,
consumerIntervalMs: 50,
startConsumers: true,
globalRateLimiter: rateLimiter,
});

let completionResult: CompleteBatchResult | null = null;

try {
queue.onProcessItem(async ({ itemIndex }) => {
return { success: true, runId: `run_${itemIndex}` };
});

queue.onBatchComplete(async (result) => {
completionResult = result;
});

const itemCount = 5;
await queue.initializeBatch(createInitOptions("batch1", "env1", itemCount));
await enqueueItems(queue, "batch1", "env1", createBatchItems(itemCount));

await vi.waitFor(
() => {
expect(completionResult).not.toBeNull();
},
{ timeout: 10000 }
);

expect(completionResult!.successfulRunCount).toBe(itemCount);
// Rate limiter is called before each blockingPop, including iterations
// where no message is available, so count >= items processed
expect(limitCallCount).toBeGreaterThanOrEqual(itemCount);
} finally {
await queue.close();
}
}
);

redisTest(
"should delay processing when rate limited",
async ({ redisContainer }) => {
let limitCallCount = 0;
const rateLimiter: GlobalRateLimiter = {
async limit() {
limitCallCount++;
// Rate limit the first 3 calls, then allow
if (limitCallCount <= 3) {
return { allowed: false, resetAt: Date.now() + 100 };
}
return { allowed: true };
},
};

const queue = new BatchQueue({
redis: {
host: redisContainer.getHost(),
port: redisContainer.getPort(),
keyPrefix: "test:",
},
drr: { quantum: 5, maxDeficit: 50 },
consumerCount: 1,
consumerIntervalMs: 50,
startConsumers: true,
globalRateLimiter: rateLimiter,
});

let completionResult: CompleteBatchResult | null = null;

try {
queue.onProcessItem(async ({ itemIndex }) => {
return { success: true, runId: `run_${itemIndex}` };
});

queue.onBatchComplete(async (result) => {
completionResult = result;
});

await queue.initializeBatch(createInitOptions("batch1", "env1", 3));
await enqueueItems(queue, "batch1", "env1", createBatchItems(3));

// Should still complete despite initial rate limiting
await vi.waitFor(
() => {
expect(completionResult).not.toBeNull();
},
{ timeout: 10000 }
);

expect(completionResult!.successfulRunCount).toBe(3);
// Rate limiter was called more times than items due to initial rejections
expect(limitCallCount).toBeGreaterThan(3);
} finally {
await queue.close();
}
}
);
});
});
7 changes: 7 additions & 0 deletions internal-packages/run-engine/src/batch-queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,15 @@ export type BatchQueueOptions = {
/**
* Optional global rate limiter to limit processing across all consumers.
* When configured, limits the max items/second processed globally.
* Rate limiting happens at the worker queue consumer level (1 token per item).
*/
globalRateLimiter?: GlobalRateLimiter;
/**
* Maximum number of items allowed in the worker queue before claiming pauses.
* Prevents unbounded worker queue growth which could cause visibility timeouts.
* Disabled by default (undefined = no limit).
*/
workerQueueMaxDepth?: number;
/** Logger instance */
logger?: Logger;
logLevel?: LogLevel;
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ export class RunEngine {
consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100,
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,
globalRateLimiter: options.batchQueue?.globalRateLimiter,
workerQueueMaxDepth: options.batchQueue?.workerQueueMaxDepth,
startConsumers: startBatchQueueConsumers,
retry: options.batchQueue?.retry,
tracer: options.tracer,
Expand Down
2 changes: 2 additions & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ export type RunEngineOptions = {
defaultConcurrency?: number;
/** Optional global rate limiter to limit processing across all consumers */
globalRateLimiter?: GlobalRateLimiter;
/** Maximum worker queue depth before claiming pauses (protects visibility timeouts) */
workerQueueMaxDepth?: number;
/** Retry configuration for failed batch items */
retry?: {
/** Maximum number of attempts (including the first). Default: 1 (no retries) */
Expand Down
30 changes: 16 additions & 14 deletions packages/redis-worker/src/fair-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import type {
FairQueueKeyProducer,
FairQueueOptions,
FairScheduler,
GlobalRateLimiter,
QueueCooloffState,
QueueDescriptor,
SchedulerContext,
Expand Down Expand Up @@ -97,8 +96,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
private maxCooloffStatesSize: number;
private queueCooloffStates = new Map<string, QueueCooloffState>();

// Global rate limiter
private globalRateLimiter?: GlobalRateLimiter;
// Worker queue backpressure
private workerQueueMaxDepth: number;
private workerQueueDepthCheckId?: string;

// Consumer tracing
private consumerTraceMaxIterations: number;
Expand Down Expand Up @@ -152,8 +152,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
this.cooloffPeriodMs = options.cooloff?.periodMs ?? 10_000;
this.maxCooloffStatesSize = options.cooloff?.maxStatesSize ?? 1000;

// Global rate limiter
this.globalRateLimiter = options.globalRateLimiter;
// Worker queue backpressure
this.workerQueueMaxDepth = options.workerQueueMaxDepth ?? 0;
this.workerQueueDepthCheckId = options.workerQueueDepthCheckId;

// Consumer tracing
this.consumerTraceMaxIterations = options.consumerTraceMaxIterations ?? 500;
Expand Down Expand Up @@ -1110,16 +1111,17 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
maxClaimCount = Math.min(maxClaimCount, availableCapacity);
}

// Check global rate limit - wait if rate limited
if (this.globalRateLimiter) {
const result = await this.globalRateLimiter.limit();
if (!result.allowed && result.resetAt) {
const waitMs = Math.max(0, result.resetAt - Date.now());
if (waitMs > 0) {
this.logger.debug("Global rate limit reached, waiting", { waitMs, loopId });
await new Promise((resolve) => setTimeout(resolve, waitMs));
}
// Check worker queue depth to prevent unbounded growth.
// Messages in the worker queue are already in-flight with a visibility timeout.
// If the queue is too deep, consumers can't keep up, and messages risk timing out.
if (this.workerQueueMaxDepth > 0 && this.workerQueueDepthCheckId) {
const depth = await this.workerQueueManager.getLength(this.workerQueueDepthCheckId);
if (depth >= this.workerQueueMaxDepth) {
return 0;
}
// Cap claim size to remaining capacity so we don't overshoot the depth limit
const remainingCapacity = this.workerQueueMaxDepth - depth;
maxClaimCount = Math.min(maxClaimCount, remainingCapacity);
}

// Claim batch of messages with visibility timeout
Expand Down
Loading