Skip to content

Commit 7fcdede

Browse files
committed
Fix review issues: Lua ZADD flags, graceful shutdown, expire overflow, key deletion
- Lua ZADD: parse NX/XX/GT/LT flags using shared zaddFlags helpers - TransactionManager: add Close() with closeCh for graceful shutdown of flushRawPending goroutine; replace time.Sleep with select - Expire: add overflow guard for time.Duration conversion of large TTLs - stageKeyDeletion/applyDel: delete hash, set, stream, HLL internal keys in addition to list, string, TTL, and zset - removeAll: use clear(sc.chans) instead of nil to avoid map access after nil assignment
1 parent 8262ce9 commit 7fcdede

4 files changed

Lines changed: 88 additions & 26 deletions

File tree

adapter/redis.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -553,10 +553,15 @@ func parseRedisSetTTL(args [][]byte, index int, opt string, now time.Time) (*tim
553553
return nil, index, errors.New("ERR invalid expire time in 'set' command")
554554
}
555555

556-
expireAt := now.Add(time.Duration(n) * time.Millisecond)
556+
unit := time.Millisecond
557557
if opt == "EX" {
558-
expireAt = now.Add(time.Duration(n) * time.Second)
558+
unit = time.Second
559559
}
560+
if n > math.MaxInt64/int64(unit) {
561+
return nil, index, errors.New("ERR invalid expire time in 'set' command")
562+
}
563+
564+
expireAt := now.Add(time.Duration(n) * unit)
560565
return &expireAt, index + 1, nil
561566
}
562567

@@ -1330,21 +1335,7 @@ func (t *txnContext) applySet(cmd redcon.Command) (redisResult, error) {
13301335
}
13311336

13321337
func (t *txnContext) applyDel(cmd redcon.Command) (redisResult, error) {
1333-
// Always stage list deletion so DEL followed by RPUSH in the same MULTI
1334-
// reliably recreates the list from an empty state.
1335-
st, err := t.loadListState(cmd.Args[1])
1336-
if err != nil {
1337-
return redisResult{}, err
1338-
}
1339-
stageListDelete(st)
1340-
1341-
tv, err := t.load(cmd.Args[1])
1342-
if err != nil {
1343-
return redisResult{}, err
1344-
}
1345-
tv.deleted = true
1346-
tv.dirty = true
1347-
return redisResult{typ: resultInt, integer: 1}, nil
1338+
return t.stageKeyDeletion(cmd.Args[1])
13481339
}
13491340

13501341
func (t *txnContext) applyGet(cmd redcon.Command) (redisResult, error) {
@@ -1514,6 +1505,20 @@ func (t *txnContext) stageKeyDeletion(key []byte) (redisResult, error) {
15141505
}
15151506
zs.members = nil
15161507
zs.dirty = true
1508+
// Mark hash, set, stream, and HLL internal keys for deletion.
1509+
for _, internalKey := range [][]byte{
1510+
redisHashKey(key),
1511+
redisSetKey(key),
1512+
redisStreamKey(key),
1513+
redisHLLKey(key),
1514+
} {
1515+
iv, err := t.load(internalKey)
1516+
if err != nil {
1517+
return redisResult{}, err
1518+
}
1519+
iv.deleted = true
1520+
iv.dirty = true
1521+
}
15171522
return redisResult{typ: resultInt, integer: 1}, nil
15181523
}
15191524

adapter/redis_lua_context.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,9 +1654,29 @@ func (c *luaScriptContext) cmdSIsMember(args []string) (luaReply, error) {
16541654
return luaIntReply(0), nil
16551655
}
16561656

