Skip to content

Commit 01562cf

Browse files
authored
Probe cycle discrimination + scheduler health snapshot (#50)
* resolved issue_01 and issue_04 for week 4 * addressed ai reivew * fixed lint error * addressed AI reviews and supplemented unit tests * fixed lint error * addressed AI reviews * addressed AI reviews
1 parent f7b0bd5 commit 01562cf

16 files changed

Lines changed: 770 additions & 98 deletions

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Discriminated ISO probe cycle outcomes (`CycleResult`: success / empty / failed) with distinct logging and `/health` fields (`last_cycle_status`, `last_cycle_error`).
13+
- Atomic `SchedulerSnapshot` for `/health` scheduler extras (`last_updated`, `poll_count`, probe stats) published under a lock from `Scheduler.health_snapshot()`.
1214
- Post the same Slack **status** summary as the interactive command to `NOTIFICATION_CHANNEL` once when the process starts (when that channel is configured).
1315
- Open-source hygiene: contributing guide, security policy, code of conduct, onboarding and handoff docs, pre-commit (Ruff), GitHub issue templates, Dependabot, CodeQL, CODEOWNERS template, and `.gitattributes`.
1416

docs/architecture.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ These components share one thread and the main event loop. They may await I/O bu
88

99
- **`Scheduler.run_forever` / `poll_once`** — orchestrates index refresh, probing, and notifications.
1010
- **`WG21Index.refresh`** — fetches and parses wg21.link index (httpx async).
11-
- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client.
11+
- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client. `run_cycle` returns a discriminated `CycleResult` (success / empty / failed).
1212
- **Slack Bolt handlers** — run on Bolt’s thread; they should not read mutable source state directly (use snapshots or health callbacks).
1313

1414
`ISOProber._stats` is updated from many coroutines in one `run_cycle()`. This is safe on the event loop because asyncio never preempts between awaits. A `threading.Lock` guards `_stats` as defense-in-depth if code is ever called from a worker thread by mistake.
@@ -17,7 +17,7 @@ These components share one thread and the main event loop. They may await I/O bu
1717

1818
| Thread | Role |
1919
|--------|------|
20-
| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler snapshot fields. |
20+
| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler fields from `Scheduler.health_snapshot()` (immutable snapshot, lock-protected publish). |
2121
| **MessageQueue sender** (`scout.py`) | Drains Slack post queue with rate limiting. |
2222
| **`run_blocking_io` / `asyncio.to_thread`** | Runs blocking psycopg2 calls (e.g. `UserWatchlist.matches_for_users`) off the loop. |
2323

src/paperscout/__main__.py

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,74 @@
2727

2828
log = logging.getLogger("paperscout")
2929

30+
# MessageQueue keys allowed in /health extras (must not overlap scheduler.health_snapshot()).
31+
_MQ_HEALTH_FIELD_NAMES = frozenset(
32+
{
33+
"mq_depth",
34+
"mq_max_size",
35+
"mq_utilization",
36+
"mq_circuit_state",
37+
}
38+
)
39+
40+
41+
def _mq_health_fields(mq: MessageQueue) -> dict:
42+
"""MQ metrics for /health; from health_fields() when present, else depth only."""
43+
if hasattr(mq, "health_fields"):
44+
try:
45+
raw = mq.health_fields()
46+
except Exception as exc:
47+
log.warning(
48+
"health: mq.health_fields() failed for %s id=%s: %s",
49+
type(mq).__name__,
50+
id(mq),
51+
exc,
52+
exc_info=True,
53+
)
54+
try:
55+
return {"mq_depth": mq.depth()}
56+
except Exception:
57+
log.warning(
58+
"health: mq.depth() fallback failed; omitting MQ fields",
59+
exc_info=True,
60+
)
61+
return {}
62+
if isinstance(raw, dict):
63+
return raw
64+
log.warning("health: mq.health_fields() returned non-dict, using mq_depth only")
65+
try:
66+
return {"mq_depth": mq.depth()}
67+
except Exception:
68+
log.warning("health: mq.depth() failed; omitting MQ fields", exc_info=True)
69+
return {}
70+
71+
72+
def _merge_extra_health_fields(
73+
scheduler_snap: dict,
74+
mq_extra: dict,
75+
db_pool: dict,
76+
) -> dict:
77+
"""Merge health JSON with scheduler winning on key conflicts."""
78+
scheduler_keys = set(scheduler_snap)
79+
mq_filtered: dict = {}
80+
for key, value in mq_extra.items():
81+
if key in _MQ_HEALTH_FIELD_NAMES:
82+
if key in scheduler_keys:
83+
log.debug(
84+
"health: mq_extra key %r conflicts with scheduler snapshot; scheduler wins",
85+
key,
86+
)
87+
else:
88+
mq_filtered[key] = value
89+
elif key in scheduler_keys:
90+
log.debug(
91+
"health: mq_extra key %r not allow-listed; scheduler snapshot kept",
92+
key,
93+
)
94+
else:
95+
log.debug("health: mq_extra key %r not allow-listed, dropping", key)
96+
return {**scheduler_snap, **mq_filtered, "db_pool": db_pool}
97+
3098

3199
def _setup_logging(data_dir: Path, console_level: str = "INFO", retention_days: int = 7) -> None:
32100
"""Console + daily rotating file logging; third-party loggers capped at WARNING."""
@@ -141,20 +209,11 @@ def _pool_status(p) -> dict:
141209
)
142210

