Skip to content

Commit c77e73c

Browse files
authored
chains/txmgr: support interfaces for broadcaster, confirmer, tracker (#93)
1 parent bcbe08e commit c77e73c

4 files changed

Lines changed: 146 additions & 74 deletions

File tree

chains/txmgr/broadcaster.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,10 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) closeInternal()
228228
return nil
229229
}
230230

231+
func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) SetEnabledAddresses(addrs []ADDR) {
232+
eb.enabledAddresses = addrs
233+
}
234+
231235
func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) SetResumeCallback(callback ResumeCallback) {
232236
eb.resumeCallback = callback
233237
}
@@ -240,6 +244,10 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) HealthReport() m
240244
return map[string]error{eb.Name(): eb.Healthy()}
241245
}
242246

247+
func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) Ready() error {
248+
return eb.StateMachine.Ready()
249+
}
250+
243251
// Trigger forces the monitor for a particular address to recheck for new txes
244252
// Logs error and does nothing if address was not registered on startup
245253
func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) Trigger(addr ADDR) {

chains/txmgr/confirmer.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,14 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) closeInternal()
169169
return nil
170170
}
171171

172+
func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) Deliver(head HEAD) {
173+
ec.mb.Deliver(head)
174+
}
175+
176+
func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) SetEnabledAddresses(addrs []ADDR) {
177+
ec.enabledAddresses = addrs
178+
}
179+
172180
func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) SetResumeCallback(callback ResumeCallback) {
173181
ec.resumeCallback = callback
174182
}
@@ -181,6 +189,10 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) HealthReport()
181189
return map[string]error{ec.Name(): ec.Healthy()}
182190
}
183191

192+
func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) Ready() error {
193+
return ec.StateMachine.Ready()
194+
}
195+
184196
func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
185197
defer ec.wg.Done()
186198
ctx, cancel := ec.stopCh.NewCtx()

chains/txmgr/tracker.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) Close() error {
100100
})
101101
}
102102

103+
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) Name() string {
104+
return tr.lggr.Name()
105+
}
106+
107+
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) HealthReport() map[string]error {
108+
return map[string]error{tr.Name(): tr.Healthy()}
109+
}
110+
103111
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) closeInternal() error {
104112
tr.initSync.Lock()
105113
defer tr.initSync.Unlock()
@@ -163,6 +171,10 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop(ctx context.Con
163171
}
164172
}
165173

174+
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) Deliver(num int64) {
175+
tr.mb.Deliver(num)
176+
}
177+
166178
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) GetAbandonedAddresses() []ADDR {
167179
tr.lock.Lock()
168180
defer tr.lock.Unlock()
@@ -187,7 +199,7 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) IsStarted() bool {
187199
return tr.isStarted
188200
}
189201

190-
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) getEnabledAddresses() []ADDR {
202+
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) GetEnabledAddresses() []ADDR {
191203
tr.lock.RLock()
192204
defer tr.lock.RUnlock()
193205
return slices.Collect(maps.Keys(tr.enabledAddrs))
@@ -206,12 +218,12 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) ensureEnabledAddresses(
206218
if err != nil {
207219
return fmt.Errorf("failed to get enabled addresses for chain: %w", err)
208220
}
209-
tr.setEnabledAddresses(enabledAddrs)
221+
tr.SetEnabledAddresses(enabledAddrs)
210222
return nil
211223
}
212224

