Skip to content

Commit 6318120

Browse files
committed
Require N consecutive ticks of non-canonical head before fork alert
Propagation timing routinely produces transient 1-2 slot leads or lags that the previous code surfaced as fork alerts (and an immediate resolved alert a tick later). Configurable via fork_confirm_ticks (default 3 = ~90s at the default 30s poll), persisted per-client in the dedup state so it survives restarts.
1 parent d3d4fb6 commit 6318120

4 files changed

Lines changed: 34 additions & 1 deletion

File tree

dora_monitor/config.example.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ slot_scan_limit: 64
2020
# Sync lag threshold (slots behind the canonical head) before alerting.
2121
sync_lag_threshold: 16
2222

23+
# Number of consecutive polls a matched client must be on a non-canonical
24+
# head before we fire a fork alert. Filters propagation-timing jitter
25+
# (1-2 slot leads/lags that self-resolve in the next poll). With
26+
# poll_interval=30, fork_confirm_ticks=3 = ~90s of confirmation.
27+
fork_confirm_ticks: 3
28+
2329
# Path for persisted dedup state. Use null for in-memory only.
2430
state_file: "./dora_monitor_state.json"
2531

dora_monitor/dora_monitor/checks.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def check_client_head_forks(
7575
state.last_known_head = max(state.last_known_head, canonical_slot)
7676

7777
current_forked: set[str] = set()
78+
forked_candidates: set[str] = set()
7879
current_lagging: set[str] = set()
7980
current_offline: set[str] = set()
8081
matched_clients: dict[str, dict] = {}
@@ -110,7 +111,7 @@ def check_client_head_forks(
110111
continue
111112

112113
if cfg.checks.forks and not is_canonical:
113-
current_forked.add(name)
114+
forked_candidates.add(name)
114115

115116
if cfg.checks.sync_lag:
116117
distance = canonical_slot - client_head
@@ -131,6 +132,24 @@ def check_client_head_forks(
131132
state.offline_clients = current_offline
132133

133134
if cfg.checks.forks:
135+
# Bump the consecutive-tick counter for clients seen on a
136+
# non-canonical fork this tick; reset for everyone else. Only treat
137+
# a client as truly forked once the counter crosses fork_confirm_ticks
138+
# to filter propagation-timing noise (1-2 slot jitter that resolves
139+
# within a poll or two).
140+
threshold = max(cfg.fork_confirm_ticks, 1)
141+
seen_matched = set(matched_clients.keys())
142+
for name in list(state.pending_fork_ticks.keys()):
143+
if name not in seen_matched:
144+
# Client vanished from the payload (lost from network view);
145+
# drop the pending counter so it doesn't persist forever.
146+
del state.pending_fork_ticks[name]
147+
for name in forked_candidates:
148+
state.pending_fork_ticks[name] = state.pending_fork_ticks.get(name, 0) + 1
149+
for name in seen_matched - forked_candidates:
150+
state.pending_fork_ticks.pop(name, None)
151+
current_forked = {n for n, c in state.pending_fork_ticks.items() if c >= threshold}
152+
134153
new_forked = current_forked - state.forked_clients
135154
resolved_forked = state.forked_clients - current_forked
136155
for name in sorted(new_forked):

dora_monitor/dora_monitor/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ class Config:
2222
poll_interval: int = 30
2323
slot_scan_limit: int = 64
2424
sync_lag_threshold: int = 16
25+
# Number of consecutive polls a matched client must be on a non-canonical
26+
# head before we fire a fork alert. Filters out propagation-timing noise
27+
# where one client briefly leads or lags by a slot or two.
28+
fork_confirm_ticks: int = 3
2529
state_file: str | None = "./dora_monitor_state.json"
2630
http_timeout: int = 10
2731
debug: bool = False

dora_monitor/dora_monitor/state.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ class State:
1313
offline_clients: set[str] = field(default_factory=set)
1414
lagging_clients: set[str] = field(default_factory=set)
1515
forked_clients: set[str] = field(default_factory=set)
16+
# Per-client consecutive non-canonical observations. Used to require
17+
# N ticks of confirmation before firing a fork alert.
18+
pending_fork_ticks: dict[str, int] = field(default_factory=dict)
1619
last_known_head: int = 0
1720
last_heartbeat_ts: float = 0.0
1821
client_versions: dict[str, str] = field(default_factory=dict)
@@ -40,6 +43,7 @@ def from_json(cls, d: dict) -> "State":
4043
last_known_head=int(d.get("last_known_head", 0)),
4144
last_heartbeat_ts=float(d.get("last_heartbeat_ts", 0.0)),
4245
client_versions=dict(d.get("client_versions", {})),
46+
pending_fork_ticks={k: int(v) for k, v in (d.get("pending_fork_ticks") or {}).items()},
4347
)
4448

4549

0 commit comments

Comments
 (0)