1- package headtracker
1+ package heads
22
33import (
44 "context"
@@ -16,32 +16,32 @@ import (
1616
1717const TrackableCallbackTimeout = 2 * time .Second
1818
19- type callbackSet [H chains.Head [BLOCK_HASH ], BLOCK_HASH chains.Hashable ] map [int ]HeadTrackable [H , BLOCK_HASH ]
19+ type callbackSet [H chains.Head [BLOCK_HASH ], BLOCK_HASH chains.Hashable ] map [int ]Trackable [H , BLOCK_HASH ]
2020
21- func (set callbackSet [H , BLOCK_HASH ]) values () []HeadTrackable [H , BLOCK_HASH ] {
22- var values [] HeadTrackable [H , BLOCK_HASH ]
21+ func (set callbackSet [H , BLOCK_HASH ]) values () []Trackable [H , BLOCK_HASH ] {
22+ values := make ([] Trackable [H , BLOCK_HASH ], 0 , len ( set ))
2323 for _ , callback := range set {
2424 values = append (values , callback )
2525 }
2626 return values
2727}
2828
29- // HeadTrackable is implemented by the core txm to be able to receive head events from any chain.
29+ // Trackable is implemented by the core txm to be able to receive head events from any chain.
3030// Chain implementations should notify head events to the core txm via this interface.
31- type HeadTrackable [H chains.Head [BLOCK_HASH ], BLOCK_HASH chains.Hashable ] interface {
31+ type Trackable [H chains.Head [BLOCK_HASH ], BLOCK_HASH chains.Hashable ] interface {
3232 // OnNewLongestChain sends a new head when it becomes available. Subscribers can recursively trace the parent
3333 // of the head to the finalized block back.
3434 OnNewLongestChain (ctx context.Context , head H )
3535}
3636
37- // HeadBroadcaster relays new Heads to all subscribers.
38- type HeadBroadcaster [H chains.Head [BLOCK_HASH ], BLOCK_HASH chains.Hashable ] interface {
37+ // Broadcaster relays new Heads to all subscribers.
38+ type Broadcaster [H chains.Head [BLOCK_HASH ], BLOCK_HASH chains.Hashable ] interface {
3939 services.Service
4040 BroadcastNewLongestChain (H )
41- Subscribe (callback HeadTrackable [H , BLOCK_HASH ]) (currentLongestChain H , unsubscribe func ())
41+ Subscribe (callback Trackable [H , BLOCK_HASH ]) (currentLongestChain H , unsubscribe func ())
4242}
4343
44- type headBroadcaster [H chains.Head [BLOCK_HASH ], BLOCK_HASH chains.Hashable ] struct {
44+ type broadcaster [H chains.Head [BLOCK_HASH ], BLOCK_HASH chains.Hashable ] struct {
4545 services.Service
4646 eng * services.Engine
4747
@@ -52,14 +52,14 @@ type headBroadcaster[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] stru
5252 lastCallbackID int
5353}
5454
55- // NewHeadBroadcaster creates a new HeadBroadcaster
56- func NewHeadBroadcaster [
55+ // NewBroadcaster creates a new Broadcaster
56+ func NewBroadcaster [
5757 H chains.Head [BLOCK_HASH ],
5858 BLOCK_HASH chains.Hashable ,
5959](
6060 lggr logger.Logger ,
61- ) HeadBroadcaster [H , BLOCK_HASH ] {
62- hb := & headBroadcaster [H , BLOCK_HASH ]{
61+ ) Broadcaster [H , BLOCK_HASH ] {
62+ hb := & broadcaster [H , BLOCK_HASH ]{
6363 callbacks : make (callbackSet [H , BLOCK_HASH ]),
6464 mailbox : mailbox .NewSingle [H ](),
6565 }
@@ -71,70 +71,70 @@ func NewHeadBroadcaster[
7171 return hb
7272}
7373
74- func (hb * headBroadcaster [H , BLOCK_HASH ]) start (context.Context ) error {
75- hb .eng .Go (hb .run )
74+ func (b * broadcaster [H , BLOCK_HASH ]) start (context.Context ) error {
75+ b .eng .Go (b .run )
7676 return nil
7777}
7878
79- func (hb * headBroadcaster [H , BLOCK_HASH ]) close () error {
80- hb .mutex .Lock ()
79+ func (b * broadcaster [H , BLOCK_HASH ]) close () error {
80+ b .mutex .Lock ()
8181 // clear all callbacks
82- hb .callbacks = make (callbackSet [H , BLOCK_HASH ])
83- hb .mutex .Unlock ()
82+ b .callbacks = make (callbackSet [H , BLOCK_HASH ])
83+ b .mutex .Unlock ()
8484 return nil
8585}
8686
87- func (hb * headBroadcaster [H , BLOCK_HASH ]) BroadcastNewLongestChain (head H ) {
88- hb .mailbox .Deliver (head )
87+ func (b * broadcaster [H , BLOCK_HASH ]) BroadcastNewLongestChain (head H ) {
88+ b .mailbox .Deliver (head )
8989}
9090
91- // Subscribe subscribes to OnNewLongestChain and Connect until HeadBroadcaster is closed,
91+ // Subscribe subscribes to OnNewLongestChain and Connect until Broadcaster is closed,
9292// or unsubscribe callback is called explicitly
93- func (hb * headBroadcaster [H , BLOCK_HASH ]) Subscribe (callback HeadTrackable [H , BLOCK_HASH ]) (currentLongestChain H , unsubscribe func ()) {
94- hb .mutex .Lock ()
95- defer hb .mutex .Unlock ()
93+ func (b * broadcaster [H , BLOCK_HASH ]) Subscribe (callback Trackable [H , BLOCK_HASH ]) (currentLongestChain H , unsubscribe func ()) {
94+ b .mutex .Lock ()
95+ defer b .mutex .Unlock ()
9696
97- currentLongestChain = hb .latest
97+ currentLongestChain = b .latest
9898
99- hb .lastCallbackID ++
100- callbackID := hb .lastCallbackID
101- hb .callbacks [callbackID ] = callback
99+ b .lastCallbackID ++
100+ callbackID := b .lastCallbackID
101+ b .callbacks [callbackID ] = callback
102102 unsubscribe = func () {
103- hb .mutex .Lock ()
104- defer hb .mutex .Unlock ()
105- delete (hb .callbacks , callbackID )
103+ b .mutex .Lock ()
104+ defer b .mutex .Unlock ()
105+ delete (b .callbacks , callbackID )
106106 }
107107
108108 return
109109}
110110
111- func (hb * headBroadcaster [H , BLOCK_HASH ]) run (ctx context.Context ) {
111+ func (b * broadcaster [H , BLOCK_HASH ]) run (ctx context.Context ) {
112112 for {
113113 select {
114114 case <- ctx .Done ():
115115 return
116- case <- hb .mailbox .Notify ():
117- hb .executeCallbacks (ctx )
116+ case <- b .mailbox .Notify ():
117+ b .executeCallbacks (ctx )
118118 }
119119 }
120120}
121121
122122// DEV: the head relayer makes no promises about head delivery! Subscribing
123123// Jobs should expect to the relayer to skip heads if there is a large number of listeners
124124// and all callbacks cannot be completed in the allotted time.
125- func (hb * headBroadcaster [H , BLOCK_HASH ]) executeCallbacks (ctx context.Context ) {
126- head , exists := hb .mailbox .Retrieve ()
125+ func (b * broadcaster [H , BLOCK_HASH ]) executeCallbacks (ctx context.Context ) {
126+ head , exists := b .mailbox .Retrieve ()
127127 if ! exists {
128- hb .eng .Info ("No head to retrieve. It might have been skipped" )
128+ b .eng .Info ("No head to retrieve. It might have been skipped" )
129129 return
130130 }
131131
132- hb .mutex .Lock ()
133- callbacks := hb .callbacks .values ()
134- hb .latest = head
135- hb .mutex .Unlock ()
132+ b .mutex .Lock ()
133+ callbacks := b .callbacks .values ()
134+ b .latest = head
135+ b .mutex .Unlock ()
136136
137- hb .eng .Debugw ("Initiating callbacks" ,
137+ b .eng .Debugw ("Initiating callbacks" ,
138138 "headNum" , head .BlockNumber (),
139139 "numCallbacks" , len (callbacks ),
140140 )
@@ -143,14 +143,14 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks(ctx context.Context)
143143 wg .Add (len (callbacks ))
144144
145145 for _ , callback := range callbacks {
146- go func (trackable HeadTrackable [H , BLOCK_HASH ]) {
146+ go func (trackable Trackable [H , BLOCK_HASH ]) {
147147 defer wg .Done ()
148148 start := time .Now ()
149149 cctx , cancel := context .WithTimeout (ctx , TrackableCallbackTimeout )
150150 defer cancel ()
151151 trackable .OnNewLongestChain (cctx , head )
152152 elapsed := time .Since (start )
153- hb .eng .Debugw (fmt .Sprintf ("Finished callback in %s" , elapsed ),
153+ b .eng .Debugw (fmt .Sprintf ("Finished callback in %s" , elapsed ),
154154 "callbackType" , reflect .TypeOf (trackable ), "blockNumber" , head .BlockNumber (), "time" , elapsed )
155155 }(callback )
156156 }
0 commit comments