Skip to content

Commit 83773b2

Browse files
committed
refactor retry logic for SendSignedTaskResponseToAggregator + Local testnet works w/ telemetry
1 parent f667829 commit 83773b2

File tree

3 files changed

+59
-24
lines changed

3 files changed

+59
-24
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Common variables for all the services
2+
# 'production' only prints info and above. 'development' also prints debug
3+
environment: 'development'
4+
aligned_layer_deployment_config_file_path: '../contracts/script/output/devnet/alignedlayer_deployment_output.json'
5+
eigen_layer_deployment_config_file_path: '../contracts/script/output/devnet/eigenlayer_deployment_output.json'
6+
eth_rpc_url: 'http://localhost:8545'
7+
eth_rpc_url_fallback: 'http://localhost:8545'
8+
eth_ws_url: 'ws://localhost:8545'
9+
eth_ws_url_fallback: 'ws://localhost:8545'
10+
eigen_metrics_ip_port_address: 'localhost:9090'
11+
12+
## ECDSA Configurations
13+
ecdsa:
14+
private_key_store_path: '../config-files/devnet/keys/operator-1.ecdsa.key.json'
15+
private_key_store_password: ''
16+
17+
## BLS Configurations
18+
bls:
19+
private_key_store_path: '../config-files/devnet/keys/operator-1.bls.key.json'
20+
private_key_store_password: ''
21+
22+
## Operator Configurations
23+
operator:
24+
aggregator_rpc_server_ip_port_address: localhost:8090
25+
operator_tracker_ip_port_address: http://localhost:4001
26+
address: 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
27+
earnings_receiver_address: 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
28+
delegation_approver_address: '0x0000000000000000000000000000000000000000'
29+
staker_opt_out_window_blocks: 0
30+
metadata_url: 'https://yetanotherco.github.io/operator_metadata/metadata.json'
31+
enable_metrics: true
32+
metrics_ip_port_address: localhost:9092
33+
max_batch_size: 268435456 # 256 MiB
34+
last_processed_batch_filepath: '../config-files/operator-1.last_processed_batch.json'
35+
36+
# Operators variables needed for register it in EigenLayer
37+
el_delegation_manager_address: '0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9'
38+
private_key_store_path: config-files/devnet/keys/operator-1.ecdsa.key.json
39+
bls_private_key_store_path: config-files/devnet/keys/operator-1.bls.key.json
40+
signer_type: local_keystore
41+
chain_id: 31337

operator/pkg/operator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func (o *Operator) handleNewBatchLogV2(newBatchLog *servicemanager.ContractAlign
334334
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
335335
)
336336

337-
o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
337+
o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse)
338338
}
339339
func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) error {
340340

@@ -415,7 +415,7 @@ func (o *Operator) handleNewBatchLogV3(newBatchLog *servicemanager.ContractAlign
415415
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
416416
)
417417

418-
o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
418+
o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse)
419419
}
420420
func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) error {
421421

operator/pkg/rpc_client.go

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/Layr-Labs/eigensdk-go/logging"
9+
retry "github.com/yetanotherco/aligned_layer/core"
910
"github.com/yetanotherco/aligned_layer/core/types"
1011
)
1112

@@ -16,11 +17,6 @@ type AggregatorRpcClient struct {
1617
logger logging.Logger
1718
}
1819

19-
const (
20-
MaxRetries = 10
21-
RetryInterval = 10 * time.Second
22-
)
23-
2420
func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger) (*AggregatorRpcClient, error) {
2521
client, err := rpc.DialHTTP("tcp", aggregatorIpPortAddr)
2622
if err != nil {
@@ -34,31 +30,29 @@ func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger)
3430
}, nil
3531
}
3632

37-
// SendSignedTaskResponseToAggregator is the method called by operators via RPC to send
38-
// their signed task response.
39-
func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregator(signedTaskResponse *types.SignedTaskResponse) {
40-
var reply uint8
41-
for retries := 0; retries < MaxRetries; retries++ {
33+
func SendSignedTaskResponse(c *AggregatorRpcClient, signedTaskResponse *types.SignedTaskResponse) func() (uint8, error) {
34+
send_task_func := func() (uint8, error) {
35+
var reply uint8
4236
err := c.rpcClient.Call("Aggregator.ProcessOperatorSignedTaskResponseV2", signedTaskResponse, &reply)
4337
if err != nil {
4438
c.logger.Error("Received error from aggregator", "err", err)
4539
if errors.Is(err, rpc.ErrShutdown) {
4640
c.logger.Error("Aggregator is shutdown. Reconnecting...")
47-
client, err := rpc.DialHTTP("tcp", c.aggregatorIpPortAddr)
48-
if err != nil {
49-
c.logger.Error("Could not reconnect to aggregator", "err", err)
50-
time.Sleep(RetryInterval)
51-
} else {
52-
c.rpcClient = client
53-
c.logger.Info("Reconnected to aggregator")
54-
}
55-
} else {
56-
c.logger.Infof("Received error from aggregator: %s. Retrying ProcessOperatorSignedTaskResponseV2 RPC call...", err)
57-
time.Sleep(RetryInterval)
5841
}
5942
} else {
6043
c.logger.Info("Signed task response header accepted by aggregator.", "reply", reply)
61-
return
6244
}
45+
return reply, err
6346
}
47+
return send_task_func
48+
}
49+
50+
// SendSignedTaskResponseToAggregator is the method called by operators via RPC to send
51+
// their signed task response.
52+
func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregatorRetryable(signedTaskResponse *types.SignedTaskResponse) (uint8, error) {
53+
config := retry.DefaultRetryConfig()
54+
config.NumRetries = 10
55+
config.Multiplier = 1 // Constant retry interval
56+
config.InitialInterval = 10 * time.Second
57+
return retry.RetryWithData(SendSignedTaskResponse(c, signedTaskResponse), config)
6458
}

0 commit comments

Comments
 (0)