Skip to content

Commit 06ad365

Browse files
authored
Merge pull request #486 from bootjp/feat/redis-str-prefix
feat: add !redis|str| prefix to Redis string keys and scope FLUSHALL
2 parents 5515c26 + 71323de commit 06ad365

7 files changed

Lines changed: 218 additions & 37 deletions

File tree

adapter/redis.go

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const (
3939
cmdExpire = "EXPIRE"
4040
cmdFlushAll = "FLUSHALL"
4141
cmdFlushDB = "FLUSHDB"
42+
cmdFlushLegacy = "FLUSHLEGACY"
4243
cmdGet = "GET"
4344
cmdGetDel = "GETDEL"
4445
cmdHDel = "HDEL"
@@ -113,6 +114,7 @@ const (
113114
const (
114115
redisLatestCommitTimeout = 5 * time.Second
115116
redisDispatchTimeout = 10 * time.Second
117+
redisFlushLegacyTimeout = 10 * time.Minute
116118
redisRelayPublishTimeout = 2 * time.Second
117119
redisTraceArgLimit = 6
118120
redisTraceArgMaxLen = 96
@@ -170,6 +172,7 @@ var argsLen = map[string]int{
170172
cmdExpire: -3,
171173
cmdFlushAll: 1,
172174
cmdFlushDB: 1,
175+
cmdFlushLegacy: 1,
173176
cmdGet: 2,
174177
cmdGetDel: 2,
175178
cmdHDel: -3,
@@ -358,6 +361,7 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore
358361
cmdExpire: r.expire,
359362
cmdFlushAll: r.flushall,
360363
cmdFlushDB: r.flushdb,
364+
cmdFlushLegacy: r.flushlegacy,
361365
cmdGet: r.get,
362366
cmdGetDel: r.getdel,
363367
cmdHDel: r.hdel,
@@ -692,7 +696,7 @@ func (r *RedisServer) loadRedisSetState(key []byte, readTS uint64, returnOld boo
692696
return state, nil
693697
}
694698

695-
oldValue, err := r.readValueAt(key, readTS)
699+
oldValue, err := r.readRedisStringAt(key, readTS)
696700
if err != nil && !errors.Is(err, store.ErrKeyNotFound) {
697701
return redisSetState{}, err
698702
}
@@ -709,7 +713,7 @@ func (r *RedisServer) replaceWithStringTxn(ctx context.Context, key, value []byt
709713
}
710714
elems = append(elems, delElems...)
711715
}
712-
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: key, Value: bytes.Clone(value)})
716+
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: redisStrKey(key), Value: bytes.Clone(value)})
713717
if ttl != nil {
714718
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: redisTTLKey(key), Value: encodeRedisTTL(*ttl)})
715719
} else {
@@ -955,7 +959,7 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
955959
return
956960
}
957961

