@@ -3,16 +3,25 @@ package rpc
33import (
44 "context"
55 "errors"
6+ "time"
7+
8+ "github.com/go-redis/redis/v8"
69 "github.com/panjf2000/ants/v2"
7- "github.com/twmb/franz-go/pkg/kgo"
810 "go.uber.org/zap"
911)
1012
11- const maxEventsPerPoll = 100
13+ const (
14+ maxEventsPerPoll = 100
15+ pollBlockDuration = 1 * time .Second
16+ maintenanceInterval = 30 * time .Second
17+ autoClaimMinIdle = 5 * time .Minute
18+ autoClaimBatchSize = 100
19+ defaultMaxLen = int64 (50000 )
20+ )
1221
1322func (c * Client ) StartConsumer () {
1423 if c .consumerRunning .Swap (true ) {
15- c .logger .Fatal ("Kafka client already running" )
24+ c .logger .Fatal ("Consumer already running" )
1625 return
1726 }
1827
@@ -25,64 +34,187 @@ func (c *Client) StartConsumer() {
2534 return
2635 }
2736
37+ streams := make ([]string , 0 , len (c .listeners )* 2 )
38+ for stream := range c .listeners {
39+ streams = append (streams , stream )
40+ }
41+ for range c .listeners {
42+ streams = append (streams , ">" )
43+ }
44+
45+ for stream := range c .listeners {
46+ go c .maintenanceLoop (ctx , stream )
47+ }
48+
2849 for {
2950 select {
3051 case <- ctx .Done ():
3152 return
3253 default :
33- records , err := c .poll (ctx )
54+ messages , err := c .poll (ctx , streams )
3455 if err != nil {
35- if errors .Is (err , kgo .ErrClientClosed ) {
36- c .logger .Info ("Kafka client closed, stopping read loop" )
37- return
38- } else if errors .Is (err , context .Canceled ) {
56+ if errors .Is (err , context .Canceled ) {
3957 c .logger .Info ("Context cancelled, stopping read loop" )
4058 return
41- } else {
42- c .logger .Error ("Failed to poll records" , zap .Error (err ))
43- continue
4459 }
60+ c .logger .Error ("Failed to poll records" , zap .Error (err ))
61+ continue
4562 }
4663
47- for _ , record := range records {
48- listener , ok := c .listeners [record . Topic ]
64+ for _ , xStream := range messages {
65+ listener , ok := c .listeners [xStream . Stream ]
4966 if ! ok {
50- c .logger .Warn ("No listener found for topic " , zap .String ("topic " , record . Topic ))
67+ c .logger .Warn ("No listener found for stream " , zap .String ("stream " , xStream . Stream ))
5168 continue
5269 }
5370
54- value := record .Value
55- if err := pool .Submit (func () {
56- ctx , cancel := listener .BuildContext ()
57- defer cancel ()
71+ streamName := xStream .Stream
72+ for _ , msg := range xStream .Messages {
73+ value , ok := msg .Values ["data" ]
74+ if ! ok {
75+ c .logger .Warn ("Message missing data field" , zap .String ("stream" , streamName ), zap .String ("id" , msg .ID ))
76+ c .redis .XAck (ctx , streamName , c .config .ConsumerGroup , msg .ID )
77+ continue
78+ }
5879
59- listener .HandleMessage (ctx , value )
60- }); err != nil {
61- c .logger .Error ("Failed to submit task to worker pool" , zap .Error (err ))
62- continue
80+ data := []byte (value .(string ))
81+ msgID := msg .ID
82+
83+ if err := pool .Submit (func () {
84+ listenerCtx , listenerCancel := listener .BuildContext ()
85+ defer listenerCancel ()
86+
87+ listener .HandleMessage (listenerCtx , data )
88+ c .redis .XAck (ctx , streamName , c .config .ConsumerGroup , msgID )
89+ }); err != nil {
90+ c .logger .Error ("Failed to submit task to worker pool" , zap .Error (err ))
91+ continue
92+ }
6393 }
6494 }
6595 }
6696 }
6797}
6898
69- func (c * Client ) poll (ctx context.Context ) ([]* kgo.Record , error ) {
70- fetches := c .client .PollRecords (ctx , maxEventsPerPoll )
71- if fetches .IsClientClosed () {
72- return nil , kgo .ErrClientClosed
73- }
99+ func (c * Client ) poll (ctx context.Context , streams []string ) ([]redis.XStream , error ) {
100+ result , err := c .redis .XReadGroup (ctx , & redis.XReadGroupArgs {
101+ Group : c .config .ConsumerGroup ,
102+ Consumer : c .config .ConsumerName ,
103+ Streams : streams ,
104+ Count : maxEventsPerPoll ,
105+ Block : pollBlockDuration ,
106+ }).Result ()
74107
75- if err := fetches .Err (); err != nil {
108+ if err != nil {
109+ if errors .Is (err , redis .Nil ) {
110+ return nil , nil
111+ }
76112 return nil , err
77113 }
78114
79- records := make ([]* kgo.Record , 0 , fetches .NumRecords ())
115+ return result , nil
116+ }
117+
118+ func (c * Client ) maintenanceLoop (ctx context.Context , stream string ) {
119+ ticker := time .NewTicker (maintenanceInterval )
120+ defer ticker .Stop ()
121+
122+ for {
123+ select {
124+ case <- ctx .Done ():
125+ return
126+ case <- ticker .C :
127+ c .trimStream (ctx , stream )
128+ c .autoClaimStale (ctx , stream )
129+ }
130+ }
131+ }
132+
133+ func (c * Client ) trimStream (ctx context.Context , stream string ) {
134+ maxLen := c .config .MaxLen
135+ if maxLen <= 0 {
136+ maxLen = defaultMaxLen
137+ }
138+
139+ trimmed , err := c .redis .XTrimMaxLenApprox (ctx , stream , maxLen , 0 ).Result ()
140+ if err != nil {
141+ if ! errors .Is (err , context .Canceled ) {
142+ c .logger .Warn ("Failed to trim stream" ,
143+ zap .String ("stream" , stream ),
144+ zap .Error (err ))
145+ }
146+ return
147+ }
148+
149+ if trimmed > 0 {
150+ c .logger .Debug ("Trimmed stream" ,
151+ zap .String ("stream" , stream ),
152+ zap .Int64 ("trimmed" , trimmed ))
153+ }
154+ }
80155
81- iter := fetches .RecordIter ()
82- for ! iter .Done () {
83- record := iter .Next ()
84- records = append (records , record )
156+ func (c * Client ) autoClaimStale (ctx context.Context , stream string ) {
157+ // go-redis/v8's XAutoClaim parser expects 2 response elements, but Redis 7+
158+ // returns 3 (messages, next start ID, deleted entry IDs). Use a raw Do call
159+ // and parse only the messages array ourselves.
160+ result , err := c .redis .Do (ctx ,
161+ "XAUTOCLAIM" , stream , c .config .ConsumerGroup , c .config .ConsumerName ,
162+ int64 (autoClaimMinIdle / time .Millisecond ), "0-0" , "COUNT" , autoClaimBatchSize ,
163+ ).Result ()
164+
165+ if err != nil {
166+ if ! errors .Is (err , context .Canceled ) && ! errors .Is (err , redis .Nil ) {
167+ c .logger .Warn ("Failed to auto-claim stale messages" ,
168+ zap .String ("stream" , stream ),
169+ zap .Error (err ))
170+ }
171+ return
85172 }
86173
87- return records , nil
174+ parts , ok := result .([]interface {})
175+ if ! ok || len (parts ) < 2 {
176+ return
177+ }
178+
179+ msgs , ok := parts [1 ].([]interface {})
180+ if ! ok || len (msgs ) == 0 {
181+ return
182+ }
183+
184+ listener , ok := c .listeners [stream ]
185+ if ! ok {
186+ return
187+ }
188+
189+ c .logger .Info ("Auto-claimed stale messages" ,
190+ zap .String ("stream" , stream ),
191+ zap .Int ("count" , len (msgs )))
192+
193+ for _ , raw := range msgs {
194+ entry , ok := raw .([]interface {})
195+ if ! ok || len (entry ) < 2 {
196+ continue
197+ }
198+
199+ msgID , _ := entry [0 ].(string )
200+ fields , _ := entry [1 ].([]interface {})
201+
202+ var data string
203+ for i := 0 ; i + 1 < len (fields ); i += 2 {
204+ if key , _ := fields [i ].(string ); key == "data" {
205+ data , _ = fields [i + 1 ].(string )
206+ break
207+ }
208+ }
209+
210+ if data == "" {
211+ c .redis .XAck (ctx , stream , c .config .ConsumerGroup , msgID )
212+ continue
213+ }
214+
215+ listenerCtx , listenerCancel := listener .BuildContext ()
216+ listener .HandleMessage (listenerCtx , []byte (data ))
217+ listenerCancel ()
218+ c .redis .XAck (ctx , stream , c .config .ConsumerGroup , msgID )
219+ }
88220}
0 commit comments