Skip to content

Commit adc29fc

Browse files
committed
test(mollifier): pin no-starvation property for light env behind heavy envs
Adds a regression test that proves a light env (single buffered entry) is drained within (envs.length - sliceSize + 1) ticks regardless of how many entries the heavy envs have queued. The test uses a stub buffer whose listEnvs/pop pair mirrors the production atomic-Lua semantic: an env disappears from listEnvs the moment its queue empties, so the light env exits the rotation as soon as it's popped — while the heavy envs stay in the rotation until their thousands of entries are drained. Together with the head-of-line fairness test this pins both fairness properties: (1) every env touches every slice slot per cycle (no within-slice bias), and (2) no env's drainage latency depends on the queue depth of other envs (no across-slice starvation).
1 parent 24407fa commit adc29fc

1 file changed

Lines changed: 70 additions & 0 deletions

File tree

packages/redis-worker/src/mollifier/drainer.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,76 @@ describe("MollifierDrainer per-tick env cap", () => {
497497
expect(popsPerTick[0][0]).not.toEqual(popsPerTick[1][0]);
498498
expect(popsPerTick[1][0]).not.toEqual(popsPerTick[2][0]);
499499
});
500+
501+
it("a light env is not starved behind heavy envs", async () => {
502+
// The buffer's atomic Lua removes an env from `mollifier:envs` the
503+
// moment its queue becomes empty, so a heavy env with thousands of
504+
// pending entries stays in listEnvs and a light env with a single
505+
// entry only stays until that one entry pops. Combined with the
506+
// advance-by-1 cursor, this means the light env can't be parked
507+
// behind heavy envs indefinitely — it gets popped within at most
508+
// `envs.length - sliceSize + 1` ticks regardless of how many
509+
// entries the heavy envs have queued.
510+
const heavy = Array.from({ length: 6 }, (_, i) => `env_heavy_${i}`);
511+
const light = "env_light";
512+
const queues = new Map<string, string[]>();
513+
for (const h of heavy) {
514+
queues.set(
515+
h,
516+
Array.from({ length: 100 }, (_, i) => `${h}_run_${i}`),
517+
);
518+
}
519+
queues.set(light, [`${light}_run_0`]);
520+
521+
const buffer = makeStubBuffer({
522+
listEnvs: async () =>
523+
[...queues.keys()].filter((k) => (queues.get(k)?.length ?? 0) > 0),
524+
pop: async (envId: string) => {
525+
const q = queues.get(envId);
526+
if (!q || q.length === 0) return null;
527+
const runId = q.shift()!;
528+
return {
529+
runId,
530+
envId,
531+
orgId: "org_1",
532+
payload: "{}",
533+
status: "DRAINING",
534+
attempts: 0,
535+
createdAt: new Date(),
536+
} as any;
537+
},
538+
});
539+
540+
const drainer = new MollifierDrainer({
541+
buffer,
542+
handler: async () => {},
543+
concurrency: 4,
544+
maxAttempts: 3,
545+
isRetryable: () => false,
546+
maxEnvsPerTick: 4, // < 7 envs so we exercise slicing
547+
logger: new Logger("test-drainer", "log"),
548+
});
549+
550+
// 7 envs, sliceSize=4 → worst-case wait for env_light is 4 ticks
551+
// (it appears in the slice in exactly 4 of every 7 ticks). Run 7 to
552+
// give the upper bound a wide margin.
553+
const ticksUntilLightDrained = await (async () => {
554+
for (let tick = 1; tick <= 7; tick++) {
555+
await drainer.runOnce();
556+
if ((queues.get(light)?.length ?? 0) === 0) return tick;
557+
}
558+
return Infinity;
559+
})();
560+
561+
expect(ticksUntilLightDrained).toBeLessThanOrEqual(4);
562+
// Sanity: heavy envs are being worked on (not starved themselves) but
563+
// are far from drained — confirms we measured the right property.
564+
for (const h of heavy) {
565+
const remaining = queues.get(h)!.length;
566+
expect(remaining).toBeGreaterThan(0);
567+
expect(remaining).toBeLessThan(100);
568+
}
569+
});
500570
});
501571

502572
describe("MollifierDrainer.start/stop", () => {

0 commit comments

Comments
 (0)