From 8c1eff6be50fc16feb8d69032209b6006e1e9756 Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Thu, 23 Mar 2023 19:07:17 +0000 Subject: [PATCH 01/11] Allow custom BN in watchtower treegen --- .../watchtower/generate-rewards-tree.go | 16 ++++++++++++--- .../submit-rewards-tree-stateless.go | 18 ++++++++++++++--- rocketpool/watchtower/watchtower.go | 20 +++++++++++++++---- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/rocketpool/watchtower/generate-rewards-tree.go b/rocketpool/watchtower/generate-rewards-tree.go index 49826a532..e02070892 100644 --- a/rocketpool/watchtower/generate-rewards-tree.go +++ b/rocketpool/watchtower/generate-rewards-tree.go @@ -19,6 +19,7 @@ import ( "github.com/rocket-pool/rocketpool-go/rocketpool" "github.com/rocket-pool/smartnode/shared/services" "github.com/rocket-pool/smartnode/shared/services/beacon" + "github.com/rocket-pool/smartnode/shared/services/beacon/client" "github.com/rocket-pool/smartnode/shared/services/config" rprewards "github.com/rocket-pool/smartnode/shared/services/rewards" "github.com/rocket-pool/smartnode/shared/services/state" @@ -52,9 +53,18 @@ func newGenerateRewardsTree(c *cli.Context, logger log.ColorLogger, errorLogger if err != nil { return nil, err } - bc, err := services.GetBeaconClient(c) - if err != nil { - return nil, err + var bc beacon.Client + // Override the beacon client, if requested + if beaconOverride := os.Getenv("TREEGEN_BEACON_CLIENT_ENDPOINT"); beaconOverride != "" { + logger.Printlnf("Using %s as the Beacon Node for GenerateRewardsTree", beaconOverride) + bc = client.NewStandardHttpClient(beaconOverride) + } else { + var err error + + bc, err = services.GetBeaconClient(c) + if err != nil { + return nil, err + } } rp, err := services.GetRocketPool(c) if err != nil { diff --git a/rocketpool/watchtower/submit-rewards-tree-stateless.go b/rocketpool/watchtower/submit-rewards-tree-stateless.go index 0a9d3f7ca..41e86ce4b 100644 --- a/rocketpool/watchtower/submit-rewards-tree-stateless.go +++ b/rocketpool/watchtower/submit-rewards-tree-stateless.go @@ -23,6 +23,7 @@ import ( "github.com/rocket-pool/smartnode/rocketpool/watchtower/utils" "github.com/rocket-pool/smartnode/shared/services" "github.com/rocket-pool/smartnode/shared/services/beacon" + "github.com/rocket-pool/smartnode/shared/services/beacon/client" "github.com/rocket-pool/smartnode/shared/services/config" rprewards "github.com/rocket-pool/smartnode/shared/services/rewards" "github.com/rocket-pool/smartnode/shared/services/state" @@ -68,10 +69,21 @@ func newSubmitRewardsTree_Stateless(c *cli.Context, logger log.ColorLogger, erro if err != nil { return nil, err } - bc, err := services.GetBeaconClient(c) - if err != nil { - return nil, err + + var bc beacon.Client + // Override the beacon client, if requested + if beaconOverride := os.Getenv("TREEGEN_BEACON_CLIENT_ENDPOINT"); beaconOverride != "" { + logger.Printlnf("Using %s as the Beacon Node for SubmitRewardsTree", beaconOverride) + bc = client.NewStandardHttpClient(beaconOverride) + } else { + var err error + + bc, err = services.GetBeaconClient(c) + if err != nil { + return nil, err + } } + rp, err := services.GetRocketPool(c) if err != nil { return nil, err diff --git a/rocketpool/watchtower/watchtower.go b/rocketpool/watchtower/watchtower.go index aab7ec107..9d05f1f4e 100644 --- a/rocketpool/watchtower/watchtower.go +++ b/rocketpool/watchtower/watchtower.go @@ -5,6 +5,7 @@ import ( "math/big" "math/rand" "net/http" + "os" "sync" "time" @@ -18,6 +19,7 @@ import ( "github.com/rocket-pool/smartnode/rocketpool/watchtower/collectors" "github.com/rocket-pool/smartnode/shared/services" "github.com/rocket-pool/smartnode/shared/services/beacon" + "github.com/rocket-pool/smartnode/shared/services/beacon/client" "github.com/rocket-pool/smartnode/shared/services/state" "github.com/rocket-pool/smartnode/shared/utils/log" ) @@ -82,10 +84,6 @@ func run(c *cli.Context) error { if err != nil { return err } - bc, err := services.GetBeaconClient(c) - if err != nil { - return err - } // Print the current mode if cfg.IsNativeMode { @@ -109,6 +107,20 @@ func run(c *cli.Context) error { errorLog := log.NewColorLogger(ErrorColor) updateLog := log.NewColorLogger(UpdateColor) + var bc beacon.Client + // Override the beacon client, if requested + if beaconOverride := os.Getenv("TREEGEN_BEACON_CLIENT_ENDPOINT"); beaconOverride != "" { + updateLog.Printlnf("Overriding the Beacon Node URL to %s", beaconOverride) + bc = client.NewStandardHttpClient(beaconOverride) + } else { + var err error + + bc, err = services.GetBeaconClient(c) + if err != nil { + return err + } + } + // Create the state manager m, err := state.NewNetworkStateManager(rp, cfg, rp.Client, bc, &updateLog) if err != nil { From d6d086bf16fe3e6c973e710408815cc36c69e78a Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Thu, 23 Mar 2023 19:07:31 +0000 Subject: [PATCH 02/11] Process epochs in parallel during participation checks --- shared/services/rewards/generator-impl-v5.go | 320 ++++++++++++++----- 1 file changed, 233 insertions(+), 87 deletions(-) diff --git a/shared/services/rewards/generator-impl-v5.go b/shared/services/rewards/generator-impl-v5.go index a02ae24ac..d20cae882 100644 --- a/shared/services/rewards/generator-impl-v5.go +++ b/shared/services/rewards/generator-impl-v5.go @@ -5,7 +5,9 @@ import ( "encoding/hex" "fmt" "math/big" + "runtime" "sort" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -56,6 +58,12 @@ type treeGeneratorImpl_v5 struct { zero *big.Int } +type epochState struct { + epoch uint64 + committees beacon.Committees + attestations [][]beacon.AttestationInfo +} + // Create a new tree generator func newTreeGeneratorImpl_v5(log *log.ColorLogger, logPrefix string, index uint64, startTime time.Time, endTime time.Time, consensusBlock uint64, elSnapshotHeader *types.Header, intervalsPassed uint64, state *state.NetworkState) *treeGeneratorImpl_v5 { return &treeGeneratorImpl_v5{ @@ -728,6 +736,98 @@ func (r *treeGeneratorImpl_v5) calculateNodeRewards() (*big.Int, *big.Int, error } +// The number of workers to use - clamps nproc/2 between 1 and 4 for now +func getWorkerCount() uint64 { + nproc := runtime.NumCPU() + + target := nproc / 2 + if target < 1 { + return 1 + } + + if target >= 4 { + return 4 + } + + return uint64(target) +} + +func (r *treeGeneratorImpl_v5) fetchEpochs(startEpoch uint64, endEpoch uint64, resp chan *epochState, errChan chan error) { + // seq tracks the next expected epoch in the sequence to be sent to the caller + // Since we fetch epochs in parallel, each thread will sleep until it becomes its turn + // to publish an epoch to the caller via the resp channel. Every time seq is updated, + // therefor, all threads must wake up to check the value of seq. If it isn't their turn, + // they will go back to sleep. + var seq uint64 + + // If we encounter an error, we will wake up all the threads so they can exit. Therefor, + // the first time we encounter an error we should set 'done' to true, and then each thread + // should check its value every time they are woken. + var done bool + + // A cond to help the workers synchronize- whenever one thread wants to wake up the other + // threads, it does so by broadcasting on this cond. + cond := sync.NewCond(&sync.Mutex{}) + workers := getWorkerCount() + + // seq should start with the first epoch the caller is expecting + seq = startEpoch + for j := uint64(0); j < workers; j++ { + + id := j + go func() { + // each worker will iterate modulo its id + for epoch := startEpoch + id; epoch < endEpoch+1; epoch += workers { + // Fetch the duties and participation for a single epoch + es, err := r.fetchEpoch(true, epoch) + if err != nil { + // Return the error to the caller + errChan <- err + // Note that an error was encountered + done = true + // Tell other threads to wake up and exit + cond.Broadcast() + // Exit this thread + return + } + + // Wait until it's this worker's turn to produce a result + cond.L.Lock() + for seq != epoch && !done { + // No errors have been encountered, and it is not this worker's + // turn yet, so go back to sleep + cond.Wait() + } + + // Check if this worker was woken up due to an error + if done { + // Another worker encountered an error, so exit now + return + } + + // No error was encountered, and seq indicates it's this worker's turn + // to reply, so produce a result on resp + resp <- es + if epoch == endEpoch { + // The last result has been produced, so close the channels + close(errChan) + close(resp) + // This worker produced the last result, so + // signal to the other workers that it is time to exit + done = true + } else { + seq++ + } + // Either seq has been updated or the last result was produced + // signal to the other workers to wake up and either do work, + // or exit now. + cond.Broadcast() + cond.L.Unlock() + } + }() + } +} + // Get all of the duties for a range of epochs func (r *treeGeneratorImpl_v5) processAttestationsForInterval() error { @@ -744,90 +844,137 @@ func (r *treeGeneratorImpl_v5) processAttestationsForInterval() error { r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) + // Make a channel for fetchEpochs to reply on + // Buffer just 1 epoch in the channel- that way, workers don't block until + // the main thread is working on one epoch and has one on deck. + epochStates := make(chan *epochState, 1) + // Workers need a channel to send back errors + errs := make(chan error) + epochsDone := 0 reportStartTime := time.Now() - for epoch := startEpoch; epoch < endEpoch+1; epoch++ { - if epochsDone == 100 { - timeTaken := time.Since(reportStartTime) - r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", r.logPrefix, epoch, endEpoch, float64(epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, timeTaken) - epochsDone = 0 - } - err := r.processEpoch(true, epoch) - if err != nil { - return err + // Start populating epochStates + r.fetchEpochs(startEpoch, endEpoch, epochStates, errs) + + // Read until both channels are closed + for { + select { + case es, ok := <-epochStates: + if !ok { + // The channel has been closed. + // Clear the reference to it so we can exit + epochStates = nil + } else { + // We received work from the channel, process it. + if epochsDone == 100 { + timeTaken := time.Since(reportStartTime) + r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", + r.logPrefix, + es.epoch, + endEpoch, + float64(es.epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, + timeTaken) + epochsDone = 0 + } + // Process epoch state + r.processEpoch(es) + epochsDone++ + } + + case err, ok := <-errs: + if !ok { + // The error channel has been closed. + // Clear the reference so we can exit. + errs = nil + } + + if err != nil { + // We received an error- exit and propagate it. + return err + } } - epochsDone++ + // If both channels have been closed, stop polling. + if epochStates == nil && errs == nil { + break + } } // Check the epoch after the end of the interval for any lingering attestations epoch := endEpoch + 1 - err = r.processEpoch(false, epoch) + es, err := r.fetchEpoch(false, epoch) if err != nil { return err } + r.processEpoch(es) r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) return nil } +func (r *treeGeneratorImpl_v5) processEpoch(es *epochState) { + + // Get all of the expected duties for the epoch + // Note: committees will be nil for the last epoch + if es.committees != nil { + r.getDutiesForEpoch(es.committees) + } + + // Process all of the slots in the epoch + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := es.epoch*r.slotsPerEpoch + i + + // The element will be nil if there was no block at the slot + if len(es.attestations[i]) > 0 { + // There was a block - process its attestations + r.checkDutiesForSlot(es.attestations[i], slot) + } + } +} + // Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance -func (r *treeGeneratorImpl_v5) processEpoch(getDuties bool, epoch uint64) error { +func (r *treeGeneratorImpl_v5) fetchEpoch(getDuties bool, epoch uint64) (*epochState, error) { // Get the committee info and attestation records for this epoch - var committeeData beacon.Committees - attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch) + out := &epochState{ + epoch: epoch, + attestations: make([][]beacon.AttestationInfo, r.slotsPerEpoch), + } var wg errgroup.Group if getDuties { wg.Go(func() error { var err error - committeeData, err = r.bc.GetCommitteesForEpoch(&epoch) + out.committees, err = r.bc.GetCommitteesForEpoch(&epoch) return err }) } - for i := uint64(0); i < r.slotsPerEpoch; i++ { - i := i - slot := epoch*r.slotsPerEpoch + i - wg.Go(func() error { + wg.Go(func() error { + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := epoch*r.slotsPerEpoch + i attestations, found, err := r.bc.GetAttestations(fmt.Sprint(slot)) if err != nil { return err } if found { - attestationsPerSlot[i] = attestations + //out.attestationsResponses[i] = attestations + out.attestations[i] = attestations } else { - attestationsPerSlot[i] = []beacon.AttestationInfo{} + //out.attestationsResponses[i] = nil + out.attestations[i] = nil } - return nil - }) - } + } + return nil + }) err := wg.Wait() if err != nil { - return fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) - } - - if getDuties { - // Get all of the expected duties for the epoch - err = r.getDutiesForEpoch(committeeData) - if err != nil { - return fmt.Errorf("Error getting duties for epoch %d: %w", epoch, err) - } - } - - // Process all of the slots in the epoch - for i := uint64(0); i < r.slotsPerEpoch; i++ { - slot := epoch*r.slotsPerEpoch + i - attestations := attestationsPerSlot[i] - if len(attestations) > 0 { - r.checkDutiesForSlot(attestations, slot) - } + return nil, fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) } - return nil + return out, nil } @@ -842,47 +989,51 @@ func (r *treeGeneratorImpl_v5) checkDutiesForSlot(attestations []beacon.Attestat // Get the RP committees for this attestation's slot and index slotInfo, exists := r.intervalDutiesInfo.Slots[attestation.SlotIndex] - if exists { - rpCommittee, exists := slotInfo.Committees[attestation.CommitteeIndex] - if exists { - blockTime := time.Unix(int64(r.networkState.BeaconConfig.GenesisTime), 0).Add(time.Second * time.Duration(r.networkState.BeaconConfig.SecondsPerSlot*attestation.SlotIndex)) - - // Check if each RP validator attested successfully - for position, validator := range rpCommittee.Positions { - if attestation.AggregationBits.BitAt(uint64(position)) { - // This was seen, so remove it from the missing attestations and add it to the completed ones - delete(rpCommittee.Positions, position) - if len(rpCommittee.Positions) == 0 { - delete(slotInfo.Committees, attestation.CommitteeIndex) - } - if len(slotInfo.Committees) == 0 { - delete(r.intervalDutiesInfo.Slots, attestation.SlotIndex) - } - validator.CompletedAttestations[attestation.SlotIndex] = true - delete(validator.MissingAttestationSlots, attestation.SlotIndex) + if !exists { + continue + } - // Check if this minipool was opted into the SP for this block - nodeDetails := r.nodeDetails[validator.NodeIndex] - if blockTime.Sub(nodeDetails.OptInTime) < 0 || nodeDetails.OptOutTime.Sub(blockTime) < 0 { - // Not opted in - continue - } + rpCommittee, exists := slotInfo.Committees[attestation.CommitteeIndex] + if !exists { + continue + } + blockTime := time.Unix(int64(r.networkState.BeaconConfig.GenesisTime), 0).Add(time.Second * time.Duration(r.networkState.BeaconConfig.SecondsPerSlot*attestation.SlotIndex)) - // Get the pseudoscore for this attestation - details := r.networkState.MinipoolDetailsByAddress[validator.Address] - bond, fee := r.getMinipoolBondAndNodeFee(details, blockTime) - minipoolScore := big.NewInt(0).Sub(one, fee) // 1 - fee - minipoolScore.Mul(minipoolScore, bond) // Multiply by bond - minipoolScore.Div(minipoolScore, validatorReq) // Divide by 32 to get the bond as a fraction of a total validator - minipoolScore.Add(minipoolScore, fee) // Total = fee + (bond/32)(1 - fee) - - // Add it to the minipool's score and the total score - validator.AttestationScore.Add(&validator.AttestationScore.Int, minipoolScore) - r.totalAttestationScore.Add(r.totalAttestationScore, minipoolScore) - r.successfulAttestations++ - } - } + // Check if each RP validator attested successfully + for position, validator := range rpCommittee.Positions { + if !attestation.AggregationBits.BitAt(uint64(position)) { + continue + } + // This was seen, so remove it from the missing attestations and add it to the completed ones + delete(rpCommittee.Positions, position) + if len(rpCommittee.Positions) == 0 { + delete(slotInfo.Committees, attestation.CommitteeIndex) + } + if len(slotInfo.Committees) == 0 { + delete(r.intervalDutiesInfo.Slots, attestation.SlotIndex) + } + validator.CompletedAttestations[attestation.SlotIndex] = true + delete(validator.MissingAttestationSlots, attestation.SlotIndex) + + // Check if this minipool was opted into the SP for this block + nodeDetails := r.nodeDetails[validator.NodeIndex] + if blockTime.Sub(nodeDetails.OptInTime) < 0 || nodeDetails.OptOutTime.Sub(blockTime) < 0 { + // Not opted in + continue } + + // Get the pseudoscore for this attestation + details := r.networkState.MinipoolDetailsByAddress[validator.Address] + bond, fee := r.getMinipoolBondAndNodeFee(details, blockTime) + minipoolScore := big.NewInt(0).Sub(one, fee) // 1 - fee + minipoolScore.Mul(minipoolScore, bond) // Multiply by bond + minipoolScore.Div(minipoolScore, validatorReq) // Divide by 32 to get the bond as a fraction of a total validator + minipoolScore.Add(minipoolScore, fee) // Total = fee + (bond/32)(1 - fee) + + // Add it to the minipool's score and the total score + validator.AttestationScore.Add(&validator.AttestationScore.Int, minipoolScore) + r.totalAttestationScore.Add(r.totalAttestationScore, minipoolScore) + r.successfulAttestations++ } } @@ -891,9 +1042,7 @@ func (r *treeGeneratorImpl_v5) checkDutiesForSlot(attestations []beacon.Attestat } // Maps out the attestaion duties for the given epoch -func (r *treeGeneratorImpl_v5) getDutiesForEpoch(committees beacon.Committees) error { - - defer committees.Release() +func (r *treeGeneratorImpl_v5) getDutiesForEpoch(committees beacon.Committees) { // Crawl the committees for idx := 0; idx < committees.Count(); idx++ { @@ -930,9 +1079,6 @@ func (r *treeGeneratorImpl_v5) getDutiesForEpoch(committees beacon.Committees) e } } } - - return nil - } // Maps all minipools to their validator indices and creates a map of indices to minipool info From e987d0b37fc53158dc31e28852bfefa3e3370a84 Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Thu, 23 Mar 2023 19:40:09 +0000 Subject: [PATCH 03/11] Optimize the standard beacon client for treegen --- go.mod | 4 +- go.sum | 6 + .../services/beacon/client/std-http-client.go | 195 ++++++++++++++++-- 3 files changed, 186 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index b2f14299c..43908b68b 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/shirou/gopsutil/v3 v3.23.1 github.com/tyler-smith/go-bip39 v1.1.0 github.com/urfave/cli v1.22.12 + github.com/valyala/fastjson v1.6.4 github.com/wealdtech/go-ens/v3 v3.5.5 github.com/wealdtech/go-eth2-types/v2 v2.8.1-0.20230131115251-b93cf60cee26 github.com/wealdtech/go-eth2-util v1.8.0 @@ -148,11 +149,12 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/supranational/blst v0.3.8-0.20220526154634-513d2456b344 // indirect + github.com/supranational/blst v0.3.10 // indirect github.com/thomaso-mirodin/intmath v0.0.0-20160323211736-5dc6d854e46e // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80 // indirect + github.com/umbracle/go-eth-consensus v0.1.2 // indirect github.com/wealdtech/go-bytesutil v1.2.0 // indirect github.com/wealdtech/go-multicodec v1.4.0 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect diff --git a/go.sum b/go.sum index 6d344def7..369ad9019 100644 --- a/go.sum +++ b/go.sum @@ -1529,6 +1529,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= github.com/supranational/blst v0.3.8-0.20220526154634-513d2456b344 h1:m+8fKfQwCAy1QjzINvKe/pYtLjo2dl59x2w9YSEJxuY= github.com/supranational/blst v0.3.8-0.20220526154634-513d2456b344/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= +github.com/supranational/blst v0.3.10 h1:CMciDZ/h4pXDDXQASe8ZGTNKUiVNxVVA5hpci2Uuhuk= +github.com/supranational/blst v0.3.10/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDdvS342BElfbETmL1Aiz3i2t0zfRj16Hs= @@ -1560,6 +1562,8 @@ github.com/ugorji/go/codec v1.1.13/go.mod h1:oNVt3Dq+FO91WNQ/9JnHKQP2QJxTzoN7wCB github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw= github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0= github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY= +github.com/umbracle/go-eth-consensus v0.1.2 h1:oRAZwURW3u6kWPBCYYp2WpknSy5rAKf5OwMJahdPK3c= +github.com/umbracle/go-eth-consensus v0.1.2/go.mod h1:FutcwopvmyWSPl3FQRT9Oru83/tJeGMxy4k0OfJ0sdY= github.com/umbracle/gohashtree v0.0.2-alpha.0.20230207094856-5b775a815c10 h1:CQh33pStIp/E30b7TxDlXfM0145bn2e8boI30IxAhTg= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= @@ -1569,6 +1573,8 @@ github.com/urfave/cli v1.22.12 h1:igJgVw1JdKH+trcLWLeLwZjU9fEfPesQ+9/e4MQ44S8= github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8= github.com/urfave/cli/v2 v2.14.1 h1:0Sx+C9404t2+DPuIJ3UpZFOEFhNG3wPxMj7uZHyZKFA= github.com/urfave/cli/v2 v2.14.1/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= +github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ= +github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= diff --git a/shared/services/beacon/client/std-http-client.go b/shared/services/beacon/client/std-http-client.go index 7bbd79254..0a0f2a726 100644 --- a/shared/services/beacon/client/std-http-client.go +++ b/shared/services/beacon/client/std-http-client.go @@ -14,10 +14,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/goccy/go-json" "github.com/prysmaticlabs/prysm/v3/crypto/bls" - "github.com/rocket-pool/rocketpool-go/types" + gec "github.com/umbracle/go-eth-consensus" + "github.com/valyala/fastjson" eth2types "github.com/wealdtech/go-eth2-types/v2" "golang.org/x/sync/errgroup" + "github.com/rocket-pool/rocketpool-go/types" "github.com/rocket-pool/smartnode/shared/services/beacon" "github.com/rocket-pool/smartnode/shared/utils/eth2" hexutil "github.com/rocket-pool/smartnode/shared/utils/hex" @@ -50,6 +52,7 @@ const ( // Beacon client using the standard Beacon HTTP REST API (https://ethereum.github.io/beacon-APIs/) type StandardHttpClient struct { providerAddress string + parsers fastjson.ParserPool } // Create a new client instance @@ -462,28 +465,111 @@ func (c *StandardHttpClient) GetEth1DataForEth2Block(blockId string) (beacon.Eth } -func (c *StandardHttpClient) GetAttestations(blockId string) ([]beacon.AttestationInfo, bool, error) { - attestations, exists, err := c.getAttestations(blockId) +type attestationsResponseRaw struct { + body []byte + version string +} + +func (c *StandardHttpClient) GetAttestationsRaw(blockId string) (*attestationsResponseRaw, bool, error) { + + // Build the request + req, err := http.NewRequest("GET", fmt.Sprintf("%s/eth/v2/beacon/blocks/%s", c.providerAddress, blockId), nil) if err != nil { return nil, false, err } - if !exists { + + req.Header.Set("accept", "application/octet-stream") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, false, err + } + defer func() { + _ = resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, false, fmt.Errorf("Could not get attestations data for slot %s: HTTP status %d", blockId, resp.StatusCode) + } + + if resp.StatusCode == http.StatusNotFound { return nil, false, nil } - // Add attestation info - attestationInfo := make([]beacon.AttestationInfo, len(attestations.Data)) - for i, attestation := range attestations.Data { - bitString := hexutil.RemovePrefix(attestation.AggregationBits) - attestationInfo[i].SlotIndex = uint64(attestation.Data.Slot) - attestationInfo[i].CommitteeIndex = uint64(attestation.Data.Index) - attestationInfo[i].AggregationBits, err = hex.DecodeString(bitString) - if err != nil { - return nil, false, fmt.Errorf("Error decoding aggregation bits for attestation %d of block %s: %w", i, blockId, err) + if resp.StatusCode != http.StatusOK { + return nil, false, fmt.Errorf("Could not get attestations data for slot %s: HTTP status %d; reponse body: '%s'", blockId, resp.StatusCode, string(body)) + } + + return &attestationsResponseRaw{ + body: body, + version: resp.Header.Get("Eth-Consensus-Version"), + }, true, nil +} + +func (c *StandardHttpClient) ParseAttestationsResponseRaw(resp *attestationsResponseRaw) ([]beacon.AttestationInfo, error) { + var attestations []*gec.Attestation + + // Unmarshal block SSZ + if strings.EqualFold(resp.version, "phase0") { + block := new(gec.SignedBeaconBlockPhase0) + if err := block.UnmarshalSSZ(resp.body); err != nil { + return nil, err + } + + attestations = block.Block.Body.Attestations + } else if strings.EqualFold(resp.version, "altair") { + block := new(gec.SignedBeaconBlockAltair) + if err := block.UnmarshalSSZ(resp.body); err != nil { + return nil, err + } + + attestations = block.Block.Body.Attestations + } else if strings.EqualFold(resp.version, "bellatrix") { + block := new(gec.SignedBeaconBlockBellatrix) + if err := block.UnmarshalSSZ(resp.body); err != nil { + return nil, err + } + + attestations = block.Block.Body.Attestations + } else if strings.EqualFold(resp.version, "capella") { + block := new(gec.SignedBeaconBlockCapella) + if err := block.UnmarshalSSZ(resp.body); err != nil { + return nil, err } + + attestations = block.Block.Body.Attestations + } else { + return nil, fmt.Errorf("unknown consensus version header: %s", resp.version) + } + + out := make([]beacon.AttestationInfo, len(attestations)) + for i := range attestations { + out[i].AggregationBits = attestations[i].AggregationBits + out[i].SlotIndex = attestations[i].Data.Slot + out[i].CommitteeIndex = attestations[i].Data.Index + } + + return out, nil + +} + +func (c *StandardHttpClient) GetAttestations(blockId string) ([]beacon.AttestationInfo, bool, error) { + resp, found, err := c.GetAttestationsRaw(blockId) + if err != nil { + return nil, found, err } - return attestationInfo, true, nil + if found == false { + return nil, found, err + } + + out, err := c.ParseAttestationsResponseRaw(resp) + if err != nil { + return nil, found, err + } + + return out, true, nil } func (c *StandardHttpClient) GetBeaconBlock(blockId string) (beacon.BeaconBlock, bool, error) { @@ -526,14 +612,87 @@ func (c *StandardHttpClient) GetBeaconBlock(blockId string) (beacon.BeaconBlock, return beaconBlock, true, nil } -// Get the attestation committees for the given epoch, or the current epoch if nil -func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) (beacon.Committees, error) { - response, err := c.getCommittees("head", epoch) +type committeesResponseRaw []byte + +func (c *StandardHttpClient) GetCommitteesForEpochRaw(epoch *uint64) (committeesResponseRaw, error) { + + query := "" + if epoch != nil { + query = fmt.Sprintf("?epoch=%d", *epoch) + } + resp, err := http.Get(fmt.Sprintf("%s/eth/v1/beacon/states/head/committees%s", c.providerAddress, query)) + if err != nil { + return nil, fmt.Errorf("Could not get committees: %w", err) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("Could not get committees: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Could not get committees: HTTP status %d; response body: '%s'", resp.StatusCode, string(body)) + } + + return body, nil +} + +// Use a faster json lib here +func (c *StandardHttpClient) ParseCommitteesResponseRaw(body committeesResponseRaw) ([]beacon.Committee, error) { + + p := c.parsers.Get() + defer c.parsers.Put(p) + + v, err := p.ParseBytes(body) + if err != nil { + return nil, fmt.Errorf("Could not get committees: error parsing json: %w\n", err) + } + v = v.Get("data") + committees, err := v.Array() + if err != nil { + return nil, fmt.Errorf("Could not get committees: error parsing json: %w\n", err) + } + + out := make([]beacon.Committee, len(committees)) + + for i := range committees { + idx, err := strconv.ParseUint(string(committees[i].GetStringBytes("index")), 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not get committees: error converting idx to uint64: %w\n", err) + } + slot, err := strconv.ParseUint(string(committees[i].GetStringBytes("slot")), 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not get committees: error converting idx to uint64: %w\n", err) + } + out[i] = beacon.Committee{ + Index: idx, + Slot: slot, + } + + validators, err := committees[i].Get("validators").Array() + if err != nil { + return nil, fmt.Errorf("Could not get committees: error parsing validators json: %w\n", err) + } + + out[i].Validators = make([]uint64, len(validators)) + for j, validator := range validators { + u64, err := strconv.ParseUint(string(validator.GetStringBytes()), 10, 64) + if err != nil { + return nil, fmt.Errorf("Could not get committees: error parsing validator key json: %w\n", err) + } + out[i].Validators[j] = u64 + } + } + + return out, nil + +} + +func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) ([]beacon.Committee, error) { + body, err := c.GetCommitteesForEpochRaw(epoch) if err != nil { return nil, err } - return &response, nil + return c.ParseCommitteesResponseRaw(body) } // Perform a withdrawal credentials change on a validator From 2bd43e6c459f26ab67acaba30d05170beb8362e9 Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Thu, 23 Mar 2023 21:03:58 +0000 Subject: [PATCH 04/11] Switch to goccy/go-json to trade a bit of speed for better memory footprint --- go.mod | 3 +- go.sum | 4 - .../services/beacon/client/std-http-client.go | 84 +++---------------- 3 files changed, 12 insertions(+), 79 deletions(-) diff --git a/go.mod b/go.mod index 43908b68b..de772a136 100644 --- a/go.mod +++ b/go.mod @@ -35,8 +35,8 @@ require ( github.com/sethvargo/go-password v0.2.0 github.com/shirou/gopsutil/v3 v3.23.1 github.com/tyler-smith/go-bip39 v1.1.0 + github.com/umbracle/go-eth-consensus v0.1.2 github.com/urfave/cli v1.22.12 - github.com/valyala/fastjson v1.6.4 github.com/wealdtech/go-ens/v3 v3.5.5 github.com/wealdtech/go-eth2-types/v2 v2.8.1-0.20230131115251-b93cf60cee26 github.com/wealdtech/go-eth2-util v1.8.0 @@ -154,7 +154,6 @@ require ( github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80 // indirect - github.com/umbracle/go-eth-consensus v0.1.2 // indirect github.com/wealdtech/go-bytesutil v1.2.0 // indirect github.com/wealdtech/go-multicodec v1.4.0 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect diff --git a/go.sum b/go.sum index 369ad9019..bfa39dff4 100644 --- a/go.sum +++ b/go.sum @@ -1527,8 +1527,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= -github.com/supranational/blst v0.3.8-0.20220526154634-513d2456b344 h1:m+8fKfQwCAy1QjzINvKe/pYtLjo2dl59x2w9YSEJxuY= -github.com/supranational/blst v0.3.8-0.20220526154634-513d2456b344/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/supranational/blst v0.3.10 h1:CMciDZ/h4pXDDXQASe8ZGTNKUiVNxVVA5hpci2Uuhuk= github.com/supranational/blst v0.3.10/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= @@ -1573,8 +1571,6 @@ github.com/urfave/cli v1.22.12 h1:igJgVw1JdKH+trcLWLeLwZjU9fEfPesQ+9/e4MQ44S8= github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8= github.com/urfave/cli/v2 v2.14.1 h1:0Sx+C9404t2+DPuIJ3UpZFOEFhNG3wPxMj7uZHyZKFA= github.com/urfave/cli/v2 v2.14.1/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= -github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ= -github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= diff --git a/shared/services/beacon/client/std-http-client.go b/shared/services/beacon/client/std-http-client.go index 0a0f2a726..fabcbf1dc 100644 --- a/shared/services/beacon/client/std-http-client.go +++ b/shared/services/beacon/client/std-http-client.go @@ -15,7 +15,6 @@ import ( "github.com/goccy/go-json" "github.com/prysmaticlabs/prysm/v3/crypto/bls" gec "github.com/umbracle/go-eth-consensus" - "github.com/valyala/fastjson" eth2types "github.com/wealdtech/go-eth2-types/v2" "golang.org/x/sync/errgroup" @@ -52,7 +51,6 @@ const ( // Beacon client using the standard Beacon HTTP REST API (https://ethereum.github.io/beacon-APIs/) type StandardHttpClient struct { providerAddress string - parsers fastjson.ParserPool } // Create a new client instance @@ -612,87 +610,27 @@ func (c *StandardHttpClient) GetBeaconBlock(blockId string) (beacon.BeaconBlock, return beaconBlock, true, nil } -type committeesResponseRaw []byte - -func (c *StandardHttpClient) GetCommitteesForEpochRaw(epoch *uint64) (committeesResponseRaw, error) { - - query := "" - if epoch != nil { - query = fmt.Sprintf("?epoch=%d", *epoch) - } - resp, err := http.Get(fmt.Sprintf("%s/eth/v1/beacon/states/head/committees%s", c.providerAddress, query)) - if err != nil { - return nil, fmt.Errorf("Could not get committees: %w", err) - } - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("Could not get committees: %w", err) - } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("Could not get committees: HTTP status %d; response body: '%s'", resp.StatusCode, string(body)) - } - - return body, nil -} - -// Use a faster json lib here -func (c *StandardHttpClient) ParseCommitteesResponseRaw(body committeesResponseRaw) ([]beacon.Committee, error) { - - p := c.parsers.Get() - defer c.parsers.Put(p) - - v, err := p.ParseBytes(body) - if err != nil { - return nil, fmt.Errorf("Could not get committees: error parsing json: %w\n", err) - } - v = v.Get("data") - committees, err := v.Array() +func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) ([]beacon.Committee, error) { + response, err := c.getCommittees("head", epoch) if err != nil { - return nil, fmt.Errorf("Could not get committees: error parsing json: %w\n", err) + return nil, err } - out := make([]beacon.Committee, len(committees)) + out := make([]beacon.Committee, len(response.Data)) + for i, committee := range response.Data { + validators := make([]uint64, len(committee.Validators)) - for i := range committees { - idx, err := strconv.ParseUint(string(committees[i].GetStringBytes("index")), 10, 64) - if err != nil { - return nil, fmt.Errorf("Could not get committees: error converting idx to uint64: %w\n", err) - } - slot, err := strconv.ParseUint(string(committees[i].GetStringBytes("slot")), 10, 64) - if err != nil { - return nil, fmt.Errorf("Could not get committees: error converting idx to uint64: %w\n", err) + for j, validator := range committee.Validators { + validators[j] = uint64(validator) } out[i] = beacon.Committee{ - Index: idx, - Slot: slot, - } - - validators, err := committees[i].Get("validators").Array() - if err != nil { - return nil, fmt.Errorf("Could not get committees: error parsing validators json: %w\n", err) - } - - out[i].Validators = make([]uint64, len(validators)) - for j, validator := range validators { - u64, err := strconv.ParseUint(string(validator.GetStringBytes()), 10, 64) - if err != nil { - return nil, fmt.Errorf("Could not get committees: error parsing validator key json: %w\n", err) - } - out[i].Validators[j] = u64 + Index: uint64(committee.Index), + Slot: uint64(committee.Slot), + Validators: validators, } } return out, nil - -} - -func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) ([]beacon.Committee, error) { - body, err := c.GetCommitteesForEpochRaw(epoch) - if err != nil { - return nil, err - } - - return c.ParseCommitteesResponseRaw(body) } // Perform a withdrawal credentials change on a validator From c0a06fa8955184f8de4e972e150e1e6354cf8adf Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Fri, 31 Mar 2023 17:24:23 +0000 Subject: [PATCH 05/11] Tighten the watchtower loop --- rocketpool/watchtower/watchtower.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rocketpool/watchtower/watchtower.go b/rocketpool/watchtower/watchtower.go index 9d05f1f4e..e83ff9fcf 100644 --- a/rocketpool/watchtower/watchtower.go +++ b/rocketpool/watchtower/watchtower.go @@ -25,9 +25,9 @@ import ( ) // Config -var minTasksInterval, _ = time.ParseDuration("4m") -var maxTasksInterval, _ = time.ParseDuration("6m") -var taskCooldown, _ = time.ParseDuration("5s") +var minTasksInterval, _ = time.ParseDuration("30s") +var maxTasksInterval, _ = time.ParseDuration("1m") +var taskCooldown, _ = time.ParseDuration("2s") const ( MaxConcurrentEth1Requests = 200 From 8805d84fef7cdcfd58332e85a06d637ee2ac6d27 Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Wed, 5 Apr 2023 18:25:22 +0000 Subject: [PATCH 06/11] Last refactors for oomph --- shared/services/beacon/client/pb/beacon.pb.go | 247 ++++++++++++++++++ .../services/beacon/client/std-http-client.go | 131 ++++------ shared/services/beacon/client/types.go | 28 +- shared/services/rewards/generator-impl-v5.go | 87 +++--- 4 files changed, 357 insertions(+), 136 deletions(-) create mode 100644 shared/services/beacon/client/pb/beacon.pb.go diff --git a/shared/services/beacon/client/pb/beacon.pb.go b/shared/services/beacon/client/pb/beacon.pb.go new file mode 100644 index 000000000..c350bb4e2 --- /dev/null +++ b/shared/services/beacon/client/pb/beacon.pb.go @@ -0,0 +1,247 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.12.4 +// source: beacon.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type CommitteesResponseData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Index uint64 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` + Slot uint64 `protobuf:"varint,2,opt,name=slot,proto3" json:"slot,omitempty"` + Validators []uint64 `protobuf:"varint,3,rep,packed,name=validators,proto3" json:"validators,omitempty"` +} + +func (x *CommitteesResponseData) Reset() { + *x = CommitteesResponseData{} + if protoimpl.UnsafeEnabled { + mi := &file_beacon_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CommitteesResponseData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommitteesResponseData) ProtoMessage() {} + +func (x *CommitteesResponseData) ProtoReflect() protoreflect.Message { + mi := &file_beacon_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommitteesResponseData.ProtoReflect.Descriptor instead. +func (*CommitteesResponseData) Descriptor() ([]byte, []int) { + return file_beacon_proto_rawDescGZIP(), []int{0} +} + +func (x *CommitteesResponseData) GetIndex() uint64 { + if x != nil { + return x.Index + } + return 0 +} + +func (x *CommitteesResponseData) GetSlot() uint64 { + if x != nil { + return x.Slot + } + return 0 +} + +func (x *CommitteesResponseData) GetValidators() []uint64 { + if x != nil { + return x.Validators + } + return nil +} + +type CommitteesResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []*CommitteesResponseData `protobuf:"bytes,1,rep,name=data,proto3" json:"data,omitempty"` + ExecutionOptimistic bool `protobuf:"varint,2,opt,name=execution_optimistic,json=executionOptimistic,proto3" json:"execution_optimistic,omitempty"` + Finalized bool `protobuf:"varint,3,opt,name=finalized,proto3" json:"finalized,omitempty"` +} + +func (x *CommitteesResponse) Reset() { + *x = CommitteesResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_beacon_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CommitteesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommitteesResponse) ProtoMessage() {} + +func (x *CommitteesResponse) ProtoReflect() protoreflect.Message { + mi := &file_beacon_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommitteesResponse.ProtoReflect.Descriptor instead. +func (*CommitteesResponse) Descriptor() ([]byte, []int) { + return file_beacon_proto_rawDescGZIP(), []int{1} +} + +func (x *CommitteesResponse) GetData() []*CommitteesResponseData { + if x != nil { + return x.Data + } + return nil +} + +func (x *CommitteesResponse) GetExecutionOptimistic() bool { + if x != nil { + return x.ExecutionOptimistic + } + return false +} + +func (x *CommitteesResponse) GetFinalized() bool { + if x != nil { + return x.Finalized + } + return false +} + +var File_beacon_proto protoreflect.FileDescriptor + +var file_beacon_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x62, 0x65, 0x61, 0x63, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, + 0x70, 0x62, 0x22, 0x62, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x65, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x14, 0x0a, 0x05, + 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x69, 0x6e, 0x64, + 0x65, 0x78, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x6c, 0x6f, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x04, 0x73, 0x6c, 0x6f, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, + 0x74, 0x6f, 0x72, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x04, 0x52, 0x0a, 0x76, 0x61, 0x6c, 0x69, + 0x64, 0x61, 0x74, 0x6f, 0x72, 0x73, 0x22, 0x95, 0x01, 0x0a, 0x12, 0x43, 0x6f, 0x6d, 0x6d, 0x69, + 0x74, 0x74, 0x65, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, + 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x62, + 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, + 0x14, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6d, + 0x69, 0x73, 0x74, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x13, 0x65, 0x78, 0x65, + 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x70, 0x74, 0x69, 0x6d, 0x69, 0x73, 0x74, 0x69, 0x63, + 0x12, 0x1c, 0x0a, 0x09, 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x09, 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x42, 0x06, + 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_beacon_proto_rawDescOnce sync.Once + file_beacon_proto_rawDescData = file_beacon_proto_rawDesc +) + +func file_beacon_proto_rawDescGZIP() []byte { + file_beacon_proto_rawDescOnce.Do(func() { + file_beacon_proto_rawDescData = protoimpl.X.CompressGZIP(file_beacon_proto_rawDescData) + }) + return file_beacon_proto_rawDescData +} + +var file_beacon_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_beacon_proto_goTypes = []interface{}{ + (*CommitteesResponseData)(nil), // 0: pb.CommitteesResponseData + (*CommitteesResponse)(nil), // 1: pb.CommitteesResponse +} +var file_beacon_proto_depIdxs = []int32{ + 0, // 0: pb.CommitteesResponse.data:type_name -> pb.CommitteesResponseData + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_beacon_proto_init() } +func file_beacon_proto_init() { + if File_beacon_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_beacon_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CommitteesResponseData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_beacon_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CommitteesResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_beacon_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_beacon_proto_goTypes, + DependencyIndexes: file_beacon_proto_depIdxs, + MessageInfos: file_beacon_proto_msgTypes, + }.Build() + File_beacon_proto = out.File + file_beacon_proto_rawDesc = nil + file_beacon_proto_goTypes = nil + file_beacon_proto_depIdxs = nil +} diff --git a/shared/services/beacon/client/std-http-client.go b/shared/services/beacon/client/std-http-client.go index fabcbf1dc..063cb8c72 100644 --- a/shared/services/beacon/client/std-http-client.go +++ b/shared/services/beacon/client/std-http-client.go @@ -8,7 +8,6 @@ import ( "net/http" "strconv" "strings" - "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -17,9 +16,12 @@ import ( gec "github.com/umbracle/go-eth-consensus" eth2types "github.com/wealdtech/go-eth2-types/v2" "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" "github.com/rocket-pool/rocketpool-go/types" "github.com/rocket-pool/smartnode/shared/services/beacon" + "github.com/rocket-pool/smartnode/shared/services/beacon/client/pb" "github.com/rocket-pool/smartnode/shared/utils/eth2" hexutil "github.com/rocket-pool/smartnode/shared/utils/hex" ) @@ -611,13 +613,35 @@ func (c *StandardHttpClient) GetBeaconBlock(blockId string) (beacon.BeaconBlock, } func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) ([]beacon.Committee, error) { - response, err := c.getCommittees("head", epoch) + query := "" + if epoch != nil { + query = fmt.Sprintf("?epoch=%d", *epoch) + } + responseBody, header, status, err := c.protoGetRequest(fmt.Sprintf(RequestCommitteePath, "head") + query) if err != nil { - return nil, err + return nil, fmt.Errorf("Could not get committees: %w", err) + } + if status != http.StatusOK { + return nil, fmt.Errorf("Could not get committees: HTTP status %d; response body: '%s'", status, string(responseBody)) + } + + m := pb.CommitteesResponse{} + if header == nil || !strings.EqualFold(header.Get("Content-Type"), "application/protobuf") { + // Parse json + err := protojson.Unmarshal(responseBody, &m) + if err != nil { + return nil, err + } + } else { + // Parse proto + err = proto.Unmarshal(responseBody, &m) + if err != nil { + return nil, err + } } - out := make([]beacon.Committee, len(response.Data)) - for i, committee := range response.Data { + out := make([]beacon.Committee, len(m.Data)) + for i, committee := range m.Data { validators := make([]uint64, len(committee.Validators)) for j, validator := range committee.Validators { @@ -878,76 +902,6 @@ func (c *StandardHttpClient) getBeaconBlock(blockId string) (BeaconBlockResponse return beaconBlock, true, nil } -type committeesDecoder struct { - decoder *json.Decoder - currentReader *io.ReadCloser -} - -// Read will be called by the json decoder to request more bytes of data from -// the beacon node's committees response. Since the decoder is reused, we -// need to avoid sending it io.EOF, or it will enter an unusable state and can -// not be reused later. -// -// On subsequent calls to Decode, the decoder resets its internal buffer, which -// means any data it reads between the last json token and EOF is correctly -// discarded. -func (c *committeesDecoder) Read(p []byte) (int, error) { - n, err := (*c.currentReader).Read(p) - if err == io.EOF { - return n, nil - } - - return n, err -} - -var committeesDecoderPool sync.Pool = sync.Pool{ - New: func() any { - var out committeesDecoder - - out.decoder = json.NewDecoder(&out) - return &out - }, -} - -// Get the committees for the epoch -func (c *StandardHttpClient) getCommittees(stateId string, epoch *uint64) (CommitteesResponse, error) { - var committees CommitteesResponse - - query := "" - if epoch != nil { - query = fmt.Sprintf("?epoch=%d", *epoch) - } - - // Committees responses are large, so let the json decoder read it in a buffered fashion - reader, status, err := c.getRequestReader(fmt.Sprintf(RequestCommitteePath, stateId) + query) - if err != nil { - return CommitteesResponse{}, fmt.Errorf("Could not get committees: %w", err) - } - defer func() { - _ = reader.Close() - }() - - if status != http.StatusOK { - body, _ := io.ReadAll(reader) - return CommitteesResponse{}, fmt.Errorf("Could not get committees: HTTP status %d; response body: '%s'", status, string(body)) - } - - d := committeesDecoderPool.Get().(*committeesDecoder) - defer func() { - d.currentReader = nil - committeesDecoderPool.Put(d) - }() - - d.currentReader = &reader - - // Begin decoding - if err := d.decoder.Decode(&committees); err != nil { - return CommitteesResponse{}, fmt.Errorf("Could not decode committees: %w", err) - } - - return committees, nil -} - // Send withdrawal credentials change request func (c *StandardHttpClient) postWithdrawalCredentialsChange(request BLSToExecutionChangeRequest) error { requestArray := []BLSToExecutionChangeRequest{request} // This route must be wrapped in an array @@ -961,8 +915,31 @@ func (c *StandardHttpClient) postWithdrawalCredentialsChange(request BLSToExecut return nil } -// Make a GET request but do not read its body yet (allows buffered decoding) -func (c *StandardHttpClient) getRequestReader(requestPath string) (io.ReadCloser, int, error) { +func (c *StandardHttpClient) protoGetRequest(requestPath string) ([]byte, http.Header, int, error) { + req, err := http.NewRequest("GET", fmt.Sprintf(RequestUrlFormat, c.providerAddress, requestPath), nil) + if err != nil { + return nil, nil, 0, err + } + + req.Header.Set("Accept", "application/protobuf") + response, err := http.DefaultClient.Do(req) + if err != nil { + return nil, nil, 0, err + } + defer func() { + _ = response.Body.Close() + }() + + body, err := io.ReadAll(response.Body) + if err != nil { + return nil, nil, 0, err + } + + return body, response.Header, response.StatusCode, nil +} + +// Make a GET request to the beacon node +func (c *StandardHttpClient) getRequest(requestPath string) ([]byte, int, error) { // Send request response, err := http.Get(fmt.Sprintf(RequestUrlFormat, c.providerAddress, requestPath)) diff --git a/shared/services/beacon/client/types.go b/shared/services/beacon/client/types.go index 9c4540265..1623e0e5c 100644 --- a/shared/services/beacon/client/types.go +++ b/shared/services/beacon/client/types.go @@ -2,11 +2,11 @@ package client import ( "encoding/hex" + "fmt" "strconv" "github.com/ethereum/go-ethereum/common" "github.com/goccy/go-json" - hexutil "github.com/rocket-pool/smartnode/shared/utils/hex" ) @@ -145,6 +145,9 @@ type Attestation struct { } `json:"data"` } +// Don't mind meeeee +const intSize = 32 << (^uint(0) >> 63) + // Unsigned integer type type uinteger uint64 @@ -155,8 +158,29 @@ func (i *uinteger) UnmarshalJSON(data []byte) error { // Unmarshal string var dataStr string - if err := json.Unmarshal(data, &dataStr); err != nil { + /*if err := json.Unmarshal(data, &dataStr); err != nil { return err + }*/ + dataLen := len(data) + if dataLen <= 2 { + return fmt.Errorf("Invalid json uinteger '%s'", string(data)) + } + dataStr = string(data[1 : dataLen-1]) + sLen := dataLen - 2 + + // Check fast paths + if intSize == 32 && (0 < sLen && sLen < 10) || + intSize == 64 && (0 < sLen && sLen < 19) { + + if sLen > 0 && dataStr[0] != '-' { + signed, err := strconv.Atoi(dataStr) + if err == nil { + *i = uinteger(signed) + return nil + } + } + + // If fast path failed just fall through } // Parse integer value diff --git a/shared/services/rewards/generator-impl-v5.go b/shared/services/rewards/generator-impl-v5.go index d20cae882..e44806502 100644 --- a/shared/services/rewards/generator-impl-v5.go +++ b/shared/services/rewards/generator-impl-v5.go @@ -5,7 +5,9 @@ import ( "encoding/hex" "fmt" "math/big" + "os" "runtime" + "runtime/pprof" "sort" "sync" "time" @@ -740,19 +742,15 @@ func (r *treeGeneratorImpl_v5) calculateNodeRewards() (*big.Int, *big.Int, error func getWorkerCount() uint64 { nproc := runtime.NumCPU() - target := nproc / 2 + target := nproc - 2 if target < 1 { return 1 } - if target >= 4 { - return 4 - } - return uint64(target) } -func (r *treeGeneratorImpl_v5) fetchEpochs(startEpoch uint64, endEpoch uint64, resp chan *epochState, errChan chan error) { +func (r *treeGeneratorImpl_v5) fetchEpochs(startEpoch uint64, endEpoch uint64, errChan chan error, startTime time.Time) { // seq tracks the next expected epoch in the sequence to be sent to the caller // Since we fetch epochs in parallel, each thread will sleep until it becomes its turn // to publish an epoch to the caller via the resp channel. Every time seq is updated, @@ -772,6 +770,7 @@ func (r *treeGeneratorImpl_v5) fetchEpochs(startEpoch uint64, endEpoch uint64, r // seq should start with the first epoch the caller is expecting seq = startEpoch + for j := uint64(0); j < workers; j++ { id := j @@ -806,18 +805,28 @@ func (r *treeGeneratorImpl_v5) fetchEpochs(startEpoch uint64, endEpoch uint64, r } // No error was encountered, and seq indicates it's this worker's turn - // to reply, so produce a result on resp - resp <- es + // to update the state, so process the epoch + r.processEpoch(es) + if epoch == endEpoch { // The last result has been produced, so close the channels close(errChan) - close(resp) // This worker produced the last result, so // signal to the other workers that it is time to exit done = true } else { seq++ + if seq%100 == 99 { + timeTaken := time.Since(startTime) + r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", + r.logPrefix, + es.epoch, + endEpoch, + float64(es.epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, + timeTaken) + } } + // Either seq has been updated or the last result was produced // signal to the other workers to wake up and either do work, // or exit now. @@ -830,6 +839,14 @@ func (r *treeGeneratorImpl_v5) fetchEpochs(startEpoch uint64, endEpoch uint64, r // Get all of the duties for a range of epochs func (r *treeGeneratorImpl_v5) processAttestationsForInterval() error { + if os.Getenv("DUTIES_PPROF") != "" { + f, err := os.Create(os.Getenv("DUTIES_PPROF")) + if err != nil { + return err + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch @@ -844,61 +861,17 @@ func (r *treeGeneratorImpl_v5) processAttestationsForInterval() error { r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) - // Make a channel for fetchEpochs to reply on - // Buffer just 1 epoch in the channel- that way, workers don't block until - // the main thread is working on one epoch and has one on deck. - epochStates := make(chan *epochState, 1) // Workers need a channel to send back errors errs := make(chan error) - epochsDone := 0 reportStartTime := time.Now() // Start populating epochStates - r.fetchEpochs(startEpoch, endEpoch, epochStates, errs) + r.fetchEpochs(startEpoch, endEpoch, errs, reportStartTime) - // Read until both channels are closed - for { - select { - case es, ok := <-epochStates: - if !ok { - // The channel has been closed. - // Clear the reference to it so we can exit - epochStates = nil - } else { - // We received work from the channel, process it. - if epochsDone == 100 { - timeTaken := time.Since(reportStartTime) - r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", - r.logPrefix, - es.epoch, - endEpoch, - float64(es.epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, - timeTaken) - epochsDone = 0 - } - // Process epoch state - r.processEpoch(es) - epochsDone++ - } - - case err, ok := <-errs: - if !ok { - // The error channel has been closed. - // Clear the reference so we can exit. - errs = nil - } - - if err != nil { - // We received an error- exit and propagate it. - return err - } - } - - // If both channels have been closed, stop polling. - if epochStates == nil && errs == nil { - break - } + // Read until the error channel is closed + for err := range errs { + return err } // Check the epoch after the end of the interval for any lingering attestations From 65e832b24b4d93da4ac1ab6c24a6d567d9460b27 Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Wed, 5 Jul 2023 20:33:20 +0000 Subject: [PATCH 07/11] Start tree generation when targetEpoch is justified --- rocketpool/watchtower/submit-rewards-tree-stateless.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rocketpool/watchtower/submit-rewards-tree-stateless.go b/rocketpool/watchtower/submit-rewards-tree-stateless.go index 41e86ce4b..117d4a763 100644 --- a/rocketpool/watchtower/submit-rewards-tree-stateless.go +++ b/rocketpool/watchtower/submit-rewards-tree-stateless.go @@ -578,7 +578,8 @@ func (t *submitRewardsTree_Stateless) getSnapshotConsensusBlock(endTime time.Tim targetSlot := uint64(math.Ceil(totalTimespan.Seconds() / float64(eth2Config.SecondsPerSlot))) targetSlotEpoch := targetSlot / eth2Config.SlotsPerEpoch targetSlot = targetSlotEpoch*eth2Config.SlotsPerEpoch + (eth2Config.SlotsPerEpoch - 1) // The target slot becomes the last one in the Epoch - requiredEpoch := targetSlotEpoch + 1 // The smoothing pool requires 1 epoch beyond the target to be finalized, to check for late attestations + // XXX Jacob, you changed this from targetSlotEpoch + 1 so we start generating when the subsequent epoch justifies instead + requiredEpoch := targetSlotEpoch // The smoothing pool requires 1 epoch beyond the target to be finalized, to check for late attestations // Check if the required epoch is finalized yet if beaconHead.FinalizedEpoch < requiredEpoch { From 20f61af069848ba01f40235bee238d1bda66cd04 Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Tue, 25 Jul 2023 00:46:03 +0000 Subject: [PATCH 08/11] Extend hacks to v6 --- shared/services/beacon/client.go | 2 +- .../services/beacon/client/std-http-client.go | 59 +++-- shared/services/rewards/generator-impl-v1.go | 11 +- shared/services/rewards/generator-impl-v2.go | 11 +- shared/services/rewards/generator-impl-v3.go | 11 +- shared/services/rewards/generator-impl-v4.go | 13 +- shared/services/rewards/generator-impl-v5.go | 13 +- shared/services/rewards/generator-impl-v6.go | 238 +++++++++++++----- shared/services/rewards/generator-impl-v7.go | 13 +- shared/services/rewards/rolling-record.go | 16 +- 10 files changed, 279 insertions(+), 108 deletions(-) diff --git a/shared/services/beacon/client.go b/shared/services/beacon/client.go index d91ec847d..d09aadbef 100644 --- a/shared/services/beacon/client.go +++ b/shared/services/beacon/client.go @@ -79,7 +79,7 @@ type Committees interface { Slot(int) uint64 // Validators returns the list of validators of the committee at // the provided offset - Validators(int) []string + Validators(int) []uint64 // Count returns the number of committees in the response Count() int // Release returns the reused validators slice buffer to the pool for diff --git a/shared/services/beacon/client/std-http-client.go b/shared/services/beacon/client/std-http-client.go index 063cb8c72..e72f69f46 100644 --- a/shared/services/beacon/client/std-http-client.go +++ b/shared/services/beacon/client/std-http-client.go @@ -612,7 +612,31 @@ func (c *StandardHttpClient) GetBeaconBlock(blockId string) (beacon.BeaconBlock, return beaconBlock, true, nil } -func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) ([]beacon.Committee, error) { +type committeeWrapper struct { + resp *pb.CommitteesResponse +} + +func (w *committeeWrapper) Count() int { + return len(w.resp.Data) +} + +func (w *committeeWrapper) Index(i int) uint64 { + return w.resp.Data[i].Index +} + +func (w *committeeWrapper) Slot(i int) uint64 { + return w.resp.Data[i].Slot +} + +func (w *committeeWrapper) Validators(i int) []uint64 { + return w.resp.Data[i].Validators +} + +func (w *committeeWrapper) Release() { + +} + +func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) (beacon.Committees, error) { query := "" if epoch != nil { query = fmt.Sprintf("?epoch=%d", *epoch) @@ -640,21 +664,26 @@ func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) ([]beacon.Comm } } - out := make([]beacon.Committee, len(m.Data)) - for i, committee := range m.Data { - validators := make([]uint64, len(committee.Validators)) - - for j, validator := range committee.Validators { - validators[j] = uint64(validator) - } - out[i] = beacon.Committee{ - Index: uint64(committee.Index), - Slot: uint64(committee.Slot), - Validators: validators, - } + out := committeeWrapper{ + resp: &m, } - return out, nil + /* + out := make(beacon.Committees, len(m.Data)) + for i, committee := range m.Data { + validators := make([]uint64, len(committee.Validators)) + + for j, validator := range committee.Validators { + validators[j] = uint64(validator) + } + out[i] = beacon.Committee{ + Index: uint64(committee.Index), + Slot: uint64(committee.Slot), + Validators: validators, + } + }*/ + + return &out, nil } // Perform a withdrawal credentials change on a validator @@ -939,7 +968,7 @@ func (c *StandardHttpClient) protoGetRequest(requestPath string) ([]byte, http.H } // Make a GET request to the beacon node -func (c *StandardHttpClient) getRequest(requestPath string) ([]byte, int, error) { +func (c *StandardHttpClient) getRequestReader(requestPath string) (io.ReadCloser, int, error) { // Send request response, err := http.Get(fmt.Sprintf(RequestUrlFormat, c.providerAddress, requestPath)) diff --git a/shared/services/rewards/generator-impl-v1.go b/shared/services/rewards/generator-impl-v1.go index 2bad61033..73ebe3439 100644 --- a/shared/services/rewards/generator-impl-v1.go +++ b/shared/services/rewards/generator-impl-v1.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sort" + "strconv" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -44,7 +45,7 @@ type treeGeneratorImpl_v1 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -1000,7 +1001,7 @@ func (r *treeGeneratorImpl_v1) createMinipoolIndexMap() error { } // Get indices for all minipool validators - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(minipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) @@ -1011,7 +1012,11 @@ func (r *treeGeneratorImpl_v1) createMinipoolIndexMap() error { if details.IsEligible { for _, minipoolInfo := range details.Minipools { minipoolInfo.ValidatorIndex = statusMap[minipoolInfo.ValidatorPubkey].Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + idx, err := strconv.ParseUint(minipoolInfo.ValidatorIndex, 10, 64) + if err != nil { + idx = 0 + } + r.validatorIndexMap[idx] = minipoolInfo } } } diff --git a/shared/services/rewards/generator-impl-v2.go b/shared/services/rewards/generator-impl-v2.go index 364139664..0ad2e0916 100644 --- a/shared/services/rewards/generator-impl-v2.go +++ b/shared/services/rewards/generator-impl-v2.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sort" + "strconv" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -48,7 +49,7 @@ type treeGeneratorImpl_v2 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -1019,7 +1020,7 @@ func (r *treeGeneratorImpl_v2) createMinipoolIndexMap() error { } // Get indices for all minipool validators - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(minipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) @@ -1037,6 +1038,10 @@ func (r *treeGeneratorImpl_v2) createMinipoolIndexMap() error { minipoolInfo.EndSlot = 0 minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -1047,7 +1052,7 @@ func (r *treeGeneratorImpl_v2) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = statusMap[minipoolInfo.ValidatorPubkey].Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/generator-impl-v3.go b/shared/services/rewards/generator-impl-v3.go index 270a34912..de2ae6f26 100644 --- a/shared/services/rewards/generator-impl-v3.go +++ b/shared/services/rewards/generator-impl-v3.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sort" + "strconv" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -44,7 +45,7 @@ type treeGeneratorImpl_v3 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -1015,7 +1016,7 @@ func (r *treeGeneratorImpl_v3) createMinipoolIndexMap() error { } // Get indices for all minipool validators - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(minipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) @@ -1033,6 +1034,10 @@ func (r *treeGeneratorImpl_v3) createMinipoolIndexMap() error { minipoolInfo.EndSlot = 0 minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -1043,7 +1048,7 @@ func (r *treeGeneratorImpl_v3) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = statusMap[minipoolInfo.ValidatorPubkey].Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/generator-impl-v4.go b/shared/services/rewards/generator-impl-v4.go index e6b3a4dba..33b789174 100644 --- a/shared/services/rewards/generator-impl-v4.go +++ b/shared/services/rewards/generator-impl-v4.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sort" + "strconv" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -46,7 +47,7 @@ type treeGeneratorImpl_v4 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -1075,7 +1076,7 @@ func (r *treeGeneratorImpl_v4) createMinipoolIndexMap() error { } // Get the status for all uncached minipool validators and add them to the cache - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(uncachedMinipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) @@ -1098,6 +1099,10 @@ func (r *treeGeneratorImpl_v4) createMinipoolIndexMap() error { minipoolInfo.EndSlot = 0 minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -1108,7 +1113,7 @@ func (r *treeGeneratorImpl_v4) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = statusMap[minipoolInfo.ValidatorPubkey].Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch @@ -1472,7 +1477,7 @@ func (r *treeGeneratorImpl_v4) getNodeEffectiveRPLStakes() ([]*big.Int, error) { // Get the status for all staking minipool validators r.log.Printlnf("%s Getting validator statuses for all eligible minipools", r.logPrefix) - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(r.stakingMinipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) diff --git a/shared/services/rewards/generator-impl-v5.go b/shared/services/rewards/generator-impl-v5.go index e44806502..ec458b24a 100644 --- a/shared/services/rewards/generator-impl-v5.go +++ b/shared/services/rewards/generator-impl-v5.go @@ -9,6 +9,7 @@ import ( "runtime" "runtime/pprof" "sort" + "strconv" "sync" "time" @@ -47,7 +48,7 @@ type treeGeneratorImpl_v5 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -102,7 +103,7 @@ func newTreeGeneratorImpl_v5(log *log.ColorLogger, logPrefix string, index uint6 }, }, validatorStatusMap: map[rptypes.ValidatorPubkey]beacon.ValidatorStatus{}, - validatorIndexMap: map[string]*MinipoolInfo{}, + validatorIndexMap: map[uint64]*MinipoolInfo{}, elSnapshotHeader: elSnapshotHeader, log: log, logPrefix: logPrefix, @@ -1058,7 +1059,7 @@ func (r *treeGeneratorImpl_v5) getDutiesForEpoch(committees beacon.Committees) { func (r *treeGeneratorImpl_v5) createMinipoolIndexMap() error { // Get the status for all uncached minipool validators and add them to the cache - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} for _, details := range r.nodeDetails { if details.IsEligible { for _, minipoolInfo := range details.Minipools { @@ -1068,6 +1069,10 @@ func (r *treeGeneratorImpl_v5) createMinipoolIndexMap() error { r.log.Printlnf("NOTE: minipool %s (pubkey %s) didn't exist at this slot; removing it", minipoolInfo.Address.Hex(), minipoolInfo.ValidatorPubkey.Hex()) minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -1076,7 +1081,7 @@ func (r *treeGeneratorImpl_v5) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = status.Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/generator-impl-v6.go b/shared/services/rewards/generator-impl-v6.go index 64ab76e36..f23aaadee 100644 --- a/shared/services/rewards/generator-impl-v6.go +++ b/shared/services/rewards/generator-impl-v6.go @@ -5,7 +5,11 @@ import ( "encoding/hex" "fmt" "math/big" + "os" + "runtime/pprof" "sort" + "strconv" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -41,7 +45,7 @@ type treeGeneratorImpl_v6 struct { smoothingPoolBalance *big.Int intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -91,7 +95,7 @@ func newTreeGeneratorImpl_v6(log *log.ColorLogger, logPrefix string, index uint6 }, }, validatorStatusMap: map[rptypes.ValidatorPubkey]beacon.ValidatorStatus{}, - validatorIndexMap: map[string]*MinipoolInfo{}, + validatorIndexMap: map[uint64]*MinipoolInfo{}, elSnapshotHeader: elSnapshotHeader, log: log, logPrefix: logPrefix, @@ -709,109 +713,207 @@ func (r *treeGeneratorImpl_v6) calculateNodeRewards() (*big.Int, *big.Int, error } -// Get all of the duties for a range of epochs -func (r *treeGeneratorImpl_v6) processAttestationsForInterval() error { - - startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch - endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch - - // Determine the validator indices of each minipool - err := r.createMinipoolIndexMap() - if err != nil { - return err - } +func (r *treeGeneratorImpl_v6) fetchEpochs(startEpoch uint64, endEpoch uint64, errChan chan error, startTime time.Time) { + // seq tracks the next expected epoch in the sequence to be sent to the caller + // Since we fetch epochs in parallel, each thread will sleep until it becomes its turn + // to publish an epoch to the caller via the resp channel. Every time seq is updated, + // therefor, all threads must wake up to check the value of seq. If it isn't their turn, + // they will go back to sleep. + var seq uint64 + + // If we encounter an error, we will wake up all the threads so they can exit. Therefor, + // the first time we encounter an error we should set 'done' to true, and then each thread + // should check its value every time they are woken. + var done bool + + // A cond to help the workers synchronize- whenever one thread wants to wake up the other + // threads, it does so by broadcasting on this cond. + cond := sync.NewCond(&sync.Mutex{}) + workers := getWorkerCount() + + // seq should start with the first epoch the caller is expecting + seq = startEpoch + + for j := uint64(0); j < workers; j++ { + + id := j + go func() { + // each worker will iterate modulo its id + for epoch := startEpoch + id; epoch < endEpoch+1; epoch += workers { + // Fetch the duties and participation for a single epoch + es, err := r.fetchEpoch(true, epoch) + if err != nil { + // Return the error to the caller + errChan <- err + // Note that an error was encountered + done = true + // Tell other threads to wake up and exit + cond.Broadcast() + // Exit this thread + return + } - // Check all of the attestations for each epoch - r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) - r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) + // Wait until it's this worker's turn to produce a result + cond.L.Lock() + for seq != epoch && !done { + // No errors have been encountered, and it is not this worker's + // turn yet, so go back to sleep + cond.Wait() + } - epochsDone := 0 - reportStartTime := time.Now() - for epoch := startEpoch; epoch < endEpoch+1; epoch++ { - if epochsDone == 100 { - timeTaken := time.Since(reportStartTime) - r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", r.logPrefix, epoch, endEpoch, float64(epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, timeTaken) - epochsDone = 0 - } + // Check if this worker was woken up due to an error + if done { + // Another worker encountered an error, so exit now + return + } - err := r.processEpoch(true, epoch) - if err != nil { - return err - } + // No error was encountered, and seq indicates it's this worker's turn + // to update the state, so process the epoch + r.processEpoch(es) - epochsDone++ - } + if epoch == endEpoch { + // The last result has been produced, so close the channels + close(errChan) + // This worker produced the last result, so + // signal to the other workers that it is time to exit + done = true + } else { + seq++ + if seq%100 == 99 { + timeTaken := time.Since(startTime) + r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", + r.logPrefix, + es.epoch, + endEpoch, + float64(es.epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, + timeTaken) + } + } - // Check the epoch after the end of the interval for any lingering attestations - epoch := endEpoch + 1 - err = r.processEpoch(false, epoch) - if err != nil { - return err + // Either seq has been updated or the last result was produced + // signal to the other workers to wake up and either do work, + // or exit now. + cond.Broadcast() + cond.L.Unlock() + } + }() } - - r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) - return nil - } // Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance -func (r *treeGeneratorImpl_v6) processEpoch(getDuties bool, epoch uint64) error { +func (r *treeGeneratorImpl_v6) fetchEpoch(getDuties bool, epoch uint64) (*epochState, error) { // Get the committee info and attestation records for this epoch - var committeeData beacon.Committees - attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch) + out := &epochState{ + epoch: epoch, + attestations: make([][]beacon.AttestationInfo, r.slotsPerEpoch), + } var wg errgroup.Group if getDuties { wg.Go(func() error { var err error - committeeData, err = r.bc.GetCommitteesForEpoch(&epoch) + out.committees, err = r.bc.GetCommitteesForEpoch(&epoch) return err }) } - for i := uint64(0); i < r.slotsPerEpoch; i++ { - i := i - slot := epoch*r.slotsPerEpoch + i - wg.Go(func() error { + wg.Go(func() error { + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := epoch*r.slotsPerEpoch + i attestations, found, err := r.bc.GetAttestations(fmt.Sprint(slot)) if err != nil { return err } if found { - attestationsPerSlot[i] = attestations + //out.attestationsResponses[i] = attestations + out.attestations[i] = attestations } else { - attestationsPerSlot[i] = []beacon.AttestationInfo{} + //out.attestationsResponses[i] = nil + out.attestations[i] = nil } - return nil - }) - } + } + return nil + }) err := wg.Wait() if err != nil { - return fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) + return nil, fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) } - if getDuties { - // Get all of the expected duties for the epoch - err = r.getDutiesForEpoch(committeeData) + return out, nil + +} + +// Get all of the duties for a range of epochs +func (r *treeGeneratorImpl_v6) processAttestationsForInterval() error { + if os.Getenv("DUTIES_PPROF") != "" { + f, err := os.Create(os.Getenv("DUTIES_PPROF")) if err != nil { - return fmt.Errorf("Error getting duties for epoch %d: %w", epoch, err) + return err } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() } - // Process all of the slots in the epoch - for i := uint64(0); i < r.slotsPerEpoch; i++ { - slot := epoch*r.slotsPerEpoch + i - attestations := attestationsPerSlot[i] - if len(attestations) > 0 { - r.checkDutiesForSlot(attestations, slot) - } + startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch + endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch + + // Determine the validator indices of each minipool + err := r.createMinipoolIndexMap() + if err != nil { + return err + } + + // Check all of the attestations for each epoch + r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) + r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) + + // Workers need a channel to send back errors + errs := make(chan error) + + reportStartTime := time.Now() + + // Start populating epochStates + r.fetchEpochs(startEpoch, endEpoch, errs, reportStartTime) + + // Read until the error channel is closed + for err := range errs { + return err } + // Check the epoch after the end of the interval for any lingering attestations + epoch := endEpoch + 1 + es, err := r.fetchEpoch(false, epoch) + if err != nil { + return err + } + r.processEpoch(es) + + r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) return nil } +func (r *treeGeneratorImpl_v6) processEpoch(es *epochState) { + + // Get all of the expected duties for the epoch + // Note: committees will be nil for the last epoch + if es.committees != nil { + r.getDutiesForEpoch(es.committees) + } + + // Process all of the slots in the epoch + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := es.epoch*r.slotsPerEpoch + i + + // The element will be nil if there was no block at the slot + if len(es.attestations[i]) > 0 { + // There was a block - process its attestations + r.checkDutiesForSlot(es.attestations[i], slot) + } + } +} + // Handle all of the attestations in the given slot func (r *treeGeneratorImpl_v6) checkDutiesForSlot(attestations []beacon.AttestationInfo, slot uint64) error { @@ -944,7 +1046,7 @@ func (r *treeGeneratorImpl_v6) getDutiesForEpoch(committees beacon.Committees) e func (r *treeGeneratorImpl_v6) createMinipoolIndexMap() error { // Get the status for all uncached minipool validators and add them to the cache - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} for _, details := range r.nodeDetails { if details.IsEligible { for _, minipoolInfo := range details.Minipools { @@ -954,6 +1056,10 @@ func (r *treeGeneratorImpl_v6) createMinipoolIndexMap() error { //r.log.Printlnf("NOTE: minipool %s (pubkey %s) didn't exist at this slot; removing it", minipoolInfo.Address.Hex(), minipoolInfo.ValidatorPubkey.Hex()) minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -962,7 +1068,7 @@ func (r *treeGeneratorImpl_v6) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = status.Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/generator-impl-v7.go b/shared/services/rewards/generator-impl-v7.go index 0dd237b27..b3be2f9a1 100644 --- a/shared/services/rewards/generator-impl-v7.go +++ b/shared/services/rewards/generator-impl-v7.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sort" + "strconv" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -41,7 +42,7 @@ type treeGeneratorImpl_v7 struct { smoothingPoolBalance *big.Int intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -89,7 +90,7 @@ func newTreeGeneratorImpl_v7(log *log.ColorLogger, logPrefix string, index uint6 }, }, validatorStatusMap: map[rptypes.ValidatorPubkey]beacon.ValidatorStatus{}, - validatorIndexMap: map[string]*MinipoolInfo{}, + validatorIndexMap: map[uint64]*MinipoolInfo{}, elSnapshotHeader: elSnapshotHeader, log: log, logPrefix: logPrefix, @@ -968,7 +969,7 @@ func (r *treeGeneratorImpl_v7) getDutiesForEpoch(committees beacon.Committees) e func (r *treeGeneratorImpl_v7) createMinipoolIndexMap() error { // Get the status for all uncached minipool validators and add them to the cache - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} for _, details := range r.nodeDetails { if details.IsEligible { for _, minipoolInfo := range details.Minipools { @@ -986,7 +987,11 @@ func (r *treeGeneratorImpl_v7) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = status.Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + vIdx, err := strconv.ParseUint(minipoolInfo.ValidatorIndex, 10, 64) + if err != nil { + vIdx = 0 + } + r.validatorIndexMap[vIdx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/rolling-record.go b/shared/services/rewards/rolling-record.go index 544d2aebc..d268e216f 100644 --- a/shared/services/rewards/rolling-record.go +++ b/shared/services/rewards/rolling-record.go @@ -3,6 +3,7 @@ package rewards import ( "fmt" "math/big" + "strconv" "time" "github.com/ethereum/go-ethereum/common" @@ -23,7 +24,7 @@ const ( type RollingRecord struct { StartSlot uint64 `json:"startSlot"` LastDutiesSlot uint64 `json:"lastDutiesSlot"` - ValidatorIndexMap map[string]*MinipoolInfo `json:"validatorIndexMap"` + ValidatorIndexMap map[uint64]*MinipoolInfo `json:"validatorIndexMap"` RewardsInterval uint64 `json:"rewardsInterval"` SmartnodeVersion string `json:"smartnodeVersion,omitempty"` @@ -45,7 +46,7 @@ func NewRollingRecord(log *log.ColorLogger, logPrefix string, bc beacon.Client, return &RollingRecord{ StartSlot: startSlot, LastDutiesSlot: 0, - ValidatorIndexMap: map[string]*MinipoolInfo{}, + ValidatorIndexMap: map[uint64]*MinipoolInfo{}, RewardsInterval: rewardsInterval, SmartnodeVersion: shared.RocketPoolVersion, @@ -169,7 +170,7 @@ func (r *RollingRecord) Serialize() ([]byte, error) { LastDutiesSlot: r.LastDutiesSlot, RewardsInterval: r.RewardsInterval, SmartnodeVersion: r.SmartnodeVersion, - ValidatorIndexMap: map[string]*MinipoolInfo{}, + ValidatorIndexMap: map[uint64]*MinipoolInfo{}, } // Remove minipool perf records with zero attestations from the serialization @@ -202,7 +203,12 @@ func (r *RollingRecord) updateValidatorIndices(state *state.NetworkState) { continue } - _, exists = r.ValidatorIndexMap[validator.Index] + idx, err := strconv.ParseUint(validator.Index, 10, 64) + if err != nil { + idx = 0 + } + + _, exists = r.ValidatorIndexMap[idx] if !exists && mpd.Status == types.Staking { // Validator exists and is staking but it hasn't been recorded yet, add it to the map and update the latest index so we don't remap stuff we've already seen minipoolInfo := &MinipoolInfo{ @@ -213,7 +219,7 @@ func (r *RollingRecord) updateValidatorIndices(state *state.NetworkState) { MissingAttestationSlots: map[uint64]bool{}, AttestationScore: NewQuotedBigInt(0), } - r.ValidatorIndexMap[validator.Index] = minipoolInfo + r.ValidatorIndexMap[idx] = minipoolInfo } } } From d7ff6810da4a370e3853f4febf8a104792b5a02a Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Wed, 26 Jul 2023 20:21:21 +0000 Subject: [PATCH 09/11] Make watchtower intervals stock again --- rocketpool/watchtower/watchtower.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rocketpool/watchtower/watchtower.go b/rocketpool/watchtower/watchtower.go index e83ff9fcf..9d05f1f4e 100644 --- a/rocketpool/watchtower/watchtower.go +++ b/rocketpool/watchtower/watchtower.go @@ -25,9 +25,9 @@ import ( ) // Config -var minTasksInterval, _ = time.ParseDuration("30s") -var maxTasksInterval, _ = time.ParseDuration("1m") -var taskCooldown, _ = time.ParseDuration("2s") +var minTasksInterval, _ = time.ParseDuration("4m") +var maxTasksInterval, _ = time.ParseDuration("6m") +var taskCooldown, _ = time.ParseDuration("5s") const ( MaxConcurrentEth1Requests = 200 From f94a618770045adc55c2e1ca926a9b2066d8aad9 Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Fri, 13 Oct 2023 04:01:49 +0000 Subject: [PATCH 10/11] extend hacks to v7 --- shared/services/rewards/generator-impl-v7.go | 224 ++++++++++++++----- 1 file changed, 163 insertions(+), 61 deletions(-) diff --git a/shared/services/rewards/generator-impl-v7.go b/shared/services/rewards/generator-impl-v7.go index b3be2f9a1..9aa079040 100644 --- a/shared/services/rewards/generator-impl-v7.go +++ b/shared/services/rewards/generator-impl-v7.go @@ -5,8 +5,11 @@ import ( "encoding/hex" "fmt" "math/big" + "os" + "runtime/pprof" "sort" "strconv" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -734,107 +737,206 @@ func (r *treeGeneratorImpl_v7) calculateNodeRewards() (*big.Int, *big.Int, error } -// Get all of the duties for a range of epochs -func (r *treeGeneratorImpl_v7) processAttestationsForInterval() error { - - startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch - endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch - - // Determine the validator indices of each minipool - err := r.createMinipoolIndexMap() - if err != nil { - return err - } +func (r *treeGeneratorImpl_v7) fetchEpochs(startEpoch uint64, endEpoch uint64, errChan chan error, startTime time.Time) { + // seq tracks the next expected epoch in the sequence to be sent to the caller + // Since we fetch epochs in parallel, each thread will sleep until it becomes its turn + // to publish an epoch to the caller via the resp channel. Every time seq is updated, + // therefor, all threads must wake up to check the value of seq. If it isn't their turn, + // they will go back to sleep. + var seq uint64 + + // If we encounter an error, we will wake up all the threads so they can exit. Therefor, + // the first time we encounter an error we should set 'done' to true, and then each thread + // should check its value every time they are woken. + var done bool + + // A cond to help the workers synchronize- whenever one thread wants to wake up the other + // threads, it does so by broadcasting on this cond. + cond := sync.NewCond(&sync.Mutex{}) + workers := getWorkerCount() + + // seq should start with the first epoch the caller is expecting + seq = startEpoch + + for j := uint64(0); j < workers; j++ { + + id := j + go func() { + // each worker will iterate modulo its id + for epoch := startEpoch + id; epoch < endEpoch+1; epoch += workers { + // Fetch the duties and participation for a single epoch + es, err := r.fetchEpoch(true, epoch) + if err != nil { + // Return the error to the caller + errChan <- err + // Note that an error was encountered + done = true + // Tell other threads to wake up and exit + cond.Broadcast() + // Exit this thread + return + } - // Check all of the attestations for each epoch - r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) - r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) + // Wait until it's this worker's turn to produce a result + cond.L.Lock() + for seq != epoch && !done { + // No errors have been encountered, and it is not this worker's + // turn yet, so go back to sleep + cond.Wait() + } - epochsDone := 0 - reportStartTime := time.Now() - for epoch := startEpoch; epoch < endEpoch+1; epoch++ { - if epochsDone == 100 { - timeTaken := time.Since(reportStartTime) - r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", r.logPrefix, epoch, endEpoch, float64(epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, timeTaken) - epochsDone = 0 - } + // Check if this worker was woken up due to an error + if done { + // Another worker encountered an error, so exit now + return + } - err := r.processEpoch(true, epoch) - if err != nil { - return err - } + // No error was encountered, and seq indicates it's this worker's turn + // to update the state, so process the epoch + r.processEpoch(es) - epochsDone++ - } + if epoch == endEpoch { + // The last result has been produced, so close the channels + close(errChan) + // This worker produced the last result, so + // signal to the other workers that it is time to exit + done = true + } else { + seq++ + if seq%100 == 99 { + timeTaken := time.Since(startTime) + r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", + r.logPrefix, + es.epoch, + endEpoch, + float64(es.epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, + timeTaken) + } + } - // Check the epoch after the end of the interval for any lingering attestations - epoch := endEpoch + 1 - err = r.processEpoch(false, epoch) - if err != nil { - return err + // Either seq has been updated or the last result was produced + // signal to the other workers to wake up and either do work, + // or exit now. + cond.Broadcast() + cond.L.Unlock() + } + }() } - - r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) - return nil - } // Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance -func (r *treeGeneratorImpl_v7) processEpoch(getDuties bool, epoch uint64) error { +func (r *treeGeneratorImpl_v7) fetchEpoch(getDuties bool, epoch uint64) (*epochState, error) { // Get the committee info and attestation records for this epoch - var committeeData beacon.Committees - attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch) + out := &epochState{ + epoch: epoch, + attestations: make([][]beacon.AttestationInfo, r.slotsPerEpoch), + } var wg errgroup.Group if getDuties { wg.Go(func() error { var err error - committeeData, err = r.bc.GetCommitteesForEpoch(&epoch) + out.committees, err = r.bc.GetCommitteesForEpoch(&epoch) return err }) } - for i := uint64(0); i < r.slotsPerEpoch; i++ { - i := i - slot := epoch*r.slotsPerEpoch + i - wg.Go(func() error { + wg.Go(func() error { + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := epoch*r.slotsPerEpoch + i attestations, found, err := r.bc.GetAttestations(fmt.Sprint(slot)) if err != nil { return err } if found { - attestationsPerSlot[i] = attestations + //out.attestationsResponses[i] = attestations + out.attestations[i] = attestations } else { - attestationsPerSlot[i] = []beacon.AttestationInfo{} + //out.attestationsResponses[i] = nil + out.attestations[i] = nil } - return nil - }) - } + } + return nil + }) err := wg.Wait() if err != nil { - return fmt.Errorf("error getting committee and attestaion records for epoch %d: %w", epoch, err) + return nil, fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) } - if getDuties { - // Get all of the expected duties for the epoch - err = r.getDutiesForEpoch(committeeData) + return out, nil + +} + +// Get all of the duties for a range of epochs +func (r *treeGeneratorImpl_v7) processAttestationsForInterval() error { + if os.Getenv("DUTIES_PPROF") != "" { + f, err := os.Create(os.Getenv("DUTIES_PPROF")) if err != nil { - return fmt.Errorf("error getting duties for epoch %d: %w", epoch, err) + return err } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + + startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch + endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch + + // Determine the validator indices of each minipool + err := r.createMinipoolIndexMap() + if err != nil { + return err + } + + // Check all of the attestations for each epoch + r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) + r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) + + // Workers need a channel to send back errors + errs := make(chan error) + + reportStartTime := time.Now() + + // Start populating epochStates + r.fetchEpochs(startEpoch, endEpoch, errs, reportStartTime) + + // Read until the error channel is closed + for err := range errs { + return err + } + + // Check the epoch after the end of the interval for any lingering attestations + epoch := endEpoch + 1 + es, err := r.fetchEpoch(false, epoch) + if err != nil { + return err + } + r.processEpoch(es) + + r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) + return nil + +} + +func (r *treeGeneratorImpl_v7) processEpoch(es *epochState) { + + // Get all of the expected duties for the epoch + // Note: committees will be nil for the last epoch + if es.committees != nil { + r.getDutiesForEpoch(es.committees) } // Process all of the slots in the epoch for i := uint64(0); i < r.slotsPerEpoch; i++ { - slot := epoch*r.slotsPerEpoch + i - attestations := attestationsPerSlot[i] - if len(attestations) > 0 { - r.checkDutiesForSlot(attestations, slot) + slot := es.epoch*r.slotsPerEpoch + i + + // The element will be nil if there was no block at the slot + if len(es.attestations[i]) > 0 { + // There was a block - process its attestations + r.checkDutiesForSlot(es.attestations[i], slot) } } - return nil - } // Handle all of the attestations in the given slot From 3f6dfd33fa1400c182bae7aa957229741994c014 Mon Sep 17 00:00:00 2001 From: Jacob Shufro Date: Fri, 27 Oct 2023 14:30:34 +0000 Subject: [PATCH 11/11] RPIP-30 test implementation --- shared/services/state/network-state.go | 55 +++++++++++++++++++++----- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/shared/services/state/network-state.go b/shared/services/state/network-state.go index 012d45b12..aec0b4661 100644 --- a/shared/services/state/network-state.go +++ b/shared/services/state/network-state.go @@ -2,6 +2,7 @@ package state import ( "fmt" + "math" "math/big" "time" @@ -373,19 +374,55 @@ func (s *NetworkState) CalculateTrueEffectiveStakes(scaleByParticipation bool, a minCollateral := big.NewInt(0).Mul(eligibleBorrowedEth, s.NetworkDetails.MinCollateralFraction) minCollateral.Div(minCollateral, s.NetworkDetails.RplPrice) - // maxCollateral := bondedEth * maxCollateralFraction / ratio - // NOTE: maxCollateralFraction and ratio are both percentages, but multiplying and dividing by them cancels out the need for normalization by eth.EthToWei(1) - maxCollateral := big.NewInt(0).Mul(eligibleBondedEth, s.NetworkDetails.MaxCollateralFraction) - maxCollateral.Div(maxCollateral, s.NetworkDetails.RplPrice) - // Calculate the effective stake nodeStake := big.NewInt(0).Set(node.RplStake) - if nodeStake.Cmp(minCollateral) == -1 { + if nodeStake.Cmp(minCollateral) == -1 || eligibleBorrowedEth.Cmp(big.NewInt(0)) == 0 { // Under min collateral nodeStake.SetUint64(0) - } else if nodeStake.Cmp(maxCollateral) == 1 { - // Over max collateral - nodeStake.Set(maxCollateral) + } else { + // Calculate a few terms. + stakedRplValueInEth := big.NewInt(0).Mul(nodeStake, s.NetworkDetails.RplPrice) + stakedRplValueInEth.Div(stakedRplValueInEth, big.NewInt(1e18)) + + // If between (inclusive 0.1 and 0.15, weight is just 100 * staked_rpl_value_in_eth + // we already know we're above 10% of borrowed. + midCollateral := big.NewInt(0).Mul(eligibleBorrowedEth, big.NewInt(150000000000000000)) + midCollateral.Div(midCollateral, s.NetworkDetails.RplPrice) + + var weight *big.Float + if nodeStake.Cmp(midCollateral) <= 0 { + weight = big.NewFloat(0).Mul(big.NewFloat(100), big.NewFloat(0).SetInt(stakedRplValueInEth)) + } else { + lnArgs, _ := + big.NewFloat(0).Sub( + big.NewFloat(0).Mul( + big.NewFloat(100.0), + big.NewFloat(0).Quo( + big.NewFloat(0.0).SetInt(stakedRplValueInEth), + big.NewFloat(0.0).SetInt(eligibleBorrowedEth), + ), + ), + big.NewFloat(13.0), + ).Float64() + weight = big.NewFloat(0).Mul( + big.NewFloat(0).Add( + big.NewFloat(13.6137), + big.NewFloat(0).Mul( + big.NewFloat(2.0), + big.NewFloat(math.Log(lnArgs)), + ), + ), + big.NewFloat(0).SetInt(eligibleBorrowedEth), + ) + } + + approx, _ := weight.Float64() + if math.IsNaN(approx) { + nodeStake.SetUint64(0) + } else { + integered, _ := weight.Int(nil) + nodeStake.Set(integered) + } } // Scale the effective stake by the participation in the current interval