Skip to content

Commit bacaef6

Browse files
committed
bridge: implement additional methods for self_heal
1 parent fcf72a4 commit bacaef6

8 files changed

Lines changed: 701 additions & 104 deletions

File tree

bridge/listener/rootchain_selfheal.go

Lines changed: 99 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ import (
1717
"github.com/0xPolygon/heimdall-v2/helper"
1818
)
1919

20+
// maxStakeNoncesPerCycle bounds queued nonces per validator per self-heal tick.
21+
// Polygon PoS runs one self-healing node per network, so total per-cycle calls
22+
// cap at validatorSetSize * maxStakeNoncesPerCycle on that single node.
23+
const maxStakeNoncesPerCycle = 10
24+
2025
var (
2126
stateSyncedCounter = promauto.NewCounter(prometheus.CounterOpts{
2227
Namespace: "self_healing",
@@ -25,11 +30,11 @@ var (
2530
Help: "The total number of missing StateSynced events processed",
2631
})
2732

28-
stakeUpdateCounter = promauto.NewCounter(prometheus.CounterOpts{
33+
stakeEventCounter = promauto.NewCounter(prometheus.CounterOpts{
2934
Namespace: "self_healing",
3035
Subsystem: helper.GetConfig().Chain,
31-
Name: "StakeUpdate",
32-
Help: "The total number of missing StakeUpdate events processed",
36+
Name: "StakeEvent",
37+
Help: "The total number of missing nonce-gated stake events (StakeUpdate, SignerChange, UnstakeInit) processed",
3338
})
3439

3540
checkpointAckCounter = promauto.NewCounter(prometheus.CounterOpts{
@@ -57,23 +62,23 @@ func (rl *RootChainListener) startSelfHealing(ctx context.Context) {
5762
httpClient: &http.Client{Timeout: 5 * time.Second},
5863
}
5964

60-
stakeUpdateTicker := time.NewTicker(helper.GetConfig().SHStakeUpdateInterval)
65+
stakeEventsTicker := time.NewTicker(helper.GetConfig().SHStakeUpdateInterval)
6166
stateSyncedTicker := time.NewTicker(helper.GetConfig().SHStateSyncedInterval)
6267
checkpointAckTicker := time.NewTicker(helper.GetConfig().SHCheckpointAckInterval)
6368

6469
rl.Logger.Info("Self-healing: started")
6570

6671
for {
6772
select {
68-
case <-stakeUpdateTicker.C:
69-
rl.processStakeUpdate(ctx)
73+
case <-stakeEventsTicker.C:
74+
rl.processStakeEvents(ctx)
7075
case <-stateSyncedTicker.C:
7176
rl.processStateSynced(ctx)
7277
case <-checkpointAckTicker.C:
7378
rl.processCheckpointAck(ctx)
7479
case <-ctx.Done():
7580
rl.Logger.Info("Self-healing: stopping")
76-
stakeUpdateTicker.Stop()
81+
stakeEventsTicker.Stop()
7782
stateSyncedTicker.Stop()
7883
checkpointAckTicker.Stop()
7984

@@ -175,9 +180,11 @@ func (rl *RootChainListener) processCheckpointAck(ctx context.Context) {
175180
rl.Logger.Info("Self-healing: successfully queued checkpoint ACK task", "headerBlockId", l1HeaderBlockId, "logIndex", latestL1Checkpoint.LogIndex, "txHash", targetLog.TxHash.Hex())
176181
}
177182

178-
// processStakeUpdate checks if validators are in sync, otherwise syncs them by broadcasting missing events
179-
func (rl *RootChainListener) processStakeUpdate(ctx context.Context) {
180-
// Fetch all heimdall validators
183+
// processStakeEvents recovers any missing nonce-gated stake event for each validator.
184+
// StakeUpdate, SignerChange, and UnstakeInit events share a single per-validator nonce counter.
185+
// If Heimdall's nonce for a validator lags the L1 maximum across these three event types,
186+
// the missing event is fetched from the subgraph and replayed.
187+
func (rl *RootChainListener) processStakeEvents(ctx context.Context) {
181188
validatorSet, err := util.GetValidatorSet(rl.cliCtx.Codec)
182189
if err != nil {
183190
rl.Logger.Error("Self-healing: failed to fetch validator set from heimdall", "error", err)
@@ -186,61 +193,103 @@ func (rl *RootChainListener) processStakeUpdate(ctx context.Context) {
186193

187194
rl.Logger.Info("Self-healing: fetched validator list from heimdall", "validatorCount", len(validatorSet.Validators))
188195

189-
// Make sure each validator is in sync
190196
var wg sync.WaitGroup
191197
for _, validator := range validatorSet.Validators {
192198
wg.Add(1)
193-
194199
go func(id uint64) {
195200
defer wg.Done()
201+
rl.recoverStakeEventsForValidator(ctx, id)
202+
}(validator.ValId)
203+
}
204+
wg.Wait()
205+
}
196206

197-
nonce, err := util.GetValidatorNonce(id, rl.cliCtx.Codec)
198-
if err != nil {
199-
rl.Logger.Error("Self-healing: failed to fetch nonce for validator from Heimdall", "validatorId", id, "error", err)
200-
return
201-
}
202-
203-
var ethereumNonce uint64
204-
205-
if err = helper.ExponentialBackoff(func() error {
206-
ethereumNonce, err = rl.getLatestNonce(ctx, id)
207-
return err
208-
}, 3, time.Second); err != nil {
209-
rl.Logger.Error("Self-healing: failed to fetch latest nonce from Ethereum (L1) for validator", "validatorId", id, "error", err)
210-
return
211-
}
212-
rl.Logger.Info("Self-healing: retrieved nonces for validator", "validatorId", id, "ethereumNonce", ethereumNonce, "heimdallNonce", nonce)
207+
func (rl *RootChainListener) recoverStakeEventsForValidator(ctx context.Context, id uint64) {
208+
heimdallNonce, l1MaxNonce, ok := rl.fetchValidatorNonces(ctx, id)
209+
if !ok || l1MaxNonce <= heimdallNonce {
210+
return
211+
}
213212

214-
if ethereumNonce <= nonce {
215-
return
216-
}
213+
// Successive iterations are paced by util.StakeNonceRetryDelay so the bridge
214+
// processor sees nonces in committed order rather than firing its
215+
// deferred-retry path with growing per-task multipliers.
216+
queued := uint64(0)
217+
for nonce := heimdallNonce + 1; nonce <= l1MaxNonce && queued < maxStakeNoncesPerCycle; nonce++ {
218+
if !rl.replayStakeEvent(ctx, id, l1MaxNonce, nonce) {
219+
break
220+
}
221+
queued++
222+
if nonce < l1MaxNonce && queued < maxStakeNoncesPerCycle && !pauseBetweenReplays(ctx) {
223+
return
224+
}
225+
}
217226

218-
nonce++
227+
if remaining := l1MaxNonce - heimdallNonce - queued; remaining > 0 {
228+
rl.Logger.Warn("Self-healing: stake event backlog exceeds per-cycle limit; will continue next cycle", "validatorId", id, "queued", queued, "remaining", remaining, "perCycleLimit", maxStakeNoncesPerCycle)
229+
}
230+
}
219231

220-
rl.Logger.Info("Self-healing: validator is behind; processing missing stake update", "validatorId", id, "ethereumNonce", ethereumNonce, "nextExpectedNonce", nonce)
232+
// fetchValidatorNonces returns the validator's heimdall and L1 nonces
233+
func (rl *RootChainListener) fetchValidatorNonces(ctx context.Context, id uint64) (uint64, uint64, bool) {
234+
heimdallNonce, err := util.GetValidatorNonce(id, rl.cliCtx.Codec)
235+
if err != nil {
236+
rl.Logger.Error("Self-healing: failed to fetch nonce for validator from Heimdall", "validatorId", id, "error", err)
237+
return 0, 0, false
238+
}
221239

222-
var stakeUpdate *types.Log
240+
var l1MaxNonce uint64
241+
if err = helper.ExponentialBackoff(func() error {
242+
l1MaxNonce, err = rl.getMaxL1NonceForValidator(ctx, id)
243+
return err
244+
}, 3, time.Second); err != nil {
245+
rl.Logger.Error("Self-healing: failed to fetch latest nonce from Ethereum (L1) for validator", "validatorId", id, "error", err)
246+
return 0, 0, false
247+
}
248+
rl.Logger.Info("Self-healing: retrieved nonces for validator", "validatorId", id, "ethereumNonce", l1MaxNonce, "heimdallNonce", heimdallNonce)
249+
return heimdallNonce, l1MaxNonce, true
250+
}
223251

224-
if err = helper.ExponentialBackoff(func() error {
225-
stakeUpdate, err = rl.getStakeUpdate(ctx, id, nonce)
226-
return err
227-
}, 3, time.Second); err != nil {
228-
rl.Logger.Error("Self-healing: failed to retrieve StakeUpdate event from subgraph", "validatorId", id, "nonce", nonce, "error", err)
229-
return
230-
}
231-
rl.Logger.Info("Self-healing: fetched StakeUpdate event from Ethereum", "validatorId", id, "nonce", nonce, "blockNumber", stakeUpdate.BlockNumber, "txHash", stakeUpdate.TxHash.Hex())
252+
// pauseBetweenReplays returns false if context is canceled during the wait.
253+
func pauseBetweenReplays(ctx context.Context) bool {
254+
select {
255+
case <-ctx.Done():
256+
return false
257+
case <-time.After(util.StakeNonceRetryDelay):
258+
return true
259+
}
260+
}
232261

233-
stakeUpdateCounter.Inc()
262+
// replayStakeEvent fetches and replays the stake event at (id, nonce). Returns
263+
// false on any condition that makes further iteration pointless this cycle:
264+
// subgraph fetch error, processEvent error, or event too recent for
265+
// SHMaxDepthDuration (since later nonces have strictly later block times).
266+
func (rl *RootChainListener) replayStakeEvent(ctx context.Context, id, l1MaxNonce, nonce uint64) bool {
267+
rl.Logger.Info("Self-healing: validator is behind; processing missing stake event", "validatorId", id, "ethereumNonce", l1MaxNonce, "nextExpectedNonce", nonce)
268+
269+
var stakeEventLog *types.Log
270+
var err error
271+
if err = helper.ExponentialBackoff(func() error {
272+
stakeEventLog, err = rl.getStakeEventLogByNonce(ctx, id, nonce)
273+
return err
274+
}, 3, time.Second); err != nil {
275+
rl.Logger.Error("Self-healing: failed to retrieve stake event from subgraph", "validatorId", id, "nonce", nonce, "error", err)
276+
return false
277+
}
278+
rl.Logger.Info("Self-healing: fetched stake event from Ethereum", "validatorId", id, "nonce", nonce, "blockNumber", stakeEventLog.BlockNumber, "txHash", stakeEventLog.TxHash.Hex())
234279

235-
if _, err = rl.processEvent(ctx, stakeUpdate); err != nil {
236-
rl.Logger.Error("Self-healing: failed to process StakeUpdate event", "validatorId", id, "nonce", nonce, "error", err)
237-
} else {
238-
rl.Logger.Info("Self-healing: successfully processed StakeUpdate event", "validatorId", id, "nonce", nonce)
239-
}
240-
}(validator.ValId)
280+
skipped, err := rl.processEvent(ctx, stakeEventLog)
281+
if err != nil {
282+
rl.Logger.Error("Self-healing: failed to process stake event", "validatorId", id, "nonce", nonce, "error", err)
283+
return false
284+
}
285+
if skipped {
286+
rl.Logger.Info("Self-healing: stake event too recent for self-heal; will retry next cycle", "validatorId", id, "nonce", nonce)
287+
return false
241288
}
242289

243-
wg.Wait()
290+
stakeEventCounter.Inc()
291+
rl.Logger.Info("Self-healing: successfully processed stake event", "validatorId", id, "nonce", nonce)
292+
return true
244293
}
245294

246295
// processStateSynced checks if chains are in sync, otherwise syncs them by broadcasting missing events

0 commit comments

Comments
 (0)