Skip to content

Commit cff265a

Browse files
committed
Optimize v8
1 parent 106856b commit cff265a

1 file changed

Lines changed: 151 additions & 43 deletions

File tree

shared/services/rewards/generator-impl-v8.go

Lines changed: 151 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ import (
44
"context"
55
"fmt"
66
"math/big"
7+
"os"
8+
"runtime/pprof"
79
"sort"
10+
"strconv"
11+
"sync"
812
"time"
913

1014
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -40,7 +44,7 @@ type treeGeneratorImpl_v8 struct {
4044
smoothingPoolBalance *big.Int
4145
intervalDutiesInfo *IntervalDutiesInfo
4246
slotsPerEpoch uint64
43-
validatorIndexMap map[string]*MinipoolInfo
47+
validatorIndexMap map[uint64]*MinipoolInfo
4448
elStartTime time.Time
4549
elEndTime time.Time
4650
validNetworkCache map[uint64]bool
@@ -88,7 +92,7 @@ func newTreeGeneratorImpl_v8(log *log.ColorLogger, logPrefix string, index uint6
8892
},
8993
},
9094
validatorStatusMap: map[rptypes.ValidatorPubkey]beacon.ValidatorStatus{},
91-
validatorIndexMap: map[string]*MinipoolInfo{},
95+
validatorIndexMap: map[uint64]*MinipoolInfo{},
9296
elSnapshotHeader: elSnapshotHeader,
9397
log: log,
9498
logPrefix: logPrefix,
@@ -727,6 +731,14 @@ func (r *treeGeneratorImpl_v8) calculateNodeRewards() (*big.Int, *big.Int, error
727731

728732
// Get all of the duties for a range of epochs
729733
func (r *treeGeneratorImpl_v8) processAttestationsForInterval() error {
734+
if os.Getenv("DUTIES_PPROF") != "" {
735+
f, err := os.Create(os.Getenv("DUTIES_PPROF"))
736+
if err != nil {
737+
return err
738+
}
739+
pprof.StartCPUProfile(f)
740+
defer pprof.StopCPUProfile()
741+
}
730742

731743
startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch
732744
endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch
@@ -741,91 +753,183 @@ func (r *treeGeneratorImpl_v8) processAttestationsForInterval() error {
741753
r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch)
742754
r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix)
743755

744-
epochsDone := 0
756+
// Workers need a channel to send back errors
757+
errs := make(chan error)
758+
745759
reportStartTime := time.Now()
746-
for epoch := startEpoch; epoch < endEpoch+1; epoch++ {
747-
if epochsDone == 100 {
748-
timeTaken := time.Since(reportStartTime)
749-
r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", r.logPrefix, epoch, endEpoch, float64(epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, timeTaken)
750-
epochsDone = 0
751-
}
752760

753-
err := r.processEpoch(true, epoch)
754-
if err != nil {
755-
return err
756-
}
761+
// Start populating epochStates
762+
r.fetchEpochs(startEpoch, endEpoch, errs, reportStartTime)
757763

758-
epochsDone++
764+
// Read until the error channel is closed
765+
for err := range errs {
766+
return err
759767
}
760768

761769
// Check the epoch after the end of the interval for any lingering attestations
762770
epoch := endEpoch + 1
763-
err = r.processEpoch(false, epoch)
771+
es, err := r.fetchEpoch(false, epoch)
764772
if err != nil {
765773
return err
766774
}
775+
r.processEpoch(es)
767776

768777
r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime))
769778
return nil
770779

771780
}
772781

773782
// Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance
774-
func (r *treeGeneratorImpl_v8) processEpoch(getDuties bool, epoch uint64) error {
783+
func (r *treeGeneratorImpl_v8) fetchEpoch(getDuties bool, epoch uint64) (*epochState, error) {
775784

776785
// Get the committee info and attestation records for this epoch
777-
var committeeData beacon.Committees
778-
attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch)
786+
out := &epochState{
787+
epoch: epoch,
788+
attestations: make([][]beacon.AttestationInfo, r.slotsPerEpoch),
789+
}
779790
var wg errgroup.Group
780791

781792
if getDuties {
782793
wg.Go(func() error {
783794
var err error
784-
committeeData, err = r.bc.GetCommitteesForEpoch(&epoch)
795+
out.committees, err = r.bc.GetCommitteesForEpoch(&epoch)
785796
return err
786797
})
787798
}
788799

789-
for i := uint64(0); i < r.slotsPerEpoch; i++ {
790-
i := i
791-
slot := epoch*r.slotsPerEpoch + i
792-
wg.Go(func() error {
800+
wg.Go(func() error {
801+
for i := uint64(0); i < r.slotsPerEpoch; i++ {
802+
slot := epoch*r.slotsPerEpoch + i
793803
attestations, found, err := r.bc.GetAttestations(fmt.Sprint(slot))
794804
if err != nil {
795805
return err
796806
}
797807
if found {
798-
attestationsPerSlot[i] = attestations
808+
//out.attestationsResponses[i] = attestations
809+
out.attestations[i] = attestations
799810
} else {
800-
attestationsPerSlot[i] = []beacon.AttestationInfo{}
811+
//out.attestationsResponses[i] = nil
812+
out.attestations[i] = nil
801813
}
802-
return nil
803-
})
804-
}
814+
}
815+
return nil
816+
})
805817
err := wg.Wait()
806818
if err != nil {
807-
return fmt.Errorf("error getting committee and attestaion records for epoch %d: %w", epoch, err)
819+
return nil, fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err)
808820
}
809821