958-
v, err := r.readValueAt(key, readTS)
962+
v, err := r.readRedisStringAt(key, readTS)
959963
if err != nil {
960964
if errors.Is(err, store.ErrKeyNotFound) {
961965
conn.WriteNull()
@@ -1423,28 +1427,51 @@ func (t *txnContext) trackTypeReadKeys(key []byte) {
14231427
redisZSetKey(key),
14241428
redisStreamKey(key),
14251429
redisHLLKey(key),
1426-
key,
1430+
redisStrKey(key),
1431+
key, // legacy bare key for fallback reads
14271432
redisTTLKey(key),
14281433
} {
14291434
t.trackReadKey(readKey)
14301435
}
14311436
}
14321437

14331438
func (t *txnContext) load(key []byte) (*txnValue, error) {
1434-
k := string(key)
1439+
// If the key is already an internal key (e.g., !redis|hash|...,
1440+
// !lst|..., !txn|..., !ddb|..., !s3|..., !dist|...), use it as-is.
1441+
// Otherwise, it's a bare user key for a string value — prefix it.
1442+
storageKey := key
1443+
userKey := extractRedisInternalUserKey(key)
1444+
if !isKnownInternalKey(key) {
1445+
storageKey = redisStrKey(key)
1446+
userKey = key
1447+
}
1448+
k := string(storageKey)
14351449
if tv, ok := t.working[k]; ok {
14361450
return tv, nil
14371451
}
1438-
t.trackReadKey(key)
1439-
if userKey := extractRedisInternalUserKey(key); userKey != nil {
1452+
t.trackReadKey(storageKey)
1453+
if !isKnownInternalKey(key) {
1454+
// Track the bare key too for conflict detection on legacy fallback reads.
1455+
t.trackReadKey(key)
1456+
}
1457+
if userKey != nil {
14401458
t.trackReadKey(redisTTLKey(userKey))
1441-
} else if !bytes.HasPrefix(key, []byte(redisTTLPrefix)) {
1442-
t.trackReadKey(redisTTLKey(key))
14431459
}
14441460
tv := &txnValue{}
1445-
val, err := t.server.readValueAt(key, t.startTS)
1446-
if err != nil && !errors.Is(err, store.ErrKeyNotFound) {
1447-
return nil, errors.WithStack(err)
1461+
var val []byte
1462+
if !isKnownInternalKey(key) {
1463+
// For bare user string keys, use the fallback-aware reader.
1464+
var err error
1465+
val, err = t.server.readRedisStringAt(key, t.startTS)
1466+
if err != nil && !errors.Is(err, store.ErrKeyNotFound) {
1467+
return nil, errors.WithStack(err)
1468+
}
1469+
} else {
1470+
var err error
1471+
val, err = t.server.readValueAt(storageKey, t.startTS)
1472+
if err != nil && !errors.Is(err, store.ErrKeyNotFound) {
1473+
return nil, errors.WithStack(err)
1474+
}
14481475
}
14491476
tv.raw = val
14501477
tv.loaded = true
@@ -1565,7 +1592,7 @@ func (t *txnContext) stagedListType(key string) (redisValueType, bool) {
15651592
}
15661593

15671594
func (t *txnContext) stagedStringType(key string) (redisValueType, bool) {
1568-
tv, ok := t.working[key]
1595+
tv, ok := t.working[string(redisStrKey([]byte(key)))]
15691596
if !ok {
15701597
return redisTypeNone, false
15711598
}
@@ -1811,6 +1838,16 @@ func (t *txnContext) stageKeyDeletion(key []byte) (redisResult, error) {
18111838
iv.deleted = true
18121839
iv.dirty = true
18131840
}
1841+
// Mark legacy bare string key for deletion. We bypass load() here
1842+
// because load() auto-prefixes bare keys to !redis|str|.
1843+
// Track the bare key in the read set for conflict detection.
1844+
t.trackReadKey(key)
1845+
bareK := string(key)
1846+
if _, ok := t.working[bareK]; !ok {
1847+
t.working[bareK] = &txnValue{}
1848+
}
1849+
t.working[bareK].deleted = true
1850+
t.working[bareK].dirty = true
18141851
return redisResult{typ: resultInt, integer: 1}, nil
18151852
}
18161853

@@ -1908,12 +1945,12 @@ func (t *txnContext) buildKeyElems() []*kv.Elem[kv.OP] {
19081945
if !tv.dirty {
19091946
continue
19101947
}
1911-
key := []byte(k)
1948+
storageKey := []byte(k)
19121949
if tv.deleted {
1913-
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: key})
1950+
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: storageKey})
19141951
continue
19151952
}
1916-
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: key, Value: tv.raw})
1953+
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: storageKey, Value: tv.raw})
19171954
}
19181955
return elems
19191956
}

