Skip to content

Commit 5c5e520

Browse files
updated scout.py to introduce message queue (#51)
* updated scout.py to introduce message queue * addressed ai reviews * rebased and resolved Co-authored-by: Cursor <cursoragent@cursor.com> * fixed the gap between PRs and addressed AI review results * addressed ai review --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 01562cf commit 5c5e520

4 files changed

Lines changed: 429 additions & 34 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- `MessageQueue`: capped HTTP 429 retries, circuit breaker (CLOSED/OPEN/HALF_OPEN) with operator-visible trip logging, dead-letter logging on exhaustion, bounded queue with drop-oldest overflow, `enqueue()` bool backpressure signal, and `health_fields()` / `depth()` for `/health` (merged via `_mq_health_fields()` on `/health`). Thread-safe breaker state, safe `Retry-After` parsing, and redacted MQ logs.
13+
- Config: `mq_max_retries`, `mq_circuit_breaker_threshold`, `mq_circuit_breaker_cooldown_seconds`, `mq_max_size`.
1214
- Discriminated ISO probe cycle outcomes (`CycleResult`: success / empty / failed) with distinct logging and `/health` fields (`last_cycle_status`, `last_cycle_error`).
1315
- Atomic `SchedulerSnapshot` for `/health` scheduler extras (`last_updated`, `poll_count`, probe stats) published under a lock from `Scheduler.health_snapshot()`.
1416
- Post the same Slack **status** summary as the interactive command to `NOTIFICATION_CHANNEL` once when the process starts (when that channel is configured).

src/paperscout/config.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,6 @@ class Settings(BaseSettings):
8484
notification_channel: str = ""
8585
# Slack channel ID for ops alerts (stale poll). Empty = disabled.
8686
ops_alert_channel: str = ""
87-
# Log a warning when MessageQueue depth reaches or exceeds this (legacy threshold).
88-
mq_backpressure_threshold: int = Field(default=100, ge=1)
89-
mq_max_size: int = Field(default=1000, ge=1)
90-
mq_max_retries: int = Field(default=10, ge=0)
91-
mq_circuit_breaker_threshold: int = Field(default=5, ge=1)
92-
mq_circuit_breaker_cooldown_seconds: int = Field(default=60, ge=1)
9387
notify_on_frontier_hit: bool = True
9488
notify_on_any_draft: bool = True
9589
# Alert when a D-paper we previously probed appears in the wg21.link index
@@ -107,6 +101,12 @@ class Settings(BaseSettings):
107101
# Days of log files to keep (one file per day).
108102
log_retention_days: int = 7
109103

104+
# -- Message queue (Slack outbound) --
105+
mq_max_retries: int = Field(default=8, ge=1)
106+
mq_circuit_breaker_threshold: int = Field(default=5, ge=1)
107+
mq_circuit_breaker_cooldown_seconds: int = Field(default=60, ge=1)
108+
mq_max_size: int = Field(default=1000, ge=1)
109+
110110
@model_validator(mode="after")
111111
def _require_slack_credentials_unless_testing(self) -> Settings:
112112
"""Slack tokens must be set for real runs; pytest sets ``_PAPERSCOUT_TESTING=1``."""

src/paperscout/scout.py

Lines changed: 223 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
from __future__ import annotations
44

5+
import hashlib
56
import logging
67
import queue
78
import threading
89
import time
10+
from dataclasses import dataclass
911
from datetime import datetime, timezone
12+
from enum import Enum
1013
from typing import Any
1114

1215
from slack_bolt import App
@@ -34,16 +37,132 @@ def create_app() -> App:
3437
# ── Message Queue ─────────────────────────────────────────────────────────────
3538

3639

40+
def _redact_channel(channel: str) -> str:
41+
"""Short stable token for logs (no raw Slack channel/user id)."""
42+
digest = hashlib.sha256(channel.encode()).hexdigest()
43+
return f"ch:{digest[:8]}"
44+
45+
46+
def _payload_meta(text: str, kwargs: dict | None = None) -> str:
47+
"""Log-safe payload summary (length and kwargs keys only)."""
48+
parts = [f"text_len={len(text)}"]
49+
if kwargs:
50+
parts.append(f"kwargs_keys={','.join(sorted(kwargs))}")
51+
return " ".join(parts)
52+
53+
54+
def _parse_retry_after(raw: str | None) -> int:
55+
"""Parse Slack ``Retry-After`` (seconds only); default 5 on invalid values."""
56+
if raw is None:
57+
return 5
58+
try:
59+
return max(1, int(raw))
60+
except (TypeError, ValueError):
61+
log.warning("MQ invalid-retry-after value=%r using=5s", raw)
62+
return 5
63+
64+
65+
def _log_enqueue_rejected(context: str) -> None:
66+
"""Caller-side hint when ``enqueue()`` returns False (circuit already logged in MQ)."""
67+
log.warning("MQ notify-enqueue-rejected context=%s", context)
68+
69+
70+
class CircuitState(str, Enum):
71+
CLOSED = "closed"
72+
OPEN = "open"
73+
HALF_OPEN = "half_open"
74+
75+
76+
@dataclass(frozen=True, slots=True)
77+
class RetryPolicy:
78+
"""Per-message 429 retry cap (from settings)."""
79+
80+
max_retries: int
81+
82+
83+
class CircuitBreaker:
84+
"""Consecutive-failure circuit breaker with cooldown and half-open probe."""
85+
86+
def __init__(
87+
self,
88+
threshold: int,
89+
cooldown_seconds: int,
90+
) -> None:
91+
self._threshold = threshold
92+
self._cooldown_seconds = cooldown_seconds
93+
self._lock = threading.Lock()
94+
self._state = CircuitState.CLOSED
95+
self._consecutive_failures = 0
96+
self._opened_at: float | None = None
97+
98+
@property
99+
def state(self) -> CircuitState:
100+
with self._lock:
101+
return self._state
102+
103+
@property
104+
def consecutive_failures(self) -> int:
105+
with self._lock:
106+
return self._consecutive_failures
107+
108+
def record_success(self) -> None:
109+
with self._lock:
110+
self._consecutive_failures = 0
111+
self._state = CircuitState.CLOSED
112+
self._opened_at = None
113+
114+
def record_failure(self) -> None:
115+
with self._lock:
116+
self._consecutive_failures += 1
117+
if self._state == CircuitState.HALF_OPEN:
118+
self._trip_locked()
119+
return
120+
if self._consecutive_failures >= self._threshold:
121+
self._trip_locked()
122+
123+
def _trip_locked(self) -> None:
124+
"""Trip to OPEN; caller must hold ``_lock``."""
125+
self._state = CircuitState.OPEN
126+
self._opened_at = time.monotonic()
127+
log.error(
128+
"MQ-CIRCUIT-OPEN failures=%d cooldown=%ds",
129+
self._consecutive_failures,
130+
self._cooldown_seconds,
131+
)
132+
133+
def allow_send(self) -> bool:
134+
"""Return whether a send attempt may proceed (may transition OPEN → HALF_OPEN)."""
135+
with self._lock:
136+
if self._state in (CircuitState.CLOSED, CircuitState.HALF_OPEN):
137+
return True
138+
if self._opened_at is None:
139+
return True
140+
elapsed = time.monotonic() - self._opened_at
141+
if elapsed >= self._cooldown_seconds:
142+
self._state = CircuitState.HALF_OPEN
143+
log.info("MQ-CIRCUIT-HALF-OPEN probing after cooldown")
144+
return True
145+
return False
146+
147+
37148
class MessageQueue:
38-
"""Background queue for Slack posts: per-channel throttle and 429 retry-after."""
149+
"""Background queue for Slack posts: throttle, capped 429 retries, circuit breaker."""
39150

40151
def __init__(self, app: App):
41152
self._app = app
42-
self._q: queue.Queue[tuple[str, str, dict]] = queue.Queue()
43-
# Maps channel → Unix timestamp of the last successful send.
153+
self._q: queue.Queue[tuple[str, str, dict]] = queue.Queue(
154+
maxsize=settings.mq_max_size,
155+
)
44156
self._last_send: dict[str, float] = {}
45157
self._lock = threading.Lock()
158+
self._queue_lock = threading.Lock()
46159
self._thread: threading.Thread | None = None
160+
self._retry = RetryPolicy(max_retries=settings.mq_max_retries)
161+
self._breaker = CircuitBreaker(
162+
threshold=settings.mq_circuit_breaker_threshold,
163+
cooldown_seconds=settings.mq_circuit_breaker_cooldown_seconds,
164+
)
165+
self._warned_high_water = False
47166

48167
def start(self) -> None:
49168
"""Start the background sender thread."""
@@ -52,8 +171,9 @@ def start(self) -> None:
52171
log.info("MessageQueue started")
53172

54173
def depth(self) -> int:
55-
"""Approximate number of messages waiting to be sent (see ``queue.Queue.qsize``)."""
56-
return self._q.qsize()
174+
"""Approximate number of messages waiting to send."""
175+
with self._queue_lock:
176+
return self._q.qsize()
57177

58178
def health_fields(self) -> dict[str, Any]:
59179
"""Metrics for the ``/health`` endpoint (merged by ``__main__``)."""
@@ -65,21 +185,52 @@ def health_fields(self) -> dict[str, Any]:
65185
"mq_depth": d,
66186
"mq_max_size": m,
67187
"mq_utilization": round(utilization, 4),
68-
"mq_circuit_state": "closed",
188+
"mq_circuit_state": self._breaker.state.value,
69189
}
70190

