Skip to content

Commit 79a6989

Browse files
committed
perf(redis): keyTypeAt fast path for stream/hash/zset commands
keyTypeAt was the leader's #2 CPU consumer in production (74.8 percent of the 30s pprof, on top of XREAD busy-poll which Phase A addressed). Each call issues up to ~19 Pebble seeks across every collection family before returning. Most callers know which type they expect: XADD wants a stream, HSET wants a hash, ZADD wants a zset. Probing every other type is wasted work in the steady state. Add keyTypeAtExpect(ctx, key, readTS, expected) that probes only the prefixes for the expected type (typically 2-3 seeks): - redisTypeStream: StreamMetaKey + legacy redisStreamKey (2 seeks) - redisTypeHash: HashFieldScanPrefix + HashMetaKey + delta (3 seeks) - redisTypeSet/ZSet: same shape (3 seeks) - redisTypeList: ListMetaKey + delta (2 seeks) - redisTypeString: 3 ExistsAt across str/HLL/legacy (3 seeks) On hit, return after the TTL filter -- ~6-9x reduction over the slow path's 19 seeks. On miss, fall through to the full keyTypeAt slow path so wrongType detection still surfaces the actual key type for WRONGTYPE replies. Strict semantics preserved on every branch. Converted call sites: - Stream (6): xaddTxn, streamTypeForWrite, resolveXReadDollarID, xreadOnce (per-iteration, hottest call), xlen, rangeStream. - Hash (8): buildHashFieldElems, hgetSlow, hexistsSlow, hmget, hdelTxn, hlen, hincrbyWithMigration, hgetall. - ZSet (7): zsetRangeEmptyFastResult, zaddTxn, zincrbyTxn, zrangeRead, zrem, zremrangebyrank, tryBZPopMin. Set / list / string / mixed-kind callers (validateExactSetKind, smembers, pfcount, getdel, incr, ltrim, lindex, etc.) are left for a follow-up. Same pattern, but their throughput is lower in the production traffic mix that motivated this work. Test coverage: - TestRedis_StreamCommandsRejectWrongType (new) -- locks down the fall-through path for streams: XADD/XLEN/XRANGE/XREAD on a string key all return WRONGTYPE, proving keyTypeAtExpect's slow-path delegation when the expected probe comes back empty. - Existing TestRedis_HGET_WRONGTYPE / TestRedis_HEXISTS_WRONGTYPE / TestRedis_SISMEMBER_WRONGTYPE etc. continue to lock down the same property for hash/set commands. - Existing collection round-trip tests (TestRedis_Stream*, TestRedis_Hash*, TestRedis_Z*, TestRedis_BullMQ*) cover the fast path's hit case end-to-end -- XADD-then-XREAD, HSET-then-HGET, ZADD-then-ZRANGE all run through keyTypeAtExpect now. Self-review (CLAUDE.md 5 lenses): 1. Data loss -- None. Read-only probe path; the slow-path fallback is the unchanged keyTypeAt, so any branch that previously detected the right type still does. 2. Concurrency -- No new shared state. Each call is read-only against the existing MVCC snapshot at readTS. 3. Performance -- 6-9x seek reduction on the hit case (the steady state for live keys), no extra seeks on the miss case (caller pays the same ~19 seeks they would have without this PR). 4. Data consistency -- TTL filter is applied identically to the slow path; the fast path returns "found at readTS" only after the same applyTTLFilter call. WrongType detection is preserved by the slow- path delegation when expected probes come back empty. 5. Test coverage -- new wrongType lockdown for stream commands; the existing collection-family wrongType tests lock down hash/set/zset. The hit-path is exercised by every existing collection round-trip test which now run through keyTypeAtExpect.
1 parent bf67fa5 commit 79a6989

3 files changed

Lines changed: 145 additions & 21 deletions

File tree