adapter/redis_compat_commands.go

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (r *RedisServer) getdel(conn redcon.Conn, cmd redcon.Command) {
112112
if typ != redisTypeString {
113113
return wrongTypeError()
114114
}
115-
raw, err := r.readValueAt(key, readTS)
115+
raw, err := r.readRedisStringAt(key, readTS)
116116
if err != nil {
117117
// Key may have expired or been deleted between type check and read.
118118
v = nil
@@ -458,6 +458,76 @@ func (r *RedisServer) flushall(conn redcon.Conn, _ redcon.Command) {
458458
r.flushDatabase(conn, true)
459459
}
460460

461+
// deleteLegacyKeys scans the full keyspace and deletes keys that do not belong
462+
// to any known internal prefix. Returns the number of user-visible legacy keys
463+
// deleted. TTL keys are intentionally NOT deleted because the !redis|ttl|
464+
// namespace is shared across all Redis types — deleting them could strip
465+
// expiration from already-migrated or newly-created keys.
466+
func (r *RedisServer) deleteLegacyKeys(ctx context.Context, readTS uint64) (int, error) {
467+
const batchSize = 1000
468+
var totalDeleted int
469+
cursor := make([]byte, 0, batchSize)
470+
for {
471+
kvs, err := r.store.ScanAt(ctx, cursor, nil, batchSize, readTS)
472+
if err != nil {
473+
return totalDeleted, fmt.Errorf("scan: %w", err)
474+
}
475+
if len(kvs) == 0 {
476+
break
477+
}
478+
479+
elems := make([]*kv.Elem[kv.OP], 0, len(kvs))
480+
legacyCount := 0
481+
for _, pair := range kvs {
482+
if !isKnownInternalKey(pair.Key) {
483+
legacyCount++
484+
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: pair.Key})
485+
}
486+
}
487+
488+
if len(elems) > 0 {
489+
if err := r.dispatchElems(ctx, false, readTS, elems); err != nil {
490+
return totalDeleted, err
491+
}
492+
totalDeleted += legacyCount
493+
}
494+
495+
// Advance cursor past the last key in this batch.
496+
lastKey := kvs[len(kvs)-1].Key
497+
cursor = make([]byte, len(lastKey)+1)
498+
copy(cursor, lastKey)
499+
500+
// Yield briefly between batches to avoid saturating the Raft log.
501+
time.Sleep(time.Millisecond)
502+
}
503+
return totalDeleted, nil
504+
}
505+
506+
// flushlegacy deletes old unprefixed Redis string keys that were written before
507+
// the !redis|str| prefix migration. It scans all keys and deletes those that
508+
// do not match any known internal prefix. This is a one-time migration operation.
509+
func (r *RedisServer) flushlegacy(conn redcon.Conn, _ redcon.Command) {
510+
if !r.coordinator.IsLeader() {
511+
n, err := r.proxyFlushLegacy()
512+
if err != nil {
513+
conn.WriteError(err.Error())
514+
return
515+
}
516+
conn.WriteInt(n)
517+
return
518+
}
519+
520+
ctx, cancel := context.WithTimeout(context.Background(), redisFlushLegacyTimeout)
521+
defer cancel()
522+
523+
totalDeleted, err := r.deleteLegacyKeys(ctx, r.readTS())
524+
if err != nil {
525+
conn.WriteError(err.Error())
526+
return
527+
}
528+
conn.WriteInt(totalDeleted)
529+
}
530+
461531
func (r *RedisServer) flushDatabase(conn redcon.Conn, all bool) {
462532
if !r.coordinator.IsLeader() {
463533
if err := r.proxyFlushDatabase(all); err != nil {
@@ -476,12 +546,17 @@ func (r *RedisServer) flushDatabase(conn redcon.Conn, all bool) {
476546
return fmt.Errorf("verify leader: %w", err)
477547
}
478548

479-
// Dispatch a single DEL_PREFIX operation. The FSM on each node will
480-
// scan locally and write tombstones, avoiding the need to enumerate
481-
// all keys here and send them through the Raft log.
549+
// Delete only Redis-related keys. Three DEL_PREFIX operations cover
550+
// all Redis namespaces: "!redis|" (str, hash, set, zset, hll,
551+
// stream, ttl), "!lst|" (list meta + items), and "!zs|" (zset
552+
// wide-column meta/member/score).
553+
// Legacy bare keys are NOT deleted here to avoid a full keyspace
554+
// scan. Run FLUSHLEGACY first to clean up legacy data.
482555
_, err := r.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
483556
Elems: []*kv.Elem[kv.OP]{
484-
{Op: kv.DelPrefix, Key: nil},
557+
{Op: kv.DelPrefix, Key: []byte("!redis|")},
558+
{Op: kv.DelPrefix, Key: []byte("!lst|")},
559+
{Op: kv.DelPrefix, Key: []byte("!zs|")},
485560
},
486561
})
487562
if err != nil {
@@ -1169,7 +1244,7 @@ func (r *RedisServer) incr(conn redcon.Conn, cmd redcon.Command) {
11691244

11701245
current = 0
11711246
if typ == redisTypeString {
1172-
raw, err := r.readValueAt(cmd.Args[1], readTS)
1247+
raw, err := r.readRedisStringAt(cmd.Args[1], readTS)
11731248
if err != nil {
11741249
return err
11751250
}
@@ -1181,7 +1256,7 @@ func (r *RedisServer) incr(conn redcon.Conn, cmd redcon.Command) {
11811256
current++
11821257

11831258
return r.dispatchElems(ctx, true, readTS, []*kv.Elem[kv.OP]{
1184-
{Op: kv.Put, Key: cmd.Args[1], Value: []byte(strconv.FormatInt(current, 10))},
1259+
{Op: kv.Put, Key: redisStrKey(cmd.Args[1]), Value: []byte(strconv.FormatInt(current, 10))},
11851260
{Op: kv.Del, Key: redisTTLKey(cmd.Args[1])},
11861261
})
11871262
}); err != nil {

adapter/redis_compat_helpers.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ func (r *RedisServer) rawKeyTypeAt(ctx context.Context, key []byte, readTS uint6
2929
{typ: redisTypeStream, key: redisStreamKey(key)},
3030
// HyperLogLog is a Redis string subtype. Treat it as "string" for TYPE.
3131
{typ: redisTypeString, key: redisHLLKey(key)},
32+
{typ: redisTypeString, key: redisStrKey(key)},
33+
// Fallback: check bare key for legacy data written before the
34+
// !redis|str| prefix migration.
3235
{typ: redisTypeString, key: key},
3336
}
3437
for _, check := range checks {
@@ -137,9 +140,23 @@ func (r *RedisServer) dispatchElems(ctx context.Context, isTxn bool, startTS uin
137140
return errors.WithStack(err)
138141
}
139142

143+
// readRedisStringAt reads a Redis string value, trying the prefixed key first
144+
// and falling back to the bare key for legacy data written before the
145+
// !redis|str| prefix migration.
146+
func (r *RedisServer) readRedisStringAt(key []byte, readTS uint64) ([]byte, error) {
147+
v, err := r.readValueAt(redisStrKey(key), readTS)
148+
if err == nil {
149+
return v, nil
150+
}
151+
if !errors.Is(err, store.ErrKeyNotFound) {
152+
return nil, err
153+
}
154+
return r.readValueAt(key, readTS)
155+
}
156+
140157
func (r *RedisServer) saveString(ctx context.Context, key []byte, value []byte, ttl *time.Time) error {
141158
elems := []*kv.Elem[kv.OP]{
142-
{Op: kv.Put, Key: key, Value: bytes.Clone(value)},
159+
{Op: kv.Put, Key: redisStrKey(key), Value: bytes.Clone(value)},
143160
}
144161
if ttl == nil {
145162
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: redisTTLKey(key)})
@@ -157,15 +174,9 @@ func (r *RedisServer) deleteLogicalKeyElems(ctx context.Context, key []byte, rea
157174

158175
elems := []*kv.Elem[kv.OP]{}
159176

160-
stringExists, err := r.store.ExistsAt(ctx, key, readTS)
161-
if err != nil {
162-
return nil, false, errors.WithStack(err)
163-
}
164-
if stringExists {
165-
elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: key})
166-
}
167-
168177
for _, internalKey := range [][]byte{
178+
redisStrKey(key),
179+
key, // legacy bare string key
169180
redisHashKey(key),
170181
redisSetKey(key),
171182
redisHLLKey(key),

0 commit comments

Comments
 (0)