@@ -95,20 +95,16 @@ func NewRedisTransport(u *url.URL, l Logger) (Transport, error) { //nolint:iretu
9595}
9696
9797func subscribeToUpdate (t * RedisTransport ) {
98- t .logger .Info ("subscribeToUpdate:Subscribe" )
9998 pubsub := t .client .Subscribe (t .ctx , t .bucketName )
100- t .logger .Info ("subscribeToUpdate:pubsub.Channel" )
10199 ch := pubsub .Channel ()
102100 for msg := range ch {
103101 var update * Update
104- t .logger .Info ("subscribeToUpdate:Unmarshal" )
105102 errUnmarshal := json .Unmarshal ([]byte (msg .Payload ), & update )
106103 if errUnmarshal != nil {
107104 t .logger .Error ("error when unmarshaling message" , zap .Any ("message" , msg ), zap .Error (errUnmarshal ))
108105
109106 continue
110107 }
111- t .logger .Info ("subscribeToUpdate:dispatch" )
112108 t .dispatch (update )
113109 }
114110}
@@ -130,32 +126,26 @@ func getRedisLastEventID(ctx context.Context, client *redis.Client, bucketName s
130126
131127// Dispatch dispatches an update to all subscribers and persists it in Bolt DB.
132128func (t * RedisTransport ) Dispatch (update * Update ) error {
133- t .logger .Info ("Dispatch:select" )
134129 select {
135130 case <- t .closed :
136131 return ErrClosedTransport
137132 default :
138133 }
139134
140- t .logger .Info ("Dispatch:AssignUUID" )
141135 AssignUUID (update )
142136
143- t .logger .Info ("Dispatch:Lock" )
144137 t .Lock ()
145138 defer t .Unlock ()
146139
147- t .logger .Info ("Dispatch:Marshal" )
148140 updateJSON , err := json .Marshal (* update )
149141 if err != nil {
150142 return fmt .Errorf ("error when marshaling update: %w" , err )
151143 }
152144
153- t .logger .Info ("Dispatch:persist" )
154145 if err := t .persist (update .ID , updateJSON ); err != nil {
155146 return err
156147 }
157148
158- t .logger .Info ("Dispatch:Publish" )
159149 // publish in pubsub for others mercure instances to consume the update and dispatch it to its subscribers
160150 if err := t .client .Publish (t .ctx , t .bucketName , updateJSON ).Err (); err != nil {
161151 return fmt .Errorf ("error when publishing update: %w" , err )
@@ -166,22 +156,17 @@ func (t *RedisTransport) Dispatch(update *Update) error {
166156
167157// Called when a pubsub message is received.
168158func (t * RedisTransport ) dispatch (update * Update ) error {
169- t .logger .Info ("dispatch:select" )
170159 select {
171160 case <- t .closed :
172161 return ErrClosedTransport
173162 default :
174163 }
175164
176- t .logger .Info ("dispatch:Lock" )
177165 t .Lock ()
178166 defer t .Unlock ()
179167
180- t .logger .Info ("dispatch:update" )
181168 for _ , s := range t .subscribers .MatchAny (update ) {
182- t .logger .Info ("dispatch:Dispatch" )
183169 if ! s .Dispatch (update , false ) {
184- t .logger .Info ("dispatch:Remove" )
185170 t .subscribers .Remove (s )
186171 }
187172 }
@@ -191,58 +176,46 @@ func (t *RedisTransport) dispatch(update *Update) error {
191176
192177// persist stores update in the database.
193178func (t * RedisTransport ) persist (updateID string , updateJSON []byte ) error {
194- t .logger .Info ("persist" )
195179 t .lastEventID = updateID
196- t .logger .Info ("persist:LPush" )
197180 err := t .client .LPush (t .ctx , t .bucketName , updateJSON ).Err ()
198181 if err != nil {
199182 return fmt .Errorf ("error while persisting to redis: %w" , err )
200183 }
201- t .logger .Info ("persist:cleanup" )
202184
203185 return t .cleanup ()
204186}
205187
206188// AddSubscriber adds a new subscriber to the transport.
207189func (t * RedisTransport ) AddSubscriber (s * Subscriber ) error {
208- t .logger .Info ("AddSubscriber:select" )
209190 select {
210191 case <- t .closed :
211192 return ErrClosedTransport
212193 default :
213194 }
214195
215- t .logger .Info ("AddSubscriber:Lock" )
216196 t .Lock ()
217- t .logger .Info ("AddSubscriber:Add" )
218197 t .subscribers .Add (s )
219- t .logger .Info ("AddSubscriber:Unlock" )
220198 t .Unlock ()
221199
222200 if s .RequestLastEventID != "" {
223- t .logger .Info ("AddSubscriber:dispatchHistory" )
224201 t .dispatchHistory (s )
225202 }
226203
227- t .logger .Info ("AddSubscriber:Ready" )
228204 s .Ready ()
229205
230206 return nil
231207}
232208
233209// RemoveSubscriber removes a new subscriber from the transport.
234210func (t * RedisTransport ) RemoveSubscriber (s * Subscriber ) error {
235- t .logger .Info ("RemoveSubscriber:select" )
236211 select {
237212 case <- t .closed :
238213 return ErrClosedTransport
239214 default :
240215 }
241216
242- t .logger .Info ("RemoveSubscriber:Lock" )
243217 t .Lock ()
244218 defer t .Unlock ()
245- t .logger .Info ("RemoveSubscriber:Remove" )
246219 t .subscribers .Remove (s )
247220
248221 return nil
@@ -257,10 +230,8 @@ func (t *RedisTransport) GetSubscribers() (string, []*Subscriber, error) {
257230}
258231
259232func (t * RedisTransport ) dispatchHistory (s * Subscriber ) {
260- t .logger .Info ("dispatchHistory:LRange" )
261233 updates , err := t .client .LRange (t .ctx , t .bucketName , 0 , int64 (t .size )).Result ()
262234 if err != nil {
263- t .logger .Info ("dispatchHistory:HistoryDispatched" )
264235 s .HistoryDispatched (EarliestLastEventID )
265236
266237 return
@@ -270,7 +241,6 @@ func (t *RedisTransport) dispatchHistory(s *Subscriber) {
270241 afterFromID := s .RequestLastEventID == EarliestLastEventID
271242 for _ , update := range updates {
272243 var lastUpdate * Update
273- t .logger .Info ("dispatchHistory:Unmarshal" )
274244 errUnmarshal := json .Unmarshal ([]byte (update ), & lastUpdate )
275245 if errUnmarshal != nil {
276246 s .HistoryDispatched (responseLastEventID )
@@ -288,7 +258,6 @@ func (t *RedisTransport) dispatchHistory(s *Subscriber) {
288258 continue
289259 }
290260
291- t .logger .Info ("dispatchHistory:Dispatch" )
292261 if ! s .Dispatch (lastUpdate , true ) {
293262 s .HistoryDispatched (responseLastEventID )
294263
@@ -303,7 +272,6 @@ func (t *RedisTransport) dispatchHistory(s *Subscriber) {
303272
304273// Close closes the Transport.
305274func (t * RedisTransport ) Close () (err error ) {
306- t .logger .Info ("Close" )
307275 t .closedOnce .Do (func () {
308276 close (t .closed )
309277
@@ -328,7 +296,6 @@ func (t *RedisTransport) Close() (err error) {
328296
329297// cleanup removes entries in the history above the size limit, triggered probabilistically.
330298func (t * RedisTransport ) cleanup () error {
331- t .logger .Info ("cleanup:LLen" )
332299 sizeUpdates , errLen := t .client .LLen (t .ctx , t .bucketName ).Result ()
333300 if errLen != nil {
334301 return fmt .Errorf ("error when getting updates length: %w" , errLen )
@@ -341,7 +308,6 @@ func (t *RedisTransport) cleanup() error {
341308 return nil
342309 }
343310
344- t .logger .Info ("cleanup:LTrim" )
345311 errTrim := t .client .LTrim (t .ctx , t .bucketName , 0 , int64 (t .size )).Err ()
346312 if errTrim != nil {
347313 return fmt .Errorf ("error when trimming update length: %w" , errLen )
0 commit comments