Skip to content

Commit 99caf8d

Browse files
committed
Address code review feedback in dora_monitor
- guard the slot-set trim against last_known_head=0 (previously the cutoff could go negative and silently never trim) - pick canonical fork by client majority instead of highest head_slot (a minority fork can briefly be ahead during a split) - offline alert only on status=offline; synchronizing/optimistic are normal transient states and were over-paging - split Slack messages on line boundaries when they exceed 3800 chars instead of letting Slack silently truncate - distinguish Slack 429 in the error log - cap /clients/execution HTML read at 512KB to bound regex work - clearer error on unknown YAML keys (top-level and under checks:) - minor: docstring noting heartbeat snapshots aren't atomic, simpler dry-run prefix closure, cleaner status check in DoraClient._get
1 parent a76aa87 commit 99caf8d

5 files changed

Lines changed: 105 additions & 23 deletions

File tree

dora_monitor/dora_monitor/checks.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,13 @@ def check_client_head_forks(
6363
if not forks:
6464
return
6565

66-
# Canonical head = fork with the highest head_slot (majority assumption).
67-
canonical_fork = max(forks, key=lambda f: int(f.get("head_slot", 0)))
66+
# Canonical = fork followed by the most clients. Using head_slot would
67+
# mis-identify a minority fork that's briefly ahead during a split.
68+
# Tiebreak on highest head_slot just to be deterministic.
69+
canonical_fork = max(
70+
forks,
71+
key=lambda f: (len(f.get("clients") or []), int(f.get("head_slot", 0))),
72+
)
6873
canonical_slot = int(canonical_fork.get("head_slot", 0))
6974
canonical_root = canonical_fork.get("head_root", "")
7075
state.last_known_head = max(state.last_known_head, canonical_slot)
@@ -93,11 +98,14 @@ def check_client_head_forks(
9398
"is_canonical_fork": is_canonical,
9499
}
95100

96-
if cfg.checks.offline and status and status != "online":
101+
# Only `offline` is an actionable alert. `synchronizing` and
102+
# `optimistic` are normal transient states (esp. at startup); we
103+
# don't want to page on them. Use sync_lag for stuck-syncing nodes.
104+
if cfg.checks.offline and status == "offline":
97105
current_offline.add(name)
98106

99-
# Skip fork/lag judgement when the client isn't online; head_slot
100-
# is stale and would produce noisy alerts.
107+
# Skip fork/lag judgement when the client isn't fully online;
108+
# head_slot is stale and would produce noisy alerts.
101109
if status != "online":
102110
continue
103111

@@ -184,12 +192,22 @@ def _format_heartbeat(
184192
dora: DoraClient,
185193
cfg: Config,
186194
) -> str:
195+
"""Compose the heartbeat digest text.
196+
197+
Makes two separate HTTP requests (client_head_forks + slots) so the head
198+
slot shown and the missed/orphaned counts are sampled at slightly
199+
different instants. They may disagree by a slot or two; this is by
200+
design, not a bug.
201+
"""
187202
payload = dora.client_head_forks()
188203
forks = payload.get("forks") or []
189204
canonical_slot = 0
190205
canonical_root = ""
191206
if forks:
192-
canonical = max(forks, key=lambda f: int(f.get("head_slot", 0)))
207+
canonical = max(
208+
forks,
209+
key=lambda f: (len(f.get("clients") or []), int(f.get("head_slot", 0))),
210+
)
193211
canonical_slot = int(canonical.get("head_slot", 0))
194212
canonical_root = canonical.get("head_root", "")
195213

@@ -283,7 +301,7 @@ def maybe_heartbeat(
283301
return
284302
now = time.time()
285303
interval_s = cfg.heartbeat_interval_minutes * 60
286-
if state.last_heartbeat_ts and (now - state.last_heartbeat_ts) < interval_s:
304+
if state.last_heartbeat_ts > 0 and (now - state.last_heartbeat_ts) < interval_s:
287305
return
288306
try:
289307
text = _format_heartbeat(dora, cfg)
@@ -322,7 +340,10 @@ def run_checks(
322340
log.exception("heartbeat failed: %s", e)
323341

324342
# Trim reported-slots sets to keep state file from growing forever.
325-
cutoff = state.last_known_head - 10_000
326-
if cutoff > 0:
343+
# Guard against last_known_head being 0 (e.g. all client_head_forks
344+
# checks disabled or the check threw on every tick): without the guard,
345+
# cutoff would go negative and the trim would silently be a no-op.
346+
if state.last_known_head > 10_000:
347+
cutoff = state.last_known_head - 10_000
327348
state.reported_missed_slots = {s for s in state.reported_missed_slots if s >= cutoff}
328349
state.reported_orphan_slots = {s for s in state.reported_orphan_slots if s >= cutoff}

dora_monitor/dora_monitor/config.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,15 @@ def load_config(path: str, require_slack: bool = True) -> Config:
3737
raw = yaml.safe_load(f) or {}
3838

3939
checks_raw = raw.pop("checks", {}) or {}
40-
checks = Checks(**checks_raw)
41-
42-
cfg = Config(checks=checks, **raw)
40+
try:
41+
checks = Checks(**checks_raw)
42+
except TypeError as e:
43+
raise ValueError(f"config: unknown key under `checks:` ({e})") from e
44+
45+
try:
46+
cfg = Config(checks=checks, **raw)
47+
except TypeError as e:
48+
raise ValueError(f"config: unknown top-level key ({e})") from e
4349

4450
env_hook = os.environ.get("SLACK_WEBHOOK_URL")
4551
if env_hook:

dora_monitor/dora_monitor/dora.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ def _get(self, path: str, params: dict[str, Any] | None = None) -> Any:
2121
r = self._session.get(url, params=params, timeout=self.timeout)
2222
r.raise_for_status()
2323
data = r.json()
24-
if isinstance(data, dict) and data.get("status") and data["status"] != "OK":
25-
raise RuntimeError(f"dora API error at {path}: {data}")
24+
if isinstance(data, dict):
25+
api_status = data.get("status")
26+
if api_status and api_status != "OK":
27+
raise RuntimeError(f"dora API error at {path}: {data}")
2628
return data
2729

2830
def slots(self, limit: int = 64, with_orphaned: int = 1, with_missing: int = 1) -> list[dict]:
@@ -57,9 +59,14 @@ def execution_versions(self) -> dict[str, str]:
5759
which is empty for ethrex. So we parse the rendered table.
5860
"""
5961
url = f"{self.base_url}/clients/execution"
60-
r = self._session.get(url, timeout=self.timeout)
62+
# Cap the read at 512 KB. Dora's real page is ~220 KB; this guards
63+
# against a malformed/runaway response triggering pathological
64+
# regex backtracking or a huge in-memory string.
65+
r = self._session.get(url, timeout=self.timeout, stream=True)
6166
r.raise_for_status()
62-
body = r.text
67+
body = r.raw.read(512 * 1024, decode_content=True).decode(
68+
r.encoding or "utf-8", errors="replace"
69+
)
6370
out: dict[str, str] = {}
6471
for name, row in _ROW_RE.findall(body):
6572
m = _VERSION_RE.search(row)

dora_monitor/dora_monitor/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ def cli() -> None:
4141
dora = DoraClient(cfg.dora_url, timeout=cfg.http_timeout)
4242
slack = SlackNotifier(cfg.slack_webhook_url, cfg.network_label, timeout=cfg.http_timeout)
4343
if args.dry_run:
44-
def _dry_send(text: str, _orig=slack._prefix) -> None:
45-
print(f"[DRY-RUN] {_orig()}{text}")
44+
prefix = slack._prefix()
45+
def _dry_send(text: str) -> None:
46+
print(f"[DRY-RUN] {prefix}{text}")
4647
slack.send = _dry_send # type: ignore[assignment]
4748

4849
state = load_state(None if args.reset_state else cfg.state_file)

dora_monitor/dora_monitor/slack.py

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
log = logging.getLogger(__name__)
66

7+
# Slack mrkdwn text limit per message is 4000 chars. Keep some headroom for
8+
# the network-label prefix and the "(i/n)" series marker we may append.
9+
_MAX_TEXT = 3800
10+
711

812
class SlackNotifier:
913
def __init__(self, webhook_url: str, network_label: str = "", timeout: int = 10):
@@ -14,11 +18,54 @@ def __init__(self, webhook_url: str, network_label: str = "", timeout: int = 10)
1418
def _prefix(self) -> str:
1519
return f"[{self.network_label}] " if self.network_label else ""
1620

17-
def send(self, text: str) -> None:
18-
body = {"text": f"{self._prefix()}{text}"}
21+
def _post(self, text: str) -> None:
1922
try:
20-
r = requests.post(self.webhook_url, json=body, timeout=self.timeout)
21-
if r.status_code >= 300:
22-
log.error("slack webhook failed: %s %s", r.status_code, r.text)
23+
r = requests.post(self.webhook_url, json={"text": text}, timeout=self.timeout)
24+
if r.status_code == 429:
25+
retry = r.headers.get("Retry-After", "?")
26+
log.error("slack rate-limited (429, retry-after=%s); alert dropped", retry)
27+
elif r.status_code >= 300:
28+
log.error("slack webhook failed: %s %s", r.status_code, r.text[:200])
2329
except requests.RequestException as e:
2430
log.error("slack webhook error: %s", e)
31+
32+
def send(self, text: str) -> None:
33+
body = f"{self._prefix()}{text}"
34+
if len(body) <= _MAX_TEXT:
35+
self._post(body)
36+
return
37+
38+
chunks = _split_on_lines(body, _MAX_TEXT)
39+
total = len(chunks)
40+
for i, chunk in enumerate(chunks, 1):
41+
self._post(f"{chunk}\n_({i}/{total})_")
42+
43+
44+
def _split_on_lines(text: str, limit: int) -> list[str]:
45+
"""Split text on newline boundaries into chunks of at most `limit` chars.
46+
47+
Falls back to hard slicing for any single line longer than `limit` so a
48+
pathological input still gets through rather than being dropped.
49+
"""
50+
chunks: list[str] = []
51+
buf: list[str] = []
52+
buf_len = 0
53+
for line in text.split("\n"):
54+
# Hard-slice a single oversized line into limit-sized pieces.
55+
if len(line) > limit:
56+
if buf:
57+
chunks.append("\n".join(buf))
58+
buf, buf_len = [], 0
59+
for i in range(0, len(line), limit):
60+
chunks.append(line[i : i + limit])
61+
continue
62+
add = len(line) + (1 if buf else 0)
63+
if buf_len + add > limit:
64+
chunks.append("\n".join(buf))
65+
buf, buf_len = [line], len(line)
66+
else:
67+
buf.append(line)
68+
buf_len += add
69+
if buf:
70+
chunks.append("\n".join(buf))
71+
return chunks

0 commit comments

Comments
 (0)