@@ -4,7 +4,11 @@ import (
44 "context"
55 "fmt"
66 "math/big"
7+ "os"
8+ "runtime/pprof"
79 "sort"
10+ "strconv"
11+ "sync"
812 "time"
913
1014 "github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -40,7 +44,7 @@ type treeGeneratorImpl_v8 struct {
4044 smoothingPoolBalance * big.Int
4145 intervalDutiesInfo * IntervalDutiesInfo
4246 slotsPerEpoch uint64
43- validatorIndexMap map [string ]* MinipoolInfo
47+ validatorIndexMap map [uint64 ]* MinipoolInfo
4448 elStartTime time.Time
4549 elEndTime time.Time
4650 validNetworkCache map [uint64 ]bool
@@ -88,7 +92,7 @@ func newTreeGeneratorImpl_v8(log *log.ColorLogger, logPrefix string, index uint6
8892 },
8993 },
9094 validatorStatusMap : map [rptypes.ValidatorPubkey ]beacon.ValidatorStatus {},
91- validatorIndexMap : map [string ]* MinipoolInfo {},
95+ validatorIndexMap : map [uint64 ]* MinipoolInfo {},
9296 elSnapshotHeader : elSnapshotHeader ,
9397 log : log ,
9498 logPrefix : logPrefix ,
@@ -724,6 +728,14 @@ func (r *treeGeneratorImpl_v8) calculateNodeRewards() (*big.Int, *big.Int, error
724728
725729// Get all of the duties for a range of epochs
726730func (r * treeGeneratorImpl_v8 ) processAttestationsForInterval () error {
731+ if os .Getenv ("DUTIES_PPROF" ) != "" {
732+ f , err := os .Create (os .Getenv ("DUTIES_PPROF" ))
733+ if err != nil {
734+ return err
735+ }
736+ pprof .StartCPUProfile (f )
737+ defer pprof .StopCPUProfile ()
738+ }
727739
728740 startEpoch := r .rewardsFile .ConsensusStartBlock / r .beaconConfig .SlotsPerEpoch
729741 endEpoch := r .rewardsFile .ConsensusEndBlock / r .beaconConfig .SlotsPerEpoch
@@ -738,91 +750,183 @@ func (r *treeGeneratorImpl_v8) processAttestationsForInterval() error {
738750 r .log .Printlnf ("%s Checking participation of %d minipools for epochs %d to %d" , r .logPrefix , len (r .validatorIndexMap ), startEpoch , endEpoch )
739751 r .log .Printlnf ("%s NOTE: this will take a long time, progress is reported every 100 epochs" , r .logPrefix )
740752
741- epochsDone := 0
753+ // Workers need a channel to send back errors
754+ errs := make (chan error )
755+
742756 reportStartTime := time .Now ()
743- for epoch := startEpoch ; epoch < endEpoch + 1 ; epoch ++ {
744- if epochsDone == 100 {
745- timeTaken := time .Since (reportStartTime )
746- r .log .Printlnf ("%s On Epoch %d of %d (%.2f%%)... (%s so far)" , r .logPrefix , epoch , endEpoch , float64 (epoch - startEpoch )/ float64 (endEpoch - startEpoch )* 100.0 , timeTaken )
747- epochsDone = 0
748- }
749757
750- err := r .processEpoch (true , epoch )
751- if err != nil {
752- return err
753- }
758+ // Start populating epochStates
759+ r .fetchEpochs (startEpoch , endEpoch , errs , reportStartTime )
754760
755- epochsDone ++
761+ // Read until the error channel is closed
762+ for err := range errs {
763+ return err
756764 }
757765
758766 // Check the epoch after the end of the interval for any lingering attestations
759767 epoch := endEpoch + 1
760- err = r .processEpoch (false , epoch )
768+ es , err : = r .fetchEpoch (false , epoch )
761769 if err != nil {
762770 return err
763771 }
772+ r .processEpoch (es )
764773
765774 r .log .Printlnf ("%s Finished participation check (total time = %s)" , r .logPrefix , time .Since (reportStartTime ))
766775 return nil
767776
768777}
769778
770779// Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance
771- func (r * treeGeneratorImpl_v8 ) processEpoch (getDuties bool , epoch uint64 ) error {
780+ func (r * treeGeneratorImpl_v8 ) fetchEpoch (getDuties bool , epoch uint64 ) ( * epochState , error ) {
772781
773782 // Get the committee info and attestation records for this epoch
774- var committeeData beacon.Committees
775- attestationsPerSlot := make ([][]beacon.AttestationInfo , r .slotsPerEpoch )
783+ out := & epochState {
784+ epoch : epoch ,
785+ attestations : make ([][]beacon.AttestationInfo , r .slotsPerEpoch ),
786+ }
776787 var wg errgroup.Group
777788
778789 if getDuties {
779790 wg .Go (func () error {
780791 var err error
781- committeeData , err = r .bc .GetCommitteesForEpoch (& epoch )
792+ out . committees , err = r .bc .GetCommitteesForEpoch (& epoch )
782793 return err
783794 })
784795 }
785796
786- for i := uint64 (0 ); i < r .slotsPerEpoch ; i ++ {
787- i := i
788- slot := epoch * r .slotsPerEpoch + i
789- wg .Go (func () error {
797+ wg .Go (func () error {
798+ for i := uint64 (0 ); i < r .slotsPerEpoch ; i ++ {
799+ slot := epoch * r .slotsPerEpoch + i
790800 attestations , found , err := r .bc .GetAttestations (fmt .Sprint (slot ))
791801 if err != nil {
792802 return err
793803 }
794804 if found {
795- attestationsPerSlot [i ] = attestations
805+ //out.attestationsResponses[i] = attestations
806+ out .attestations [i ] = attestations
796807 } else {
797- attestationsPerSlot [i ] = []beacon.AttestationInfo {}
808+ //out.attestationsResponses[i] = nil
809+ out .attestations [i ] = nil
798810 }
799- return nil
800- })
801- }
811+ }
812+ return nil
813+ })
802814 err := wg .Wait ()
803815 if err != nil {
804- return fmt .Errorf ("error getting committee and attestaion records for epoch %d: %w" , epoch , err )
816+ return nil , fmt .Errorf ("Error getting committee and attestaion records for epoch %d: %w" , epoch , err )
805817 }
806818
807- if getDuties {
808- // Get all of the expected duties for the epoch
809- err = r .getDutiesForEpoch (committeeData )
810- if err != nil {
811- return fmt .Errorf ("error getting duties for epoch %d: %w" , epoch , err )
812- }
819+ return out , nil
820+
821+ }
822+
823+ func (r * treeGeneratorImpl_v8 ) fetchEpochs (startEpoch uint64 , endEpoch uint64 , errChan chan error , startTime time.Time ) {
824+ // seq tracks the next expected epoch in the sequence to be sent to the caller
825+ // Since we fetch epochs in parallel, each thread will sleep until it becomes its turn
826+ // to publish an epoch to the caller via the resp channel. Every time seq is updated,
827+ // therefor, all threads must wake up to check the value of seq. If it isn't their turn,
828+ // they will go back to sleep.
829+ var seq uint64
830+
831+ // If we encounter an error, we will wake up all the threads so they can exit. Therefor,
832+ // the first time we encounter an error we should set 'done' to true, and then each thread
833+ // should check its value every time they are woken.
834+ var done bool
835+
836+ // A cond to help the workers synchronize- whenever one thread wants to wake up the other
837+ // threads, it does so by broadcasting on this cond.
838+ cond := sync .NewCond (& sync.Mutex {})
839+ workers := getWorkerCount ()
840+
841+ // seq should start with the first epoch the caller is expecting
842+ seq = startEpoch
843+
844+ for j := uint64 (0 ); j < workers ; j ++ {
845+
846+ id := j
847+ go func () {
848+ // each worker will iterate modulo its id
849+ for epoch := startEpoch + id ; epoch < endEpoch + 1 ; epoch += workers {
850+ // Fetch the duties and participation for a single epoch
851+ es , err := r .fetchEpoch (true , epoch )
852+ if err != nil {
853+ // Return the error to the caller
854+ errChan <- err
855+ // Note that an error was encountered
856+ done = true
857+ // Tell other threads to wake up and exit
858+ cond .Broadcast ()
859+ // Exit this thread
860+ return
861+ }
862+
863+ // Wait until it's this worker's turn to produce a result
864+ cond .L .Lock ()
865+ for seq != epoch && ! done {
866+ // No errors have been encountered, and it is not this worker's
867+ // turn yet, so go back to sleep
868+ cond .Wait ()
869+ }
870+
871+ // Check if this worker was woken up due to an error
872+ if done {
873+ // Another worker encountered an error, so exit now
874+ return
875+ }
876+
877+ // No error was encountered, and seq indicates it's this worker's turn
878+ // to update the state, so process the epoch
879+ r .processEpoch (es )
880+
881+ if epoch == endEpoch {
882+ // The last result has been produced, so close the channels
883+ close (errChan )
884+ // This worker produced the last result, so
885+ // signal to the other workers that it is time to exit
886+ done = true
887+ } else {
888+ seq ++
889+ if seq % 100 == 99 {
890+ timeTaken := time .Since (startTime )
891+ r .log .Printlnf ("%s On Epoch %d of %d (%.2f%%)... (%s so far)" ,
892+ r .logPrefix ,
893+ es .epoch ,
894+ endEpoch ,
895+ float64 (es .epoch - startEpoch )/ float64 (endEpoch - startEpoch )* 100.0 ,
896+ timeTaken )
897+ }
898+ }
899+
900+ // Either seq has been updated or the last result was produced
901+ // signal to the other workers to wake up and either do work,
902+ // or exit now.
903+ cond .Broadcast ()
904+ cond .L .Unlock ()
905+ }
906+ }()
907+ }
908+ }
909+
910+ // Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance
911+ func (r * treeGeneratorImpl_v8 ) processEpoch (es * epochState ) {
912+
913+ // Get all of the expected duties for the epoch
914+ // Note: committees will be nil for the last epoch
915+ if es .committees != nil {
916+ r .getDutiesForEpoch (es .committees )
813917 }
814918
815919 // Process all of the slots in the epoch
816920 for i := uint64 (0 ); i < r .slotsPerEpoch ; i ++ {
817- slot := epoch * r .slotsPerEpoch + i
818- attestations := attestationsPerSlot [i ]
819- if len (attestations ) > 0 {
820- r .checkDutiesForSlot (attestations , slot )
921+ slot := es .epoch * r .slotsPerEpoch + i
922+
923+ // The element will be nil if there was no block at the slot
924+ if len (es .attestations [i ]) > 0 {
925+ // There was a block - process its attestations
926+ r .checkDutiesForSlot (es .attestations [i ], slot )
821927 }
822928 }
823929
824- return nil
825-
826930}
827931
828932// Handle all of the attestations in the given slot
@@ -957,7 +1061,7 @@ func (r *treeGeneratorImpl_v8) getDutiesForEpoch(committees beacon.Committees) e
9571061func (r * treeGeneratorImpl_v8 ) createMinipoolIndexMap () error {
9581062
9591063 // Get the status for all uncached minipool validators and add them to the cache
960- r .validatorIndexMap = map [string ]* MinipoolInfo {}
1064+ r .validatorIndexMap = map [uint64 ]* MinipoolInfo {}
9611065 for _ , details := range r .nodeDetails {
9621066 if details .IsEligible {
9631067 for _ , minipoolInfo := range details .Minipools {
@@ -975,7 +1079,11 @@ func (r *treeGeneratorImpl_v8) createMinipoolIndexMap() error {
9751079 default :
9761080 // Get the validator index
9771081 minipoolInfo .ValidatorIndex = status .Index
978- r .validatorIndexMap [minipoolInfo .ValidatorIndex ] = minipoolInfo
1082+ vIdx , err := strconv .ParseUint (minipoolInfo .ValidatorIndex , 10 , 64 )
1083+ if err != nil {
1084+ vIdx = 0
1085+ }
1086+ r .validatorIndexMap [vIdx ] = minipoolInfo
9791087
9801088 // Get the validator's activation start and end slots
9811089 startSlot := status .ActivationEpoch * r .beaconConfig .SlotsPerEpoch
0 commit comments