@@ -243,9 +243,7 @@ type RedisServer struct {
243243 store store.MVCCStore
244244 coordinator kv.Coordinator
245245 redisTranscoder * redisTranscoder
246- pubsub redcon.PubSub
247- pubsubMu sync.RWMutex
248- pubsubChannels map [string ]int
246+ pubsub * redisPubSub
249247 scriptMu sync.RWMutex
250248 scriptCache map [string ]string
251249 traceCommands bool
@@ -260,9 +258,8 @@ type RedisServer struct {
260258}
261259
262260type connState struct {
263- inTxn bool
264- queue []redcon.Command
265- subscriptions map [string ]struct {}
261+ inTxn bool
262+ queue []redcon.Command
266263}
267264
268265type resultType int
@@ -297,7 +294,7 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore
297294 redisAddr : redisAddr ,
298295 relay : relay ,
299296 leaderRedis : leaderRedis ,
300- pubsubChannels : map [ string ] int {} ,
297+ pubsub : newRedisPubSub () ,
301298 scriptCache : map [string ]string {},
302299 traceCommands : os .Getenv ("ELASTICKV_REDIS_TRACE" ) == "1" ,
303300 }
@@ -433,17 +430,18 @@ func (r *RedisServer) Run() error {
433430
434431 traceID , traceStart := r .traceCommandStart (conn , name , cmd .Args [1 :])
435432 f (conn , cmd )
436- r .traceCommandFinish (traceID , conn , name , time .Since (traceStart ))
433+ if r .traceCommands {
434+ r .traceCommandFinish (traceID , conn , name , time .Since (traceStart ))
435+ }
437436 },
438437 func (conn redcon.Conn ) bool {
439438 // Use this function to accept or deny the connection.
440439 // log.Printf("accept: %s", conn.RemoteAddr())
441440 return true
442441 },
443442 func (conn redcon.Conn , err error ) {
444- // This is called when the connection has been closed
445- // log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
446- r .cleanupConnSubscriptions (conn )
443+ // This is called when the connection has been closed.
444+ // PubSub connections clean up their own subscriptions via bgrunner.
447445 })
448446
449447 return errors .WithStack (err )
@@ -609,7 +607,7 @@ func (r *RedisServer) replaceWithString(ctx context.Context, key, value []byte,
609607 } else {
610608 elems = append (elems , & kv.Elem [kv.OP ]{Op : kv .Del , Key : redisTTLKey (key )})
611609 }
612- return r .dispatchElems (ctx , true , elems )
610+ return r .dispatchElems (ctx , true , 0 , elems )
613611}
614612
615613func (r * RedisServer ) executeSet (ctx context.Context , key , value []byte , opts redisSetOptions ) (redisSetExecution , error ) {
@@ -632,40 +630,6 @@ func (r *RedisServer) executeSet(ctx context.Context, key, value []byte, opts re
632630 return redisSetExecution {state : state , wroteOldBulk : opts .returnOld }, nil
633631}
634632
635- func (r * RedisServer ) trackSubscription (conn redcon.Conn , channel string ) {
636- state := getConnState (conn )
637- if state .subscriptions == nil {
638- state .subscriptions = map [string ]struct {}{}
639- }
640- if _ , ok := state .subscriptions [channel ]; ok {
641- return
642- }
643- state .subscriptions [channel ] = struct {}{}
644-
645- r .pubsubMu .Lock ()
646- defer r .pubsubMu .Unlock ()
647- r .pubsubChannels [channel ]++
648- }
649-
650- func (r * RedisServer ) cleanupConnSubscriptions (conn redcon.Conn ) {
651- ctx := conn .Context ()
652- state , ok := ctx .(* connState )
653- if ! ok || len (state .subscriptions ) == 0 {
654- return
655- }
656-
657- r .pubsubMu .Lock ()
658- defer r .pubsubMu .Unlock ()
659- for channel := range state .subscriptions {
660- if n := r .pubsubChannels [channel ]; n <= 1 {
661- delete (r .pubsubChannels , channel )
662- } else {
663- r .pubsubChannels [channel ] = n - 1
664- }
665- }
666- state .subscriptions = nil
667- }
668-
669633func (r * RedisServer ) Stop () {
670634 _ = r .relayConnCache .Close ()
671635 _ = r .listen .Close ()
@@ -838,7 +802,7 @@ func (r *RedisServer) del(conn redcon.Conn, cmd redcon.Command) {
838802 }
839803 elems = append (elems , keyElems ... )
840804 }
841- if err := r .dispatchElems (ctx , true , elems ); err != nil {
805+ if err := r .dispatchElems (ctx , true , readTS , elems ); err != nil {
842806 return err
843807 }
844808 removed = nextRemoved
@@ -1471,12 +1435,31 @@ func (t *txnContext) applyExpire(cmd redcon.Command, unit time.Duration) (redisR
14711435 return redisResult {typ : resultInt , integer : 0 }, nil
14721436 }
14731437
1438+ if ttl <= 0 {
1439+ return t .stageKeyDeletion (cmd .Args [1 ])
1440+ }
1441+
14741442 expireAt := time .Now ().Add (time .Duration (ttl ) * unit )
14751443 state .value = & expireAt
14761444 state .dirty = true
14771445 return redisResult {typ : resultInt , integer : 1 }, nil
14781446}
14791447
1448+ func (t * txnContext ) stageKeyDeletion (key []byte ) (redisResult , error ) {
1449+ st , err := t .loadListState (key )
1450+ if err != nil {
1451+ return redisResult {}, err
1452+ }
1453+ stageListDelete (st )
1454+ tv , err := t .load (key )
1455+ if err != nil {
1456+ return redisResult {}, err
1457+ }
1458+ tv .deleted = true
1459+ tv .dirty = true
1460+ return redisResult {typ : resultInt , integer : 1 }, nil
1461+ }
1462+
14801463func parseRangeBounds (startRaw , endRaw []byte , total int ) (int , int , error ) {
14811464 start , err := parseInt (startRaw )
14821465 if err != nil {
0 commit comments