213-
// setEnabledAddresses sets enabled addresses. Caller must hold tr.lock, or the Tracker must be unstarted (pre-startInternal, or post-closeInternal).
214-
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) setEnabledAddresses(enabledAddrs []ADDR) {
225+
// SetEnabledAddresses sets enabled addresses. Caller must hold tr.lock, or the Tracker must be unstarted (pre-startInternal, or post-closeInternal).
226+
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) SetEnabledAddresses(enabledAddrs []ADDR) {
215227
if len(enabledAddrs) == 0 {
216228
tr.lggr.Warnf("enabled address list is empty")
217229
}

chains/txmgr/txmgr.go

Lines changed: 110 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,32 @@ type reset struct {
8383
done chan error
8484
}
8585

86+
type BroadcasterI[ADDR chains.Hashable] interface {
87+
services.Service
88+
SetEnabledAddresses([]ADDR)
89+
SetResumeCallback(ResumeCallback)
90+
Trigger(ADDR)
91+
}
92+
93+
type ConfirmerI[HEAD chains.Head[BHASH], ADDR chains.Hashable, BHASH chains.Hashable] interface {
94+
services.Service
95+
Deliver(HEAD)
96+
SetEnabledAddresses([]ADDR)
97+
SetResumeCallback(ResumeCallback)
98+
}
99+
100+
type TrackerI[ADDR chains.Hashable] interface {
101+
services.Service
102+
Deliver(int64)
103+
GetEnabledAddresses() []ADDR
104+
SetEnabledAddresses([]ADDR)
105+
}
106+
107+
type resetableService interface {
108+
startInternal(ctx context.Context) error
109+
closeInternal() error
110+
}
111+
86112
type Txm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH chains.Hashable, BHASH chains.Hashable, R txmgrtypes.ChainReceipt[THASH, BHASH], SEQ chains.Sequence, FEE fees.Fee] struct {
87113
services.StateMachine
88114
logger logger.SugaredLogger
@@ -105,9 +131,9 @@ type Txm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH cha
105131

106132
reaper *Reaper[CID]
107133
resender *Resender[CID, ADDR, THASH, BHASH, R, SEQ, FEE]
108-
broadcaster *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
109-
confirmer *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]
110-
tracker *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]
134+
broadcaster BroadcasterI[ADDR]
135+
confirmer ConfirmerI[HEAD, ADDR, BHASH]
136+
tracker TrackerI[ADDR]
111137
finalizer txmgrtypes.Finalizer[BHASH, HEAD]
112138
fwdMgr txmgrtypes.ForwarderManager[ADDR]
113139
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
@@ -136,10 +162,10 @@ func NewTxm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH
136162
fwdMgr txmgrtypes.ForwarderManager[ADDR],
137163
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE],
138164
txStore txmgrtypes.TxStore[ADDR, CID, THASH, BHASH, R, SEQ, FEE],
139-
broadcaster *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE],
140-
confirmer *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE],
165+
broadcaster BroadcasterI[ADDR],
166+
confirmer ConfirmerI[HEAD, ADDR, BHASH],
141167
resender *Resender[CID, ADDR, THASH, BHASH, R, SEQ, FEE],
142-
tracker *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE],
168+
tracker TrackerI[ADDR],
143169
finalizer txmgrtypes.Finalizer[BHASH, HEAD],
144170
newErrorClassifierFunc NewErrorClassifier,
145171
txmv2wrapper TxmV2Wrapper[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE],
@@ -201,7 +227,7 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) Start(ctx context.Cont
201227
if err := ms.Start(ctx, b.tracker); err != nil {
202228
return fmt.Errorf("Txm: Tracker failed to start: %w", err)
203229
}
204-
b.enabledAddrs = b.tracker.getEnabledAddresses()
230+
b.enabledAddrs = b.tracker.GetEnabledAddresses()
205231
slices.SortFunc(b.enabledAddrs, func(a, b ADDR) int { return strings.Compare(a.String(), b.String()) })
206232

207233
if err := ms.Start(ctx, b.finalizer); err != nil {
@@ -325,9 +351,9 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) HealthReport() map[str
325351
// setEnabled sets enabled addresses in the broadcaster, tracker, and confirmer.
326352
// Must only be called before starting, or after closing (during a reset).
327353
func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) setEnabled(addrs []ADDR) {
328-
b.broadcaster.enabledAddresses = addrs
329-
b.tracker.setEnabledAddresses(addrs)
330-
b.confirmer.enabledAddresses = addrs
354+
b.broadcaster.SetEnabledAddresses(addrs)
355+
b.tracker.SetEnabledAddresses(addrs)
356+
b.confirmer.SetEnabledAddresses(addrs)
331357
}
332358

333359
func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
@@ -349,14 +375,20 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
349375
// These should always close successfully, since it should be logically
350376
// impossible to enter this code path with ec/eb in a state other than
351377
// "Started"
352-
if err := b.broadcaster.closeInternal(); err != nil {
353-
b.logger.Panicw(fmt.Sprintf("Failed to Close Broadcaster: %v", err), "err", err)
378+
if r, ok := b.broadcaster.(resetableService); ok {
379+
if err := r.closeInternal(); err != nil {
380+
b.logger.Panicw(fmt.Sprintf("Failed to Close Broadcaster: %v", err), "err", err)
381+
}
354382
}
355-
if err := b.tracker.closeInternal(); err != nil {
356-
b.logger.Panicw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err)
383+
if r, ok := b.tracker.(resetableService); ok {
384+
if err := r.closeInternal(); err != nil {
385+
b.logger.Panicw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err)
386+
}
357387
}
358-
if err := b.confirmer.closeInternal(); err != nil {
359-
b.logger.Panicw(fmt.Sprintf("Failed to Close Confirmer: %v", err), "err", err)
388+
if r, ok := b.confirmer.(resetableService); ok {
389+
if err := r.closeInternal(); err != nil {
390+
b.logger.Panicw(fmt.Sprintf("Failed to Close Confirmer: %v", err), "err", err)
391+
}
360392
}
361393
if f != nil {
362394
f()
@@ -372,64 +404,72 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
372404
// execReset will not return until either:
373405
// 1. Broadcaster, Confirmer, and Tracker all started successfully
374406
// 2. chStop was closed (txmgr exit)
375-
wg.Add(3)
376-
go func() {
377-
defer wg.Done()
378-
// Retry indefinitely on failure
379-
backoff := newRedialBackoff()
380-
for {
381-
select {
382-
case <-time.After(backoff.Duration()):
383-
if err := b.broadcaster.startInternal(ctx); err != nil {
384-
b.logger.Criticalw("Failed to start Broadcaster", "err", err)
385-
b.SvcErrBuffer.Append(err)
386-
continue
407+
if r, ok := b.broadcaster.(resetableService); ok {
408+
wg.Add(1)
409+
go func() {
410+
defer wg.Done()
411+
// Retry indefinitely on failure
412+
backoff := newRedialBackoff()
413+
for {
414+
select {
415+
case <-time.After(backoff.Duration()):
416+
if err := r.startInternal(ctx); err != nil {
417+
b.logger.Criticalw("Failed to start Broadcaster", "err", err)
418+
b.SvcErrBuffer.Append(err)
419+
continue
420+
}
421+
return
422+
case <-b.chStop:
423+
stopOnce.Do(func() { stopped = true })
424+
return
387425
}
388-
return
389-
case <-b.chStop:
390-
stopOnce.Do(func() { stopped = true })
391-
return
392426
}
393-
}
394-
}()
395-
go func() {
396-
defer wg.Done()
397-
// Retry indefinitely on failure
398-
backoff := newRedialBackoff()
399-
for {
400-
select {
401-
case <-time.After(backoff.Duration()):
402-
if err := b.tracker.startInternal(ctx); err != nil {
403-
b.logger.Criticalw("Failed to start Tracker", "err", err)
404-
b.SvcErrBuffer.Append(err)
405-
continue
427+
}()
428+
}
429+
if r, ok := b.tracker.(resetableService); ok {
430+
wg.Add(1)
431+
go func() {
432+
defer wg.Done()
433+
// Retry indefinitely on failure
434+
backoff := newRedialBackoff()
435+
for {
436+
select {
437+
case <-time.After(backoff.Duration()):
438+
if err := r.startInternal(ctx); err != nil {
439+
b.logger.Criticalw("Failed to start Tracker", "err", err)
440+
b.SvcErrBuffer.Append(err)
441+
continue
442+
}
443+
return
444+
case <-b.chStop:
445+
stopOnce.Do(func() { stopped = true })
446+
return
406447
}
407-
return
408-
case <-b.chStop:
409-
stopOnce.Do(func() { stopped = true })
410-
return
411448
}
412-
}
413-
}()
414-
go func() {
415-
defer wg.Done()
416-
// Retry indefinitely on failure
417-
backoff := newRedialBackoff()
418-
for {
419-
select {
420-
case <-time.After(backoff.Duration()):
421-
if err := b.confirmer.startInternal(ctx); err != nil {
422-
b.logger.Criticalw("Failed to start Confirmer", "err", err)
423-
b.SvcErrBuffer.Append(err)
424-
continue
449+
}()
450+
}
451+
if r, ok := b.confirmer.(resetableService); ok {
452+
wg.Add(1)
453+
go func() {
454+
defer wg.Done()
455+
// Retry indefinitely on failure
456+
backoff := newRedialBackoff()
457+
for {
458+
select {
459+
case <-time.After(backoff.Duration()):
460+
if err := r.startInternal(ctx); err != nil {
461+
b.logger.Criticalw("Failed to start Confirmer", "err", err)
462+
b.SvcErrBuffer.Append(err)
463+
continue
464+
}
465+
return
466+
case <-b.chStop:
467+
stopOnce.Do(func() { stopped = true })
468+
return
425469
}
426-
return
427-
case <-b.chStop:
428-
stopOnce.Do(func() { stopped = true })
429-
return
430470
}
431-
}
432-
}()
471+
}()
472+
}
433473

434474
wg.Wait()
435475
}
@@ -442,8 +482,8 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
442482
case address := <-b.trigger:
443483
b.broadcaster.Trigger(address)
444484
case head := <-b.chHeads:
445-
b.confirmer.mb.Deliver(head)
446-
b.tracker.mb.Deliver(head.BlockNumber())
485+
b.confirmer.Deliver(head)
486+
b.tracker.Deliver(head.BlockNumber())
447487
b.finalizer.DeliverLatestHead(head)
448488
case reset := <-b.reset:
449489
// This check prevents the weird edge-case where you can select

0 commit comments

Comments
 (0)