143211
def _extra_health_fields() -> dict:
144-
lsp = scheduler._last_successful_poll
145-
s = scheduler._last_probe_stats
146-
# HTTP 200 outcomes / non-skipped probe attempts (excludes skipped_discovered, skipped_in_index).
147-
hits = s.get("hit_recent", 0) + s.get("hit_old", 0) + s.get("hit_no_lm", 0)
148-
attempted = hits + s.get("miss", 0) + s.get("error", 0)
149-
probe_success_rate = hits / attempted if attempted > 0 else None
150-
return {
151-
"last_successful_poll": (
152-
datetime.fromtimestamp(lsp, tz=timezone.utc).isoformat() if lsp else None
153-
),
154-
"probe_success_rate": probe_success_rate,
155-
"mq_depth": mq.depth(),
156-
"db_pool": _pool_status(pool),
157-
}
212+
return _merge_extra_health_fields(
213+
scheduler.health_snapshot(),
214+
_mq_health_fields(mq),
215+
_pool_status(pool),
216+
)
158217

159218
register_handlers(app, user_watchlist, state, paper_count_fn, launch_time)
160219

src/paperscout/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,12 @@ class Settings(BaseSettings):
8484
notification_channel: str = ""
8585
# Slack channel ID for ops alerts (stale poll). Empty = disabled.
8686
ops_alert_channel: str = ""
87-
# Log a warning when MessageQueue depth reaches or exceeds this (unbounded queue).
87+
# Log a warning when MessageQueue depth reaches or exceeds this (legacy threshold).
8888
mq_backpressure_threshold: int = Field(default=100, ge=1)
89+
mq_max_size: int = Field(default=1000, ge=1)
90+
mq_max_retries: int = Field(default=10, ge=0)
91+
mq_circuit_breaker_threshold: int = Field(default=5, ge=1)
92+
mq_circuit_breaker_cooldown_seconds: int = Field(default=60, ge=1)
8993
notify_on_frontier_hit: bool = True
9094
notify_on_any_draft: bool = True
9195
# Alert when a D-paper we previously probed appears in the wg21.link index

src/paperscout/health.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ def do_GET(self) -> None:
6161
except Exception:
6262
log.exception("health: extra_fields_fn failed")
6363
extra = {}
64-
body = json.dumps({**base, **extra}).encode()
64+
# Base handler fields win if extra_fields_fn returns overlapping keys.
65+
safe_extra = {k: v for k, v in extra.items() if k not in base}
66+
body = json.dumps({**base, **safe_extra}).encode()
6567

6668
self.send_response(200)
6769
self.send_header("Content-Type", "application/json")

src/paperscout/models.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,38 @@ class ProbeHit:
177177
is_recent: bool = False
178178

179179

180+
class CycleStatus(str, Enum):
181+
"""Outcome of one ``ISOProber.run_cycle()`` invocation."""
182+
183+
SUCCESS = "success"
184+
EMPTY = "empty"
185+
FAILED = "failed"
186+
187+
188+
@dataclass(frozen=True, slots=True)
189+
class CycleResult:
190+
"""Discriminated probe cycle result (success vs empty vs failed)."""
191+
192+
status: CycleStatus
193+
results: tuple[ProbeHit, ...] = ()
194+
error: str | None = None
195+
196+
@property
197+
def hits(self) -> list[ProbeHit]:
198+
"""Probe hits when ``status`` is ``SUCCESS``; otherwise empty."""
199+
return list(self.results) if self.status == CycleStatus.SUCCESS else []
200+
201+
def __post_init__(self) -> None:
202+
if self.status == CycleStatus.FAILED and not self.error:
203+
raise ValueError("CycleResult FAILED must carry a non-empty error string")
204+
if self.status == CycleStatus.SUCCESS and not self.results:
205+
raise ValueError("CycleResult SUCCESS must carry at least one ProbeHit")
206+
if self.status == CycleStatus.EMPTY and self.results:
207+
raise ValueError("CycleResult EMPTY must not carry results")
208+
if self.status != CycleStatus.FAILED and self.error is not None:
209+
raise ValueError("CycleResult error is only valid for FAILED status")
210+
211+
180212
@dataclass
181213
class PerUserMatches:
182214
"""One user's watchlist hits: ``(paper|hit, 'author'|'paper')`` tuples."""

0 commit comments

Comments
 (0)