Skip to content

Commit 585d572

Browse files
committed
[watcher] dedup WithdrawalSignedEvent applies via signatures.is_some()
Sui's SubscribeCheckpoints stream can redeliver a checkpoint, which made every pod's watcher apply the same WithdrawalSignedEvent twice — the second apply correctly hit InsufficientCapacity but tripped the sticky guardian_limiter_drifted bit even though the local limiter was still in lockstep with the guardian. Gate the apply on signatures.is_some(); the same gate also skips historical events that the bootstrap snapshot already counted via GetGuardianInfo's next_seq.
1 parent 7d1e4c9 commit 585d572

2 files changed

Lines changed: 26 additions & 2 deletions

File tree

crates/hashi/src/metrics.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub struct Metrics {
4747
pub guardian_limiter_validate_total: IntCounterVec,
4848
pub guardian_limiter_apply_total: IntCounterVec,
4949
pub guardian_limiter_anchor_events_total: IntCounter,
50+
pub guardian_limiter_anchor_events_skipped_total: IntCounter,
5051
pub guardian_limiter_batch_truncated_total: IntCounter,
5152
pub guardian_limiter_batch_stuck_head_total: IntCounter,
5253
pub guardian_rpc_total: IntCounterVec,
@@ -352,6 +353,12 @@ impl Metrics {
352353
registry,
353354
)
354355
.unwrap(),
356+
guardian_limiter_anchor_events_skipped_total: register_int_counter_with_registry!(
357+
"hashi_guardian_limiter_anchor_events_skipped_total",
358+
"WithdrawalSignedEvent observations skipped because signatures were already recorded — covers checkpoint-stream redelivery and bootstrap-replay events",
359+
registry,
360+
)
361+
.unwrap(),
355362
guardian_limiter_batch_truncated_total: register_int_counter_with_registry!(
356363
"hashi_guardian_limiter_batch_truncated_total",
357364
"Times the leader's approved batch was truncated by local-limiter capacity (the head fits, the tail does not)",

crates/hashi/src/onchain/watcher.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,14 @@ async fn handle_events(
451451
// Advance uses the event's checkpoint timestamp (~sign-time)
452452
// rather than `txn.timestamp_ms` (creation time) to stay in
453453
// lockstep with the guardian's `last_updated_at`.
454+
//
455+
// `signatures.is_some()` doubles as a "have we already
456+
// accounted for this txn?" flag, set on the first apply
457+
// here and by `scrape_hashi` at bootstrap. It keeps the
458+
// limiter idempotent if `SubscribeCheckpoints` redelivers
459+
// a checkpoint, and it also skips historical events that
460+
// the bootstrap snapshot already counted via the
461+
// guardian's `next_seq`.
454462
let (limiter_inputs, pick_to_sign_ms) = {
455463
let mut state = state.state_mut();
456464
state
@@ -462,17 +470,26 @@ async fn handle_events(
462470
.withdrawal_txns
463471
.get_mut(&event.withdrawal_txn_id)
464472
{
465-
Some(txn) => {
473+
Some(txn) if txn.signatures.is_none() => {
466474
txn.signatures = Some(event.signatures.clone());
467475
let amount_sats = withdrawal_limiter_consumption_amount(txn);
468476
let timestamp_secs = checkpoint_timestamp_ms / 1000;
469477
let pick_to_sign =
470478
checkpoint_timestamp_ms.saturating_sub(txn.timestamp_ms);
471479
(Some((amount_sats, timestamp_secs)), Some(pick_to_sign))
472480
}
473-
None => (None, None),
481+
_ => (None, None),
474482
}
475483
};
484+
if limiter_inputs.is_none() {
485+
tracing::debug!(
486+
withdrawal_txn_id = %event.withdrawal_txn_id,
487+
"Skipping limiter apply: WithdrawalSignedEvent for already-signed txn"
488+
);
489+
if let Some(metrics) = state.metrics() {
490+
metrics.guardian_limiter_anchor_events_skipped_total.inc();
491+
}
492+
}
476493
if let Some(d) = pick_to_sign_ms
477494
&& let Some(metrics) = state.metrics()
478495
{

0 commit comments

Comments
 (0)