Skip to content

Commit 21cf2c9

Browse files
authored
perf(redis): stream entry-per-key layout for O(new) XREAD (#620)
## Summary - Store each Redis stream entry under its own key (`!stream|entry|<userKey><StreamID(16B)>`) with a separate meta key (`!stream|meta|<userKey>` -> `Length | LastMs | LastSeq`); XREAD / XRANGE become bounded range scans that unmarshal only the selected entries, XADD is two small writes, XLEN is a single meta read. - Migration is dual-read with write-time rewrite: reads bump `elastickv_stream_legacy_format_reads_total` when they fall through to the legacy blob; writes convert the legacy blob to the new layout in the same transaction and delete the blob, so XLEN never double-counts. - StreamID suffix is binary big-endian `ms || seq` so lex order over entry keys matches the numeric `(ms, seq)` order the client sees; parsing the "ms-seq" string form for range bounds is done once up front to compute the scan bounds. ## Motivation Incident 2026-04-24: one client doing 11 XREAD/s on a large stream consumed 14 CPU cores on the leader, starving raft and Lua paths. Per the CPU profile, `proto.Unmarshal` alone took 59% of 14 cores because `loadStreamAt` read the entire stream as a single protobuf blob and `unmarshalStreamValue` re-parsed every entry on every read. ## Design - Layout: meta key holds a fixed 24-byte binary record (`Length(8) | LastMs(8) | LastSeq(8)`); entry keys embed the StreamID in big-endian binary so a prefix range scan is enough to serve XRANGE / XREAD without loading the whole stream. `LastMs` / `LastSeq` track the highest ID ever assigned, so XADD `*` stays monotonic even after XTRIM removes the current tail. - Commands: XADD / XREAD / XRANGE / XREVRANGE / XLEN / XTRIM all rewritten to use the new layout. XADD with MAXLEN trims the head by scanning exactly `count` entry keys and emitting Dels in the same txn as the new Put. - Migration: reads try the new meta first; on miss, fall back to the legacy blob and increment the counter. Writes that observe the legacy blob re-emit every legacy entry as a `!stream|entry|…` put, emit the new meta, and delete the legacy blob, all in a single coordinator-dispatched transaction. The legacy blob and the new layout never coexist in a committed state. `redisStreamKey` is marked Deprecated and kept only for the dual-read fallback. ## Commands touched `XADD`, `XREAD`, `XRANGE`, `XREVRANGE`, `XLEN`, `XTRIM`. `deleteLogicalKeyElems` also learns to clean up the new meta + every entry key so `DEL` and overwrite paths stay correct. ## Test plan New tests in `adapter/redis_compat_commands_stream_test.go`: - [x] `TestRedis_StreamXAddXReadRoundTrip` — XADD then XREAD `0` returns all entries. - [x] `TestRedis_StreamXReadLatencyIsConstant` — 10 000 XADDs, then 100 XREADs from `$` must not grow beyond `2 * first + 10ms`. On the old blob path this failed catastrophically. - [x] `TestRedis_StreamXTrimMaxLen` — XADD 100, XTRIM MAXLEN=10, XLEN==10, XRANGE returns the last 10. - [x] `TestRedis_StreamXRangeBounds` — inclusive / exclusive (`(`) bounds on both sides, plus XREVRANGE. - [x] `TestRedis_StreamMigrationFromLegacyBlob` — seed a legacy blob directly, XREAD bumps the counter, XADD migrates and deletes the blob, subsequent XREAD hits the new layout and does NOT bump the counter, XLEN is original + new (no double count), XADD `*` is > the pre-migration last ID. - [x] `TestRedis_StreamAutoIDMonotonicAfterTrim` — XTRIM that empties the stream must not rewind XADD `*`. `go build ./... && go vet ./... && go test -short ./adapter/... ./store/...` all pass locally. ## Follow-ups (not in this PR) - Migrate hash / set / zset single-blob fallbacks off the entry-per-key layouts the same way (Layer 4 design doc already lists these). - XINFO, XACK, consumer groups — none exist today. - Delete the `redisStreamKey` helper + legacy blob fallback once `elastickv_stream_legacy_format_reads_total` stays at zero across production nodes for long enough to guarantee no surviving legacy streams. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Improved stream add semantics with stricter ID validation and guaranteed monotonic auto-IDs. * Multikey stream deletions now reliably remove all stream data within transactions. * **Bug Fixes** * Corrected XRANGE/XREVRANGE inclusive/exclusive boundaries and COUNT clamping. * Ensured XLEN/XTRIM reflect actual entries and legacy-format remnants are cleaned on write. * **Performance** * Reduced blocked read latency and stabilized BLOCK timeout behavior to return null on timeout. * **Tests** * Added extensive end-to-end and unit tests covering XADD/XREAD/XTRIM/XRANGE, deletion, and edge cases. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents ae36349 + a31080f commit 21cf2c9

10 files changed

Lines changed: 2572 additions & 132 deletions

adapter/redis.go

Lines changed: 124 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,6 +1496,15 @@ func (r *RedisServer) mergeInternalNamespaces(start []byte, pattern []byte, merg
14961496
return err
14971497
}
14981498
for _, prefix := range redisInternalPrefixes {
1499+
// !stream|meta| keys are length-prefixed (see store.StreamMetaKey):
1500+
// a pattern-bound scan over the raw prefix would mask out every
1501+
// migrated stream because the user-key bytes do not start at
1502+
// prefix[len(prefix):]. Delegate to the wide-column scan below,
1503+
// which uses streamMetaScanStart(start) to place the user-key
1504+
// lower bound past the length field.
1505+
if prefix == store.StreamMetaPrefix {
1506+
continue
1507+
}
14991508
internalStart, internalEnd := listPatternScanBounds(prefix, pattern)
15001509
if err := mergeScannedKeys(internalStart, internalEnd); err != nil {
15011510
return err
@@ -1518,7 +1527,27 @@ func (r *RedisServer) mergeInternalNamespaces(start []byte, pattern []byte, merg
15181527
}
15191528
zsetMemberStart := store.ZSetMemberScanPrefix(start)
15201529
zsetMemberEnd := prefixScanEnd([]byte(store.ZSetMemberPrefix))
1521-
return mergeScannedKeys(zsetMemberStart, zsetMemberEnd)
1530+
if err := mergeScannedKeys(zsetMemberStart, zsetMemberEnd); err != nil {
1531+
return err
1532+
}
1533+
// Post-migration streams live under !stream|meta|<len><userKey>.
1534+
// The meta record is enough to expose the logical key via KEYS;
1535+
// entry rows are filtered out by redisVisibleUserKey / collectUserKeys
1536+
// so the result stays one-line-per-stream regardless of entry count.
1537+
streamMetaStart := streamMetaScanStart(start)
1538+
streamMetaEnd := prefixScanEnd([]byte(store.StreamMetaPrefix))
1539+
return mergeScannedKeys(streamMetaStart, streamMetaEnd)
1540+
}
1541+
1542+
// streamMetaScanStart returns the lower bound for scanning stream meta
1543+
// keys that begin with the given user-key prefix. The store helper
1544+
// already returns StreamMetaPrefix + len(userKey) + userKey, so callers
1545+
// only need to supply the bounded pattern prefix.
1546+
func streamMetaScanStart(userPrefix []byte) []byte {
1547+
if len(userPrefix) == 0 {
1548+
return []byte(store.StreamMetaPrefix)
1549+
}
1550+
return store.StreamMetaKey(userPrefix)
15221551
}
15231552

15241553
func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) {
@@ -1664,9 +1693,25 @@ func wideColumnVisibleUserKey(key []byte) (userKey []byte, isWide bool) {
16641693
if store.IsSetMemberKey(key) {
16651694
return store.ExtractSetUserKeyFromMember(key), true
16661695
}
1696+
if userKey, ok := streamWideColumnVisibleUserKey(key); ok {
1697+
return userKey, true
1698+
}
16671699
return zsetWideColumnVisibleUserKey(key)
16681700
}
16691701

1702+
// streamWideColumnVisibleUserKey maps a wide-column stream key to its
1703+
// visible user key. Meta keys expose the stream exactly once; entry keys
1704+
// are internal-only so KEYS / SCAN don't leak one result per entry.
1705+
func streamWideColumnVisibleUserKey(key []byte) ([]byte, bool) {
1706+
if store.IsStreamMetaKey(key) {
1707+
return store.ExtractStreamUserKeyFromMeta(key), true
1708+
}
1709+
if store.IsStreamEntryKey(key) {
1710+
return nil, true
1711+
}
1712+
return nil, false
1713+
}
1714+
16701715
func redisVisibleUserKey(key []byte) []byte {
16711716
if bytes.HasPrefix(key, redisTxnKeyPrefix) || isRedisTTLKey(key) {
16721717
return nil
@@ -1863,7 +1908,13 @@ type txnContext struct {
18631908
zsetStates map[string]*zsetTxnState
18641909
ttlStates map[string]*ttlTxnState
18651910
readKeys map[string][]byte
1866-
startTS uint64
1911+
// streamDeletions tracks user keys whose stream wide-column layout must
1912+
// be tombstoned on commit: the !stream|meta|<key> record plus every
1913+
// !stream|entry|<key><ID> row. stageKeyDeletion seeds this (MULTI/EXEC
1914+
// DEL / EXPIRE 0) so migrated streams are properly removed rather than
1915+
// leaking entry keys past the DEL's apparent success.
1916+
streamDeletions map[string][]byte
1917+
startTS uint64
18671918
}
18681919

18691920
type listTxnState struct {
@@ -1918,7 +1969,8 @@ func (t *txnContext) trackTypeReadKeys(key []byte) {
19181969
redisHashKey(key),
19191970
redisSetKey(key),
19201971
redisZSetKey(key),
1921-
redisStreamKey(key),
1972+
redisStreamKey(key), // legacy single-blob stream key
1973+
store.StreamMetaKey(key), // post-migration wide-column stream meta
19221974
redisHLLKey(key),
19231975
redisStrKey(key),
19241976
key, // legacy bare key for fallback reads
@@ -2459,7 +2511,7 @@ func (t *txnContext) stageKeyDeletion(key []byte) (redisResult, error) {
24592511
zs.members = map[string]float64{}
24602512
zs.exists = false
24612513
zs.dirty = true
2462-
// Mark hash, set, stream, and HLL internal keys for deletion.
2514+
// Mark hash, set, stream (legacy blob), and HLL internal keys for deletion.
24632515
for _, internalKey := range [][]byte{
24642516
redisHashKey(key),
24652517
redisSetKey(key),
@@ -2473,6 +2525,19 @@ func (t *txnContext) stageKeyDeletion(key []byte) (redisResult, error) {
24732525
iv.deleted = true
24742526
iv.dirty = true
24752527
}
2528+
// Stage the wide-column stream cleanup: the !stream|meta| record and
2529+
// every !stream|entry| row must also be tombstoned when the user deletes
2530+
// a migrated stream via MULTI/EXEC DEL or EXPIRE 0. Without this step
2531+
// the command would report success but leave rows behind, and a later
2532+
// XLEN / XREAD would "resurrect" the stream. commit() expands this
2533+
// entry into concrete Del elems by scanning the entry-key prefix.
2534+
// The map is lazy-initialised so test fixtures that build a minimal
2535+
// txnContext literal without this field still work.
2536+
if t.streamDeletions == nil {
2537+
t.streamDeletions = map[string][]byte{}
2538+
}
2539+
t.streamDeletions[string(key)] = bytes.Clone(key)
2540+
t.trackReadKey(store.StreamMetaKey(key))
24762541
// Mark legacy bare string key for deletion. We bypass load() here
24772542
// because load() auto-prefixes bare keys to !redis|str|.
24782543
// Track the bare key in the read set for conflict detection.
@@ -2553,9 +2618,23 @@ func (t *txnContext) commit() error {
25532618
// non-string keys get a !redis|ttl| element written in the same transaction.
25542619
ttlElems := t.buildTTLElems()
25552620

2621+
// Derive a single redisDispatchTimeout-bounded context covering both the
2622+
// stream-deletion scans (paginated ScanAt/ExistsAt over StreamEntryScanPrefix)
2623+
// and the final Dispatch. Without this bound, buildStreamDeletionElems would
2624+
// run on the server-lifetime handlerContext, leaving its scans uncancellable
2625+
// from the request side on a slow disk or hot-key pathological commit.
2626+
ctx, cancel := context.WithTimeout(t.server.handlerContext(), redisDispatchTimeout)
2627+
defer cancel()
2628+
2629+
streamElems, err := t.buildStreamDeletionElems(ctx)
2630+
if err != nil {
2631+
return err
2632+
}
2633+
25562634
elems = append(elems, listElems...)
25572635
elems = append(elems, zsetElems...)
25582636
elems = append(elems, ttlElems...)
2637+
elems = append(elems, streamElems...)
25592638
if len(elems) == 0 {
25602639
return nil
25612640
}
@@ -2571,8 +2650,6 @@ func (t *txnContext) commit() error {
25712650
CommitTS: commitTS,
25722651
ReadKeys: readKeys,
25732652
}
2574-
ctx, cancel := context.WithTimeout(t.server.handlerContext(), redisDispatchTimeout)
2575-
defer cancel()
25762653
if _, err := t.server.coordinator.Dispatch(ctx, group); err != nil {
25772654
return errors.WithStack(err)
25782655
}
@@ -2795,6 +2872,39 @@ func buildZSetWideElems(key []byte, st *zsetTxnState) ([]*kv.Elem[kv.OP], int64)
27952872
return elems, lenDelta
27962873
}
27972874

2875+
// buildStreamDeletionElems expands every user key queued in streamDeletions
2876+
// into the Del operations that actually tombstone a migrated stream:
2877+
// !stream|meta|<key> and every !stream|entry|<key><ID> row. Called from
2878+
// commit() so that MULTI/EXEC DEL / EXPIRE 0 on a migrated stream leaves
2879+
// the store in a consistent state instead of only dropping the legacy blob.
2880+
// Each scan runs at t.startTS so the delete honours the transaction's
2881+
// snapshot view.
2882+
//
2883+
// ctx is the redisDispatchTimeout-bounded context derived in commit(); it
2884+
// caps the paginated ExistsAt + scanAllDeltaElems inside
2885+
// deleteStreamWideColumnElems so a pathological staged-stream count cannot
2886+
// hold the EXEC handler open past the per-request budget.
2887+
func (t *txnContext) buildStreamDeletionElems(ctx context.Context) ([]*kv.Elem[kv.OP], error) {
2888+
if len(t.streamDeletions) == 0 {
2889+
return nil, nil
2890+
}
2891+
keys := make([]string, 0, len(t.streamDeletions))
2892+
for k := range t.streamDeletions {
2893+
keys = append(keys, k)
2894+
}
2895+
sort.Strings(keys)
2896+
var elems []*kv.Elem[kv.OP]
2897+
for _, k := range keys {
2898+
userKey := t.streamDeletions[k]
2899+
streamElems, err := t.server.deleteStreamWideColumnElems(ctx, userKey, t.startTS)
2900+
if err != nil {
2901+
return nil, err
2902+
}
2903+
elems = append(elems, streamElems...)
2904+
}
2905+
return elems, nil
2906+
}
2907+
27982908
// buildTTLElems returns !redis|ttl| Raft elements for non-string keys with dirty TTL state.
27992909
// String keys have TTL embedded in the value; they are handled by buildKeyElems.
28002910
func (t *txnContext) buildTTLElems() []*kv.Elem[kv.OP] {
@@ -2827,13 +2937,14 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err
28272937
defer readPin.Release()
28282938

28292939
txn := &txnContext{
2830-
server: r,
2831-
working: map[string]*txnValue{},
2832-
listStates: map[string]*listTxnState{},
2833-
zsetStates: map[string]*zsetTxnState{},
2834-
ttlStates: map[string]*ttlTxnState{},
2835-
readKeys: map[string][]byte{},
2836-
startTS: startTS,
2940+
server: r,
2941+
working: map[string]*txnValue{},
2942+
listStates: map[string]*listTxnState{},
2943+
zsetStates: map[string]*zsetTxnState{},
2944+
ttlStates: map[string]*ttlTxnState{},
2945+
readKeys: map[string][]byte{},
2946+
streamDeletions: map[string][]byte{},
2947+
startTS: startTS,
28372948
}
28382949

28392950
nextResults := make([]redisResult, 0, len(queue))

0 commit comments

Comments
 (0)