Skip to content

Commit ef9b8e2

Browse files
authored
perf(redis): keyTypeAt fast path for stream/hash/zset commands (#665)
## Summary - `keyTypeAt` was the leader's #2 CPU consumer in production (74.8% of the 30s pprof, on top of XREAD busy-poll which #663 addresses). Each call issues up to ~19 Pebble seeks across every collection family before returning. - Add `keyTypeAtExpect(ctx, key, readTS, expected)` — probes only the prefixes for the expected type (2–3 seeks). On hit, applies the TTL filter and returns; on miss, falls through to the full `keyTypeAt` slow path so wrongType detection is preserved. - Convert 21 stream / hash / zset call sites to the fast path. Set / list / string / mixed-kind callers are left for a follow-up. ## CPU savings | Branch | Slow path (before) | Fast path (this PR, hit) | Reduction | |---|---:|---:|---:| | Stream (XADD/XREAD/XLEN/XRANGE) | ~19 seeks | 2 seeks | 9.5× | | Hash (HSET/HGET/HDEL/HLEN/HMGET/HEXISTS/HGETALL/HINCRBY) | ~19 | 3 | 6.3× | | ZSet (ZADD/ZRANGE/ZINCRBY/ZREM/ZREMRANGEBYRANK/BZPOPMIN) | ~19 | 3 | 6.3× | Steady-state: ZADD on an existing zset, XADD on an existing stream, HSET on an existing hash all hit the fast path. First-write and wrongType cases pay the same as before. ## Self-review (CLAUDE.md 5 lenses) 1. **Data loss** — None. Read-only probe path; the slow-path fallback is the unchanged `keyTypeAt`, so any branch that previously detected the right type still does. 2. **Concurrency** — No new shared state. Each call is read-only against the existing MVCC snapshot at `readTS`. 3. **Performance** — 6–9× seek reduction on the hit case (the steady state for live keys), no extra seeks on the miss case (caller pays the same ~19 seeks they would have without this PR). 4. **Data consistency** — TTL filter is applied identically to the slow path; the fast path returns "found at readTS" only after the same `applyTTLFilter` call. WrongType detection is preserved by slow-path delegation when expected probes come back empty. 5. **Test coverage** — new `TestRedis_StreamCommandsRejectWrongType` locks down the stream wrongType fall-through. Existing collection-family wrongType tests (`TestRedis_HGET_WRONGTYPE`, etc.) lock down hash/set. Existing collection round-trip tests cover the hit path; all now run through `keyTypeAtExpect`. ## Test plan - [x] `go test -race -run "TestRedis_Stream|TestRedis_Hash|TestRedis_Z|TestRedis_HGET|TestRedis_HEXISTS|TestRedis_HMGET|TestRedis_HDEL|TestRedis_HLEN|TestRedis_HGETALL|TestRedis_BullMQ|TestRedis_BZPOPMIN|TestNextXAddID|TestXAddEnforce" ./adapter/...` — passes (146s) - [x] `golangci-lint run ./adapter/...` — clean - [ ] CI: full adapter test suite under -race - [ ] CI: jepsen redis suite ## Follow-up - Set / list / string / mixed-kind (validateExactSetKind, smembers, pfcount, getdel, incr, ltrim, lindex, etc.) — same pattern, lower production traffic, deferred. @claude review
2 parents 26f0141 + 0617c43 commit ef9b8e2

3 files changed

Lines changed: 165 additions & 21 deletions

File tree

adapter/redis_compat_commands.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,7 +2051,7 @@ func (r *RedisServer) applyHashFieldPairs(key []byte, args [][]byte) (int, error
20512051
var added int
20522052
err := r.retryRedisWrite(ctx, func() error {
20532053
readTS := r.readTS()
2054-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
2054+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeHash)
20552055
if err != nil {
20562056
return err
20572057
}
@@ -2506,7 +2506,7 @@ func (r *RedisServer) zsetRangeEmptyFastResult(ctx context.Context, key []byte,
25062506
// return hit=true with an empty result -- that is the correct
25072507
// Redis answer and saves the slow-path round-trip. Otherwise
25082508
// fall back so the slow path can produce WRONGTYPE.
2509-
typ, typErr := r.keyTypeAt(ctx, key, readTS)
2509+
typ, typErr := r.keyTypeAtExpect(ctx, key, readTS, redisTypeZSet)
25102510
if typErr != nil {
25112511
return false, monitoring.LuaFastPathFallbackOther, cockerrors.WithStack(typErr)
25122512
}
@@ -2533,7 +2533,7 @@ func (r *RedisServer) zsetRangeEmptyFastResult(ctx context.Context, key []byte,
25332533
// hgetSlow falls back to the type-probing path when hashFieldFastLookup
25342534
// misses. Handles legacy-blob hashes and nil / WRONGTYPE disambiguation.
25352535
func (r *RedisServer) hgetSlow(conn redcon.Conn, ctx context.Context, key, field []byte, readTS uint64) {
2536-
typ, err := r.keyTypeAt(ctx, key, readTS)
2536+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeHash)
25372537
if err != nil {
25382538
conn.WriteError(err.Error())
25392539
return
@@ -2564,7 +2564,7 @@ func (r *RedisServer) hmget(conn redcon.Conn, cmd redcon.Command) {
25642564
return
25652565
}
25662566
readTS := r.readTS()
2567-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
2567+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeHash)
25682568
if err != nil {
25692569
conn.WriteError(err.Error())
25702570
return
@@ -2678,7 +2678,7 @@ func (r *RedisServer) resolveHashFieldDelElems(ctx context.Context, key []byte,
26782678

26792679
func (r *RedisServer) hdelTxn(ctx context.Context, key []byte, fields [][]byte) (int, error) {
26802680
readTS := r.readTS()
2681-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
2681+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeHash)
26822682
if err != nil {
26832683
return 0, err
26842684
}
@@ -2801,7 +2801,7 @@ func (r *RedisServer) hashFieldFastExists(ctx context.Context, key, field []byte
28012801
}
28022802

28032803
func (r *RedisServer) hexistsSlow(conn redcon.Conn, ctx context.Context, key, field []byte, readTS uint64) {
2804-
typ, err := r.keyTypeAt(ctx, key, readTS)
2804+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeHash)
28052805
if err != nil {
28062806
conn.WriteError(err.Error())
28072807
return
@@ -2831,7 +2831,7 @@ func (r *RedisServer) hlen(conn redcon.Conn, cmd redcon.Command) {
28312831
return
28322832
}
28332833
readTS := r.readTS()
2834-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
2834+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeHash)
28352835
if err != nil {
28362836
conn.WriteError(err.Error())
28372837
return
@@ -2950,7 +2950,7 @@ func (r *RedisServer) hincrbyWithMigration(ctx context.Context, key, fieldKey []
29502950

29512951
func (r *RedisServer) hincrbyTxn(ctx context.Context, key, field []byte, increment int64) (int64, error) {
29522952
readTS := r.readTS()
2953-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
2953+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeHash)
29542954
if err != nil {
29552955
return 0, err
29562956
}
@@ -3049,7 +3049,7 @@ func (r *RedisServer) hgetall(conn redcon.Conn, cmd redcon.Command) {
30493049
return
30503050
}
30513051
readTS := r.readTS()
3052-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
3052+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeHash)
30533053
if err != nil {
30543054
conn.WriteError(err.Error())
30553055
return
@@ -3271,7 +3271,7 @@ func (r *RedisServer) applyZAddPair(ctx context.Context, key []byte, p zaddPair,
32713271

32723272
func (r *RedisServer) zaddTxn(ctx context.Context, key []byte, flags zaddFlags, pairs []zaddPair) (int, error) {
32733273
readTS := r.readTS()
3274-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
3274+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeZSet)
32753275
if err != nil {
32763276
return 0, err
32773277
}
@@ -3341,7 +3341,7 @@ func (r *RedisServer) zaddTxn(ctx context.Context, key []byte, flags zaddFlags,
33413341
// Returns the new score after applying increment.
33423342
func (r *RedisServer) zincrbyTxn(ctx context.Context, key []byte, member string, increment float64) (float64, error) {
33433343
readTS := r.readTS()
3344-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
3344+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeZSet)
33453345
if err != nil {
33463346
return 0, err
33473347
}
@@ -3511,7 +3511,7 @@ func (r *RedisServer) zrange(conn redcon.Conn, cmd redcon.Command) {
35113511

35123512
func (r *RedisServer) zrangeRead(conn redcon.Conn, key []byte, start, stop int, opts zrangeOptions) {
35133513
readTS := r.readTS()
3514-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
3514+
typ, err := r.keyTypeAtExpect(context.Background(), key, readTS, redisTypeZSet)
35153515
if err != nil {
35163516
conn.WriteError(err.Error())
35173517
return
@@ -3551,7 +3551,7 @@ func (r *RedisServer) zrem(conn redcon.Conn, cmd redcon.Command) {
35513551
var removed int
35523552
if err := r.retryRedisWrite(ctx, func() error {
35533553
readTS := r.readTS()
3554-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
3554+
typ, err := r.keyTypeAtExpect(ctx, cmd.Args[1], readTS, redisTypeZSet)
35553555
if err != nil {
35563556
return err
35573557
}
@@ -3599,7 +3599,7 @@ func (r *RedisServer) zremrangebyrank(conn redcon.Conn, cmd redcon.Command) {
35993599
var removed int
36003600
if err := r.retryRedisWrite(ctx, func() error {
36013601
readTS := r.readTS()
3602-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
3602+
typ, err := r.keyTypeAtExpect(ctx, cmd.Args[1], readTS, redisTypeZSet)
36033603
if err != nil {
36043604
return err
36053605
}
@@ -3636,7 +3636,7 @@ func (r *RedisServer) tryBZPopMin(key []byte) (*bzpopminResult, error) {
36363636
var result *bzpopminResult
36373637
err := r.retryRedisWrite(ctx, func() error {
36383638
readTS := r.readTS()
3639-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
3639+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeZSet)
36403640
if err != nil {
36413641
return err
36423642
}
@@ -4022,7 +4022,7 @@ func (r *RedisServer) xadd(conn redcon.Conn, cmd redcon.Command) {
40224022

40234023
func (r *RedisServer) xaddTxn(ctx context.Context, key []byte, req xaddRequest) (string, error) {
40244024
readTS := r.readTS()
4025-
typ, err := r.keyTypeAt(ctx, key, readTS)
4025+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeStream)
40264026
if err != nil {
40274027
return "", err
40284028
}
@@ -4358,7 +4358,7 @@ func (r *RedisServer) xtrim(conn redcon.Conn, cmd redcon.Command) {
43584358
// store errors. Extracted from xtrimTxn so the outer function stays
43594359
// within the cyclop budget.
43604360
func (r *RedisServer) streamTypeForWrite(ctx context.Context, key []byte, readTS uint64) (bool, error) {
4361-
typ, err := r.keyTypeAt(ctx, key, readTS)
4361+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeStream)
43624362
if err != nil {
43634363
return false, err
43644364
}
@@ -4584,7 +4584,7 @@ func (r *RedisServer) resolveXReadAfterIDs(ctx context.Context, req *xreadReques
45844584
// past a BLOCK-window cancel.
45854585
func (r *RedisServer) resolveXReadDollarID(ctx context.Context, key []byte) (string, error) {
45864586
readTS := r.readTS()
4587-
typ, err := r.keyTypeAt(ctx, key, readTS)
4587+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeStream)
45884588
if err != nil {
45894589
return "", err
45904590
}
@@ -4636,7 +4636,7 @@ func (r *RedisServer) xreadOnce(ctx context.Context, req xreadRequest) ([]xreadR
46364636
results := make([]xreadResult, 0, len(req.keys))
46374637
for i, key := range req.keys {
46384638
readTS := r.readTS()
4639-
typ, err := r.keyTypeAt(ctx, key, readTS)
4639+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeStream)
46404640
if err != nil {
46414641
return nil, err
46424642
}
@@ -4972,7 +4972,7 @@ func (r *RedisServer) xlen(conn redcon.Conn, cmd redcon.Command) {
49724972
return
49734973
}
49744974
readTS := r.readTS()
4975-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
4975+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeStream)
49764976
if err != nil {
49774977
conn.WriteError(err.Error())
49784978
return
@@ -5076,7 +5076,7 @@ func (r *RedisServer) rangeStream(conn redcon.Conn, cmd redcon.Command, reverse
50765076
}
50775077

50785078
readTS := r.readTS()
5079-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
5079+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeStream)
50805080
if err != nil {
50815081
conn.WriteError(err.Error())
50825082
return

adapter/redis_compat_commands_stream_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,55 @@ func TestRedis_StreamXReadShortBlockReturnsNullNotError(t *testing.T) {
184184
}
185185
}
186186

187+
// TestRedis_StreamCommandsRejectWrongType locks down the wrongType
188+
// detection on the stream fast path: keyTypeAtExpect short-circuits to
189+
// the slow path when the expected (stream) prefixes return empty, so
190+
// the actual key type is reported and XADD/XREAD/XLEN/XRANGE all
191+
// surface WRONGTYPE rather than treating the key as missing.
192+
func TestRedis_StreamCommandsRejectWrongType(t *testing.T) {
193+
t.Parallel()
194+
nodes, _, _ := createNode(t, 3)
195+
defer shutdown(nodes)
196+
197+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
198+
defer func() { _ = rdb.Close() }()
199+
ctx := context.Background()
200+
201+
// Seed the key as a plain string.
202+
require.NoError(t, rdb.Set(ctx, "stream-wrongtype", "I am a string", 0).Err())
203+
204+
// XADD must reject with WRONGTYPE — the fast-path stream probe
205+
// returns empty (the key has no stream-meta or legacy-stream
206+
// row), so we fall through to the full keyTypeAt slow path which
207+
// detects the string and the caller raises WRONGTYPE.
208+
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
209+
Stream: "stream-wrongtype",
210+
ID: "1-0",
211+
Values: []string{"k", "v"},
212+
}).Result()
213+
require.Error(t, err, "XADD on a string key must return WRONGTYPE")
214+
require.Contains(t, err.Error(), "WRONGTYPE")
215+
216+
// XLEN: same fall-through path — string key surfaces WRONGTYPE.
217+
_, err = rdb.XLen(ctx, "stream-wrongtype").Result()
218+
require.Error(t, err, "XLEN on a string key must return WRONGTYPE")
219+
require.Contains(t, err.Error(), "WRONGTYPE")
220+
221+
// XRANGE: same expectation.
222+
_, err = rdb.XRange(ctx, "stream-wrongtype", "-", "+").Result()
223+
require.Error(t, err, "XRANGE on a string key must return WRONGTYPE")
224+
require.Contains(t, err.Error(), "WRONGTYPE")
225+
226+
// XREAD with a missing stream returns nil (the legacy "no rows"
227+
// path). XREAD with a wrongType key, however, must surface the
228+
// error so the BLOCK loop does not spin forever on a string.
229+
_, err = rdb.XRead(ctx, &redis.XReadArgs{
230+
Streams: []string{"stream-wrongtype", "0"},
231+
}).Result()
232+
require.Error(t, err, "XREAD on a string key must return WRONGTYPE")
233+
require.Contains(t, err.Error(), "WRONGTYPE")
234+
}
235+
187236
func TestRedis_StreamXAddXReadRoundTrip(t *testing.T) {
188237
t.Parallel()
189238
nodes, _, _ := createNode(t, 3)

adapter/redis_compat_helpers.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,101 @@ func (r *RedisServer) keyTypeAt(ctx context.Context, key []byte, readTS uint64)
314314
return r.applyTTLFilter(ctx, key, readTS, typ)
315315
}
316316

317+
// keyTypeAtExpect is a fast-path replacement for keyTypeAt callers that
318+
// know the type they expect to find. The slow path probes ~19 Pebble
319+
// seeks across every collection family before returning. The fast path:
320+
//
321+
// 1. Probe only the prefixes for `expected` (typically 2-3 seeks).
322+
// 2. On hit, run the same string-priority guard the wide-column
323+
// fast-path callers use (hashFieldFastLookup, zsetMemberFastScore,
324+
// setMemberFastExists, hashFieldFastExists). When a redisStrKey
325+
// row also exists at the same user key, fall back to the slow
326+
// path so the rawKeyTypeAt "string wins" tiebreaker fires and
327+
// the caller gets WRONGTYPE / nil instead of the
328+
// collection-specific answer. The guard is the narrow form (see
329+
// hasHigherPriorityStringEncoding's doc comment): only redisStrKey
330+
// is checked, the rarer HLL / legacy-bare-key dual-encoding cases
331+
// remain a known residual risk shared with the other fast-path
332+
// callers.
333+
// 3. On miss, fall back to the full keyTypeAt slow path so that
334+
// wrongType collisions (the key exists under a different type)
335+
// still surface as the correct redisValueType.
336+
//
337+
// Steady-state production: most XADD/XREAD/HSET/etc. calls are on a key
338+
// of the expected type, so step 1 hits and the slow-path 19 seeks shrink
339+
// to 2-3 (plus the priority-guard ExistsAt). The slow path stays in
340+
// place for first-write and wrongType cases, which keep their existing
341+
// semantics — wrongTypeError detection is preserved by the
342+
// fall-through.
343+
func (r *RedisServer) keyTypeAtExpect(ctx context.Context, key []byte, readTS uint64, expected redisValueType) (redisValueType, error) {
344+
if expected == redisTypeNone {
345+
return r.keyTypeAt(ctx, key, readTS)
346+
}
347+
found, err := r.probeExpectedType(ctx, key, readTS, expected)
348+
if err != nil {
349+
return redisTypeNone, err
350+
}
351+
if !found {
352+
return r.keyTypeAt(ctx, key, readTS)
353+
}
354+
if expected != redisTypeString {
355+
higher, hErr := r.hasHigherPriorityStringEncoding(ctx, key, readTS)
356+
if hErr != nil {
357+
return redisTypeNone, hErr
358+
}
359+
if higher {
360+
return r.keyTypeAt(ctx, key, readTS)
361+
}
362+
}
363+
return r.applyTTLFilter(ctx, key, readTS, expected)
364+
}
365+
366+
// probeExpectedType issues only the prefix probes for the given type.
367+
// It is intentionally conservative: returning false here means "no row
368+
// of the expected type was visible at readTS", not "the key does not
369+
// exist". Callers that need strict "does any value type exist for this
370+
// key" semantics must take the keyTypeAt slow path; keyTypeAtExpect
371+
// composes both.
372+
func (r *RedisServer) probeExpectedType(ctx context.Context, key []byte, readTS uint64, expected redisValueType) (bool, error) {
373+
switch expected {
374+
case redisTypeString:
375+
_, found, err := r.probeStringTypes(ctx, key, readTS)
376+
return found, err
377+
case redisTypeList:
378+
_, found, err := r.probeListType(ctx, key, readTS)
379+
return found, err
380+
case redisTypeHash:
381+
return r.wideColumnTypeExists(ctx, key, readTS, store.HashFieldScanPrefix, store.HashMetaKey, store.HashMetaDeltaScanPrefix)
382+
case redisTypeSet:
383+
return r.wideColumnTypeExists(ctx, key, readTS, store.SetMemberScanPrefix, store.SetMetaKey, store.SetMetaDeltaScanPrefix)
384+
case redisTypeZSet:
385+
return r.wideColumnTypeExists(ctx, key, readTS, store.ZSetMemberScanPrefix, store.ZSetMetaKey, store.ZSetMetaDeltaScanPrefix)
386+
case redisTypeStream:
387+
return r.probeStreamExists(ctx, key, readTS)
388+
case redisTypeNone:
389+
// Caller already short-circuited.
390+
return false, nil
391+
}
392+
return false, nil
393+
}
394+
395+
// probeStreamExists checks whether a stream is present at readTS in
396+
// either the new entry-per-key meta layout or the legacy single-blob
397+
// encoding. Two ExistsAt seeks worst-case; one when the new layout is
398+
// present (the common case post-#620 migration).
399+
func (r *RedisServer) probeStreamExists(ctx context.Context, key []byte, readTS uint64) (bool, error) {
400+
if exists, err := r.store.ExistsAt(ctx, store.StreamMetaKey(key), readTS); err != nil {
401+
return false, errors.WithStack(err)
402+
} else if exists {
403+
return true, nil
404+
}
405+
exists, err := r.store.ExistsAt(ctx, redisStreamKey(key), readTS)
406+
if err != nil {
407+
return false, errors.WithStack(err)
408+
}
409+
return exists, nil
410+
}
411+
317412
// applyTTLFilter takes a raw (TTL-unaware) type and returns the
318413
// TTL-filtered equivalent. Callers that need BOTH the raw and filtered
319414
// types (SET NX/XX/GET against a possibly-expired key) can reuse a

0 commit comments

Comments
 (0)