1657+
func parseLuaZAddArgs(args []string) (zaddFlags, []string, error) {
1658+
var flags zaddFlags
1659+
i := 1
1660+
for i < len(args) {
1661+
if !flags.applyFlag(strings.ToUpper(args[i])) {
1662+
break
1663+
}
1664+
i++
1665+
}
1666+
if err := flags.validate(); err != nil {
1667+
return zaddFlags{}, nil, err
1668+
}
1669+
remaining := args[i:]
1670+
if len(remaining)%2 != 0 || len(remaining) == 0 {
1671+
return zaddFlags{}, nil, errors.New("ERR syntax error")
1672+
}
1673+
return flags, remaining, nil
1674+
}
1675+
16571676
func (c *luaScriptContext) cmdZAdd(args []string) (luaReply, error) {
1658-
if (len(args)-1)%2 != 0 {
1659-
return luaReply{}, errors.New("ERR syntax error")
1677+
flags, pairs, err := parseLuaZAddArgs(args)
1678+
if err != nil {
1679+
return luaReply{}, err
16601680
}
16611681
st, err := c.zsetState([]byte(args[0]))
16621682
if err != nil {
@@ -1667,15 +1687,20 @@ func (c *luaScriptContext) cmdZAdd(args []string) (luaReply, error) {
16671687
st.members = map[string]float64{}
16681688
}
16691689
added := 0
1670-
for i := 1; i < len(args); i += 2 {
1671-
score, err := strconv.ParseFloat(args[i], 64)
1690+
for j := 0; j < len(pairs); j += 2 {
1691+
score, err := strconv.ParseFloat(pairs[j], 64)
16721692
if err != nil {
16731693
return luaReply{}, errors.WithStack(err)
16741694
}
1675-
if _, ok := st.members[args[i+1]]; !ok {
1695+
member := pairs[j+1]
1696+
oldScore, exists := st.members[member]
1697+
if !flags.allows(exists, oldScore, score) {
1698+
continue
1699+
}
1700+
if !exists {
16761701
added++
16771702
}
1678-
st.members[args[i+1]] = score
1703+
st.members[member] = score
16791704
}
16801705
st.loaded = true
16811706
st.dirty = true

adapter/redis_pubsub.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (ps *redisPubSub) removeAll(sc *pubsubConn) {
145145
}
146146
}
147147
}
148-
sc.chans = nil
148+
clear(sc.chans)
149149
}
150150

151151
func (ps *redisPubSub) writeSubscribeReply(sc *pubsubConn, channel string, count int) {

kv/transaction.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type TransactionManager struct {
1616
mu sync.Mutex
1717
rawPending []*rawCommitItem
1818
rawFlushing bool
19+
closeCh chan struct{}
1920
}
2021

2122
type rawCommitItem struct {
@@ -34,7 +35,26 @@ var rawBatchWindow = 500 * time.Microsecond
3435

3536
func NewTransaction(raft *raft.Raft) *TransactionManager {
3637
return &TransactionManager{
37-
raft: raft,
38+
raft: raft,
39+
closeCh: make(chan struct{}),
40+
}
41+
}
42+
43+
var errShuttingDown = errors.New("transaction manager is shutting down")
44+
45+
// Close signals the TransactionManager to stop and drains any pending raw
46+
// commit items, sending each an error so callers are not blocked forever.
47+
func (t *TransactionManager) Close() {
48+
close(t.closeCh)
49+
50+
t.mu.Lock()
51+
pending := t.rawPending
52+
t.rawPending = nil
53+
t.rawFlushing = false
54+
t.mu.Unlock()
55+
56+
for _, item := range pending {
57+
item.done <- rawCommitResult{err: errShuttingDown}
3858
}
3959
}
4060

@@ -175,9 +195,21 @@ func (t *TransactionManager) commitRaw(reqs []*pb.Request) (*TransactionResponse
175195
}
176196

177197
func (t *TransactionManager) flushRawPending() {
178-
time.Sleep(rawBatchWindow)
198+
timer := time.NewTimer(rawBatchWindow)
199+
select {
200+
case <-timer.C:
201+
case <-t.closeCh:
202+
timer.Stop()
203+
return
204+
}
179205

180206
for {
207+
select {
208+
case <-t.closeCh:
209+
return
210+
default:
211+
}
212+
181213
batch := t.takeRawBatch()
182214
if len(batch) == 0 {
183215
t.mu.Lock()

0 commit comments

Comments
 (0)