Skip to content

Commit b902e4f

Browse files
committed
update events to check connection if new update receved
1 parent a6949c3 commit b902e4f

2 files changed

Lines changed: 12 additions & 2 deletions

File tree

pkg/events/events.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ type Callback func(events *substrate.EventRecords)
6666
// Events processor receives all events starting from the given state
6767
// and for each set of events calls callback cb
6868
type Processor struct {
69-
sub substrate.Manager
69+
sub substrate.Manager
70+
update chan substrate.Manager
7071

7172
cb Callback
7273
state State
@@ -173,6 +174,15 @@ func (e *Processor) subscribe(ctx context.Context) error {
173174
if err := e.state.Set(block.Number); err != nil {
174175
return errors.Wrap(err, "failed to commit last block number")
175176
}
177+
case newSub := <-e.update:
178+
// if new update is received check if the current manager is healty, if not:
179+
// update the manager and return error to force resubscribe with the new manager
180+
cl, _, err := e.sub.Raw()
181+
if err != nil {
182+
e.sub = newSub
183+
return err
184+
}
185+
cl.Client.Close()
176186
}
177187
}
178188
}

pkg/events/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func NewRedisStream(sub substrate.Manager, address string, farm pkg.FarmID, node
5454

5555
func (r *RedisStream) UpdateSubstrateManager(sub substrate.Manager) {
5656
r.sub = sub
57-
r.processor.sub = sub
57+
r.processor.update <- sub
5858
}
5959

6060
func (r *RedisStream) push(con redis.Conn, queue string, event interface{}) error {

0 commit comments

Comments
 (0)