Skip to content

Commit 5399677

Browse files
committed
feat(supervisor): split backpressure signal and add resume ramp
isEngaged() exposes the hard backpressure state (drives scale-up freeze), while shouldSkipDequeue() additionally ramps after release - skipping a linearly-decaying fraction of attempts over rampMs so the aggregate dequeue rate climbs back to full instead of snapping and re-flooding the cluster.
1 parent 9a3cf7b commit 5399677

2 files changed

Lines changed: 134 additions & 2 deletions

File tree

apps/supervisor/src/backpressure/backpressureMonitor.test.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,93 @@ describe("BackpressureMonitor", () => {
151151

152152
expect(reads()).toBe(readsAtStop);
153153
});
154+
155+
it("isEngaged reflects the hard engaged state (the signal for freezing scale-up)", async () => {
156+
const { source } = countingSource({ engaged: true });
157+
const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 });
158+
159+
monitor.start();
160+
await vi.advanceTimersByTimeAsync(0);
161+
162+
expect(monitor.isEngaged()).toBe(true);
163+
164+
monitor.stop();
165+
});
166+
167+
it("isEngaged is false when clear and when stale", async () => {
168+
const source: BackpressureSignalSource = {
169+
read: async () => ({ engaged: true, ts: Date.now() }),
170+
};
171+
const monitor = new BackpressureMonitor({
172+
enabled: true,
173+
source,
174+
refreshIntervalMs: 1_000_000,
175+
maxVerdictAgeMs: 15_000,
176+
});
177+
178+
monitor.start();
179+
await vi.advanceTimersByTimeAsync(0);
180+
expect(monitor.isEngaged()).toBe(true);
181+
182+
await vi.advanceTimersByTimeAsync(15_001); // stale → fail-open
183+
expect(monitor.isEngaged()).toBe(false);
184+
185+
monitor.stop();
186+
});
187+
188+
it("ramps the dequeue gate after release instead of resuming instantly", async () => {
189+
let engaged = true;
190+
let rnd = 0.5;
191+
const source: BackpressureSignalSource = { read: async () => ({ engaged }) };
192+
const monitor = new BackpressureMonitor({
193+
enabled: true,
194+
source,
195+
refreshIntervalMs: 1000,
196+
rampMs: 10_000,
197+
random: () => rnd,
198+
});
199+
200+
monitor.start();
201+
await vi.advanceTimersByTimeAsync(0);
202+
expect(monitor.shouldSkipDequeue()).toBe(true); // hard engaged
203+
204+
// Release: the next refresh observes the clear verdict and starts the ramp.
205+
engaged = false;
206+
await vi.advanceTimersByTimeAsync(1000);
207+
expect(monitor.isEngaged()).toBe(false);
208+
209+
// Just after release (progress ~0): skip probability ~1, so skip regardless.
210+
rnd = 0.99;
211+
expect(monitor.shouldSkipDequeue()).toBe(true);
212+
213+
// Halfway through the ramp (progress 0.5): skip probability 0.5.
214+
await vi.advanceTimersByTimeAsync(5000);
215+
rnd = 0.4;
216+
expect(monitor.shouldSkipDequeue()).toBe(true); // 0.4 < 0.5 → skip
217+
rnd = 0.6;
218+
expect(monitor.shouldSkipDequeue()).toBe(false); // 0.6 ≥ 0.5 → allow
219+
220+
// Past the ramp window: never skip.
221+
await vi.advanceTimersByTimeAsync(5000);
222+
rnd = 0.0;
223+
expect(monitor.shouldSkipDequeue()).toBe(false);
224+
225+
monitor.stop();
226+
});
227+
228+
it("resumes instantly when no ramp is configured", async () => {
229+
let engaged = true;
230+
const source: BackpressureSignalSource = { read: async () => ({ engaged }) };
231+
const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 });
232+
233+
monitor.start();
234+
await vi.advanceTimersByTimeAsync(0);
235+
expect(monitor.shouldSkipDequeue()).toBe(true);
236+
237+
engaged = false;
238+
await vi.advanceTimersByTimeAsync(1000);
239+
expect(monitor.shouldSkipDequeue()).toBe(false); // no ramp → instant resume
240+
241+
monitor.stop();
242+
});
154243
});

apps/supervisor/src/backpressure/backpressureMonitor.ts

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,24 @@ export type BackpressureMonitorOptions = {
2222
* Guards against the source silently going stale (e.g. hanging reads).
2323
*/
2424
maxVerdictAgeMs?: number;
25+
/**
26+
* If set, after backpressure releases the dequeue gate stays partially engaged
27+
* for this long, skipping a linearly-decaying fraction of attempts so the
28+
* aggregate dequeue rate ramps from ~0 to full instead of snapping to full and
29+
* re-flooding a freshly-recovered cluster. 0/unset = instant resume.
30+
*/
31+
rampMs?: number;
32+
/** Injectable RNG for the resume ramp; defaults to Math.random. */
33+
random?: () => number;
2534
};
2635

2736
const DEFAULT_REFRESH_INTERVAL_MS = 1000;
2837

2938
export class BackpressureMonitor {
3039
private verdict: BackpressureVerdict | null = null;
3140
private timer?: ReturnType<typeof setInterval>;
41+
private wasEngaged = false;
42+
private releasedAt?: number;
3243

3344
constructor(private readonly opts: BackpressureMonitorOptions) {}
3445

@@ -51,8 +62,12 @@ export class BackpressureMonitor {
5162
}
5263
}
5364

54-
/** Hot-path read: synchronous, never performs I/O. */
55-
shouldSkipDequeue(): boolean {
65+
/**
66+
* Hard backpressure state: true while the (fresh) verdict says engaged. This is
67+
* the signal for freezing consumer-pool scale-up - distinct from the dequeue
68+
* gate, which additionally ramps after release. Hot-path read, no I/O.
69+
*/
70+
isEngaged(): boolean {
5671
const verdict = this.verdict;
5772
if (verdict?.engaged !== true) {
5873
return false;
@@ -66,6 +81,26 @@ export class BackpressureMonitor {
6681
return true;
6782
}
6883

84+
/** Hot-path read: synchronous, never performs I/O. */
85+
shouldSkipDequeue(): boolean {
86+
if (this.isEngaged()) {
87+
return true;
88+
}
89+
90+
// Post-release ramp: skip a linearly-decaying fraction of attempts so the
91+
// aggregate dequeue rate climbs back to full over rampMs rather than snapping.
92+
const rampMs = this.opts.rampMs;
93+
if (rampMs && this.releasedAt !== undefined) {
94+
const elapsed = Date.now() - this.releasedAt;
95+
if (elapsed < rampMs) {
96+
const skipProbability = 1 - elapsed / rampMs;
97+
return (this.opts.random ?? Math.random)() < skipProbability;
98+
}
99+
}
100+
101+
return false;
102+
}
103+
69104
private async refresh(): Promise<void> {
70105
try {
71106
this.verdict = await this.opts.source.read();
@@ -74,5 +109,13 @@ export class BackpressureMonitor {
74109
// unknown (no verdict) so dequeue resumes as if backpressure were off.
75110
this.verdict = null;
76111
}
112+
113+
// Track the engaged→released transition to anchor the resume ramp. Based on
114+
// the raw refreshed verdict, not the staleness-adjusted read.
115+
const nowEngaged = this.verdict?.engaged === true;
116+
if (this.wasEngaged && !nowEngaged) {
117+
this.releasedAt = Date.now();
118+
}
119+
this.wasEngaged = nowEngaged;
77120
}
78121
}

0 commit comments

Comments
 (0)