Skip to content

Commit 6173f58

Browse files
committed
perf(redis): empty-zset fast path for BZPOPMIN signal-driven wakes
Production profile after #666 deploy showed BZPOPMIN still taking 47.76 percent of n1 leader CPU (down from 86.56 percent pre-deploy but not eliminated). Per-wake breakdown: each tryBZPopMin runs keyTypeAtExpect, the zset prefix probe misses on the empty queue, so the slow-path fallback re-issues all ~19 keyTypeAt seeks just to confirm "still empty". On a busy queue with ZADD signals firing at the producer rate, this baseline grows linearly with the wake rate. Add a fast variant for the signal-driven-wake case: - keyTypeAtExpectFast: probes only the expected type's prefixes (3 seeks for zset). On a miss, returns redisTypeNone immediately without rawKeyTypeAt's 16 follow-up seeks and without the hasHigherPriorityStringEncoding guard. - tryBZPopMinFast: tryBZPopMin variant that uses keyTypeAtExpectFast. No wrongType detection on the type-check step (the load step itself still surfaces obvious corruption). - waitForBlockedCommandUpdate now returns true iff the wake came from a Signal channel (false on fallback timer / handlerCtx cancellation). - bzpopminWaitLoop tracks the wake reason and threads it as a fast bool into bzpopminTryAllKeys / tryBZPopMin{,Fast}. First iteration is always full; signal-driven wakes flip to fast; fallback-timer wakes flip back to full. The wrongType-detection invariant is preserved because zsetWaiters.Signal only fires on ZADD / ZINCRBY (the only writes that produce zset rows visible to the fast probe). A wrongType-introducing write (HSET, SET, XADD on the same user key) does not fire Signal, so the fallback timer's full check is the safety net that detects it within ~one redisBlockWaitFallback (100 ms). Initial-iteration wrongType detection is unchanged. Behavior change: a HSET / SET / XADD that arrives mid-BLOCK on a key being BZPOPMINed surfaces WRONGTYPE within ~100 ms instead of immediately. Previous behavior surfaced WRONGTYPE within ~10 ms under the busy-poll, ~100 ms under #666's fallback (this PR keeps the same 100 ms ceiling). No correctness regression for existing contracts (initial wrongType, BLOCK-timeout-returns-nil). Test coverage: - TestRedis_BZPopMinRejectsInitialWrongType (new): pre-existing string at the BZPOPMIN target key surfaces WRONGTYPE on the very first iteration's full check, well under the BLOCK timeout. - TestRedis_BZPopMinDetectsMidBlockWrongType (new): SET arriving during a BZPOPMIN BLOCK surfaces WRONGTYPE through the fallback- timer full check before the BLOCK budget elapses. Locks down the fallback safety net. - Existing TestRedis_BZPopMinWakesOnZAdd / WakesOnZIncrBy / TimesOutOnEmptyKey continue to lock down the signal path and the deadline contract. Self-review (CLAUDE.md 5 lenses): 1. Data loss -- None. Read-only fast path; persistBZPopMinResult is unchanged. 2. Concurrency -- waitForBlockedCommandUpdate's new bool return is set under the same select that already serialised the three wake sources; no new shared state. 3. Performance -- 16 fewer Pebble seeks per signal-driven wake on empty queues. The number of wakes per BZPOPMIN is unchanged -- this is per-wake cost reduction, not wake-rate reduction. 4. Data consistency -- Signal-driven wake reads at a fresh readTS; the fast probe and the slow probe see the same MVCC snapshot. wrongType detection on the slow path is preserved on initial iteration and on every fallback wake. 5. Test coverage -- two new lockdown tests for the wrongType paths that cannot regress silently if the fast / full mode flag is wired incorrectly.
1 parent 53c687d commit 6173f58

3 files changed

Lines changed: 175 additions & 7 deletions

File tree

