88 "regexp"
99 "strconv"
1010 "strings"
11+ "sync"
1112
1213 "RedisShake/internal/client"
1314 "RedisShake/internal/client/proto"
@@ -51,6 +52,7 @@ type scanStandaloneReader struct {
5152 needDumpQueue * utils.UniqueQueue
5253 needRestoreChan chan * needRestoreItem
5354 dumpClient * client.Redis
55+ subWG sync.WaitGroup
5456
5557 stat struct {
5658 Name string `json:"name"`
@@ -68,28 +70,30 @@ func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reade
6870 r .opts = opts
6971 r .ch = make (chan * entry.Entry , 1024 )
7072 r .stat .Name = "reader_" + strings .Replace (opts .Address , ":" , "_" , - 1 )
71- r .needDumpQueue = utils .NewUniqueQueue (100000 ) // cache 100000 keys
73+ r .needDumpQueue = utils .NewUniqueQueue (100000000 ) // cache 100000000 keys
7274 r .needRestoreChan = make (chan * needRestoreItem , 1024 ) // inflight 1024 keys
7375 log .Infof ("[%s] scanStandaloneReader init finished. dbs=[%v]" , r .stat .Name , r .dbs )
7476 return r
7577}
7678
7779func (r * scanStandaloneReader ) StartRead (ctx context.Context ) []chan * entry.Entry {
7880 r .ctx = ctx
81+ if r .opts .KSN {
82+ r .subWG .Add (1 )
83+ go r .subscribe ()
84+ r .subWG .Wait ()
85+ }
7986 if r .opts .Scan {
8087 go r .scan ()
8188 }
82- if r .opts .KSN {
83- go r .subscript ()
84- }
8589 go r .dump ()
8690 go r .restore ()
8791 return []chan * entry.Entry {r .ch }
8892}
8993
90- func (r * scanStandaloneReader ) subscript () {
94+ func (r * scanStandaloneReader ) subscribe () {
9195 c := client .NewRedisClient (r .ctx , r .opts .Address , r .opts .Username , r .opts .Password , r .opts .Tls , r .opts .TlsConfig , r .opts .PreferReplica )
92- log .Infof ("[%s] scanStandaloneReader subscript started. dbs=[%v]" , r .stat .Name , r .dbs )
96+ log .Infof ("[%s] scanStandaloneReader subscribe started. dbs=[%v]" , r .stat .Name , r .dbs )
9397 if len (r .dbs ) == 0 {
9498 c .Send ("psubscribe" , "__keyevent@*__:*" )
9599 _ , err := c .Receive ()
@@ -110,11 +114,14 @@ func (r *scanStandaloneReader) subscript() {
110114 }
111115 }
112116
117+ // wait
118+ r .subWG .Done ()
119+
113120 regex := regexp .MustCompile (`\d+` )
114121 for {
115122 select {
116123 case <- r .ctx .Done ():
117- log .Infof ("[%s] scanStandaloneReader subscript finished." , r .stat .Name )
124+ log .Infof ("[%s] scanStandaloneReader subscribe finished." , r .stat .Name )
118125 r .needDumpQueue .Close ()
119126 return
120127 default :
@@ -269,6 +276,9 @@ func (r *scanStandaloneReader) restore() {
269276 switch v := iPttl .(type ) {
270277 case int64 :
271278 pttl = int (v )
279+ if pttl == 0 {
280+ pttl = 1
281+ }
272282 case string :
273283 log .Panicf ("iPttl is string, this should not happen. key=[%s], pttl=[%s]" , key , v )
274284 default :
0 commit comments