Skip to content

Commit 18e64f6

Browse files
d-csclaude
andcommitted
test(redis-worker): restore real concurrency in batch tests with race-tolerant assertions
The two batch tests that exercise pop-failure / queue-empty behaviour were temporarily set to concurrency=1 to dodge the worker pool's parallel-pick race. That collapsed the worker pool and stopped the tests from validating their semantics under genuine concurrency. Restored concurrency=5 (matching the rest of the suite) and switched the non-deterministic counts to bounded-range assertions: - mid-batch pop failure: actual drained entries are deterministic (the two bad pops + one good pop); failure count is in [1, concurrency] because workers that loop after a sibling's empty/null pop can re-pop the broken env before skip.add propagates. envBadPops is bounded by drainBatchSize + concurrency — the property is "bounded retry", not "exactly one". - stops popping early: popCalls in [3, concurrency + 2] and strictly less than drainBatchSize — the property is "we don't pop all the way to the batch ceiling once the queue empties". These bounds are tight enough to catch a regression to unbounded pops while tolerating the legitimate race between worker iterations. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 3bceb54 commit 18e64f6

1 file changed

Lines changed: 32 additions & 24 deletions

File tree

packages/redis-worker/src/mollifier/drainer.test.ts

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -271,32 +271,38 @@ describe("MollifierDrainer.drainBatchSize", () => {
271271
},
272272
});
273273

274+
const concurrency = 5;
275+
const drainBatchSize = 5;
274276
const drainer = new MollifierDrainer({
275277
buffer,
276278
handler: async (input) => {
277279
handled.push(input.runId);
278280
},
279-
// Concurrency=1 so the worker pool runs sequentially and pop calls
280-
// can't race past the `skip.add(envId)` that fires after a pop
281-
// failure. The semantic this test pins (one env's pop blowup
282-
// aborts its batch and counts as one failure) is the deterministic
283-
// case; multi-worker race semantics are exercised by the safety
284-
// property test below.
285-
concurrency: 1,
281+
concurrency,
286282
maxAttempts: 3,
287283
isRetryable: () => false,
288-
drainBatchSize: 5,
284+
drainBatchSize,
289285
logger: new Logger("test-drainer", "log"),
290286
});
291287

292288
const result = await drainer.runOnce();
293-
// env_bad: 2 successful pops processed (drained) + 1 pop failure (failed).
294-
// env_good: 1 successful pop processed (drained).
289+
// The actual ENTRIES drained are deterministic regardless of races:
290+
// env_bad's pop returns bad_1 then bad_2 (the only two snapshots it
291+
// ever produces) and env_good's pop returns good_1 (its only entry).
295292
expect(result.drained).toBe(3);
296-
expect(result.failed).toBe(1);
297293
expect(new Set(handled)).toEqual(new Set(["bad_1", "bad_2", "good_1"]));
298-
// We stopped popping env_bad on the failure — no fourth attempt.
299-
expect(envBadPops).toBe(3);
294+
// At least one failure is recorded (env_bad's throwing pop). With
295+
// concurrency > 1 the race between "worker loops after empty/null"
296+
// and "skip.add(envBad) propagates" can re-pop the broken env, so
297+
// the upper bound is concurrency. The property we're pinning is
298+
// bounded retry, not "exactly one".
299+
expect(result.failed).toBeGreaterThanOrEqual(1);
300+
expect(result.failed).toBeLessThanOrEqual(concurrency);
301+
// env_bad's pop call count is bounded too — at most concurrency
302+
// retries after the first throw — definitely never reaches the
303+
// drainBatchSize ceiling.
304+
expect(envBadPops).toBeGreaterThanOrEqual(3);
305+
expect(envBadPops).toBeLessThan(drainBatchSize + concurrency);
300306
});
301307

302308
it("fans batched pops out across multiple envs in a single tick", async () => {
@@ -575,29 +581,31 @@ describe("MollifierDrainer.drainBatchSize", () => {
575581
},
576582
});
577583

584+
const concurrency = 5;
585+
const drainBatchSize = 10;
578586
const drainer = new MollifierDrainer({
579587
buffer,
580588
handler: async (input) => {
581589
handled.push(input.runId);
582590
},
583-
// Concurrency=1 isolates the "stop on empty" semantic from the
584-
// worker pool's parallel-pick race: with multiple workers, several
585-
// can pick env_a simultaneously and pop in parallel before any of
586-
// them can `skip.add(envId)`, so the empty-pop count would be
587-
// >1 nondeterministically.
588-
concurrency: 1,
591+
concurrency,
589592
maxAttempts: 3,
590593
isRetryable: () => false,
591-
drainBatchSize: 10,
594+
drainBatchSize,
592595
logger: new Logger("test-drainer", "log"),
593596
});
594597

595598
const r = await drainer.runOnce();
596599
expect(r.drained).toBe(2);
597-
expect(handled).toEqual(["only_1", "only_2"]);
598-
// 2 successful pops + 1 sentinel pop that returned null and ended
599-
// the batch loop — 3 calls, not 10. Bounding stops the Lua spam.
600-
expect(popCalls).toBe(3);
600+
expect(new Set(handled)).toEqual(new Set(["only_1", "only_2"]));
601+
// The property we're pinning: pop calls are bounded by concurrency
602+
// (plus the original two successes) once the queue empties — they
603+
// never run all the way up to drainBatchSize. With concurrency > 1
604+
// multiple workers can race to pop env_a before `skip.add` lands,
605+
// so the upper bound is the worker count, not a tight "3".
606+
expect(popCalls).toBeGreaterThanOrEqual(3); // 2 success + ≥1 sentinel null
607+
expect(popCalls).toBeLessThanOrEqual(concurrency + 2);
608+
expect(popCalls).toBeLessThan(drainBatchSize); // bounded — the actual safety property
601609
});
602610
});
603611

0 commit comments

Comments
 (0)