71-
def enqueue(self, channel: str, text: str, **kwargs) -> None:
72-
"""Queue a ``chat.postMessage`` for *channel* (or user id for DMs)."""
73-
from .config import settings
74-
75-
self._q.put((channel, text, kwargs))
76-
depth = self._q.qsize()
77-
if depth >= settings.mq_backpressure_threshold:
191+
def enqueue(self, channel: str, text: str, **kwargs) -> bool:
192+
"""Queue a ``chat.postMessage``; return False when the circuit breaker rejects."""
193+
if not self._breaker.allow_send():
78194
log.warning(
79-
"MQ-BACKPRESSURE depth=%d threshold=%d",
80-
depth,
81-
settings.mq_backpressure_threshold,
195+
"MQ enqueue-rejected circuit=open %s %s",
196+
_redact_channel(channel),
197+
_payload_meta(text, kwargs),
82198
)
199+
return False
200+
201+
item = (channel, text, kwargs)
202+
max_size = settings.mq_max_size
203+
with self._queue_lock:
204+
while True:
205+
try:
206+
self._q.put_nowait(item)
207+
break
208+
except queue.Full:
209+
try:
210+
dropped_ch, dropped_text, dropped_kwargs = self._q.get_nowait()
211+
log.warning(
212+
"MQ drop-oldest %s %s",
213+
_redact_channel(dropped_ch),
214+
_payload_meta(dropped_text, dropped_kwargs),
215+
)
216+
except queue.Empty:
217+
# Consumer may have taken an item between Full and get_nowait; retry put.
218+
continue
219+
if max_size > 0:
220+
depth = self._q.qsize()
221+
high = 0.8 * max_size
222+
low = 0.7 * max_size
223+
if depth >= high and not self._warned_high_water:
224+
log.warning(
225+
"MQ high-water depth=%d max=%d utilization=%.0f%%",
226+
depth,
227+
max_size,
228+
100.0 * depth / max_size,
229+
)
230+
self._warned_high_water = True
231+
elif depth < low:
232+
self._warned_high_water = False
233+
return True
83234