adapter/redis_bzpopmin_wake_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,82 @@ func TestRedis_BZPopMinWakesOnZIncrBy(t *testing.T) {
119119
// regression in the wait-loop refactor where the new
120120
// waitForBlockedCommandUpdate timer or context-cancel branch could otherwise
121121
// leak a -ERR reply.
122+
// TestRedis_BZPopMinRejectsInitialWrongType locks down the
123+
// first-iteration full-check invariant: when BZPOPMIN is issued
124+
// against a key that already holds a wrongType encoding (e.g. a
125+
// string), the very first tryBZPopMin must surface WRONGTYPE. The
126+
// signal-driven fast path must never run before the initial full
127+
// check has confirmed the type.
128+
func TestRedis_BZPopMinRejectsInitialWrongType(t *testing.T) {
129+
t.Parallel()
130+
nodes, _, _ := createNode(t, 3)
131+
defer shutdown(nodes)
132+
133+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
134+
defer func() { _ = rdb.Close() }()
135+
ctx := context.Background()
136+
137+
// Pre-write a string at the BZPOPMIN target key.
138+
require.NoError(t, rdb.Set(ctx, "bzpop-wrongtype", "I am a string", 0).Err())
139+
140+
// BZPOPMIN with a 1 s budget — full check on the first iteration
141+
// must catch the wrongType immediately, well under the BLOCK
142+
// timeout.
143+
zwk, err := rdb.BZPopMin(ctx, time.Second, "bzpop-wrongtype").Result()
144+
require.Error(t, err, "BZPOPMIN on a string key must return WRONGTYPE")
145+
require.Contains(t, err.Error(), "WRONGTYPE")
146+
require.Nil(t, zwk)
147+
}
148+
149+
// TestRedis_BZPopMinDetectsMidBlockWrongType locks down the
150+
// fallback-timer-driven full check: when BZPOPMIN is blocked on a
151+
// non-existent key and a wrongType encoding (e.g. SET) is written
152+
// to that key during the BLOCK window, the next fallback-timer
153+
// wake must run the full check and surface WRONGTYPE within
154+
// ~redisBlockWaitFallback (100 ms). A pure signal-driven path
155+
// would miss this because SET / HSET / etc. do not fire
156+
// zsetWaiters.Signal.
157+
//
158+
// The 5 s BLOCK budget gives plenty of slack for CI scheduler
159+
// jitter; the assertion is "WRONGTYPE before BLOCK timeout", not a
160+
// tight latency gate.
161+
func TestRedis_BZPopMinDetectsMidBlockWrongType(t *testing.T) {
162+
t.Parallel()
163+
nodes, _, _ := createNode(t, 3)
164+
defer shutdown(nodes)
165+
166+
rdbReader := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
167+
defer func() { _ = rdbReader.Close() }()
168+
rdbWriter := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
169+
defer func() { _ = rdbWriter.Close() }()
170+
ctx := context.Background()
171+
172+
type popResult struct {
173+
zwk *redis.ZWithKey
174+
err error
175+
}
176+
resultCh := make(chan popResult, 1)
177+
go func() {
178+
zwk, err := rdbReader.BZPopMin(ctx, 5*time.Second, "bzpop-mid-wrongtype").Result()
179+
resultCh <- popResult{zwk: zwk, err: err}
180+
}()
181+
182+
// Let the reader enter the wait loop and exhaust its first
183+
// (full) iteration on a missing key. Then SET a string at the
184+
// same key. The next fallback-timer wake (~100 ms after the
185+
// previous one) must run the full check and surface WRONGTYPE.
186+
time.Sleep(50 * time.Millisecond)
187+
require.NoError(t, rdbWriter.Set(ctx, "bzpop-mid-wrongtype", "I am a string", 0).Err())
188+
189+
select {
190+
case res := <-resultCh:
191+
require.Error(t, res.err, "BZPOPMIN must return WRONGTYPE after mid-block SET, got zwk=%v", res.zwk)
192+
require.Contains(t, res.err.Error(), "WRONGTYPE")
193+
case <-time.After(6 * time.Second):
194+
t.Fatal("BZPOPMIN did not return WRONGTYPE within the BLOCK window after a mid-block SET")
195+
}
196+
}
197+
122198
func TestRedis_BZPopMinTimesOutOnEmptyKey(t *testing.T) {
123199
t.Parallel()
124200
nodes, _, _ := createNode(t, 3)

adapter/redis_compat_commands.go

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3651,12 +3651,35 @@ func (r *RedisServer) zremrangebyrank(conn redcon.Conn, cmd redcon.Command) {
36513651
}
36523652

36533653
func (r *RedisServer) tryBZPopMin(key []byte) (*bzpopminResult, error) {
3654+
return r.tryBZPopMinWithMode(key, false)
3655+
}
3656+
3657+
// tryBZPopMinFast is the signal-driven-wake variant of tryBZPopMin.
3658+
// It uses keyTypeAtExpectFast (no slow-path fallback, no wrongType
3659+
// detection) on the assumption that the only mutation since the
3660+
// caller's previous full check is a ZADD/ZINCRBY — the only writes
3661+
// that fire zsetWaiters.Signal. A wrongType write that arrived since
3662+
// the last full check would not have signalled, so this fast path
3663+
// would treat the key as "still empty" and return nil; the
3664+
// fallback-timer wake in bzpopminWaitLoop, which uses the full
3665+
// tryBZPopMin path, detects such cases within ~redisBlockWaitFallback.
3666+
func (r *RedisServer) tryBZPopMinFast(key []byte) (*bzpopminResult, error) {
3667+
return r.tryBZPopMinWithMode(key, true)
3668+
}
3669+
3670+
func (r *RedisServer) tryBZPopMinWithMode(key []byte, fast bool) (*bzpopminResult, error) {
36543671
ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout)
36553672
defer cancel()
36563673
var result *bzpopminResult
36573674
err := r.retryRedisWrite(ctx, func() error {
36583675
readTS := r.readTS()
3659-
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeZSet)
3676+
var typ redisValueType
3677+
var err error
3678+
if fast {
3679+
typ, err = r.keyTypeAtExpectFast(ctx, key, readTS, redisTypeZSet)
3680+
} else {
3681+
typ, err = r.keyTypeAtExpect(ctx, key, readTS, redisTypeZSet)
3682+
}
36603683
if err != nil {
36613684
return err
36623685
}
@@ -3763,28 +3786,47 @@ func (r *RedisServer) bzpopminWaitLoop(conn redcon.Conn, keys [][]byte, deadline
37633786
handlerCtx := r.handlerContext()
37643787
w, release := r.zsetWaiters.Register(keys)
37653788
defer release()
3789+
// fast tracks whether the previous wake came from a Signal (true)
3790+
// or the fallback timer / shutdown (false). The first iteration
3791+
// always runs the full tryBZPopMin so an existing wrongType key
3792+
// surfaces an immediate WRONGTYPE; subsequent iterations after a
3793+
// signal-driven wake skip the wrongType detection because the
3794+
// only writes that fire zsetWaiters.Signal are ZADD / ZINCRBY,
3795+
// neither of which can introduce a wrongType. Fallback-timer
3796+
// wakes always re-run the full check so a HSET / SET / etc. that
3797+
// arrived between iterations is detected within ~one
3798+
// redisBlockWaitFallback (100ms).
3799+
fast := false
37663800
for {
37673801
if handlerCtx.Err() != nil {
37683802
conn.WriteNull()
37693803
return
37703804
}
3771-
if r.bzpopminTryAllKeys(conn, keys) {
3805+
if r.bzpopminTryAllKeys(conn, keys, fast) {
37723806
return
37733807
}
37743808
if !time.Now().Before(deadline) {
37753809
conn.WriteNull()
37763810
return
37773811
}
3778-
waitForBlockedCommandUpdate(handlerCtx, w.C, deadline)
3812+
fast = waitForBlockedCommandUpdate(handlerCtx, w.C, deadline)
37793813
}
37803814
}
37813815

37823816
// bzpopminTryAllKeys runs one tryBZPopMin pass across keys. Returns
37833817
// true when a result was written (success or terminal error) and the
3784-
// caller should stop the loop, false to continue waiting.
3785-
func (r *RedisServer) bzpopminTryAllKeys(conn redcon.Conn, keys [][]byte) bool {
3818+
// caller should stop the loop, false to continue waiting. The fast
3819+
// flag selects tryBZPopMinFast (signal-driven wake, skips wrongType
3820+
// detection) over tryBZPopMin (full check).
3821+
func (r *RedisServer) bzpopminTryAllKeys(conn redcon.Conn, keys [][]byte, fast bool) bool {
37863822
for _, key := range keys {
3787-
result, err := r.tryBZPopMin(key)
3823+
var result *bzpopminResult
3824+
var err error
3825+
if fast {
3826+
result, err = r.tryBZPopMinFast(key)
3827+
} else {
3828+
result, err = r.tryBZPopMin(key)
3829+
}
37883830
if err != nil {
37893831
conn.WriteError(err.Error())
37903832
return true
@@ -3812,7 +3854,16 @@ func (r *RedisServer) bzpopminTryAllKeys(conn redcon.Conn, keys [][]byte) bool {
38123854
// BRPOP / BLMOVE in follow-ups) — the keyWaiterRegistry that produces
38133855
// waiterC is per-domain (streamWaiters vs zsetWaiters), but the
38143856
// timer-and-select shape is identical.
3815-
func waitForBlockedCommandUpdate(handlerCtx context.Context, waiterC <-chan struct{}, deadline time.Time) {
3857+
//
3858+
// Returns true iff the wake came from waiterC (i.e., a producer
3859+
// Signal). False on fallback-timer fire or handlerCtx cancellation.
3860+
// Callers that have a signal-implied invariant (e.g., "only ZADD /
3861+
// ZINCRBY fires zsetWaiters.Signal") can use the return value to
3862+
// pick a faster re-check on the next iteration; fallback wakes
3863+
// always need the full check because writes that bypass Signal
3864+
// (Lua flush, follower-applied entries, wrongType-introducing
3865+
// commands) only become observable through the timer branch.
3866+
func waitForBlockedCommandUpdate(handlerCtx context.Context, waiterC <-chan struct{}, deadline time.Time) bool {
38163867
fallback := redisBlockWaitFallback
38173868
if remaining := time.Until(deadline); remaining < fallback {
38183869
fallback = remaining
@@ -3832,8 +3883,11 @@ func waitForBlockedCommandUpdate(handlerCtx context.Context, waiterC <-chan stru
38323883
}()
38333884
select {
38343885
case <-waiterC:
3886+
return true
38353887
case <-timer.C:
3888+
return false
38363889
case <-handlerCtx.Done():
3890+
return false
38373891
}
38383892
}
38393893

adapter/redis_compat_helpers.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,44 @@ func (r *RedisServer) keyTypeAtExpect(ctx context.Context, key []byte, readTS ui
363363
return r.applyTTLFilter(ctx, key, readTS, expected)
364364
}
365365

366+
// keyTypeAtExpectFast is the signal-driven-wake variant of
367+
// keyTypeAtExpect. On a fast-probe miss it returns redisTypeNone
368+
// directly (no rawKeyTypeAt slow-path fallback, no
369+
// hasHigherPriorityStringEncoding guard). Callers MUST have an
370+
// invariant that the only mutation since the last full check could
371+
// have produced a row visible to probeExpectedType — typically a
372+
// blocking-command wait loop after a Signal-driven wake, where the
373+
// only writes that fire keyWaiterRegistry.Signal are
374+
// expected-type-creating writes (ZADD/ZINCRBY for zsets,
375+
// XADD-and-friends for streams). A wrongType-introducing write
376+
// (HSET, SET, etc.) does NOT signal, so a non-zset key that
377+
// appeared between iterations is invisible to this fast path; the
378+
// blocking command's fallback-timer wake (which uses the slow
379+
// keyTypeAtExpect) is the safety net that detects it within
380+
// ~redisBlockWaitFallback (100ms).
381+
//
382+
// Compared to keyTypeAtExpect on the empty-key case
383+
// (probeExpectedType -> false -> rawKeyTypeAt slow path = ~19
384+
// seeks), the fast variant returns after the 3-seek probe. For a
385+
// BZPOPMIN waiting on an empty zset and being woken by Signal at
386+
// the ZADD rate, this turns each wake from "19 seeks just to
387+
// confirm still-empty" into "0 seeks because the probe found the
388+
// new ZADD's row" or "3 seeks to confirm the ZADD raced and the
389+
// queue is empty again".
390+
func (r *RedisServer) keyTypeAtExpectFast(ctx context.Context, key []byte, readTS uint64, expected redisValueType) (redisValueType, error) {
391+
if expected == redisTypeNone {
392+
return redisTypeNone, nil
393+
}
394+
found, err := r.probeExpectedType(ctx, key, readTS, expected)
395+
if err != nil {
396+
return redisTypeNone, err
397+
}
398+
if !found {
399+
return redisTypeNone, nil
400+
}
401+
return r.applyTTLFilter(ctx, key, readTS, expected)
402+
}
403+
366404
// probeExpectedType issues only the prefix probes for the given type.
367405
// It is intentionally conservative: returning false here means "no row
368406
// of the expected type was visible at readTS", not "the key does not

0 commit comments

Comments
 (0)