diff --git a/go.mod b/go.mod index b2f14299c..de772a136 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/sethvargo/go-password v0.2.0 github.com/shirou/gopsutil/v3 v3.23.1 github.com/tyler-smith/go-bip39 v1.1.0 + github.com/umbracle/go-eth-consensus v0.1.2 github.com/urfave/cli v1.22.12 github.com/wealdtech/go-ens/v3 v3.5.5 github.com/wealdtech/go-eth2-types/v2 v2.8.1-0.20230131115251-b93cf60cee26 @@ -148,7 +149,7 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/supranational/blst v0.3.8-0.20220526154634-513d2456b344 // indirect + github.com/supranational/blst v0.3.10 // indirect github.com/thomaso-mirodin/intmath v0.0.0-20160323211736-5dc6d854e46e // indirect github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect diff --git a/go.sum b/go.sum index 6d344def7..bfa39dff4 100644 --- a/go.sum +++ b/go.sum @@ -1527,8 +1527,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= -github.com/supranational/blst v0.3.8-0.20220526154634-513d2456b344 h1:m+8fKfQwCAy1QjzINvKe/pYtLjo2dl59x2w9YSEJxuY= -github.com/supranational/blst v0.3.8-0.20220526154634-513d2456b344/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= +github.com/supranational/blst v0.3.10 h1:CMciDZ/h4pXDDXQASe8ZGTNKUiVNxVVA5hpci2Uuhuk= +github.com/supranational/blst v0.3.10/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDdvS342BElfbETmL1Aiz3i2t0zfRj16Hs= @@ -1560,6 +1560,8 @@ github.com/ugorji/go/codec v1.1.13/go.mod h1:oNVt3Dq+FO91WNQ/9JnHKQP2QJxTzoN7wCB github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw= github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0= github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY= +github.com/umbracle/go-eth-consensus v0.1.2 h1:oRAZwURW3u6kWPBCYYp2WpknSy5rAKf5OwMJahdPK3c= +github.com/umbracle/go-eth-consensus v0.1.2/go.mod h1:FutcwopvmyWSPl3FQRT9Oru83/tJeGMxy4k0OfJ0sdY= github.com/umbracle/gohashtree v0.0.2-alpha.0.20230207094856-5b775a815c10 h1:CQh33pStIp/E30b7TxDlXfM0145bn2e8boI30IxAhTg= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= diff --git a/rocketpool/watchtower/generate-rewards-tree.go b/rocketpool/watchtower/generate-rewards-tree.go index 49826a532..e02070892 100644 --- a/rocketpool/watchtower/generate-rewards-tree.go +++ b/rocketpool/watchtower/generate-rewards-tree.go @@ -19,6 +19,7 @@ import ( "github.com/rocket-pool/rocketpool-go/rocketpool" "github.com/rocket-pool/smartnode/shared/services" "github.com/rocket-pool/smartnode/shared/services/beacon" + "github.com/rocket-pool/smartnode/shared/services/beacon/client" "github.com/rocket-pool/smartnode/shared/services/config" rprewards "github.com/rocket-pool/smartnode/shared/services/rewards" "github.com/rocket-pool/smartnode/shared/services/state" @@ -52,9 +53,18 @@ func newGenerateRewardsTree(c *cli.Context, logger log.ColorLogger, errorLogger if err != nil { return nil, err } - bc, err := services.GetBeaconClient(c) - if err != nil { - return nil, err + var bc beacon.Client + // Override the beacon client, if requested + if beaconOverride := os.Getenv("TREEGEN_BEACON_CLIENT_ENDPOINT"); beaconOverride != "" { + logger.Printlnf("Using %s as the Beacon Node for GenerateRewardsTree", beaconOverride) + bc = client.NewStandardHttpClient(beaconOverride) + } else { + var err error + + bc, err = services.GetBeaconClient(c) + if err != nil { + return nil, err + } } rp, err := services.GetRocketPool(c) if err != nil { diff --git a/rocketpool/watchtower/submit-rewards-tree-stateless.go b/rocketpool/watchtower/submit-rewards-tree-stateless.go index 0a9d3f7ca..117d4a763 100644 --- a/rocketpool/watchtower/submit-rewards-tree-stateless.go +++ b/rocketpool/watchtower/submit-rewards-tree-stateless.go @@ -23,6 +23,7 @@ import ( "github.com/rocket-pool/smartnode/rocketpool/watchtower/utils" "github.com/rocket-pool/smartnode/shared/services" "github.com/rocket-pool/smartnode/shared/services/beacon" + "github.com/rocket-pool/smartnode/shared/services/beacon/client" "github.com/rocket-pool/smartnode/shared/services/config" rprewards "github.com/rocket-pool/smartnode/shared/services/rewards" "github.com/rocket-pool/smartnode/shared/services/state" @@ -68,10 +69,21 @@ func newSubmitRewardsTree_Stateless(c *cli.Context, logger log.ColorLogger, erro if err != nil { return nil, err } - bc, err := services.GetBeaconClient(c) - if err != nil { - return nil, err + + var bc beacon.Client + // Override the beacon client, if requested + if beaconOverride := os.Getenv("TREEGEN_BEACON_CLIENT_ENDPOINT"); beaconOverride != "" { + logger.Printlnf("Using %s as the Beacon Node for SubmitRewardsTree", beaconOverride) + bc = client.NewStandardHttpClient(beaconOverride) + } else { + var err error + + bc, err = services.GetBeaconClient(c) + if err != nil { + return nil, err + } } + rp, err := services.GetRocketPool(c) if err != nil { return nil, err @@ -566,7 +578,8 @@ func (t *submitRewardsTree_Stateless) getSnapshotConsensusBlock(endTime time.Tim targetSlot := uint64(math.Ceil(totalTimespan.Seconds() / float64(eth2Config.SecondsPerSlot))) targetSlotEpoch := targetSlot / eth2Config.SlotsPerEpoch targetSlot = targetSlotEpoch*eth2Config.SlotsPerEpoch + (eth2Config.SlotsPerEpoch - 1) // The target slot becomes the last one in the Epoch - requiredEpoch := targetSlotEpoch + 1 // The smoothing pool requires 1 epoch beyond the target to be finalized, to check for late attestations + // XXX Jacob, you changed this from targetSlotEpoch + 1 so we start generating when the subsequent epoch justifies instead + requiredEpoch := targetSlotEpoch // The smoothing pool requires 1 epoch beyond the target to be finalized, to check for late attestations // Check if the required epoch is finalized yet if beaconHead.FinalizedEpoch < requiredEpoch { diff --git a/rocketpool/watchtower/watchtower.go b/rocketpool/watchtower/watchtower.go index aab7ec107..9d05f1f4e 100644 --- a/rocketpool/watchtower/watchtower.go +++ b/rocketpool/watchtower/watchtower.go @@ -5,6 +5,7 @@ import ( "math/big" "math/rand" "net/http" + "os" "sync" "time" @@ -18,6 +19,7 @@ import ( "github.com/rocket-pool/smartnode/rocketpool/watchtower/collectors" "github.com/rocket-pool/smartnode/shared/services" "github.com/rocket-pool/smartnode/shared/services/beacon" + "github.com/rocket-pool/smartnode/shared/services/beacon/client" "github.com/rocket-pool/smartnode/shared/services/state" "github.com/rocket-pool/smartnode/shared/utils/log" ) @@ -82,10 +84,6 @@ func run(c *cli.Context) error { if err != nil { return err } - bc, err := services.GetBeaconClient(c) - if err != nil { - return err - } // Print the current mode if cfg.IsNativeMode { @@ -109,6 +107,20 @@ func run(c *cli.Context) error { errorLog := log.NewColorLogger(ErrorColor) updateLog := log.NewColorLogger(UpdateColor) + var bc beacon.Client + // Override the beacon client, if requested + if beaconOverride := os.Getenv("TREEGEN_BEACON_CLIENT_ENDPOINT"); beaconOverride != "" { + updateLog.Printlnf("Overriding the Beacon Node URL to %s", beaconOverride) + bc = client.NewStandardHttpClient(beaconOverride) + } else { + var err error + + bc, err = services.GetBeaconClient(c) + if err != nil { + return err + } + } + // Create the state manager m, err := state.NewNetworkStateManager(rp, cfg, rp.Client, bc, &updateLog) if err != nil { diff --git a/shared/services/beacon/client.go b/shared/services/beacon/client.go index d91ec847d..d09aadbef 100644 --- a/shared/services/beacon/client.go +++ b/shared/services/beacon/client.go @@ -79,7 +79,7 @@ type Committees interface { Slot(int) uint64 // Validators returns the list of validators of the committee at // the provided offset - Validators(int) []string + Validators(int) []uint64 // Count returns the number of committees in the response Count() int // Release returns the reused validators slice buffer to the pool for diff --git a/shared/services/beacon/client/pb/beacon.pb.go b/shared/services/beacon/client/pb/beacon.pb.go new file mode 100644 index 000000000..c350bb4e2 --- /dev/null +++ b/shared/services/beacon/client/pb/beacon.pb.go @@ -0,0 +1,247 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.12.4 +// source: beacon.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type CommitteesResponseData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Index uint64 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` + Slot uint64 `protobuf:"varint,2,opt,name=slot,proto3" json:"slot,omitempty"` + Validators []uint64 `protobuf:"varint,3,rep,packed,name=validators,proto3" json:"validators,omitempty"` +} + +func (x *CommitteesResponseData) Reset() { + *x = CommitteesResponseData{} + if protoimpl.UnsafeEnabled { + mi := &file_beacon_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CommitteesResponseData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommitteesResponseData) ProtoMessage() {} + +func (x *CommitteesResponseData) ProtoReflect() protoreflect.Message { + mi := &file_beacon_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommitteesResponseData.ProtoReflect.Descriptor instead. +func (*CommitteesResponseData) Descriptor() ([]byte, []int) { + return file_beacon_proto_rawDescGZIP(), []int{0} +} + +func (x *CommitteesResponseData) GetIndex() uint64 { + if x != nil { + return x.Index + } + return 0 +} + +func (x *CommitteesResponseData) GetSlot() uint64 { + if x != nil { + return x.Slot + } + return 0 +} + +func (x *CommitteesResponseData) GetValidators() []uint64 { + if x != nil { + return x.Validators + } + return nil +} + +type CommitteesResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []*CommitteesResponseData `protobuf:"bytes,1,rep,name=data,proto3" json:"data,omitempty"` + ExecutionOptimistic bool `protobuf:"varint,2,opt,name=execution_optimistic,json=executionOptimistic,proto3" json:"execution_optimistic,omitempty"` + Finalized bool `protobuf:"varint,3,opt,name=finalized,proto3" json:"finalized,omitempty"` +} + +func (x *CommitteesResponse) Reset() { + *x = CommitteesResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_beacon_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CommitteesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommitteesResponse) ProtoMessage() {} + +func (x *CommitteesResponse) ProtoReflect() protoreflect.Message { + mi := &file_beacon_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommitteesResponse.ProtoReflect.Descriptor instead. +func (*CommitteesResponse) Descriptor() ([]byte, []int) { + return file_beacon_proto_rawDescGZIP(), []int{1} +} + +func (x *CommitteesResponse) GetData() []*CommitteesResponseData { + if x != nil { + return x.Data + } + return nil +} + +func (x *CommitteesResponse) GetExecutionOptimistic() bool { + if x != nil { + return x.ExecutionOptimistic + } + return false +} + +func (x *CommitteesResponse) GetFinalized() bool { + if x != nil { + return x.Finalized + } + return false +} + +var File_beacon_proto protoreflect.FileDescriptor + +var file_beacon_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x62, 0x65, 0x61, 0x63, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, + 0x70, 0x62, 0x22, 0x62, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x65, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x14, 0x0a, 0x05, + 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x69, 0x6e, 0x64, + 0x65, 0x78, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x6c, 0x6f, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x04, 0x73, 0x6c, 0x6f, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, + 0x74, 0x6f, 0x72, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x04, 0x52, 0x0a, 0x76, 0x61, 0x6c, 0x69, + 0x64, 0x61, 0x74, 0x6f, 0x72, 0x73, 0x22, 0x95, 0x01, 0x0a, 0x12, 0x43, 0x6f, 0x6d, 0x6d, 0x69, + 0x74, 0x74, 0x65, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, + 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x62, + 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, + 0x14, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6d, + 0x69, 0x73, 0x74, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x13, 0x65, 0x78, 0x65, + 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x70, 0x74, 0x69, 0x6d, 0x69, 0x73, 0x74, 0x69, 0x63, + 0x12, 0x1c, 0x0a, 0x09, 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x09, 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x42, 0x06, + 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_beacon_proto_rawDescOnce sync.Once + file_beacon_proto_rawDescData = file_beacon_proto_rawDesc +) + +func file_beacon_proto_rawDescGZIP() []byte { + file_beacon_proto_rawDescOnce.Do(func() { + file_beacon_proto_rawDescData = protoimpl.X.CompressGZIP(file_beacon_proto_rawDescData) + }) + return file_beacon_proto_rawDescData +} + +var file_beacon_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_beacon_proto_goTypes = []interface{}{ + (*CommitteesResponseData)(nil), // 0: pb.CommitteesResponseData + (*CommitteesResponse)(nil), // 1: pb.CommitteesResponse +} +var file_beacon_proto_depIdxs = []int32{ + 0, // 0: pb.CommitteesResponse.data:type_name -> pb.CommitteesResponseData + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_beacon_proto_init() } +func file_beacon_proto_init() { + if File_beacon_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_beacon_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CommitteesResponseData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_beacon_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CommitteesResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_beacon_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_beacon_proto_goTypes, + DependencyIndexes: file_beacon_proto_depIdxs, + MessageInfos: file_beacon_proto_msgTypes, + }.Build() + File_beacon_proto = out.File + file_beacon_proto_rawDesc = nil + file_beacon_proto_goTypes = nil + file_beacon_proto_depIdxs = nil +} diff --git a/shared/services/beacon/client/std-http-client.go b/shared/services/beacon/client/std-http-client.go index 7bbd79254..e72f69f46 100644 --- a/shared/services/beacon/client/std-http-client.go +++ b/shared/services/beacon/client/std-http-client.go @@ -8,17 +8,20 @@ import ( "net/http" "strconv" "strings" - "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/goccy/go-json" "github.com/prysmaticlabs/prysm/v3/crypto/bls" - "github.com/rocket-pool/rocketpool-go/types" + gec "github.com/umbracle/go-eth-consensus" eth2types "github.com/wealdtech/go-eth2-types/v2" "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "github.com/rocket-pool/rocketpool-go/types" "github.com/rocket-pool/smartnode/shared/services/beacon" + "github.com/rocket-pool/smartnode/shared/services/beacon/client/pb" "github.com/rocket-pool/smartnode/shared/utils/eth2" hexutil "github.com/rocket-pool/smartnode/shared/utils/hex" ) @@ -462,28 +465,111 @@ func (c *StandardHttpClient) GetEth1DataForEth2Block(blockId string) (beacon.Eth } -func (c *StandardHttpClient) GetAttestations(blockId string) ([]beacon.AttestationInfo, bool, error) { - attestations, exists, err := c.getAttestations(blockId) +type attestationsResponseRaw struct { + body []byte + version string +} + +func (c *StandardHttpClient) GetAttestationsRaw(blockId string) (*attestationsResponseRaw, bool, error) { + + // Build the request + req, err := http.NewRequest("GET", fmt.Sprintf("%s/eth/v2/beacon/blocks/%s", c.providerAddress, blockId), nil) if err != nil { return nil, false, err } - if !exists { + + req.Header.Set("accept", "application/octet-stream") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, false, err + } + defer func() { + _ = resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, false, fmt.Errorf("Could not get attestations data for slot %s: HTTP status %d", blockId, resp.StatusCode) + } + + if resp.StatusCode == http.StatusNotFound { return nil, false, nil } - // Add attestation info - attestationInfo := make([]beacon.AttestationInfo, len(attestations.Data)) - for i, attestation := range attestations.Data { - bitString := hexutil.RemovePrefix(attestation.AggregationBits) - attestationInfo[i].SlotIndex = uint64(attestation.Data.Slot) - attestationInfo[i].CommitteeIndex = uint64(attestation.Data.Index) - attestationInfo[i].AggregationBits, err = hex.DecodeString(bitString) - if err != nil { - return nil, false, fmt.Errorf("Error decoding aggregation bits for attestation %d of block %s: %w", i, blockId, err) + if resp.StatusCode != http.StatusOK { + return nil, false, fmt.Errorf("Could not get attestations data for slot %s: HTTP status %d; reponse body: '%s'", blockId, resp.StatusCode, string(body)) + } + + return &attestationsResponseRaw{ + body: body, + version: resp.Header.Get("Eth-Consensus-Version"), + }, true, nil +} + +func (c *StandardHttpClient) ParseAttestationsResponseRaw(resp *attestationsResponseRaw) ([]beacon.AttestationInfo, error) { + var attestations []*gec.Attestation + + // Unmarshal block SSZ + if strings.EqualFold(resp.version, "phase0") { + block := new(gec.SignedBeaconBlockPhase0) + if err := block.UnmarshalSSZ(resp.body); err != nil { + return nil, err + } + + attestations = block.Block.Body.Attestations + } else if strings.EqualFold(resp.version, "altair") { + block := new(gec.SignedBeaconBlockAltair) + if err := block.UnmarshalSSZ(resp.body); err != nil { + return nil, err + } + + attestations = block.Block.Body.Attestations + } else if strings.EqualFold(resp.version, "bellatrix") { + block := new(gec.SignedBeaconBlockBellatrix) + if err := block.UnmarshalSSZ(resp.body); err != nil { + return nil, err + } + + attestations = block.Block.Body.Attestations + } else if strings.EqualFold(resp.version, "capella") { + block := new(gec.SignedBeaconBlockCapella) + if err := block.UnmarshalSSZ(resp.body); err != nil { + return nil, err } + + attestations = block.Block.Body.Attestations + } else { + return nil, fmt.Errorf("unknown consensus version header: %s", resp.version) + } + + out := make([]beacon.AttestationInfo, len(attestations)) + for i := range attestations { + out[i].AggregationBits = attestations[i].AggregationBits + out[i].SlotIndex = attestations[i].Data.Slot + out[i].CommitteeIndex = attestations[i].Data.Index + } + + return out, nil + +} + +func (c *StandardHttpClient) GetAttestations(blockId string) ([]beacon.AttestationInfo, bool, error) { + resp, found, err := c.GetAttestationsRaw(blockId) + if err != nil { + return nil, found, err + } + + if found == false { + return nil, found, err + } + + out, err := c.ParseAttestationsResponseRaw(resp) + if err != nil { + return nil, found, err } - return attestationInfo, true, nil + return out, true, nil } func (c *StandardHttpClient) GetBeaconBlock(blockId string) (beacon.BeaconBlock, bool, error) { @@ -526,14 +612,78 @@ func (c *StandardHttpClient) GetBeaconBlock(blockId string) (beacon.BeaconBlock, return beaconBlock, true, nil } -// Get the attestation committees for the given epoch, or the current epoch if nil +type committeeWrapper struct { + resp *pb.CommitteesResponse +} + +func (w *committeeWrapper) Count() int { + return len(w.resp.Data) +} + +func (w *committeeWrapper) Index(i int) uint64 { + return w.resp.Data[i].Index +} + +func (w *committeeWrapper) Slot(i int) uint64 { + return w.resp.Data[i].Slot +} + +func (w *committeeWrapper) Validators(i int) []uint64 { + return w.resp.Data[i].Validators +} + +func (w *committeeWrapper) Release() { + +} + func (c *StandardHttpClient) GetCommitteesForEpoch(epoch *uint64) (beacon.Committees, error) { - response, err := c.getCommittees("head", epoch) + query := "" + if epoch != nil { + query = fmt.Sprintf("?epoch=%d", *epoch) + } + responseBody, header, status, err := c.protoGetRequest(fmt.Sprintf(RequestCommitteePath, "head") + query) if err != nil { - return nil, err + return nil, fmt.Errorf("Could not get committees: %w", err) + } + if status != http.StatusOK { + return nil, fmt.Errorf("Could not get committees: HTTP status %d; response body: '%s'", status, string(responseBody)) + } + + m := pb.CommitteesResponse{} + if header == nil || !strings.EqualFold(header.Get("Content-Type"), "application/protobuf") { + // Parse json + err := protojson.Unmarshal(responseBody, &m) + if err != nil { + return nil, err + } + } else { + // Parse proto + err = proto.Unmarshal(responseBody, &m) + if err != nil { + return nil, err + } } - return &response, nil + out := committeeWrapper{ + resp: &m, + } + + /* + out := make(beacon.Committees, len(m.Data)) + for i, committee := range m.Data { + validators := make([]uint64, len(committee.Validators)) + + for j, validator := range committee.Validators { + validators[j] = uint64(validator) + } + out[i] = beacon.Committee{ + Index: uint64(committee.Index), + Slot: uint64(committee.Slot), + Validators: validators, + } + }*/ + + return &out, nil } // Perform a withdrawal credentials change on a validator @@ -781,90 +931,43 @@ func (c *StandardHttpClient) getBeaconBlock(blockId string) (BeaconBlockResponse return beaconBlock, true, nil } -type committeesDecoder struct { - decoder *json.Decoder - currentReader *io.ReadCloser -} - -// Read will be called by the json decoder to request more bytes of data from -// the beacon node's committees response. Since the decoder is reused, we -// need to avoid sending it io.EOF, or it will enter an unusable state and can -// not be reused later. -// -// On subsequent calls to Decode, the decoder resets its internal buffer, which -// means any data it reads between the last json token and EOF is correctly -// discarded. -func (c *committeesDecoder) Read(p []byte) (int, error) { - n, err := (*c.currentReader).Read(p) - if err == io.EOF { - return n, nil +// Send withdrawal credentials change request +func (c *StandardHttpClient) postWithdrawalCredentialsChange(request BLSToExecutionChangeRequest) error { + requestArray := []BLSToExecutionChangeRequest{request} // This route must be wrapped in an array + responseBody, status, err := c.postRequest(RequestWithdrawalCredentialsChangePath, requestArray) + if err != nil { + return fmt.Errorf("Could not broadcast withdrawal credentials change for validator %d: %w", request.Message.ValidatorIndex, err) } - - return n, err -} - -var committeesDecoderPool sync.Pool = sync.Pool{ - New: func() any { - var out committeesDecoder - - out.decoder = json.NewDecoder(&out) - return &out - }, -} - -// Get the committees for the epoch -func (c *StandardHttpClient) getCommittees(stateId string, epoch *uint64) (CommitteesResponse, error) { - var committees CommitteesResponse - - query := "" - if epoch != nil { - query = fmt.Sprintf("?epoch=%d", *epoch) + if status != http.StatusOK { + return fmt.Errorf("Could not broadcast withdrawal credentials change for validator %d: HTTP status %d; response body: '%s'", request.Message.ValidatorIndex, status, string(responseBody)) } + return nil +} - // Committees responses are large, so let the json decoder read it in a buffered fashion - reader, status, err := c.getRequestReader(fmt.Sprintf(RequestCommitteePath, stateId) + query) +func (c *StandardHttpClient) protoGetRequest(requestPath string) ([]byte, http.Header, int, error) { + req, err := http.NewRequest("GET", fmt.Sprintf(RequestUrlFormat, c.providerAddress, requestPath), nil) if err != nil { - return CommitteesResponse{}, fmt.Errorf("Could not get committees: %w", err) + return nil, nil, 0, err } - defer func() { - _ = reader.Close() - }() - if status != http.StatusOK { - body, _ := io.ReadAll(reader) - return CommitteesResponse{}, fmt.Errorf("Could not get committees: HTTP status %d; response body: '%s'", status, string(body)) + req.Header.Set("Accept", "application/protobuf") + response, err := http.DefaultClient.Do(req) + if err != nil { + return nil, nil, 0, err } - - d := committeesDecoderPool.Get().(*committeesDecoder) defer func() { - d.currentReader = nil - committeesDecoderPool.Put(d) + _ = response.Body.Close() }() - d.currentReader = &reader - - // Begin decoding - if err := d.decoder.Decode(&committees); err != nil { - return CommitteesResponse{}, fmt.Errorf("Could not decode committees: %w", err) - } - - return committees, nil -} - -// Send withdrawal credentials change request -func (c *StandardHttpClient) postWithdrawalCredentialsChange(request BLSToExecutionChangeRequest) error { - requestArray := []BLSToExecutionChangeRequest{request} // This route must be wrapped in an array - responseBody, status, err := c.postRequest(RequestWithdrawalCredentialsChangePath, requestArray) + body, err := io.ReadAll(response.Body) if err != nil { - return fmt.Errorf("Could not broadcast withdrawal credentials change for validator %d: %w", request.Message.ValidatorIndex, err) + return nil, nil, 0, err } - if status != http.StatusOK { - return fmt.Errorf("Could not broadcast withdrawal credentials change for validator %d: HTTP status %d; response body: '%s'", request.Message.ValidatorIndex, status, string(responseBody)) - } - return nil + + return body, response.Header, response.StatusCode, nil } -// Make a GET request but do not read its body yet (allows buffered decoding) +// Make a GET request to the beacon node func (c *StandardHttpClient) getRequestReader(requestPath string) (io.ReadCloser, int, error) { // Send request diff --git a/shared/services/beacon/client/types.go b/shared/services/beacon/client/types.go index 9c4540265..1623e0e5c 100644 --- a/shared/services/beacon/client/types.go +++ b/shared/services/beacon/client/types.go @@ -2,11 +2,11 @@ package client import ( "encoding/hex" + "fmt" "strconv" "github.com/ethereum/go-ethereum/common" "github.com/goccy/go-json" - hexutil "github.com/rocket-pool/smartnode/shared/utils/hex" ) @@ -145,6 +145,9 @@ type Attestation struct { } `json:"data"` } +// Don't mind meeeee +const intSize = 32 << (^uint(0) >> 63) + // Unsigned integer type type uinteger uint64 @@ -155,8 +158,29 @@ func (i *uinteger) UnmarshalJSON(data []byte) error { // Unmarshal string var dataStr string - if err := json.Unmarshal(data, &dataStr); err != nil { + /*if err := json.Unmarshal(data, &dataStr); err != nil { return err + }*/ + dataLen := len(data) + if dataLen <= 2 { + return fmt.Errorf("Invalid json uinteger '%s'", string(data)) + } + dataStr = string(data[1 : dataLen-1]) + sLen := dataLen - 2 + + // Check fast paths + if intSize == 32 && (0 < sLen && sLen < 10) || + intSize == 64 && (0 < sLen && sLen < 19) { + + if sLen > 0 && dataStr[0] != '-' { + signed, err := strconv.Atoi(dataStr) + if err == nil { + *i = uinteger(signed) + return nil + } + } + + // If fast path failed just fall through } // Parse integer value diff --git a/shared/services/rewards/generator-impl-v1.go b/shared/services/rewards/generator-impl-v1.go index 2bad61033..73ebe3439 100644 --- a/shared/services/rewards/generator-impl-v1.go +++ b/shared/services/rewards/generator-impl-v1.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sort" + "strconv" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -44,7 +45,7 @@ type treeGeneratorImpl_v1 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -1000,7 +1001,7 @@ func (r *treeGeneratorImpl_v1) createMinipoolIndexMap() error { } // Get indices for all minipool validators - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(minipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) @@ -1011,7 +1012,11 @@ func (r *treeGeneratorImpl_v1) createMinipoolIndexMap() error { if details.IsEligible { for _, minipoolInfo := range details.Minipools { minipoolInfo.ValidatorIndex = statusMap[minipoolInfo.ValidatorPubkey].Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + idx, err := strconv.ParseUint(minipoolInfo.ValidatorIndex, 10, 64) + if err != nil { + idx = 0 + } + r.validatorIndexMap[idx] = minipoolInfo } } } diff --git a/shared/services/rewards/generator-impl-v2.go b/shared/services/rewards/generator-impl-v2.go index 364139664..0ad2e0916 100644 --- a/shared/services/rewards/generator-impl-v2.go +++ b/shared/services/rewards/generator-impl-v2.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sort" + "strconv" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -48,7 +49,7 @@ type treeGeneratorImpl_v2 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -1019,7 +1020,7 @@ func (r *treeGeneratorImpl_v2) createMinipoolIndexMap() error { } // Get indices for all minipool validators - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(minipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) @@ -1037,6 +1038,10 @@ func (r *treeGeneratorImpl_v2) createMinipoolIndexMap() error { minipoolInfo.EndSlot = 0 minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -1047,7 +1052,7 @@ func (r *treeGeneratorImpl_v2) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = statusMap[minipoolInfo.ValidatorPubkey].Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/generator-impl-v3.go b/shared/services/rewards/generator-impl-v3.go index 270a34912..de2ae6f26 100644 --- a/shared/services/rewards/generator-impl-v3.go +++ b/shared/services/rewards/generator-impl-v3.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sort" + "strconv" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -44,7 +45,7 @@ type treeGeneratorImpl_v3 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -1015,7 +1016,7 @@ func (r *treeGeneratorImpl_v3) createMinipoolIndexMap() error { } // Get indices for all minipool validators - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(minipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) @@ -1033,6 +1034,10 @@ func (r *treeGeneratorImpl_v3) createMinipoolIndexMap() error { minipoolInfo.EndSlot = 0 minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -1043,7 +1048,7 @@ func (r *treeGeneratorImpl_v3) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = statusMap[minipoolInfo.ValidatorPubkey].Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/generator-impl-v4.go b/shared/services/rewards/generator-impl-v4.go index e6b3a4dba..33b789174 100644 --- a/shared/services/rewards/generator-impl-v4.go +++ b/shared/services/rewards/generator-impl-v4.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sort" + "strconv" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -46,7 +47,7 @@ type treeGeneratorImpl_v4 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -1075,7 +1076,7 @@ func (r *treeGeneratorImpl_v4) createMinipoolIndexMap() error { } // Get the status for all uncached minipool validators and add them to the cache - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(uncachedMinipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) @@ -1098,6 +1099,10 @@ func (r *treeGeneratorImpl_v4) createMinipoolIndexMap() error { minipoolInfo.EndSlot = 0 minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -1108,7 +1113,7 @@ func (r *treeGeneratorImpl_v4) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = statusMap[minipoolInfo.ValidatorPubkey].Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch @@ -1472,7 +1477,7 @@ func (r *treeGeneratorImpl_v4) getNodeEffectiveRPLStakes() ([]*big.Int, error) { // Get the status for all staking minipool validators r.log.Printlnf("%s Getting validator statuses for all eligible minipools", r.logPrefix) - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} statusMap, err := r.bc.GetValidatorStatuses(r.stakingMinipoolPubkeys, &beacon.ValidatorStatusOptions{ Slot: &r.rewardsFile.ConsensusEndBlock, }) diff --git a/shared/services/rewards/generator-impl-v5.go b/shared/services/rewards/generator-impl-v5.go index a02ae24ac..ec458b24a 100644 --- a/shared/services/rewards/generator-impl-v5.go +++ b/shared/services/rewards/generator-impl-v5.go @@ -5,7 +5,12 @@ import ( "encoding/hex" "fmt" "math/big" + "os" + "runtime" + "runtime/pprof" "sort" + "strconv" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -43,7 +48,7 @@ type treeGeneratorImpl_v5 struct { smoothingPoolAddress common.Address intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -56,6 +61,12 @@ type treeGeneratorImpl_v5 struct { zero *big.Int } +type epochState struct { + epoch uint64 + committees beacon.Committees + attestations [][]beacon.AttestationInfo +} + // Create a new tree generator func newTreeGeneratorImpl_v5(log *log.ColorLogger, logPrefix string, index uint64, startTime time.Time, endTime time.Time, consensusBlock uint64, elSnapshotHeader *types.Header, intervalsPassed uint64, state *state.NetworkState) *treeGeneratorImpl_v5 { return &treeGeneratorImpl_v5{ @@ -92,7 +103,7 @@ func newTreeGeneratorImpl_v5(log *log.ColorLogger, logPrefix string, index uint6 }, }, validatorStatusMap: map[rptypes.ValidatorPubkey]beacon.ValidatorStatus{}, - validatorIndexMap: map[string]*MinipoolInfo{}, + validatorIndexMap: map[uint64]*MinipoolInfo{}, elSnapshotHeader: elSnapshotHeader, log: log, logPrefix: logPrefix, @@ -728,8 +739,115 @@ func (r *treeGeneratorImpl_v5) calculateNodeRewards() (*big.Int, *big.Int, error } +// The number of workers to use - clamps nproc/2 between 1 and 4 for now +func getWorkerCount() uint64 { + nproc := runtime.NumCPU() + + target := nproc - 2 + if target < 1 { + return 1 + } + + return uint64(target) +} + +func (r *treeGeneratorImpl_v5) fetchEpochs(startEpoch uint64, endEpoch uint64, errChan chan error, startTime time.Time) { + // seq tracks the next expected epoch in the sequence to be sent to the caller + // Since we fetch epochs in parallel, each thread will sleep until it becomes its turn + // to publish an epoch to the caller via the resp channel. Every time seq is updated, + // therefor, all threads must wake up to check the value of seq. If it isn't their turn, + // they will go back to sleep. + var seq uint64 + + // If we encounter an error, we will wake up all the threads so they can exit. Therefor, + // the first time we encounter an error we should set 'done' to true, and then each thread + // should check its value every time they are woken. + var done bool + + // A cond to help the workers synchronize- whenever one thread wants to wake up the other + // threads, it does so by broadcasting on this cond. + cond := sync.NewCond(&sync.Mutex{}) + workers := getWorkerCount() + + // seq should start with the first epoch the caller is expecting + seq = startEpoch + + for j := uint64(0); j < workers; j++ { + + id := j + go func() { + // each worker will iterate modulo its id + for epoch := startEpoch + id; epoch < endEpoch+1; epoch += workers { + // Fetch the duties and participation for a single epoch + es, err := r.fetchEpoch(true, epoch) + if err != nil { + // Return the error to the caller + errChan <- err + // Note that an error was encountered + done = true + // Tell other threads to wake up and exit + cond.Broadcast() + // Exit this thread + return + } + + // Wait until it's this worker's turn to produce a result + cond.L.Lock() + for seq != epoch && !done { + // No errors have been encountered, and it is not this worker's + // turn yet, so go back to sleep + cond.Wait() + } + + // Check if this worker was woken up due to an error + if done { + // Another worker encountered an error, so exit now + return + } + + // No error was encountered, and seq indicates it's this worker's turn + // to update the state, so process the epoch + r.processEpoch(es) + + if epoch == endEpoch { + // The last result has been produced, so close the channels + close(errChan) + // This worker produced the last result, so + // signal to the other workers that it is time to exit + done = true + } else { + seq++ + if seq%100 == 99 { + timeTaken := time.Since(startTime) + r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", + r.logPrefix, + es.epoch, + endEpoch, + float64(es.epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, + timeTaken) + } + } + + // Either seq has been updated or the last result was produced + // signal to the other workers to wake up and either do work, + // or exit now. + cond.Broadcast() + cond.L.Unlock() + } + }() + } +} + // Get all of the duties for a range of epochs func (r *treeGeneratorImpl_v5) processAttestationsForInterval() error { + if os.Getenv("DUTIES_PPROF") != "" { + f, err := os.Create(os.Getenv("DUTIES_PPROF")) + if err != nil { + return err + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch @@ -744,90 +862,93 @@ func (r *treeGeneratorImpl_v5) processAttestationsForInterval() error { r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) - epochsDone := 0 + // Workers need a channel to send back errors + errs := make(chan error) + reportStartTime := time.Now() - for epoch := startEpoch; epoch < endEpoch+1; epoch++ { - if epochsDone == 100 { - timeTaken := time.Since(reportStartTime) - r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", r.logPrefix, epoch, endEpoch, float64(epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, timeTaken) - epochsDone = 0 - } - err := r.processEpoch(true, epoch) - if err != nil { - return err - } + // Start populating epochStates + r.fetchEpochs(startEpoch, endEpoch, errs, reportStartTime) - epochsDone++ + // Read until the error channel is closed + for err := range errs { + return err } // Check the epoch after the end of the interval for any lingering attestations epoch := endEpoch + 1 - err = r.processEpoch(false, epoch) + es, err := r.fetchEpoch(false, epoch) if err != nil { return err } + r.processEpoch(es) r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) return nil } +func (r *treeGeneratorImpl_v5) processEpoch(es *epochState) { + + // Get all of the expected duties for the epoch + // Note: committees will be nil for the last epoch + if es.committees != nil { + r.getDutiesForEpoch(es.committees) + } + + // Process all of the slots in the epoch + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := es.epoch*r.slotsPerEpoch + i + + // The element will be nil if there was no block at the slot + if len(es.attestations[i]) > 0 { + // There was a block - process its attestations + r.checkDutiesForSlot(es.attestations[i], slot) + } + } +} + // Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance -func (r *treeGeneratorImpl_v5) processEpoch(getDuties bool, epoch uint64) error { +func (r *treeGeneratorImpl_v5) fetchEpoch(getDuties bool, epoch uint64) (*epochState, error) { // Get the committee info and attestation records for this epoch - var committeeData beacon.Committees - attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch) + out := &epochState{ + epoch: epoch, + attestations: make([][]beacon.AttestationInfo, r.slotsPerEpoch), + } var wg errgroup.Group if getDuties { wg.Go(func() error { var err error - committeeData, err = r.bc.GetCommitteesForEpoch(&epoch) + out.committees, err = r.bc.GetCommitteesForEpoch(&epoch) return err }) } - for i := uint64(0); i < r.slotsPerEpoch; i++ { - i := i - slot := epoch*r.slotsPerEpoch + i - wg.Go(func() error { + wg.Go(func() error { + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := epoch*r.slotsPerEpoch + i attestations, found, err := r.bc.GetAttestations(fmt.Sprint(slot)) if err != nil { return err } if found { - attestationsPerSlot[i] = attestations + //out.attestationsResponses[i] = attestations + out.attestations[i] = attestations } else { - attestationsPerSlot[i] = []beacon.AttestationInfo{} + //out.attestationsResponses[i] = nil + out.attestations[i] = nil } - return nil - }) - } + } + return nil + }) err := wg.Wait() if err != nil { - return fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) + return nil, fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) } - if getDuties { - // Get all of the expected duties for the epoch - err = r.getDutiesForEpoch(committeeData) - if err != nil { - return fmt.Errorf("Error getting duties for epoch %d: %w", epoch, err) - } - } - - // Process all of the slots in the epoch - for i := uint64(0); i < r.slotsPerEpoch; i++ { - slot := epoch*r.slotsPerEpoch + i - attestations := attestationsPerSlot[i] - if len(attestations) > 0 { - r.checkDutiesForSlot(attestations, slot) - } - } - - return nil + return out, nil } @@ -842,47 +963,51 @@ func (r *treeGeneratorImpl_v5) checkDutiesForSlot(attestations []beacon.Attestat // Get the RP committees for this attestation's slot and index slotInfo, exists := r.intervalDutiesInfo.Slots[attestation.SlotIndex] - if exists { - rpCommittee, exists := slotInfo.Committees[attestation.CommitteeIndex] - if exists { - blockTime := time.Unix(int64(r.networkState.BeaconConfig.GenesisTime), 0).Add(time.Second * time.Duration(r.networkState.BeaconConfig.SecondsPerSlot*attestation.SlotIndex)) - - // Check if each RP validator attested successfully - for position, validator := range rpCommittee.Positions { - if attestation.AggregationBits.BitAt(uint64(position)) { - // This was seen, so remove it from the missing attestations and add it to the completed ones - delete(rpCommittee.Positions, position) - if len(rpCommittee.Positions) == 0 { - delete(slotInfo.Committees, attestation.CommitteeIndex) - } - if len(slotInfo.Committees) == 0 { - delete(r.intervalDutiesInfo.Slots, attestation.SlotIndex) - } - validator.CompletedAttestations[attestation.SlotIndex] = true - delete(validator.MissingAttestationSlots, attestation.SlotIndex) + if !exists { + continue + } - // Check if this minipool was opted into the SP for this block - nodeDetails := r.nodeDetails[validator.NodeIndex] - if blockTime.Sub(nodeDetails.OptInTime) < 0 || nodeDetails.OptOutTime.Sub(blockTime) < 0 { - // Not opted in - continue - } + rpCommittee, exists := slotInfo.Committees[attestation.CommitteeIndex] + if !exists { + continue + } + blockTime := time.Unix(int64(r.networkState.BeaconConfig.GenesisTime), 0).Add(time.Second * time.Duration(r.networkState.BeaconConfig.SecondsPerSlot*attestation.SlotIndex)) - // Get the pseudoscore for this attestation - details := r.networkState.MinipoolDetailsByAddress[validator.Address] - bond, fee := r.getMinipoolBondAndNodeFee(details, blockTime) - minipoolScore := big.NewInt(0).Sub(one, fee) // 1 - fee - minipoolScore.Mul(minipoolScore, bond) // Multiply by bond - minipoolScore.Div(minipoolScore, validatorReq) // Divide by 32 to get the bond as a fraction of a total validator - minipoolScore.Add(minipoolScore, fee) // Total = fee + (bond/32)(1 - fee) - - // Add it to the minipool's score and the total score - validator.AttestationScore.Add(&validator.AttestationScore.Int, minipoolScore) - r.totalAttestationScore.Add(r.totalAttestationScore, minipoolScore) - r.successfulAttestations++ - } - } + // Check if each RP validator attested successfully + for position, validator := range rpCommittee.Positions { + if !attestation.AggregationBits.BitAt(uint64(position)) { + continue + } + // This was seen, so remove it from the missing attestations and add it to the completed ones + delete(rpCommittee.Positions, position) + if len(rpCommittee.Positions) == 0 { + delete(slotInfo.Committees, attestation.CommitteeIndex) + } + if len(slotInfo.Committees) == 0 { + delete(r.intervalDutiesInfo.Slots, attestation.SlotIndex) } + validator.CompletedAttestations[attestation.SlotIndex] = true + delete(validator.MissingAttestationSlots, attestation.SlotIndex) + + // Check if this minipool was opted into the SP for this block + nodeDetails := r.nodeDetails[validator.NodeIndex] + if blockTime.Sub(nodeDetails.OptInTime) < 0 || nodeDetails.OptOutTime.Sub(blockTime) < 0 { + // Not opted in + continue + } + + // Get the pseudoscore for this attestation + details := r.networkState.MinipoolDetailsByAddress[validator.Address] + bond, fee := r.getMinipoolBondAndNodeFee(details, blockTime) + minipoolScore := big.NewInt(0).Sub(one, fee) // 1 - fee + minipoolScore.Mul(minipoolScore, bond) // Multiply by bond + minipoolScore.Div(minipoolScore, validatorReq) // Divide by 32 to get the bond as a fraction of a total validator + minipoolScore.Add(minipoolScore, fee) // Total = fee + (bond/32)(1 - fee) + + // Add it to the minipool's score and the total score + validator.AttestationScore.Add(&validator.AttestationScore.Int, minipoolScore) + r.totalAttestationScore.Add(r.totalAttestationScore, minipoolScore) + r.successfulAttestations++ } } @@ -891,9 +1016,7 @@ func (r *treeGeneratorImpl_v5) checkDutiesForSlot(attestations []beacon.Attestat } // Maps out the attestaion duties for the given epoch -func (r *treeGeneratorImpl_v5) getDutiesForEpoch(committees beacon.Committees) error { - - defer committees.Release() +func (r *treeGeneratorImpl_v5) getDutiesForEpoch(committees beacon.Committees) { // Crawl the committees for idx := 0; idx < committees.Count(); idx++ { @@ -930,16 +1053,13 @@ func (r *treeGeneratorImpl_v5) getDutiesForEpoch(committees beacon.Committees) e } } } - - return nil - } // Maps all minipools to their validator indices and creates a map of indices to minipool info func (r *treeGeneratorImpl_v5) createMinipoolIndexMap() error { // Get the status for all uncached minipool validators and add them to the cache - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} for _, details := range r.nodeDetails { if details.IsEligible { for _, minipoolInfo := range details.Minipools { @@ -949,6 +1069,10 @@ func (r *treeGeneratorImpl_v5) createMinipoolIndexMap() error { r.log.Printlnf("NOTE: minipool %s (pubkey %s) didn't exist at this slot; removing it", minipoolInfo.Address.Hex(), minipoolInfo.ValidatorPubkey.Hex()) minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -957,7 +1081,7 @@ func (r *treeGeneratorImpl_v5) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = status.Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/generator-impl-v6.go b/shared/services/rewards/generator-impl-v6.go index 64ab76e36..f23aaadee 100644 --- a/shared/services/rewards/generator-impl-v6.go +++ b/shared/services/rewards/generator-impl-v6.go @@ -5,7 +5,11 @@ import ( "encoding/hex" "fmt" "math/big" + "os" + "runtime/pprof" "sort" + "strconv" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -41,7 +45,7 @@ type treeGeneratorImpl_v6 struct { smoothingPoolBalance *big.Int intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -91,7 +95,7 @@ func newTreeGeneratorImpl_v6(log *log.ColorLogger, logPrefix string, index uint6 }, }, validatorStatusMap: map[rptypes.ValidatorPubkey]beacon.ValidatorStatus{}, - validatorIndexMap: map[string]*MinipoolInfo{}, + validatorIndexMap: map[uint64]*MinipoolInfo{}, elSnapshotHeader: elSnapshotHeader, log: log, logPrefix: logPrefix, @@ -709,109 +713,207 @@ func (r *treeGeneratorImpl_v6) calculateNodeRewards() (*big.Int, *big.Int, error } -// Get all of the duties for a range of epochs -func (r *treeGeneratorImpl_v6) processAttestationsForInterval() error { - - startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch - endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch - - // Determine the validator indices of each minipool - err := r.createMinipoolIndexMap() - if err != nil { - return err - } +func (r *treeGeneratorImpl_v6) fetchEpochs(startEpoch uint64, endEpoch uint64, errChan chan error, startTime time.Time) { + // seq tracks the next expected epoch in the sequence to be sent to the caller + // Since we fetch epochs in parallel, each thread will sleep until it becomes its turn + // to publish an epoch to the caller via the resp channel. Every time seq is updated, + // therefor, all threads must wake up to check the value of seq. If it isn't their turn, + // they will go back to sleep. + var seq uint64 + + // If we encounter an error, we will wake up all the threads so they can exit. Therefor, + // the first time we encounter an error we should set 'done' to true, and then each thread + // should check its value every time they are woken. + var done bool + + // A cond to help the workers synchronize- whenever one thread wants to wake up the other + // threads, it does so by broadcasting on this cond. + cond := sync.NewCond(&sync.Mutex{}) + workers := getWorkerCount() + + // seq should start with the first epoch the caller is expecting + seq = startEpoch + + for j := uint64(0); j < workers; j++ { + + id := j + go func() { + // each worker will iterate modulo its id + for epoch := startEpoch + id; epoch < endEpoch+1; epoch += workers { + // Fetch the duties and participation for a single epoch + es, err := r.fetchEpoch(true, epoch) + if err != nil { + // Return the error to the caller + errChan <- err + // Note that an error was encountered + done = true + // Tell other threads to wake up and exit + cond.Broadcast() + // Exit this thread + return + } - // Check all of the attestations for each epoch - r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) - r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) + // Wait until it's this worker's turn to produce a result + cond.L.Lock() + for seq != epoch && !done { + // No errors have been encountered, and it is not this worker's + // turn yet, so go back to sleep + cond.Wait() + } - epochsDone := 0 - reportStartTime := time.Now() - for epoch := startEpoch; epoch < endEpoch+1; epoch++ { - if epochsDone == 100 { - timeTaken := time.Since(reportStartTime) - r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", r.logPrefix, epoch, endEpoch, float64(epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, timeTaken) - epochsDone = 0 - } + // Check if this worker was woken up due to an error + if done { + // Another worker encountered an error, so exit now + return + } - err := r.processEpoch(true, epoch) - if err != nil { - return err - } + // No error was encountered, and seq indicates it's this worker's turn + // to update the state, so process the epoch + r.processEpoch(es) - epochsDone++ - } + if epoch == endEpoch { + // The last result has been produced, so close the channels + close(errChan) + // This worker produced the last result, so + // signal to the other workers that it is time to exit + done = true + } else { + seq++ + if seq%100 == 99 { + timeTaken := time.Since(startTime) + r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", + r.logPrefix, + es.epoch, + endEpoch, + float64(es.epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, + timeTaken) + } + } - // Check the epoch after the end of the interval for any lingering attestations - epoch := endEpoch + 1 - err = r.processEpoch(false, epoch) - if err != nil { - return err + // Either seq has been updated or the last result was produced + // signal to the other workers to wake up and either do work, + // or exit now. + cond.Broadcast() + cond.L.Unlock() + } + }() } - - r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) - return nil - } // Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance -func (r *treeGeneratorImpl_v6) processEpoch(getDuties bool, epoch uint64) error { +func (r *treeGeneratorImpl_v6) fetchEpoch(getDuties bool, epoch uint64) (*epochState, error) { // Get the committee info and attestation records for this epoch - var committeeData beacon.Committees - attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch) + out := &epochState{ + epoch: epoch, + attestations: make([][]beacon.AttestationInfo, r.slotsPerEpoch), + } var wg errgroup.Group if getDuties { wg.Go(func() error { var err error - committeeData, err = r.bc.GetCommitteesForEpoch(&epoch) + out.committees, err = r.bc.GetCommitteesForEpoch(&epoch) return err }) } - for i := uint64(0); i < r.slotsPerEpoch; i++ { - i := i - slot := epoch*r.slotsPerEpoch + i - wg.Go(func() error { + wg.Go(func() error { + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := epoch*r.slotsPerEpoch + i attestations, found, err := r.bc.GetAttestations(fmt.Sprint(slot)) if err != nil { return err } if found { - attestationsPerSlot[i] = attestations + //out.attestationsResponses[i] = attestations + out.attestations[i] = attestations } else { - attestationsPerSlot[i] = []beacon.AttestationInfo{} + //out.attestationsResponses[i] = nil + out.attestations[i] = nil } - return nil - }) - } + } + return nil + }) err := wg.Wait() if err != nil { - return fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) + return nil, fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) } - if getDuties { - // Get all of the expected duties for the epoch - err = r.getDutiesForEpoch(committeeData) + return out, nil + +} + +// Get all of the duties for a range of epochs +func (r *treeGeneratorImpl_v6) processAttestationsForInterval() error { + if os.Getenv("DUTIES_PPROF") != "" { + f, err := os.Create(os.Getenv("DUTIES_PPROF")) if err != nil { - return fmt.Errorf("Error getting duties for epoch %d: %w", epoch, err) + return err } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() } - // Process all of the slots in the epoch - for i := uint64(0); i < r.slotsPerEpoch; i++ { - slot := epoch*r.slotsPerEpoch + i - attestations := attestationsPerSlot[i] - if len(attestations) > 0 { - r.checkDutiesForSlot(attestations, slot) - } + startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch + endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch + + // Determine the validator indices of each minipool + err := r.createMinipoolIndexMap() + if err != nil { + return err + } + + // Check all of the attestations for each epoch + r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) + r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) + + // Workers need a channel to send back errors + errs := make(chan error) + + reportStartTime := time.Now() + + // Start populating epochStates + r.fetchEpochs(startEpoch, endEpoch, errs, reportStartTime) + + // Read until the error channel is closed + for err := range errs { + return err } + // Check the epoch after the end of the interval for any lingering attestations + epoch := endEpoch + 1 + es, err := r.fetchEpoch(false, epoch) + if err != nil { + return err + } + r.processEpoch(es) + + r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) return nil } +func (r *treeGeneratorImpl_v6) processEpoch(es *epochState) { + + // Get all of the expected duties for the epoch + // Note: committees will be nil for the last epoch + if es.committees != nil { + r.getDutiesForEpoch(es.committees) + } + + // Process all of the slots in the epoch + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := es.epoch*r.slotsPerEpoch + i + + // The element will be nil if there was no block at the slot + if len(es.attestations[i]) > 0 { + // There was a block - process its attestations + r.checkDutiesForSlot(es.attestations[i], slot) + } + } +} + // Handle all of the attestations in the given slot func (r *treeGeneratorImpl_v6) checkDutiesForSlot(attestations []beacon.AttestationInfo, slot uint64) error { @@ -944,7 +1046,7 @@ func (r *treeGeneratorImpl_v6) getDutiesForEpoch(committees beacon.Committees) e func (r *treeGeneratorImpl_v6) createMinipoolIndexMap() error { // Get the status for all uncached minipool validators and add them to the cache - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} for _, details := range r.nodeDetails { if details.IsEligible { for _, minipoolInfo := range details.Minipools { @@ -954,6 +1056,10 @@ func (r *treeGeneratorImpl_v6) createMinipoolIndexMap() error { //r.log.Printlnf("NOTE: minipool %s (pubkey %s) didn't exist at this slot; removing it", minipoolInfo.Address.Hex(), minipoolInfo.ValidatorPubkey.Hex()) minipoolInfo.WasActive = false } else { + idx, err := strconv.ParseUint(status.Index, 10, 64) + if err != nil { + idx = 0 + } switch status.Status { case beacon.ValidatorState_PendingInitialized, beacon.ValidatorState_PendingQueued: // Remove minipools that don't have indices yet since they're not actually viable @@ -962,7 +1068,7 @@ func (r *treeGeneratorImpl_v6) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = status.Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + r.validatorIndexMap[idx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/generator-impl-v7.go b/shared/services/rewards/generator-impl-v7.go index 0dd237b27..9aa079040 100644 --- a/shared/services/rewards/generator-impl-v7.go +++ b/shared/services/rewards/generator-impl-v7.go @@ -5,7 +5,11 @@ import ( "encoding/hex" "fmt" "math/big" + "os" + "runtime/pprof" "sort" + "strconv" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -41,7 +45,7 @@ type treeGeneratorImpl_v7 struct { smoothingPoolBalance *big.Int intervalDutiesInfo *IntervalDutiesInfo slotsPerEpoch uint64 - validatorIndexMap map[string]*MinipoolInfo + validatorIndexMap map[uint64]*MinipoolInfo elStartTime time.Time elEndTime time.Time validNetworkCache map[uint64]bool @@ -89,7 +93,7 @@ func newTreeGeneratorImpl_v7(log *log.ColorLogger, logPrefix string, index uint6 }, }, validatorStatusMap: map[rptypes.ValidatorPubkey]beacon.ValidatorStatus{}, - validatorIndexMap: map[string]*MinipoolInfo{}, + validatorIndexMap: map[uint64]*MinipoolInfo{}, elSnapshotHeader: elSnapshotHeader, log: log, logPrefix: logPrefix, @@ -733,107 +737,206 @@ func (r *treeGeneratorImpl_v7) calculateNodeRewards() (*big.Int, *big.Int, error } -// Get all of the duties for a range of epochs -func (r *treeGeneratorImpl_v7) processAttestationsForInterval() error { - - startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch - endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch - - // Determine the validator indices of each minipool - err := r.createMinipoolIndexMap() - if err != nil { - return err - } +func (r *treeGeneratorImpl_v7) fetchEpochs(startEpoch uint64, endEpoch uint64, errChan chan error, startTime time.Time) { + // seq tracks the next expected epoch in the sequence to be sent to the caller + // Since we fetch epochs in parallel, each thread will sleep until it becomes its turn + // to publish an epoch to the caller via the resp channel. Every time seq is updated, + // therefor, all threads must wake up to check the value of seq. If it isn't their turn, + // they will go back to sleep. + var seq uint64 + + // If we encounter an error, we will wake up all the threads so they can exit. Therefor, + // the first time we encounter an error we should set 'done' to true, and then each thread + // should check its value every time they are woken. + var done bool + + // A cond to help the workers synchronize- whenever one thread wants to wake up the other + // threads, it does so by broadcasting on this cond. + cond := sync.NewCond(&sync.Mutex{}) + workers := getWorkerCount() + + // seq should start with the first epoch the caller is expecting + seq = startEpoch + + for j := uint64(0); j < workers; j++ { + + id := j + go func() { + // each worker will iterate modulo its id + for epoch := startEpoch + id; epoch < endEpoch+1; epoch += workers { + // Fetch the duties and participation for a single epoch + es, err := r.fetchEpoch(true, epoch) + if err != nil { + // Return the error to the caller + errChan <- err + // Note that an error was encountered + done = true + // Tell other threads to wake up and exit + cond.Broadcast() + // Exit this thread + return + } - // Check all of the attestations for each epoch - r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) - r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) + // Wait until it's this worker's turn to produce a result + cond.L.Lock() + for seq != epoch && !done { + // No errors have been encountered, and it is not this worker's + // turn yet, so go back to sleep + cond.Wait() + } - epochsDone := 0 - reportStartTime := time.Now() - for epoch := startEpoch; epoch < endEpoch+1; epoch++ { - if epochsDone == 100 { - timeTaken := time.Since(reportStartTime) - r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", r.logPrefix, epoch, endEpoch, float64(epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, timeTaken) - epochsDone = 0 - } + // Check if this worker was woken up due to an error + if done { + // Another worker encountered an error, so exit now + return + } - err := r.processEpoch(true, epoch) - if err != nil { - return err - } + // No error was encountered, and seq indicates it's this worker's turn + // to update the state, so process the epoch + r.processEpoch(es) - epochsDone++ - } + if epoch == endEpoch { + // The last result has been produced, so close the channels + close(errChan) + // This worker produced the last result, so + // signal to the other workers that it is time to exit + done = true + } else { + seq++ + if seq%100 == 99 { + timeTaken := time.Since(startTime) + r.log.Printlnf("%s On Epoch %d of %d (%.2f%%)... (%s so far)", + r.logPrefix, + es.epoch, + endEpoch, + float64(es.epoch-startEpoch)/float64(endEpoch-startEpoch)*100.0, + timeTaken) + } + } - // Check the epoch after the end of the interval for any lingering attestations - epoch := endEpoch + 1 - err = r.processEpoch(false, epoch) - if err != nil { - return err + // Either seq has been updated or the last result was produced + // signal to the other workers to wake up and either do work, + // or exit now. + cond.Broadcast() + cond.L.Unlock() + } + }() } - - r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) - return nil - } // Process an epoch, optionally getting the duties for all eligible minipools in it and checking each one's attestation performance -func (r *treeGeneratorImpl_v7) processEpoch(getDuties bool, epoch uint64) error { +func (r *treeGeneratorImpl_v7) fetchEpoch(getDuties bool, epoch uint64) (*epochState, error) { // Get the committee info and attestation records for this epoch - var committeeData beacon.Committees - attestationsPerSlot := make([][]beacon.AttestationInfo, r.slotsPerEpoch) + out := &epochState{ + epoch: epoch, + attestations: make([][]beacon.AttestationInfo, r.slotsPerEpoch), + } var wg errgroup.Group if getDuties { wg.Go(func() error { var err error - committeeData, err = r.bc.GetCommitteesForEpoch(&epoch) + out.committees, err = r.bc.GetCommitteesForEpoch(&epoch) return err }) } - for i := uint64(0); i < r.slotsPerEpoch; i++ { - i := i - slot := epoch*r.slotsPerEpoch + i - wg.Go(func() error { + wg.Go(func() error { + for i := uint64(0); i < r.slotsPerEpoch; i++ { + slot := epoch*r.slotsPerEpoch + i attestations, found, err := r.bc.GetAttestations(fmt.Sprint(slot)) if err != nil { return err } if found { - attestationsPerSlot[i] = attestations + //out.attestationsResponses[i] = attestations + out.attestations[i] = attestations } else { - attestationsPerSlot[i] = []beacon.AttestationInfo{} + //out.attestationsResponses[i] = nil + out.attestations[i] = nil } - return nil - }) - } + } + return nil + }) err := wg.Wait() if err != nil { - return fmt.Errorf("error getting committee and attestaion records for epoch %d: %w", epoch, err) + return nil, fmt.Errorf("Error getting committee and attestaion records for epoch %d: %w", epoch, err) } - if getDuties { - // Get all of the expected duties for the epoch - err = r.getDutiesForEpoch(committeeData) + return out, nil + +} + +// Get all of the duties for a range of epochs +func (r *treeGeneratorImpl_v7) processAttestationsForInterval() error { + if os.Getenv("DUTIES_PPROF") != "" { + f, err := os.Create(os.Getenv("DUTIES_PPROF")) if err != nil { - return fmt.Errorf("error getting duties for epoch %d: %w", epoch, err) + return err } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + + startEpoch := r.rewardsFile.ConsensusStartBlock / r.beaconConfig.SlotsPerEpoch + endEpoch := r.rewardsFile.ConsensusEndBlock / r.beaconConfig.SlotsPerEpoch + + // Determine the validator indices of each minipool + err := r.createMinipoolIndexMap() + if err != nil { + return err + } + + // Check all of the attestations for each epoch + r.log.Printlnf("%s Checking participation of %d minipools for epochs %d to %d", r.logPrefix, len(r.validatorIndexMap), startEpoch, endEpoch) + r.log.Printlnf("%s NOTE: this will take a long time, progress is reported every 100 epochs", r.logPrefix) + + // Workers need a channel to send back errors + errs := make(chan error) + + reportStartTime := time.Now() + + // Start populating epochStates + r.fetchEpochs(startEpoch, endEpoch, errs, reportStartTime) + + // Read until the error channel is closed + for err := range errs { + return err + } + + // Check the epoch after the end of the interval for any lingering attestations + epoch := endEpoch + 1 + es, err := r.fetchEpoch(false, epoch) + if err != nil { + return err + } + r.processEpoch(es) + + r.log.Printlnf("%s Finished participation check (total time = %s)", r.logPrefix, time.Since(reportStartTime)) + return nil + +} + +func (r *treeGeneratorImpl_v7) processEpoch(es *epochState) { + + // Get all of the expected duties for the epoch + // Note: committees will be nil for the last epoch + if es.committees != nil { + r.getDutiesForEpoch(es.committees) } // Process all of the slots in the epoch for i := uint64(0); i < r.slotsPerEpoch; i++ { - slot := epoch*r.slotsPerEpoch + i - attestations := attestationsPerSlot[i] - if len(attestations) > 0 { - r.checkDutiesForSlot(attestations, slot) + slot := es.epoch*r.slotsPerEpoch + i + + // The element will be nil if there was no block at the slot + if len(es.attestations[i]) > 0 { + // There was a block - process its attestations + r.checkDutiesForSlot(es.attestations[i], slot) } } - return nil - } // Handle all of the attestations in the given slot @@ -968,7 +1071,7 @@ func (r *treeGeneratorImpl_v7) getDutiesForEpoch(committees beacon.Committees) e func (r *treeGeneratorImpl_v7) createMinipoolIndexMap() error { // Get the status for all uncached minipool validators and add them to the cache - r.validatorIndexMap = map[string]*MinipoolInfo{} + r.validatorIndexMap = map[uint64]*MinipoolInfo{} for _, details := range r.nodeDetails { if details.IsEligible { for _, minipoolInfo := range details.Minipools { @@ -986,7 +1089,11 @@ func (r *treeGeneratorImpl_v7) createMinipoolIndexMap() error { default: // Get the validator index minipoolInfo.ValidatorIndex = status.Index - r.validatorIndexMap[minipoolInfo.ValidatorIndex] = minipoolInfo + vIdx, err := strconv.ParseUint(minipoolInfo.ValidatorIndex, 10, 64) + if err != nil { + vIdx = 0 + } + r.validatorIndexMap[vIdx] = minipoolInfo // Get the validator's activation start and end slots startSlot := status.ActivationEpoch * r.beaconConfig.SlotsPerEpoch diff --git a/shared/services/rewards/rolling-record.go b/shared/services/rewards/rolling-record.go index 544d2aebc..d268e216f 100644 --- a/shared/services/rewards/rolling-record.go +++ b/shared/services/rewards/rolling-record.go @@ -3,6 +3,7 @@ package rewards import ( "fmt" "math/big" + "strconv" "time" "github.com/ethereum/go-ethereum/common" @@ -23,7 +24,7 @@ const ( type RollingRecord struct { StartSlot uint64 `json:"startSlot"` LastDutiesSlot uint64 `json:"lastDutiesSlot"` - ValidatorIndexMap map[string]*MinipoolInfo `json:"validatorIndexMap"` + ValidatorIndexMap map[uint64]*MinipoolInfo `json:"validatorIndexMap"` RewardsInterval uint64 `json:"rewardsInterval"` SmartnodeVersion string `json:"smartnodeVersion,omitempty"` @@ -45,7 +46,7 @@ func NewRollingRecord(log *log.ColorLogger, logPrefix string, bc beacon.Client, return &RollingRecord{ StartSlot: startSlot, LastDutiesSlot: 0, - ValidatorIndexMap: map[string]*MinipoolInfo{}, + ValidatorIndexMap: map[uint64]*MinipoolInfo{}, RewardsInterval: rewardsInterval, SmartnodeVersion: shared.RocketPoolVersion, @@ -169,7 +170,7 @@ func (r *RollingRecord) Serialize() ([]byte, error) { LastDutiesSlot: r.LastDutiesSlot, RewardsInterval: r.RewardsInterval, SmartnodeVersion: r.SmartnodeVersion, - ValidatorIndexMap: map[string]*MinipoolInfo{}, + ValidatorIndexMap: map[uint64]*MinipoolInfo{}, } // Remove minipool perf records with zero attestations from the serialization @@ -202,7 +203,12 @@ func (r *RollingRecord) updateValidatorIndices(state *state.NetworkState) { continue } - _, exists = r.ValidatorIndexMap[validator.Index] + idx, err := strconv.ParseUint(validator.Index, 10, 64) + if err != nil { + idx = 0 + } + + _, exists = r.ValidatorIndexMap[idx] if !exists && mpd.Status == types.Staking { // Validator exists and is staking but it hasn't been recorded yet, add it to the map and update the latest index so we don't remap stuff we've already seen minipoolInfo := &MinipoolInfo{ @@ -213,7 +219,7 @@ func (r *RollingRecord) updateValidatorIndices(state *state.NetworkState) { MissingAttestationSlots: map[uint64]bool{}, AttestationScore: NewQuotedBigInt(0), } - r.ValidatorIndexMap[validator.Index] = minipoolInfo + r.ValidatorIndexMap[idx] = minipoolInfo } } } diff --git a/shared/services/state/network-state.go b/shared/services/state/network-state.go index 012d45b12..aec0b4661 100644 --- a/shared/services/state/network-state.go +++ b/shared/services/state/network-state.go @@ -2,6 +2,7 @@ package state import ( "fmt" + "math" "math/big" "time" @@ -373,19 +374,55 @@ func (s *NetworkState) CalculateTrueEffectiveStakes(scaleByParticipation bool, a minCollateral := big.NewInt(0).Mul(eligibleBorrowedEth, s.NetworkDetails.MinCollateralFraction) minCollateral.Div(minCollateral, s.NetworkDetails.RplPrice) - // maxCollateral := bondedEth * maxCollateralFraction / ratio - // NOTE: maxCollateralFraction and ratio are both percentages, but multiplying and dividing by them cancels out the need for normalization by eth.EthToWei(1) - maxCollateral := big.NewInt(0).Mul(eligibleBondedEth, s.NetworkDetails.MaxCollateralFraction) - maxCollateral.Div(maxCollateral, s.NetworkDetails.RplPrice) - // Calculate the effective stake nodeStake := big.NewInt(0).Set(node.RplStake) - if nodeStake.Cmp(minCollateral) == -1 { + if nodeStake.Cmp(minCollateral) == -1 || eligibleBorrowedEth.Cmp(big.NewInt(0)) == 0 { // Under min collateral nodeStake.SetUint64(0) - } else if nodeStake.Cmp(maxCollateral) == 1 { - // Over max collateral - nodeStake.Set(maxCollateral) + } else { + // Calculate a few terms. + stakedRplValueInEth := big.NewInt(0).Mul(nodeStake, s.NetworkDetails.RplPrice) + stakedRplValueInEth.Div(stakedRplValueInEth, big.NewInt(1e18)) + + // If between (inclusive 0.1 and 0.15, weight is just 100 * staked_rpl_value_in_eth + // we already know we're above 10% of borrowed. + midCollateral := big.NewInt(0).Mul(eligibleBorrowedEth, big.NewInt(150000000000000000)) + midCollateral.Div(midCollateral, s.NetworkDetails.RplPrice) + + var weight *big.Float + if nodeStake.Cmp(midCollateral) <= 0 { + weight = big.NewFloat(0).Mul(big.NewFloat(100), big.NewFloat(0).SetInt(stakedRplValueInEth)) + } else { + lnArgs, _ := + big.NewFloat(0).Sub( + big.NewFloat(0).Mul( + big.NewFloat(100.0), + big.NewFloat(0).Quo( + big.NewFloat(0.0).SetInt(stakedRplValueInEth), + big.NewFloat(0.0).SetInt(eligibleBorrowedEth), + ), + ), + big.NewFloat(13.0), + ).Float64() + weight = big.NewFloat(0).Mul( + big.NewFloat(0).Add( + big.NewFloat(13.6137), + big.NewFloat(0).Mul( + big.NewFloat(2.0), + big.NewFloat(math.Log(lnArgs)), + ), + ), + big.NewFloat(0).SetInt(eligibleBorrowedEth), + ) + } + + approx, _ := weight.Float64() + if math.IsNaN(approx) { + nodeStake.SetUint64(0) + } else { + integered, _ := weight.Int(nil) + nodeStake.Set(integered) + } } // Scale the effective stake by the participation in the current interval