adapter/redis_compat_commands.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2042,7 +2042,7 @@ func (r *RedisServer) applyHashFieldPairs(key []byte, args [][]byte) (int, error
20422042
var added int
20432043
err := r.retryRedisWrite(ctx, func() error {
20442044
readTS := r.readTS()
2045-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
2045+
typ, err := r.keyTypeAtExpect(context.Background(), key, readTS, redisTypeHash)
20462046
if err != nil {
20472047
return err
20482048
}
@@ -2497,7 +2497,7 @@ func (r *RedisServer) zsetRangeEmptyFastResult(ctx context.Context, key []byte,
24972497
// return hit=true with an empty result -- that is the correct
24982498
// Redis answer and saves the slow-path round-trip. Otherwise
24992499
// fall back so the slow path can produce WRONGTYPE.
2500-
typ, typErr := r.keyTypeAt(ctx, key, readTS)
2500+
typ, typErr := r.keyTypeAtExpect(ctx, key, readTS, redisTypeZSet)
25012501
if typErr != nil {
25022502
return false, monitoring.LuaFastPathFallbackOther, cockerrors.WithStack(typErr)
25032503
}
@@ -2524,7 +2524,7 @@ func (r *RedisServer) zsetRangeEmptyFastResult(ctx context.Context, key []byte,
25242524
// hgetSlow falls back to the type-probing path when hashFieldFastLookup
25252525
// misses. Handles legacy-blob hashes and nil / WRONGTYPE disambiguation.
25262526
func (r *RedisServer) hgetSlow(conn redcon.Conn, ctx context.Context, key, field []byte, readTS uint64) {
2527-
typ, err := r.keyTypeAt(ctx, key, readTS)
2527+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeHash)
25282528
if err != nil {
25292529
conn.WriteError(err.Error())
25302530
return
@@ -2555,7 +2555,7 @@ func (r *RedisServer) hmget(conn redcon.Conn, cmd redcon.Command) {
25552555
return
25562556
}
25572557
readTS := r.readTS()
2558-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
2558+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeHash)
25592559
if err != nil {
25602560
conn.WriteError(err.Error())
25612561
return
@@ -2669,7 +2669,7 @@ func (r *RedisServer) resolveHashFieldDelElems(ctx context.Context, key []byte,
26692669

26702670
func (r *RedisServer) hdelTxn(ctx context.Context, key []byte, fields [][]byte) (int, error) {
26712671
readTS := r.readTS()
2672-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
2672+
typ, err := r.keyTypeAtExpect(context.Background(), key, readTS, redisTypeHash)
26732673
if err != nil {
26742674
return 0, err
26752675
}
@@ -2792,7 +2792,7 @@ func (r *RedisServer) hashFieldFastExists(ctx context.Context, key, field []byte
27922792
}
27932793

27942794
func (r *RedisServer) hexistsSlow(conn redcon.Conn, ctx context.Context, key, field []byte, readTS uint64) {
2795-
typ, err := r.keyTypeAt(ctx, key, readTS)
2795+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeHash)
27962796
if err != nil {
27972797
conn.WriteError(err.Error())
27982798
return
@@ -2822,7 +2822,7 @@ func (r *RedisServer) hlen(conn redcon.Conn, cmd redcon.Command) {
28222822
return
28232823
}
28242824
readTS := r.readTS()
2825-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
2825+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeHash)
28262826
if err != nil {
28272827
conn.WriteError(err.Error())
28282828
return
@@ -2941,7 +2941,7 @@ func (r *RedisServer) hincrbyWithMigration(ctx context.Context, key, fieldKey []
29412941

29422942
func (r *RedisServer) hincrbyTxn(ctx context.Context, key, field []byte, increment int64) (int64, error) {
29432943
readTS := r.readTS()
2944-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
2944+
typ, err := r.keyTypeAtExpect(context.Background(), key, readTS, redisTypeHash)
29452945
if err != nil {
29462946
return 0, err
29472947
}
@@ -3040,7 +3040,7 @@ func (r *RedisServer) hgetall(conn redcon.Conn, cmd redcon.Command) {
30403040
return
30413041
}
30423042
readTS := r.readTS()
3043-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
3043+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeHash)
30443044
if err != nil {
30453045
conn.WriteError(err.Error())
30463046
return
@@ -3262,7 +3262,7 @@ func (r *RedisServer) applyZAddPair(ctx context.Context, key []byte, p zaddPair,
32623262

32633263
func (r *RedisServer) zaddTxn(ctx context.Context, key []byte, flags zaddFlags, pairs []zaddPair) (int, error) {
32643264
readTS := r.readTS()
3265-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
3265+
typ, err := r.keyTypeAtExpect(context.Background(), key, readTS, redisTypeZSet)
32663266
if err != nil {
32673267
return 0, err
32683268
}
@@ -3332,7 +3332,7 @@ func (r *RedisServer) zaddTxn(ctx context.Context, key []byte, flags zaddFlags,
33323332
// Returns the new score after applying increment.
33333333
func (r *RedisServer) zincrbyTxn(ctx context.Context, key []byte, member string, increment float64) (float64, error) {
33343334
readTS := r.readTS()
3335-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
3335+
typ, err := r.keyTypeAtExpect(context.Background(), key, readTS, redisTypeZSet)
33363336
if err != nil {
33373337
return 0, err
33383338
}
@@ -3502,7 +3502,7 @@ func (r *RedisServer) zrange(conn redcon.Conn, cmd redcon.Command) {
35023502

35033503
func (r *RedisServer) zrangeRead(conn redcon.Conn, key []byte, start, stop int, opts zrangeOptions) {
35043504
readTS := r.readTS()
3505-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
3505+
typ, err := r.keyTypeAtExpect(context.Background(), key, readTS, redisTypeZSet)
35063506
if err != nil {
35073507
conn.WriteError(err.Error())
35083508
return
@@ -3542,7 +3542,7 @@ func (r *RedisServer) zrem(conn redcon.Conn, cmd redcon.Command) {
35423542
var removed int
35433543
if err := r.retryRedisWrite(ctx, func() error {
35443544
readTS := r.readTS()
3545-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
3545+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeZSet)
35463546
if err != nil {
35473547
return err
35483548
}
@@ -3590,7 +3590,7 @@ func (r *RedisServer) zremrangebyrank(conn redcon.Conn, cmd redcon.Command) {
35903590
var removed int
35913591
if err := r.retryRedisWrite(ctx, func() error {
35923592
readTS := r.readTS()
3593-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
3593+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeZSet)
35943594
if err != nil {
35953595
return err
35963596
}
@@ -3627,7 +3627,7 @@ func (r *RedisServer) tryBZPopMin(key []byte) (*bzpopminResult, error) {
36273627
var result *bzpopminResult
36283628
err := r.retryRedisWrite(ctx, func() error {
36293629
readTS := r.readTS()
3630-
typ, err := r.keyTypeAt(context.Background(), key, readTS)
3630+
typ, err := r.keyTypeAtExpect(context.Background(), key, readTS, redisTypeZSet)
36313631
if err != nil {
36323632
return err
36333633
}
@@ -4013,7 +4013,7 @@ func (r *RedisServer) xadd(conn redcon.Conn, cmd redcon.Command) {
40134013

40144014
func (r *RedisServer) xaddTxn(ctx context.Context, key []byte, req xaddRequest) (string, error) {
40154015
readTS := r.readTS()
4016-
typ, err := r.keyTypeAt(ctx, key, readTS)
4016+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeStream)
40174017
if err != nil {
40184018
return "", err
40194019
}
@@ -4328,7 +4328,7 @@ func (r *RedisServer) xtrim(conn redcon.Conn, cmd redcon.Command) {
43284328
// store errors. Extracted from xtrimTxn so the outer function stays
43294329
// within the cyclop budget.
43304330
func (r *RedisServer) streamTypeForWrite(ctx context.Context, key []byte, readTS uint64) (bool, error) {
4331-
typ, err := r.keyTypeAt(ctx, key, readTS)
4331+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeStream)
43324332
if err != nil {
43334333
return false, err
43344334
}
@@ -4554,7 +4554,7 @@ func (r *RedisServer) resolveXReadAfterIDs(ctx context.Context, req *xreadReques
45544554
// past a BLOCK-window cancel.
45554555
func (r *RedisServer) resolveXReadDollarID(ctx context.Context, key []byte) (string, error) {
45564556
readTS := r.readTS()
4557-
typ, err := r.keyTypeAt(ctx, key, readTS)
4557+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeStream)
45584558
if err != nil {
45594559
return "", err
45604560
}
@@ -4606,7 +4606,7 @@ func (r *RedisServer) xreadOnce(ctx context.Context, req xreadRequest) ([]xreadR
46064606
results := make([]xreadResult, 0, len(req.keys))
46074607
for i, key := range req.keys {
46084608
readTS := r.readTS()
4609-
typ, err := r.keyTypeAt(ctx, key, readTS)
4609+
typ, err := r.keyTypeAtExpect(ctx, key, readTS, redisTypeStream)
46104610
if err != nil {
46114611
return nil, err
46124612
}
@@ -4897,7 +4897,7 @@ func (r *RedisServer) xlen(conn redcon.Conn, cmd redcon.Command) {
48974897
return
48984898
}
48994899
readTS := r.readTS()
4900-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
4900+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeStream)
49014901
if err != nil {
49024902
conn.WriteError(err.Error())
49034903
return
@@ -5001,7 +5001,7 @@ func (r *RedisServer) rangeStream(conn redcon.Conn, cmd redcon.Command, reverse
50015001
}
50025002

50035003
readTS := r.readTS()
5004-
typ, err := r.keyTypeAt(context.Background(), cmd.Args[1], readTS)
5004+
typ, err := r.keyTypeAtExpect(context.Background(), cmd.Args[1], readTS, redisTypeStream)
50055005
if err != nil {
50065006
conn.WriteError(err.Error())
50075007
return

adapter/redis_compat_commands_stream_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,55 @@ func TestRedis_StreamXReadShortBlockReturnsNullNotError(t *testing.T) {
101101
}
102102
}
103103

104+
// TestRedis_StreamCommandsRejectWrongType locks down the wrongType
105+
// detection on the stream fast path: keyTypeAtExpect short-circuits to
106+
// the slow path when the expected (stream) prefixes return empty, so
107+
// the actual key type is reported and XADD/XREAD/XLEN/XRANGE all
108+
// surface WRONGTYPE rather than treating the key as missing.
109+
func TestRedis_StreamCommandsRejectWrongType(t *testing.T) {
110+
t.Parallel()
111+
nodes, _, _ := createNode(t, 3)
112+
defer shutdown(nodes)
113+
114+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
115+
defer func() { _ = rdb.Close() }()
116+
ctx := context.Background()
117+
118+
// Seed the key as a plain string.
119+
require.NoError(t, rdb.Set(ctx, "stream-wrongtype", "I am a string", 0).Err())
120+
121+
// XADD must reject with WRONGTYPE — the fast-path stream probe
122+
// returns empty (the key has no stream-meta or legacy-stream
123+
// row), so we fall through to the full keyTypeAt slow path which
124+
// detects the string and the caller raises WRONGTYPE.
125+
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
126+
Stream: "stream-wrongtype",
127+
ID: "1-0",
128+
Values: []string{"k", "v"},
129+
}).Result()
130+
require.Error(t, err, "XADD on a string key must return WRONGTYPE")
131+
require.Contains(t, err.Error(), "WRONGTYPE")
132+
133+
// XLEN: same fall-through path — string key surfaces WRONGTYPE.
134+
_, err = rdb.XLen(ctx, "stream-wrongtype").Result()
135+
require.Error(t, err, "XLEN on a string key must return WRONGTYPE")
136+
require.Contains(t, err.Error(), "WRONGTYPE")
137+
138+
// XRANGE: same expectation.
139+
_, err = rdb.XRange(ctx, "stream-wrongtype", "-", "+").Result()
140+
require.Error(t, err, "XRANGE on a string key must return WRONGTYPE")
141+
require.Contains(t, err.Error(), "WRONGTYPE")
142+
143+
// XREAD with a missing stream returns nil (the legacy "no rows"
144+
// path). XREAD with a wrongType key, however, must surface the
145+
// error so the BLOCK loop does not spin forever on a string.
146+
_, err = rdb.XRead(ctx, &redis.XReadArgs{
147+
Streams: []string{"stream-wrongtype", "0"},
148+
}).Result()
149+
require.Error(t, err, "XREAD on a string key must return WRONGTYPE")
150+
require.Contains(t, err.Error(), "WRONGTYPE")
151+
}
152+
104153
func TestRedis_StreamXAddXReadRoundTrip(t *testing.T) {
105154
t.Parallel()
106155
nodes, _, _ := createNode(t, 3)

adapter/redis_compat_helpers.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,81 @@ func (r *RedisServer) keyTypeAt(ctx context.Context, key []byte, readTS uint64)
314314
return r.applyTTLFilter(ctx, key, readTS, typ)
315315
}
316316

317+
// keyTypeAtExpect is a fast-path replacement for keyTypeAt callers that
318+
// know the type they expect to find. The slow path probes ~19 Pebble
319+
// seeks across every collection family before returning. The fast path:
320+
//
321+
// 1. Probe only the prefixes for `expected` (typically 2-3 seeks).
322+
// 2. On hit, apply the TTL filter and return `expected`.
323+
// 3. On miss, fall back to the full keyTypeAt slow path so that
324+
// wrongType collisions (the key exists under a different type) still
325+
// surface as the correct redisValueType.
326+
//
327+
// Steady-state production: most XADD/XREAD/HSET/etc. calls are on a key
328+
// of the expected type, so step 1 hits and the slow-path 19 seeks shrink
329+
// to 2-3. The slow path stays in place for first-write and wrongType
330+
// cases, which keep their existing semantics — wrongTypeError detection
331+
// is preserved by the fall-through.
332+
func (r *RedisServer) keyTypeAtExpect(ctx context.Context, key []byte, readTS uint64, expected redisValueType) (redisValueType, error) {
333+
if expected == redisTypeNone {
334+
return r.keyTypeAt(ctx, key, readTS)
335+
}
336+
found, err := r.probeExpectedType(ctx, key, readTS, expected)
337+
if err != nil {
338+
return redisTypeNone, err
339+
}
340+
if found {
341+
return r.applyTTLFilter(ctx, key, readTS, expected)
342+
}
343+
return r.keyTypeAt(ctx, key, readTS)
344+
}
345+
346+
// probeExpectedType issues only the prefix probes for the given type.
347+
// It is intentionally conservative: returning false here means "no row
348+
// of the expected type was visible at readTS", not "the key does not
349+
// exist". Callers that need strict "does any value type exist for this
350+
// key" semantics must take the keyTypeAt slow path; keyTypeAtExpect
351+
// composes both.
352+
func (r *RedisServer) probeExpectedType(ctx context.Context, key []byte, readTS uint64, expected redisValueType) (bool, error) {
353+
switch expected {
354+
case redisTypeString:
355+
_, found, err := r.probeStringTypes(ctx, key, readTS)
356+
return found, err
357+
case redisTypeList:
358+
_, found, err := r.probeListType(ctx, key, readTS)
359+
return found, err
360+
case redisTypeHash:
361+
return r.wideColumnTypeExists(ctx, key, readTS, store.HashFieldScanPrefix, store.HashMetaKey, store.HashMetaDeltaScanPrefix)
362+
case redisTypeSet:
363+
return r.wideColumnTypeExists(ctx, key, readTS, store.SetMemberScanPrefix, store.SetMetaKey, store.SetMetaDeltaScanPrefix)
364+
case redisTypeZSet:
365+
return r.wideColumnTypeExists(ctx, key, readTS, store.ZSetMemberScanPrefix, store.ZSetMetaKey, store.ZSetMetaDeltaScanPrefix)
366+
case redisTypeStream:
367+
return r.probeStreamExists(ctx, key, readTS)
368+
case redisTypeNone:
369+
// Caller already short-circuited.
370+
return false, nil
371+
}
372+
return false, nil
373+
}
374+
375+
// probeStreamExists checks whether a stream is present at readTS in
376+
// either the new entry-per-key meta layout or the legacy single-blob
377+
// encoding. Two ExistsAt seeks worst-case; one when the new layout is
378+
// present (the common case post-#620 migration).
379+
func (r *RedisServer) probeStreamExists(ctx context.Context, key []byte, readTS uint64) (bool, error) {
380+
if exists, err := r.store.ExistsAt(ctx, store.StreamMetaKey(key), readTS); err != nil {
381+
return false, errors.WithStack(err)
382+
} else if exists {
383+
return true, nil
384+
}
385+
exists, err := r.store.ExistsAt(ctx, redisStreamKey(key), readTS)
386+
if err != nil {
387+
return false, errors.WithStack(err)
388+
}
389+
return exists, nil
390+
}
391+
317392
// applyTTLFilter takes a raw (TTL-unaware) type and returns the
318393
// TTL-filtered equivalent. Callers that need BOTH the raw and filtered
319394
// types (SET NX/XX/GET against a possibly-expired key) can reuse a

0 commit comments

Comments
 (0)