84235
def _run(self) -> None:
85236
while True:
@@ -99,8 +250,30 @@ def _throttle(self, channel: str) -> None:
99250
if wait > 0:
100251
time.sleep(wait)
101252

253+
def _dead_letter(
254+
self,
255+
channel: str,
256+
text: str,
257+
*,
258+
reason: str,
259+
attempts: int = 0,
260+
kwargs: dict | None = None,
261+
) -> None:
262+
log.error(
263+
"MQ-DEAD-LETTER %s reason=%s attempts=%d %s",
264+
_redact_channel(channel),
265+
reason,
266+
attempts,
267+
_payload_meta(text, kwargs),
268+
)
269+
102270
def _send_with_retry(self, channel: str, text: str, kwargs: dict) -> None:
103-
while True:
271+
if not self._breaker.allow_send():
272+
self._dead_letter(channel, text, reason="circuit_open", kwargs=kwargs)
273+
return
274+
275+
max_attempts = self._retry.max_retries + 1
276+
for attempt in range(max_attempts):
104277
try:
105278
self._app.client.chat_postMessage(
106279
channel=channel,
@@ -111,24 +284,45 @@ def _send_with_retry(self, channel: str, text: str, kwargs: dict) -> None:
111284
)
112285
with self._lock:
113286
self._last_send[channel] = time.monotonic()
287+
self._breaker.record_success()
114288
return
115289
except SlackApiError as exc:
116290
if exc.response.status_code == 429:
117-
retry_after = int(exc.response.headers.get("Retry-After", "5"))
291+
if attempt >= self._retry.max_retries:
292+
self._dead_letter(
293+
channel,
294+
text,
295+
reason="retry_exhausted",
296+
attempts=attempt + 1,
297+
kwargs=kwargs,
298+
)
299+
self._breaker.record_failure()
300+
return
301+
retry_after = _parse_retry_after(
302+
exc.response.headers.get("Retry-After", "5"),
303+
)
118304
log.warning(
119-
"MQ 429 rate-limited channel=%s retry_after=%ds",
120-
channel,
305+
"MQ 429 rate-limited %s retry_after=%ds attempt=%d",
306+
_redact_channel(channel),
121307
retry_after,
308+
attempt + 1,
122309
)
123310
time.sleep(retry_after)
124-
# Re-throttle per-channel timer after sleeping
125311
with self._lock:
126312
self._last_send[channel] = time.monotonic()
127313
else:
128-
log.exception("MQ send-fail channel=%s", channel)
314+
log.exception(
315+
"MQ send-fail %s",
316+
_redact_channel(channel),
317+
)
318+
self._breaker.record_failure()
129319
return
130320
except Exception:
131-
log.exception("MQ send-fail channel=%s", channel)
321+
log.exception(
322+
"MQ send-fail %s",
323+
_redact_channel(channel),
324+
)
325+
self._breaker.record_failure()
132326
return
133327

