Skip to content

Commit 8190215

Browse files
committed
feat(core): freeze consumer-pool scale-up under backpressure
Add optional shouldPauseScaling to ScalingOptions; when it returns true the pool stops scaling up (scale-down still allowed), so it won't add consumers to drain a queue backpressure is deliberately holding.
1 parent 5399677 commit 8190215

3 files changed

Lines changed: 84 additions & 0 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Add optional `shouldPauseScaling` to the supervisor consumer pool scaling options to freeze scale-up while it returns true (scale-down stays allowed).

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,4 +718,65 @@ describe("RunQueueConsumerPool", () => {
718718
expect(pool.size).toBe(1);
719719
});
720720
});
721+
722+
describe("Backpressure scale-up freeze", () => {
723+
it("freezes scale-up while shouldPauseScaling returns true, then resumes", async () => {
724+
let paused = true;
725+
pool = new RunQueueConsumerPool({
726+
...defaultOptions,
727+
scaling: {
728+
strategy: "smooth",
729+
minConsumerCount: 1,
730+
maxConsumerCount: 10,
731+
scaleUpCooldownMs: 0,
732+
disableJitter: true,
733+
shouldPauseScaling: () => paused,
734+
},
735+
});
736+
await pool.start();
737+
expect(pool.size).toBe(1);
738+
739+
// A high queue would normally scale up, but backpressure freezes it.
740+
pool.updateQueueLength(10);
741+
advanceTimeAndProcessMetrics(1100);
742+
expect(pool.size).toBe(1);
743+
744+
// Once backpressure releases, scaling resumes.
745+
paused = false;
746+
pool.updateQueueLength(10);
747+
advanceTimeAndProcessMetrics(1100);
748+
expect(pool.size).toBeGreaterThan(1);
749+
});
750+
751+
it("still allows scale-down while paused", async () => {
752+
let paused = false;
753+
pool = new RunQueueConsumerPool({
754+
...defaultOptions,
755+
scaling: {
756+
strategy: "smooth",
757+
minConsumerCount: 1,
758+
maxConsumerCount: 10,
759+
scaleUpCooldownMs: 0,
760+
scaleDownCooldownMs: 0,
761+
disableJitter: true,
762+
shouldPauseScaling: () => paused,
763+
},
764+
});
765+
await pool.start();
766+
767+
pool.updateQueueLength(10);
768+
advanceTimeAndProcessMetrics(1100);
769+
const scaledUp = pool.size;
770+
expect(scaledUp).toBeGreaterThan(1);
771+
772+
// Pausing must not block shrinking - we want to drain down, just not grow.
773+
// Loop to let the EWMA-smoothed queue length fall (one batch isn't enough).
774+
paused = true;
775+
for (let i = 0; i < 5; i++) {
776+
pool.updateQueueLength(0);
777+
advanceTimeAndProcessMetrics(1100);
778+
}
779+
expect(pool.size).toBeLessThan(scaledUp);
780+
});
781+
});
721782
});

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ export type ScalingOptions = {
2222
batchWindowMs?: number;
2323
disableJitter?: boolean;
2424
dampingFactor?: number;
25+
/**
26+
* When this returns true, scale-up is frozen (scale-down still allowed). Used to
27+
* stop the pool from adding consumers to drain a queue that backpressure is
28+
* deliberately holding. Synchronous and hot-path-safe.
29+
*/
30+
shouldPauseScaling?: () => boolean;
2531
};
2632

2733
export type ConsumerPoolOptions = {
@@ -49,6 +55,7 @@ export class RunQueueConsumerPool {
4955
private readonly maxConsumerCount: number;
5056
private readonly scalingStrategy: ScalingStrategy;
5157
private readonly disableJitter: boolean;
58+
private readonly shouldPauseScaling?: () => boolean;
5259

5360
private consumers: Map<string, QueueConsumer> = new Map();
5461
private readonly consumerFactory: QueueConsumerFactory;
@@ -79,6 +86,7 @@ export class RunQueueConsumerPool {
7986
this.scaleUpCooldownMs = opts.scaling.scaleUpCooldownMs ?? 10000; // 10 seconds default
8087
this.scaleDownCooldownMs = opts.scaling.scaleDownCooldownMs ?? 60000; // 60 seconds default
8188
this.disableJitter = opts.scaling.disableJitter ?? false;
89+
this.shouldPauseScaling = opts.scaling.shouldPauseScaling;
8290

8391
// Configure EWMA parameters from options
8492
this.ewmaAlpha = opts.scaling.ewmaAlpha ?? 0.3;
@@ -259,6 +267,16 @@ export class RunQueueConsumerPool {
259267

260268
// Check cooldown periods with jitter
261269
if (targetCount > this.consumers.size) {
270+
// Freeze scale-up while backpressure is engaged - don't add consumers to
271+
// drain a queue we're deliberately holding. Scale-down stays allowed.
272+
if (this.shouldPauseScaling?.()) {
273+
this.logger.debug("Scale up frozen by backpressure", {
274+
currentCount: this.consumers.size,
275+
targetCount,
276+
});
277+
return;
278+
}
279+
262280
// Scale up
263281
const effectiveCooldown = this.scaleUpCooldownMs + jitterMs;
264282
if (timeSinceLastScale < effectiveCooldown) {

0 commit comments

Comments
 (0)