Skip to content

Commit 01e977b

Browse files
authored
Merge branch 'main' into fix/admin-rolling-env-forward
2 parents 3fe2c45 + 98d38b5 commit 01e977b

12 files changed

Lines changed: 966 additions & 64 deletions

adapter/redis_bzpopmin_wake_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,82 @@ func TestRedis_BZPopMinWakesOnZIncrBy(t *testing.T) {
119119
// regression in the wait-loop refactor where the new
120120
// waitForBlockedCommandUpdate timer or context-cancel branch could otherwise
121121
// leak a -ERR reply.
122+
// TestRedis_BZPopMinRejectsInitialWrongType locks down the
123+
// first-iteration full-check invariant: when BZPOPMIN is issued
124+
// against a key that already holds a wrongType encoding (e.g. a
125+
// string), the very first tryBZPopMin must surface WRONGTYPE. The
126+
// signal-driven fast path must never run before the initial full
127+
// check has confirmed the type.
128+
func TestRedis_BZPopMinRejectsInitialWrongType(t *testing.T) {
129+
t.Parallel()
130+
nodes, _, _ := createNode(t, 3)
131+
defer shutdown(nodes)
132+
133+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
134+
defer func() { _ = rdb.Close() }()
135+
ctx := context.Background()
136+
137+
// Pre-write a string at the BZPOPMIN target key.
138+
require.NoError(t, rdb.Set(ctx, "bzpop-wrongtype", "I am a string", 0).Err())
139+
140+
// BZPOPMIN with a 1 s budget — full check on the first iteration
141+
// must catch the wrongType immediately, well under the BLOCK
142+
// timeout.
143+
zwk, err := rdb.BZPopMin(ctx, time.Second, "bzpop-wrongtype").Result()
144+
require.Error(t, err, "BZPOPMIN on a string key must return WRONGTYPE")
145+
require.Contains(t, err.Error(), "WRONGTYPE")
146+
require.Nil(t, zwk)
147+
}
148+
149+
// TestRedis_BZPopMinDetectsMidBlockWrongType locks down the
150+
// fallback-timer-driven full check: when BZPOPMIN is blocked on a
151+
// non-existent key and a wrongType encoding (e.g. SET) is written
152+
// to that key during the BLOCK window, the next fallback-timer
153+
// wake must run the full check and surface WRONGTYPE within
154+
// ~redisBlockWaitFallback (100 ms). A pure signal-driven path
155+
// would miss this because SET / HSET / etc. do not fire
156+
// zsetWaiters.Signal.
157+
//
158+
// The 5 s BLOCK budget gives plenty of slack for CI scheduler
159+
// jitter; the assertion is "WRONGTYPE before BLOCK timeout", not a
160+
// tight latency gate.
161+
func TestRedis_BZPopMinDetectsMidBlockWrongType(t *testing.T) {
162+
t.Parallel()
163+
nodes, _, _ := createNode(t, 3)
164+
defer shutdown(nodes)
165+
166+
rdbReader := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
167+
defer func() { _ = rdbReader.Close() }()
168+
rdbWriter := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
169+
defer func() { _ = rdbWriter.Close() }()
170+
ctx := context.Background()
171+
172+
type popResult struct {
173+
zwk *redis.ZWithKey
174+
err error
175+
}
176+
resultCh := make(chan popResult, 1)
177+
go func() {
178+
zwk, err := rdbReader.BZPopMin(ctx, 5*time.Second, "bzpop-mid-wrongtype").Result()
179+
resultCh <- popResult{zwk: zwk, err: err}
180+
}()
181+
182+
// Let the reader enter the wait loop and exhaust its first
183+
// (full) iteration on a missing key. Then SET a string at the
184+
// same key. The next fallback-timer wake (~100 ms after the
185+
// previous one) must run the full check and surface WRONGTYPE.
186+
time.Sleep(50 * time.Millisecond)
187+
require.NoError(t, rdbWriter.Set(ctx, "bzpop-mid-wrongtype", "I am a string", 0).Err())
188+
189+
select {
190+
case res := <-resultCh:
191+
require.Error(t, res.err, "BZPOPMIN must return WRONGTYPE after mid-block SET, got zwk=%v", res.zwk)
192+
require.Contains(t, res.err.Error(), "WRONGTYPE")
193+
case <-time.After(6 * time.Second):
194+
t.Fatal("BZPOPMIN did not return WRONGTYPE within the BLOCK window after a mid-block SET")
195+
}
196+
}
197+
122198
func TestRedis_BZPopMinTimesOutOnEmptyKey(t *testing.T) {
123199
t.Parallel()
124200
nodes, _, _ := createNode(t, 3)

adapter/redis_compat_commands.go

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3651,12 +3651,35 @@ func (r *RedisServer) zremrangebyrank(conn redcon.Conn, cmd redcon.Command) {
36513651
}
36523652

36533653
func (r *RedisServer) tryBZPopMin(key []byte) (*bzpopminResult, error) {
3654+
return r.tryBZPopMinWithMode(key, false)
3655+
}
3656+
3657+
// tryBZPopMinFast is the signal-driven-wake variant of tryBZPopMin.
3658+
// It uses keyTypeAtExpectFast (no slow-path fallback, no wrongType
3659+
// detection) on the assumption that the only mutation since the
3660+
// caller's previous full check is a ZADD/ZINCRBY — the only writes
3661+
// that fire zsetWaiters.Signal. A wrongType write that arrived since
3662+
// the last full check would not have signalled, so this fast path
3663+
// would treat the key as "still empty" and return nil; the
3664+
// fallback-timer wake in bzpopminWaitLoop, which uses the full
3665+
// tryBZPopMin path, detects such cases within ~redisBlockWaitFallback.
3666+
func (r *RedisServer) tryBZPopMinFast(key []byte) (*bzpopminResult, error) {
3667+
return r.tryBZPopMinWithMode(key, true)
3668+
}
3669+
3670+
func (r *RedisServer) tryBZPopMinWithMode(key []byte, fast bool) (*bzpopminResult, error) {
36543671
ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout)
36553672
defer cancel()
36563673
var result *bzpopminResult
36573674
err := r.retryRedisWrite(ctx, func() error {
36583675
readTS := r.readTS()
3659-
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeZSet)
3676+
var typ redisValueType
3677+
var err error
3678+
if fast {
3679+
typ, err = r.keyTypeAtExpectFast(ctx, key, readTS, redisTypeZSet)
3680+
} else {
3681+
typ, err = r.keyTypeAtExpect(ctx, key, readTS, redisTypeZSet)
3682+
}
36603683
if err != nil {
36613684
return err
36623685
}
@@ -3763,28 +3786,47 @@ func (r *RedisServer) bzpopminWaitLoop(conn redcon.Conn, keys [][]byte, deadline
37633786
handlerCtx := r.handlerContext()
37643787
w, release := r.zsetWaiters.Register(keys)
37653788
defer release()
3789+
// fast tracks whether the previous wake came from a Signal (true)
3790+
// or the fallback timer / shutdown (false). The first iteration
3791+
// always runs the full tryBZPopMin so an existing wrongType key
3792+
// surfaces an immediate WRONGTYPE; subsequent iterations after a
3793+
// signal-driven wake skip the wrongType detection because the
3794+
// only writes that fire zsetWaiters.Signal are ZADD / ZINCRBY,
3795+
// neither of which can introduce a wrongType. Fallback-timer
3796+
// wakes always re-run the full check so a HSET / SET / etc. that
3797+
// arrived between iterations is detected within ~one
3798+
// redisBlockWaitFallback (100ms).
3799+
fast := false
37663800
for {
37673801
if handlerCtx.Err() != nil {
37683802
conn.WriteNull()
37693803
return
37703804
}
3771-
if r.bzpopminTryAllKeys(conn, keys) {
3805+
if r.bzpopminTryAllKeys(conn, keys, fast) {
37723806
return
37733807
}
37743808
if !time.Now().Before(deadline) {
37753809
conn.WriteNull()
37763810
return
37773811
}
3778-
waitForBlockedCommandUpdate(handlerCtx, w.C, deadline)
3812+
fast = waitForBlockedCommandUpdate(handlerCtx, w.C, deadline)
37793813
}
37803814
}
37813815

37823816
// bzpopminTryAllKeys runs one tryBZPopMin pass across keys. Returns
37833817
// true when a result was written (success or terminal error) and the
3784-
// caller should stop the loop, false to continue waiting.
3785-
func (r *RedisServer) bzpopminTryAllKeys(conn redcon.Conn, keys [][]byte) bool {
3818+
// caller should stop the loop, false to continue waiting. The fast
3819+
// flag selects tryBZPopMinFast (signal-driven wake, skips wrongType
3820+
// detection) over tryBZPopMin (full check).
3821+
func (r *RedisServer) bzpopminTryAllKeys(conn redcon.Conn, keys [][]byte, fast bool) bool {
37863822
for _, key := range keys {
3787-
result, err := r.tryBZPopMin(key)
3823+
var result *bzpopminResult
3824+
var err error
3825+
if fast {
3826+
result, err = r.tryBZPopMinFast(key)
3827+
} else {
3828+
result, err = r.tryBZPopMin(key)
3829+
}
37883830
if err != nil {
37893831
conn.WriteError(err.Error())
37903832
return true
@@ -3812,7 +3854,16 @@ func (r *RedisServer) bzpopminTryAllKeys(conn redcon.Conn, keys [][]byte) bool {
38123854
// BRPOP / BLMOVE in follow-ups) — the keyWaiterRegistry that produces
38133855
// waiterC is per-domain (streamWaiters vs zsetWaiters), but the
38143856
// timer-and-select shape is identical.
3815-
func waitForBlockedCommandUpdate(handlerCtx context.Context, waiterC <-chan struct{}, deadline time.Time) {
3857+
//
3858+
// Returns true iff the wake came from waiterC (i.e., a producer
3859+
// Signal). False on fallback-timer fire or handlerCtx cancellation.
3860+
// Callers that have a signal-implied invariant (e.g., "only ZADD /
3861+
// ZINCRBY fires zsetWaiters.Signal") can use the return value to
3862+
// pick a faster re-check on the next iteration; fallback wakes
3863+
// always need the full check because writes that bypass Signal
3864+
// (Lua flush, follower-applied entries, wrongType-introducing
3865+
// commands) only become observable through the timer branch.
3866+
func waitForBlockedCommandUpdate(handlerCtx context.Context, waiterC <-chan struct{}, deadline time.Time) bool {
38163867
fallback := redisBlockWaitFallback
38173868
if remaining := time.Until(deadline); remaining < fallback {
38183869
fallback = remaining
@@ -3832,8 +3883,11 @@ func waitForBlockedCommandUpdate(handlerCtx context.Context, waiterC <-chan stru
38323883
}()
38333884
select {
38343885
case <-waiterC:
3886+
return true
38353887
case <-timer.C:
3888+
return false
38363889
case <-handlerCtx.Done():
3890+
return false
38373891
}
38383892
}
38393893

adapter/redis_compat_helpers.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,44 @@ func (r *RedisServer) keyTypeAtExpect(ctx context.Context, key []byte, readTS ui
363363
return r.applyTTLFilter(ctx, key, readTS, expected)
364364
}
365365

366+
// keyTypeAtExpectFast is the signal-driven-wake variant of
367+
// keyTypeAtExpect. On a fast-probe miss it returns redisTypeNone
368+
// directly (no rawKeyTypeAt slow-path fallback, no
369+
// hasHigherPriorityStringEncoding guard). Callers MUST have an
370+
// invariant that the only mutation since the last full check could
371+
// have produced a row visible to probeExpectedType — typically a
372+
// blocking-command wait loop after a Signal-driven wake, where the
373+
// only writes that fire keyWaiterRegistry.Signal are
374+
// expected-type-creating writes (ZADD/ZINCRBY for zsets,
375+
// XADD-and-friends for streams). A wrongType-introducing write
376+
// (HSET, SET, etc.) does NOT signal, so a non-zset key that
377+
// appeared between iterations is invisible to this fast path; the
378+
// blocking command's fallback-timer wake (which uses the slow
379+
// keyTypeAtExpect) is the safety net that detects it within
380+
// ~redisBlockWaitFallback (100ms).
381+
//
382+
// Compared to keyTypeAtExpect on the empty-key case
383+
// (probeExpectedType -> false -> rawKeyTypeAt slow path = ~19
384+
// seeks), the fast variant returns after the 3-seek probe. For a
385+
// BZPOPMIN waiting on an empty zset and being woken by Signal at
386+
// the ZADD rate, this turns each wake from "19 seeks just to
387+
// confirm still-empty" into "0 seeks because the probe found the
388+
// new ZADD's row" or "3 seeks to confirm the ZADD raced and the
389+
// queue is empty again".
390+
func (r *RedisServer) keyTypeAtExpectFast(ctx context.Context, key []byte, readTS uint64, expected redisValueType) (redisValueType, error) {
391+
if expected == redisTypeNone {
392+
return redisTypeNone, nil
393+
}
394+
found, err := r.probeExpectedType(ctx, key, readTS, expected)
395+
if err != nil {
396+
return redisTypeNone, err
397+
}
398+
if !found {
399+
return redisTypeNone, nil
400+
}
401+
return r.applyTTLFilter(ctx, key, readTS, expected)
402+
}
403+
366404
// probeExpectedType issues only the prefix probes for the given type.
367405
// It is intentionally conservative: returning false here means "no row
368406
// of the expected type was visible at readTS", not "the key does not

docs/admin_ui_key_visualizer_design.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,11 +319,13 @@ Because writes are recorded by Raft leaders and follower-local reads are recorde
319319
|---|---|---|
320320
| 0 | `cmd/elastickv-admin` skeleton, token-protected `Admin` gRPC service stub, empty SPA shell, CI wiring. | Binary builds, `/api/cluster/overview` returns live data from a real node only when the configured admin token is supplied. |
321321
| 1 | Overview, Routes, Raft Groups, Adapters pages. `LiveSummary` added. No sampler. | All read-only pages match `grpcurl` ground truth. |
322-
| 2 | Key Visualizer MVP: in-memory sampler with adaptive sub-sampling, leader writes, leader/follower reads, fan-out across nodes, static matrix API with virtual-bucket metadata. | Benchmark gate green; heatmap shows synthetic hotspot within 2 s of load; ±5% / 95%-CI accuracy SLO holds under synthetic bursts; fan-out returns complete view with 1 node down. |
323-
| 3 | Bytes series, drill-down, split/merge continuity, namespace-isolated persistence of compacted columns distributed **per owning Raft group**, lineage recovery, and retention GC. | Heatmap remains continuous across a live `SplitRange`; restart preserves last 7 days; expired data and stale lineage records are collected; no single Raft group sees more than its share of KeyViz writes. |
322+
| 2-A | Key Visualizer MVP server side: in-memory sampler with adaptive sub-sampling, leader writes, leader/follower reads, static matrix API with virtual-bucket metadata. | Benchmark gate green; ±5% / 95%-CI accuracy SLO holds under synthetic bursts; matrix endpoint returns the local node's view. |
323+
| 2-B | KeyViz SPA integration into `web/admin/`: heatmap page, series picker, row budget, manual + auto refresh. See `docs/design/2026_04_27_proposed_keyviz_spa_integration.md`. | Heatmap shows synthetic hotspot within ~5 s of `make client` driving traffic against `make run`; type check (`tsc -b --noEmit`) clean. |
324+
| 2-C | Cluster fan-out: admin RPC that aggregates each node's local sampler view so the SPA shows a cluster-wide heatmap rather than the local node's slice. | Fan-out returns complete view with 1 node down; SPA renders aggregate within the §10 budget. |
325+
| 3 | Drill-down, split/merge continuity, namespace-isolated persistence of compacted columns distributed **per owning Raft group**, lineage recovery, and retention GC. | Heatmap remains continuous across a live `SplitRange`; restart preserves last 7 days; expired data and stale lineage records are collected; no single Raft group sees more than its share of KeyViz writes. |
324326
| 4 (deferred) | Mutating admin operations (`SplitRange` from UI), browser login, RBAC, and identity-provider integration. Out of scope for this design; a follow-up design will cover it. ||
325327

326-
Phases 0–2 are the minimum operationally useful product; Phase 3 is the "ship-quality" target.
328+
Phases 0–2 (A/B/C) together are the minimum operationally useful product; Phase 3 is the "ship-quality" target. As of 2026-04-27, Phase 2-A is shipped (PRs #639/#645/#646/#647/#651/#660/#661/#672), Phase 2-B lands with this proposal, and Phase 2-C is open. Bytes series, originally listed under Phase 3, was rolled forward into 2-A and is already on the wire.
327329

328330
## 13. Open Questions
329331

0 commit comments

Comments
 (0)