134328

@@ -228,7 +422,8 @@ def notify_channel(app: App, result: PollResult, mq: MessageQueue) -> None:
228422
len(other_hits),
229423
)
230424
for batch in batches:
231-
mq.enqueue(channel, batch)
425+
if not mq.enqueue(channel, batch):
426+
_log_enqueue_rejected("notify_channel")
232427

233428

234429
# ── Per-user DM notifications ─────────────────────────────────────────────────
@@ -269,7 +464,8 @@ def notify_users(app: App, result: PollResult, mq: MessageQueue) -> None:
269464
len(matches.probe_hits),
270465
)
271466
for batch in batches:
272-
mq.enqueue(user_id, batch)
467+
if not mq.enqueue(user_id, batch):
468+
_log_enqueue_rejected("notify_users")
273469

274470

275471
def _batch_lines(lines: list[str], max_len: int) -> list[str]:
@@ -501,7 +697,8 @@ def enqueue_startup_status(
501697
channel = settings.notification_channel
502698
if not channel:
503699
return
504-
mq.enqueue(channel, format_status_message(state, paper_count_fn))
700+
if not mq.enqueue(channel, format_status_message(state, paper_count_fn)):
701+
_log_enqueue_rejected("enqueue_startup_status")
505702

506703

507704
def _handle_version(say, reply_opts: dict) -> None:

0 commit comments

Comments
 (0)