Skip to content

Commit 303ebe2

Browse files
authored
fix(redis): bound BZPOPMIN fast mode to redisBlockWaitFallback (#683)
Address Codex P1 / Claude bot review on 6173f58: The previous bzpopminWaitLoop set `fast` directly from waitForBlockedCommandUpdate's return value. Under sustained zsetWaiters.Signal pressure (a ZADD/ZINCRBY hot key whose signals fire faster than each bzpopminTryAllKeys round finishes) the buffered-1 waiterC stays perpetually full: every wait returns via the signal arm, fast stays true forever, the 100 ms fallback timer never fires. A wrongType write (HSET / SET / etc.) on a co- registered key (multi-key BZPOPMIN) then goes undetected for the entire BLOCK window — a regression vs #666's strict 100 ms ceiling. Fix: track the wall time of the last full check inside bzpopminWaitLoop and demote `fast` back to false once redisBlockWaitFallback has elapsed since that check. Signal-driven wakes still take the fast path during the first 100 ms after a full check; after that, the next wake (signal or timer) runs the full check and resets the budget. The 100 ms WRONGTYPE-detection ceiling is restored regardless of signal rate. Per CLAUDE.md regression-test convention, the failing scenario is locked down by TestRedis_BZPopMinDetectsWrongTypeUnderSignalLoad before the fix: - A signal-pressure goroutine drives zsetWaiters.Signal in a tight loop on bzpop-press-hot. The reader's waiter, registered on both bzpop-press-cold and bzpop-press-hot, wakes on every signal. - After the reader settles into fast mode (~200 ms), a string is SET at bzpop-press-cold. - Pre-fix: the loop never demotes fast=false, the fast-path probe on bzpop-press-cold never detects the wrongType, BZPOPMIN times out at the 3 s BLOCK deadline. - Post-fix: the wall-time gate forces a full check within ~100 ms of the SET, WRONGTYPE surfaces well before the BLOCK deadline. Verified: the test fails on parent commit 6173f58 (loop ran for the full 3 s without surfacing WRONGTYPE) and passes on this commit (returns WRONGTYPE in ~100 ms). Drive-by cleanups from the same review pass: - Collapse tryBZPopMin / tryBZPopMinFast wrappers; bzpopminTryAllKeys calls tryBZPopMinWithMode directly with the fast bool. Both wrappers had no other callers (per grep) and only added one level of indirection. - Move TestRedis_BZPopMinTimesOutOnEmptyKey's stranded doc comment back over its own function; the previous PR mistakenly prepended it to TestRedis_BZPopMinRejectsInitialWrongType. Self-review (CLAUDE.md 5 lenses): 1. Data loss -- None. Loop control flow only; persistBZPopMinResult path unchanged. 2. Concurrency -- The wall-time gate is a local variable; no shared state added. waitForBlockedCommandUpdate's signaled-bool contract is unchanged. 3. Performance -- One time.Since per loop iteration on top of the existing ScanAt cost; negligible. Worst-case fast-path utilisation drops from "always" to "first 100 ms of every full check window," but the fast-mode CPU win is still claimed for the dominant signal-driven case (most BZPOPMIN BLOCK windows complete within one full-check window because the consumer pops the first matched ZADD). 4. Data consistency -- WRONGTYPE detection ceiling is back to 100 ms regardless of signal rate. Fast-path correctness invariants from #677 (signal-driven wake guarantees no wrongType transition) are unchanged. 5. Test coverage -- regression-test-first per CLAUDE.md convention. Existing TestRedis_BZPopMinRejectsInitialWrongType, TestRedis_BZPopMinDetectsMidBlockWrongType, and TestRedis_BZPopMinTimesOutOnEmptyKey continue to lock down the other branches. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * BZPOPMIN now correctly detects wrong-type errors even under sustained signal-driven conditions, preventing timeouts in edge cases * **Tests** * Added test to verify BZPOPMIN wrong-type error detection under high signal load <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 08b2fb4 + fe5b718 commit 303ebe2

2 files changed

Lines changed: 134 additions & 46 deletions

File tree

adapter/redis_bzpopmin_wake_test.go

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package adapter
22

33
import (
44
"context"
5+
"runtime"
56
"testing"
67
"time"
78

@@ -113,12 +114,6 @@ func TestRedis_BZPopMinWakesOnZIncrBy(t *testing.T) {
113114
}
114115
}
115116

116-
// TestRedis_BZPopMinTimesOutOnEmptyKey locks down the BLOCK-timeout
117-
// contract: when no ZADD arrives within the BLOCK window, BZPOPMIN
118-
// returns redis.Nil rather than a protocol error. This guards a
119-
// regression in the wait-loop refactor where the new
120-
// waitForBlockedCommandUpdate timer or context-cancel branch could otherwise
121-
// leak a -ERR reply.
122117
// TestRedis_BZPopMinRejectsInitialWrongType locks down the
123118
// first-iteration full-check invariant: when BZPOPMIN is issued
124119
// against a key that already holds a wrongType encoding (e.g. a
@@ -195,6 +190,99 @@ func TestRedis_BZPopMinDetectsMidBlockWrongType(t *testing.T) {
195190
}
196191
}
197192