810-
if getDuties {
811-
// Get all of the expected duties for the epoch
812-
err = r.getDutiesForEpoch(committeeData)
813-
if err != nil {
814-
return fmt.Errorf("error getting duties for epoch %d: %w", epoch, err)
815-
}
822+
return out, nil
823+
824+
}
825+
826+
func (r *treeGeneratorImpl_v8) fetchEpochs(startEpoch uint64, endEpoch uint64, errChan chan error, startTime time.Time) {
827+
// seq tracks the next expected epoch in the sequence to be sent to the caller
828+
// Since we fetch epochs in parallel, each thread will sleep until it becomes its turn
829+
// to publish an epoch to the caller via the resp channel. Every time seq is updated,
830+
// therefor, all threads must wake up to check the value of seq. If it isn't their turn,
831+
// they will go back to sleep.
832+
var seq uint64
833+
834+
// If we encounter an error, we will wake up all the threads so they can exit. Therefor,
835+
// the first time we encounter an error we should set 'done' to true, and then each thread
836+
// should check its value every time they are woken.
837+
var done bool
838+
839+
// A cond to help the workers synchronize- whenever one thread wants to wake up the other
840+
// threads, it does so by broadcasting on this cond.
841+
cond := sync.NewCond(&sync.Mutex{})
842+
workers := getWorkerCount()
843+
844+
// seq should start with the first epoch the caller is expecting
845+
seq = startEpoch
846+
847+
for j := uint64(0); j < workers; j++ {
848+
849+
id := j
850+
go func() {
851+
// each worker will iterate modulo its id
852+
for epoch := startEpoch + id; epoch < endEpoch+1; epoch += workers {
853+
// Fetch the duties and participation for a single epoch
854+
es, err := r.fetchEpoch(true, epoch)
855+
if err != nil {
856+
// Return the error to the caller
857+
errChan <- err
858+
// Note that an error was encountered
859+
done = true
860+
// Tell other threads to wake up and exit
861+
cond.Broadcast()
862+
// Exit this thread
863+
return
864+
}
865+
866+
// Wait until it's this worker's turn to produce a result
867+
cond.L.Lock()
868+
for seq != epoch && !done {
869+
// No errors have been encountered, and it is not this worker's
870+
// turn yet, so go back to sleep
871+
cond.Wait()
872+
}
873+
874+
// Check if this worker was woken up due to an error
875+
if done {
876+
// Another worker encountered an error, so exit now
877+
return
878+
}
879+
880+
// No error was encountered, and seq indicates it's this worker's turn
881+
// to update the state, so process the epoch
882+
r.processEpoch(es)
883+
884+
if epoch == endEpoch {
885+
// The last result has been produced, so close the channels
886+
close(errChan)
887+
// This worker produced the last result, so
888+
// signal to the other workers that it is time to exit
889+
done = true
890+
} else {
891+
seq++
892+
if seq%100 == 99 {
893+
timeTaken := time.Since(startTime)
894+
r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)",
895+
r.logPrefix,
896+
es.epoch,
897+
endEpoch,
898+
float64(es.epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0,
899+
timeTaken)
900+
}
901+
}
902+
903+
// Either seq has been updated or the last result was produced
904+
// signal to the other workers to wake up and either do work,
905+
// or exit now.
906+
cond.Broadcast()
907+
cond.L.Unlock()
908+
}
909+
}()
910+
}
911+
}
912+
913+
// Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance
914+
func (r *treeGeneratorImpl_v8) processEpoch(es *epochState) {
915+
916+
// Get all of the expected duties for the epoch
917+
// Note: committees will be nil for the last epoch
918+
if es.committees != nil {
919+
r.getDutiesForEpoch(es.committees)
816920
}
817921

818922
// Process all of the slots in the epoch
819923
for i := uint64(0); i < r.slotsPerEpoch; i++ {
820-
slot := epoch*r.slotsPerEpoch + i
821-
attestations := attestationsPerSlot[i]
822-
if len(attestations) > 0 {
823-
r.checkDutiesForSlot(attestations, slot)
924+
slot := es.epoch*r.slotsPerEpoch + i
925+
926+
// The element will be nil if there was no block at the slot
927+
if len(es.attestations[i]) > 0 {
928+
// There was a block - process its attestations
929+
r.checkDutiesForSlot(es.attestations[i], slot)
824930
}
825931
}
826932

827-
return nil
828-
829933
}
830934

831935
// Handle all of the attestations in the given slot
@@ -960,7 +1064,7 @@ func (r *treeGeneratorImpl_v8) getDutiesForEpoch(committees beacon.Committees) e
9601064
func (r *treeGeneratorImpl_v8) createMinipoolIndexMap() error {
9611065

9621066
// Get the status for all uncached minipool validators and add them to the cache
963-
r.validatorIndexMap = map[string]*MinipoolInfo{}
1067+
r.validatorIndexMap = map[uint64]*MinipoolInfo{}
9641068
for _, details := range r.nodeDetails {
9651069
if details.IsEligible {
9661070
for _, minipoolInfo := range details.Minipools {
@@ -978,7 +1082,11 @@ func (r *treeGeneratorImpl_v8) createMinipoolIndexMap() error {
9781082
default:
9791083
// Get the validator index
9801084
minipoolInfo.ValidatorIndex = status.Index
981-
r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo
1085+
vIdx, err := strconv.ParseUint(minipoolInfo.ValidatorIndex, 10, 64)
1086+
if err != nil {
1087+
vIdx = 0
1088+
}
1089+
r.validatorIndexMap[vIdx] = minipoolInfo
9821090

9831091
// Get the validator's activation start and end slots
9841092
startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch

0 commit comments

Comments
 (0)