193+
// TestRedis_BZPopMinDetectsWrongTypeUnderSignalLoad locks down the
194+
// periodic full-check invariant under sustained signal pressure
195+
// (Codex P1 / Claude bot review on 6173f584).
196+
//
197+
// The fast-path optimisation skips the wrongType slow probe on
198+
// signal-driven wakes. If `fast` is set directly from
199+
// waitForBlockedCommandUpdate's return value, sustained ZADD/ZINCRBY
200+
// signals can keep `fast=true` indefinitely — the 100 ms fallback
201+
// timer never fires because waiterC is constantly refilled. A
202+
// mid-block wrongType write on one of the registered keys then goes
203+
// undetected for the entire BLOCK window.
204+
//
205+
// Trigger: directly drive zsetWaiters.Signal in a tight loop on
206+
// bzpop-press-hot while the reader BZPOPMINs on
207+
// [bzpop-press-cold, bzpop-press-hot]. The reader's waiter is
208+
// registered on both keys, so each signal wakes it; the empty hot
209+
// key fast-probe returns nil, the loop continues with fast=true.
210+
// Driving Signal directly (rather than through a real producer)
211+
// removes the OCC-race non-determinism a stealer goroutine would
212+
// introduce. After the reader settles into fast mode, SET a string
213+
// at bzpop-press-cold. Without a wall-time-based forced full check,
214+
// the reader's fast probe on bzpop-press-cold never detects the
215+
// wrongType and the command times out at the BLOCK deadline
216+
// instead of returning WRONGTYPE.
217+
//
218+
// The fix maintains a lastFullCheck wall time inside bzpopminWaitLoop
219+
// and demotes `fast` back to false when more than redisBlockWaitFallback
220+
// has elapsed since the last full check, restoring the #666 ceiling.
221+
func TestRedis_BZPopMinDetectsWrongTypeUnderSignalLoad(t *testing.T) {
222+
t.Parallel()
223+
nodes, _, _ := createNode(t, 3)
224+
defer shutdown(nodes)
225+
226+
rdbReader := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
227+
defer func() { _ = rdbReader.Close() }()
228+
rdbWriter := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
229+
defer func() { _ = rdbWriter.Close() }()
230+
231+
stealerCtx, stealerCancel := context.WithCancel(context.Background())
232+
defer stealerCancel()
233+
234+
// Synthetic signal generator: tight loop calling
235+
// zsetWaiters.Signal on bzpop-press-hot. The reader's waiter is
236+
// registered on both bzpop-press-cold and bzpop-press-hot, so
237+
// any signal on either key wakes it. No actual zset rows are
238+
// written — the fast-path probe on bzpop-press-hot finds
239+
// nothing and the reader stays blocked. runtime.Gosched lets
240+
// the reader's goroutine make forward progress between
241+
// signals.
242+
signalServer := nodes[0].redisServer
243+
go func() {
244+
key := []byte("bzpop-press-hot")
245+
for stealerCtx.Err() == nil {
246+
signalServer.zsetWaiters.Signal(key)
247+
runtime.Gosched()
248+
}
249+
}()
250+
251+
type popResult struct {
252+
zwk *redis.ZWithKey
253+
err error
254+
}
255+
resultCh := make(chan popResult, 1)
256+
go func() {
257+
zwk, err := rdbReader.BZPopMin(context.Background(), 3*time.Second,
258+
"bzpop-press-cold", "bzpop-press-hot").Result()
259+
resultCh <- popResult{zwk: zwk, err: err}
260+
}()
261+
262+
// Let the reader complete several signal-driven wakes before we
263+
// introduce the wrongType. 200 ms is well above the 100 ms
264+
// fallback budget; with the fix in place a forced full check
265+
// has run during this window.
266+
time.Sleep(200 * time.Millisecond)
267+
268+
require.NoError(t, rdbWriter.Set(context.Background(), "bzpop-press-cold",
269+
"I am a string", 0).Err())
270+
271+
select {
272+
case res := <-resultCh:
273+
require.Error(t, res.err, "BZPOPMIN must surface WRONGTYPE on bzpop-press-cold under signal load (zwk=%v)", res.zwk)
274+
require.Contains(t, res.err.Error(), "WRONGTYPE")
275+
case <-time.After(3500 * time.Millisecond):
276+
t.Fatal("BZPOPMIN did not return WRONGTYPE under sustained signal load — fast mode likely sticky")
277+
}
278+
}
279+
280+
// TestRedis_BZPopMinTimesOutOnEmptyKey locks down the BLOCK-timeout
281+
// contract: when no ZADD arrives within the BLOCK window, BZPOPMIN
282+
// returns redis.Nil rather than a protocol error. This guards a
283+
// regression in the wait-loop refactor where the new
284+
// waitForBlockedCommandUpdate timer or context-cancel branch could otherwise
285+
// leak a -ERR reply.
198286
func TestRedis_BZPopMinTimesOutOnEmptyKey(t *testing.T) {
199287
t.Parallel()
200288
nodes, _, _ := createNode(t, 3)

adapter/redis_compat_commands.go

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3650,23 +3650,13 @@ func (r *RedisServer) zremrangebyrank(conn redcon.Conn, cmd redcon.Command) {
36503650
conn.WriteInt(removed)
36513651
}
36523652

3653-
func (r *RedisServer) tryBZPopMin(key []byte) (*bzpopminResult, error) {
3654-
return r.tryBZPopMinWithMode(key, false)
3655-
}
3656-
3657-
// tryBZPopMinFast is the signal-driven-wake variant of tryBZPopMin.
3658-
// It uses keyTypeAtExpectFast (no slow-path fallback, no wrongType
3659-
// detection) on the assumption that the only mutation since the
3660-
// caller's previous full check is a ZADD/ZINCRBY — the only writes
3661-
// that fire zsetWaiters.Signal. A wrongType write that arrived since
3662-
// the last full check would not have signalled, so this fast path
3663-
// would treat the key as "still empty" and return nil; the
3664-
// fallback-timer wake in bzpopminWaitLoop, which uses the full
3665-
// tryBZPopMin path, detects such cases within ~redisBlockWaitFallback.
3666-
func (r *RedisServer) tryBZPopMinFast(key []byte) (*bzpopminResult, error) {
3667-
return r.tryBZPopMinWithMode(key, true)
3668-
}
3669-
3653+
// tryBZPopMinWithMode runs one BZPOPMIN attempt against key. The
3654+
// fast flag selects keyTypeAtExpectFast (no slow-path fallback, no
3655+
// wrongType detection) when true; the caller MUST guarantee that the
3656+
// only mutations since the previous full check are signalling writes
3657+
// (ZADD/ZINCRBY for zsetWaiters). bzpopminWaitLoop enforces this by
3658+
// running fast=false on the first iteration and after every
3659+
// fallback-timer wake or wall-time-bounded re-arm.
36703660
func (r *RedisServer) tryBZPopMinWithMode(key []byte, fast bool) (*bzpopminResult, error) {
36713661
ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout)
36723662
defer cancel()
@@ -3786,17 +3776,28 @@ func (r *RedisServer) bzpopminWaitLoop(conn redcon.Conn, keys [][]byte, deadline
37863776
handlerCtx := r.handlerContext()
37873777
w, release := r.zsetWaiters.Register(keys)
37883778
defer release()
3789-
// fast tracks whether the previous wake came from a Signal (true)
3790-
// or the fallback timer / shutdown (false). The first iteration
3791-
// always runs the full tryBZPopMin so an existing wrongType key
3792-
// surfaces an immediate WRONGTYPE; subsequent iterations after a
3793-
// signal-driven wake skip the wrongType detection because the
3794-
// only writes that fire zsetWaiters.Signal are ZADD / ZINCRBY,
3795-
// neither of which can introduce a wrongType. Fallback-timer
3796-
// wakes always re-run the full check so a HSET / SET / etc. that
3797-
// arrived between iterations is detected within ~one
3798-
// redisBlockWaitFallback (100ms).
3779+
// fast tracks whether the next iteration may skip the wrongType
3780+
// slow probe. The first iteration is always full so an existing
3781+
// wrongType key surfaces an immediate WRONGTYPE; subsequent
3782+
// iterations after a signal-driven wake skip the wrongType
3783+
// detection because zsetWaiters.Signal only fires for ZADD /
3784+
// ZINCRBY (neither of which can introduce a wrongType).
3785+
//
3786+
// lastFullCheck wall-time-bounds how long the fast mode can stay
3787+
// active under sustained signal pressure. Without this gate, a
3788+
// hot key whose zsetWaiters.Signal fires faster than each
3789+
// bzpopminTryAllKeys round finishes can keep waiterC perpetually
3790+
// full, starving the fallback timer and letting a wrongType
3791+
// write on a co-registered key (multi-key BZPOPMIN) go
3792+
// undetected for the entire BLOCK window. Demoting `fast` back
3793+
// to false after redisBlockWaitFallback elapses since the last
3794+
// full check restores the #666 ceiling: WRONGTYPE on any
3795+
// registered key surfaces within ~one fallback interval (100 ms)
3796+
// regardless of signal rate. See
3797+
// TestRedis_BZPopMinDetectsWrongTypeUnderSignalLoad for the
3798+
// regression scenario.
37993799
fast := false
3800+
lastFullCheck := time.Now()
38003801
for {
38013802
if handlerCtx.Err() != nil {
38023803
conn.WriteNull()
@@ -3805,28 +3806,27 @@ func (r *RedisServer) bzpopminWaitLoop(conn redcon.Conn, keys [][]byte, deadline
38053806
if r.bzpopminTryAllKeys(conn, keys, fast) {
38063807
return
38073808
}
3809+
if !fast {
3810+
lastFullCheck = time.Now()
3811+
}
38083812
if !time.Now().Before(deadline) {
38093813
conn.WriteNull()
38103814
return
38113815
}
3812-
fast = waitForBlockedCommandUpdate(handlerCtx, w.C, deadline)
3816+
signaled := waitForBlockedCommandUpdate(handlerCtx, w.C, deadline)
3817+
fast = signaled && time.Since(lastFullCheck) < redisBlockWaitFallback
38133818
}
38143819
}
38153820

3816-
// bzpopminTryAllKeys runs one tryBZPopMin pass across keys. Returns
3817-
// true when a result was written (success or terminal error) and the
3818-
// caller should stop the loop, false to continue waiting. The fast
3819-
// flag selects tryBZPopMinFast (signal-driven wake, skips wrongType
3820-
// detection) over tryBZPopMin (full check).
3821+
// bzpopminTryAllKeys runs one tryBZPopMinWithMode pass across keys.
3822+
// Returns true when a result was written (success or terminal error)
3823+
// and the caller should stop the loop, false to continue waiting.
3824+
// The fast flag is forwarded to tryBZPopMinWithMode: true selects
3825+
// the signal-driven-wake path (skips wrongType detection); false
3826+
// selects the full check.
38213827
func (r *RedisServer) bzpopminTryAllKeys(conn redcon.Conn, keys [][]byte, fast bool) bool {
38223828
for _, key := range keys {
3823-
var result *bzpopminResult
3824-
var err error
3825-
if fast {
3826-
result, err = r.tryBZPopMinFast(key)
3827-
} else {
3828-
result, err = r.tryBZPopMin(key)
3829-
}
3829+
result, err := r.tryBZPopMinWithMode(key, fast)
38303830
if err != nil {
38313831
conn.WriteError(err.Error())
38323832
return true

0 commit comments

Comments
 (0)