From 13386d224b055f2e204bd97c1ffc5945959ce38c Mon Sep 17 00:00:00 2001 From: Hermes Evolution Date: Tue, 23 Jun 2026 06:09:18 +0200 Subject: [PATCH 1/2] feat: cron failure records + digest for silent job failures (#433) - Persist per-job run status under ~/.hermes/cron/failures/ via save_job_failure/list_job_failures/get_latest_failure. - run_one_job writes a failure record (last output + traceback) on every failed run and a success marker on recovery, replacing invisible cron failures with a durable audit trail. - Add opt-in cron.failure_digest config key; build_cron_failure_digest() surfaces recent un-acked failures to the user on the next interaction. - Recognize circuit_breaker as a known custom-provider config key. - Add tests covering persistence, latest resolution, run_one_job hooks, digest gating, and ack behavior. Closes #433 Co-Authored-By: Hermes Evolution --- cron/jobs.py | 90 +++++++++++ cron/scheduler.py | 145 ++++++++++++++++- hermes_cli/config.py | 6 + tests/cron/test_cron_failure_logging.py | 204 ++++++++++++++++++++++++ 4 files changed, 442 insertions(+), 3 deletions(-) create mode 100644 tests/cron/test_cron_failure_logging.py diff --git a/cron/jobs.py b/cron/jobs.py index a120ad5d1..4ae13fe20 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -73,6 +73,7 @@ _jobs_file_lock = threading.RLock() _jobs_lock_state = threading.local() OUTPUT_DIR = CRON_DIR / "output" +FAILURE_DIR = CRON_DIR / "failures" ONESHOT_GRACE_SECONDS = 120 @@ -272,8 +273,10 @@ def ensure_dirs(): """Ensure cron directories exist with secure permissions.""" CRON_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + FAILURE_DIR.mkdir(parents=True, exist_ok=True) _secure_dir(CRON_DIR) _secure_dir(OUTPUT_DIR) + _secure_dir(FAILURE_DIR) # ============================================================================= @@ -1483,6 +1486,93 @@ def save_job_output(job_id: str, output: str): return output_file +def save_job_failure( + job: Dict[str, Any], + *, + success: bool, + error: Optional[str] = None, + output: str = "", + exit_code: Optional[int] = None, + traceback_text: Optional[str] = None, + max_output_chars: int = 4000, +) -> Path: + """Persist a per-job failure record under ``FAILURE_DIR``. + + Captures the last N characters of the job output plus any traceback so + operators can diagnose why a cron job failed without re-running it. + Records are keyed by job id and timestamp; the most recent file per job + is the canonical "latest failure". Failures are written even when the + job later recovers, so the record reflects the *most recent* run status. + + Returns the path of the written record. + """ + ensure_dirs() + job_id = str(job.get("id") or "unknown") + failure_job_dir = FAILURE_DIR / job_id + failure_job_dir.mkdir(parents=True, exist_ok=True) + _secure_dir(failure_job_dir) + + now = _hermes_now() + # Include sub-seconds in the filename so rapid successive failures don't + # collide and overwrite each other. + timestamp = now.strftime("%Y-%m-%d_%H-%M-%S") + f"_{now.microsecond:06d}" + record_file = failure_job_dir / f"{timestamp}.json" + + trimmed_output = output + if len(trimmed_output) > max_output_chars: + trimmed_output = "..." + trimmed_output[-max_output_chars:] + + record = { + "job_id": job_id, + "job_name": str(job.get("name") or job_id), + "timestamp": now.isoformat(), + "success": bool(success), + "exit_code": exit_code, + "error": error, + "traceback": traceback_text, + "last_output": trimmed_output, + } + + fd, tmp_path = tempfile.mkstemp(dir=str(failure_job_dir), suffix=".tmp", prefix=".failure_") + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(record, f, indent=2, default=str) + f.flush() + os.fsync(f.fileno()) + atomic_replace(tmp_path, record_file) + _secure_file(record_file) + except BaseException: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + return record_file + + +def list_job_failures(job_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]: + """Return recent failure records for a job, newest first.""" + failure_job_dir = FAILURE_DIR / job_id + if not failure_job_dir.exists(): + return [] + records: List[Dict[str, Any]] = [] + for path in sorted(failure_job_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True): + try: + records.append(json.loads(path.read_text(encoding="utf-8"))) + except Exception: + continue + if limit is not None and len(records) >= limit: + break + return records + + +def get_latest_failure(job_id: str) -> Optional[Dict[str, Any]]: + """Return the most recent failure record for a job, or None.""" + records = list_job_failures(job_id, limit=1) + return records[0] if records else None + + # ============================================================================= # Skill reference rewriting (curator integration) # ============================================================================= diff --git a/cron/scheduler.py b/cron/scheduler.py index af48de7c1..176cf1f42 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -20,6 +20,7 @@ import subprocess import sys import threading +import traceback # fcntl is Unix-only; on Windows use msvcrt for file locking try: @@ -68,14 +69,14 @@ def _summarize_cron_failure_for_delivery(job: dict, error: str | None) -> str: return ( f"⚠️ Cron '{job_name}' failed: provider {reason}. " "Fallback chain was exhausted or unavailable. " - "Full details saved in cron output." + "Full details saved in cron output / cron/failures." ) if "readtimeout" in lower or "timed out" in lower or "timeout" in lower: return ( f"⚠️ Cron '{job_name}' failed: provider timeout. " "Fallback chain was exhausted or unavailable. " - "Full details saved in cron output." + "Full details saved in cron output / cron/failures." ) # Match authentication/authorization wording at a word boundary and the @@ -84,7 +85,7 @@ def _summarize_cron_failure_for_delivery(job: dict, error: str | None) -> str: if re.search(r"authenticat|authoriz", lower) or re.search(r"\b(401|403)\b", text): return ( f"⚠️ Cron '{job_name}' failed: provider authentication error. " - "Full details saved in cron output." + "Full details saved in cron output / cron/failures." ) # Strip common exception wrappers and collapse provider payloads. Bound @@ -206,9 +207,15 @@ def _resolve_cron_enabled_toolsets(job: dict, cfg: dict) -> list[str] | None: from cron.jobs import ( get_due_jobs, + load_jobs, mark_job_run, mark_job_started, + save_jobs, save_job_output, + save_job_failure, + list_job_failures, + get_latest_failure, + _jobs_lock, advance_next_run, ) @@ -289,6 +296,112 @@ def _get_hermes_home() -> Path: return _hermes_home or get_hermes_home() +def _failure_digest_enabled(cfg: dict) -> bool: + """Return whether ``cron.failure_digest`` is enabled in config.yaml. + + The digest surfaces recent cron failures to the user on the next + interaction. Default disabled (False); opt-in via config.yaml. + """ + try: + cron_cfg = cfg.get("cron", {}) if isinstance(cfg, dict) else {} + return bool(cron_cfg.get("failure_digest", False)) + except Exception: + return False + + +def _load_cron_config() -> dict: + """Load config.yaml, returning an empty dict on any failure.""" + try: + from hermes_cli.config import load_config + + return load_config() or {} + except Exception: + return {} + + +def build_cron_failure_digest(adapters=None, loop=None) -> Optional[str]: + """Build a user-visible digest of recent cron failures. + + Scans all jobs and emits a compact message for any job whose latest + failure record reports success=False and is newer than the job's last + acknowledged digest timestamp (stored in ``failure_digest_last_at``). + Updates that timestamp when a failure is included. + + Returns the digest text, or None if there is nothing new to surface. + """ + cfg = _load_cron_config() + if not _failure_digest_enabled(cfg): + return None + + import datetime as _dt + + now = _hermes_now() + cutoff = now - _dt.timedelta(hours=24) + lines: List[str] = [] + jobs = load_jobs() + for job in jobs: + if not job.get("enabled", True): + continue + record = get_latest_failure(job["id"]) + if not record: + continue + if record.get("success") is True: + continue + try: + ts = _dt.datetime.fromisoformat(str(record.get("timestamp") or "")) + except (TypeError, ValueError): + continue + if ts < cutoff: + continue + + last_ack = job.get("failure_digest_last_at") + if last_ack: + try: + last_ack_dt = _dt.datetime.fromisoformat(str(last_ack)) + if ts <= last_ack_dt: + continue + except (TypeError, ValueError): + pass + + job_name = record.get("job_name") or job.get("name") or job["id"] + err = (record.get("error") or "unknown error")[:120] + lines.append(f"• '{job_name}' failed at {ts.strftime('%Y-%m-%d %H:%M')}: {err}") + + if not lines: + return None + + digest = ( + "⚠️ Cron failure digest (last 24h):\n" + + "\n".join(lines) + + "\n\nFull details: ~/.hermes/cron/failures/" + ) + + # Update ack timestamps so we don't repeat the same failures every turn. + try: + with _jobs_lock(): + jobs = load_jobs() + now_iso = now.isoformat() + changed = False + for job in jobs: + record = get_latest_failure(job["id"]) + if not record or record.get("success") is True: + continue + try: + ts = _dt.datetime.fromisoformat(str(record.get("timestamp") or "")) + except (TypeError, ValueError): + continue + if ts < cutoff: + continue + job["failure_digest_last_at"] = now_iso + changed = True + if changed: + save_jobs(jobs) + except Exception: + logger.debug("Could not update failure_digest_last_at", exc_info=True) + + return digest + + def _get_lock_paths() -> tuple[Path, Path]: """Resolve cron lock paths at call time so profile/env changes are honored.""" hermes_home = _get_hermes_home() @@ -2391,6 +2504,32 @@ def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) - if verbose: logger.info("Output saved to: %s", output_file) + # Persist a failure record whenever a job fails or the agent returns an + # empty response. This is the per-job audit trail that makes silent + # failures visible; successful runs overwrite the latest record so the + # digest only shows current problems. + if not success: + tb = traceback.format_exc() if sys.exc_info()[0] is not None else None + try: + save_job_failure( + job, + success=False, + error=error, + output=output, + traceback_text=tb, + ) + logger.warning( + "Job '%s' failure record saved to cron/failures", + job.get("id"), + ) + except Exception as fe: + logger.error("Could not save cron failure record: %s", fe) + else: + try: + save_job_failure(job, success=True, output=output) + except Exception: + pass + # Deliver the final response to the origin/target chat. # If the agent responded with [SILENT], skip delivery (but # output is already saved above). Failed jobs always deliver. diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 669537b30..026fd67cd 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -2345,6 +2345,11 @@ def _ensure_hermes_home_managed(home: Path): # 1 = serial (pre-v0.9 behaviour). # Also overridable via HERMES_CRON_MAX_PARALLEL env var. "max_parallel_jobs": None, + # Optional user-visible digest that surfaces recent cron failures on the + # next interaction. Set ``cron.failure_digest: true`` in config.yaml to + # enable; default is false so existing users are not surprised by new + # messages. No env var — config.yaml is the canonical UI. + "failure_digest": False, }, # Kanban multi-agent coordination — controls the dispatcher loop that @@ -4091,6 +4096,7 @@ def _normalize_custom_provider_entry( "api_mode", "transport", "model", "default_model", "models", "context_length", "rate_limit_delay", "request_timeout_seconds", "stale_timeout_seconds", + "circuit_breaker", "discover_models", "extra_body", } for camel, snake in _CAMEL_ALIASES.items(): diff --git a/tests/cron/test_cron_failure_logging.py b/tests/cron/test_cron_failure_logging.py new file mode 100644 index 000000000..487bd114d --- /dev/null +++ b/tests/cron/test_cron_failure_logging.py @@ -0,0 +1,204 @@ +"""Tests for cron failure logging / per-job failure records / digest (issue #433). + +These tests exercise the focused first slice added to cron/scheduler.py and +cron/jobs.py: + +* ``save_job_failure`` / ``list_job_failures`` / ``get_latest_failure`` persistence +* ``run_one_job`` writes a failure record on agent/script failure +* ``run_one_job`` writes a success marker on recovery +* ``build_cron_failure_digest`` respects the ``cron.failure_digest`` config key +* failure records include last-N output and traceback +""" + +import contextlib +import json +import logging +import os +import time +from pathlib import Path +from unittest.mock import patch + +import pytest + +import cron.jobs as jobs +import cron.scheduler as scheduler +from cron.scheduler import build_cron_failure_digest + + +@pytest.fixture(autouse=True) +def _patch_hermes_home(tmp_path, monkeypatch): + """Redirect HERMES_HOME and scheduler's internal override to a temp dir.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + scheduler._hermes_home = tmp_path + jobs.HERMES_DIR = tmp_path + jobs.CRON_DIR = tmp_path / "cron" + jobs.OUTPUT_DIR = jobs.CRON_DIR / "output" + jobs.FAILURE_DIR = jobs.CRON_DIR / "failures" + jobs.JOBS_FILE = jobs.CRON_DIR / "jobs.json" + jobs.TICKER_HEARTBEAT_FILE = jobs.CRON_DIR / "ticker_heartbeat" + jobs.TICKER_SUCCESS_FILE = jobs.CRON_DIR / "ticker_last_success" + jobs.ensure_dirs() + + +def _write_jobs(jobs_list): + """Persist a raw jobs list directly to the temp jobs.json.""" + jobs.CRON_DIR.mkdir(parents=True, exist_ok=True) + jobs.JOBS_FILE.write_text( + json.dumps({"jobs": jobs_list, "updated_at": jobs._hermes_now().isoformat()}), + encoding="utf-8", + ) + + +def test_save_job_failure_writes_record(tmp_path): + job = {"id": "j1", "name": "test job"} + record_path = jobs.save_job_failure( + job, + success=False, + error="boom", + output="x" * 5000 + "\nLAST LINE", + traceback_text="Traceback (most recent call last):\n ...", + ) + + assert record_path.exists() + assert jobs.FAILURE_DIR in record_path.parents + data = json.loads(record_path.read_text(encoding="utf-8")) + assert data["job_id"] == "j1" + assert data["job_name"] == "test job" + assert data["success"] is False + assert data["error"] == "boom" + assert "Traceback" in data["traceback"] + # last-N output trimming + assert data["last_output"].startswith("...") + assert "LAST LINE" in data["last_output"] + + +def test_save_job_failure_success_marker_overwrites_digest_state(tmp_path): + job = {"id": "j2", "name": "good job"} + jobs.save_job_failure(job, success=False, error="old") + path = jobs.save_job_failure(job, success=True, output="ok") + data = json.loads(path.read_text(encoding="utf-8")) + assert data["success"] is True + assert data["error"] is None + + +def test_list_and_get_latest_failure(tmp_path): + job = {"id": "j3", "name": "multi"} + p1 = jobs.save_job_failure(job, success=False, error="first") + time.sleep(0.05) + p2 = jobs.save_job_failure(job, success=False, error="second") + + latest = jobs.get_latest_failure("j3") + assert latest["error"] == "second" + + all_records = jobs.list_job_failures("j3") + assert len(all_records) == 2 + assert all_records[0]["error"] == "second" + assert all_records[1]["error"] == "first" + + +def test_run_one_job_writes_failure_record_on_agent_failure(monkeypatch): + def fake_run_job(job): + return False, "agent output", "", "provider 429 rate limit" + + monkeypatch.setattr(scheduler, "run_job", fake_run_job) + monkeypatch.setattr( + scheduler, "save_job_output", lambda jid, out: Path("/tmp/out.md") + ) + monkeypatch.setattr(scheduler, "_deliver_result", lambda *a, **kw: None) + monkeypatch.setattr(scheduler, "mark_job_run", lambda *a, **kw: None) + + scheduler.run_one_job({"id": "j4", "name": "fail job"}) + + latest = jobs.get_latest_failure("j4") + assert latest is not None + assert latest["success"] is False + assert latest["error"] + assert "429" in latest["error"] + assert latest["last_output"] == "agent output" + + +def test_run_one_job_writes_success_marker(monkeypatch): + def fake_run_job(job): + return True, "all good", "final response", None + + monkeypatch.setattr(scheduler, "run_job", fake_run_job) + monkeypatch.setattr( + scheduler, "save_job_output", lambda jid, out: Path("/tmp/out.md") + ) + monkeypatch.setattr(scheduler, "_deliver_result", lambda *a, **kw: None) + monkeypatch.setattr(scheduler, "mark_job_run", lambda *a, **kw: None) + + scheduler.run_one_job({"id": "j5", "name": "ok job"}) + + latest = jobs.get_latest_failure("j5") + assert latest is not None + assert latest["success"] is True + + +def test_failure_digest_disabled_by_default(monkeypatch): + assert scheduler._failure_digest_enabled({}) is False + assert ( + scheduler._failure_digest_enabled({"cron": {"failure_digest": False}}) is False + ) + assert ( + scheduler._failure_digest_enabled({"cron": {"failure_digest": "true"}}) is True + ) + + +def test_build_digest_respects_failure_digest_config(monkeypatch): + _write_jobs([{"id": "j6", "name": "digested", "enabled": True}]) + jobs.save_job_failure({"id": "j6", "name": "digested"}, success=False, error="boom") + + # Disabled → no digest + assert build_cron_failure_digest() is None + + # Enabled → digest emitted and ack timestamp updated + monkeypatch.setattr( + scheduler, "_load_cron_config", lambda: {"cron": {"failure_digest": True}} + ) + digest = build_cron_failure_digest() + assert digest is not None + assert "j6" in digest or "digested" in digest + assert "boom" in digest + + saved = json.loads(jobs.JOBS_FILE.read_text(encoding="utf-8")) + assert saved["jobs"][0].get("failure_digest_last_at") + + # Same failure is now acked → no second digest + assert build_cron_failure_digest() is None + + +def test_build_digest_ignores_success_records_and_old_failures(monkeypatch, tmp_path): + _write_jobs([{"id": "j7", "name": "mixed", "enabled": True}]) + monkeypatch.setattr( + scheduler, "_load_cron_config", lambda: {"cron": {"failure_digest": True}} + ) + + jobs.save_job_failure({"id": "j7", "name": "mixed"}, success=True) + assert build_cron_failure_digest() is None + + # Old failure (timestamp in 2020) should not surface + old_path = jobs.save_job_failure( + {"id": "j7", "name": "mixed"}, success=False, error="old" + ) + data = json.loads(old_path.read_text(encoding="utf-8")) + data["timestamp"] = "2020-01-01T00:00:00+00:00" + old_path.write_text(json.dumps(data), encoding="utf-8") + assert build_cron_failure_digest() is None + + +def test_run_one_job_failure_record_logs_warning(caplog, monkeypatch): + def fake_run_job(job): + return False, "out", "", "bang" + + monkeypatch.setattr(scheduler, "run_job", fake_run_job) + monkeypatch.setattr( + scheduler, "save_job_output", lambda jid, out: Path("/tmp/out.md") + ) + monkeypatch.setattr(scheduler, "_deliver_result", lambda *a, **kw: None) + monkeypatch.setattr(scheduler, "mark_job_run", lambda *a, **kw: None) + + with caplog.at_level(logging.WARNING, logger="cron.scheduler"): + scheduler.run_one_job({"id": "j8", "name": "warn job"}) + + assert any("failure record saved" in rec.message for rec in caplog.records) From 9d8aafe2042876468eab53c6e6d10a2367ab4c53 Mon Sep 17 00:00:00 2001 From: Hermes Evolution Date: Tue, 23 Jun 2026 10:10:12 +0200 Subject: [PATCH 2/2] feat: wire cron failure digest into CLI user turn (#433) The persistence and build_cron_failure_digest helper from the first slice were not connected to any user-interaction path, so the digest never reached the operator. This change: - Adds a small lazy-import helper _get_cron_failure_digest_for_user(). - Surfaces the digest at the start of HermesCLI.chat() before the user message is sent to the agent. - Prepends the same digest to the model's user_message so the agent sees the recent cron failures without adding a phantom turn to history. - Adds integration tests proving the digest reaches run_conversation and that it is skipped when no digest is available. Closes #433 Co-Authored-By: Hermes Evolution --- cli.py | 31 ++++- .../test_cron_failure_digest_integration.py | 116 ++++++++++++++++++ 2 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 tests/cli/test_cron_failure_digest_integration.py diff --git a/cli.py b/cli.py index 794bf6576..46cb9ed46 100644 --- a/cli.py +++ b/cli.py @@ -2396,6 +2396,23 @@ def _prepend_note_to_message(message, note: str): return message +def _get_cron_failure_digest_for_user() -> Optional[str]: + """Build a user-visible cron failure digest if enabled and failures exist. + + Returns a formatted digest string when ``cron.failure_digest`` is enabled + and there are un-acknowledged cron failures within the last 24 hours. + Returns ``None`` otherwise. The underlying implementation updates ack + timestamps only when it actually emits a digest, so calling this on every + user turn is safe and will not repeat the same failure. + """ + try: + from cron.scheduler import build_cron_failure_digest + + return build_cron_failure_digest() + except Exception: + return None + + # --------------------------------------------------------------------------- # File-drop / local attachment detection — extracted as pure helpers for tests. # --------------------------------------------------------------------------- @@ -10984,6 +11001,13 @@ def chat(self, message, images: list = None) -> Optional[str]: from run_agent import _sanitize_surrogates message = _sanitize_surrogates(message) + # Surface recent cron failures to the operator before this turn. + # The digest is opt-in via ``cron.failure_digest`` and acks on delivery, + # so the same failure is surfaced only once per user interaction cycle. + _cron_failure_digest = _get_cron_failure_digest_for_user() + if _cron_failure_digest: + _cprint(f"\n{_cron_failure_digest}\n") + # Add user message to history self.conversation_history.append({"role": "user", "content": message}) @@ -11102,9 +11126,14 @@ def run_agent(): reset_current_session_key = None # type: ignore[assignment] _approval_session_token = None agent_message = _voice_prefix + message if _voice_prefix else message + # If recent cron failures were surfaced, prepend them to the + # user message so the model sees them without adding a phantom + # turn to conversation_history. + if _cron_failure_digest: + agent_message = _prepend_note_to_message(agent_message, _cron_failure_digest) # Prepend pending notes via _prepend_note_to_message, which # handles both plain-string and multimodal content-parts list - # messages. Naive ``note + "\n\n" + agent_message`` crashed with + # messages. Naive ``note + "\\n\\n" + agent_message`` crashed with # TypeError when an image was attached (agent_message is a list) # and a /model or /reload-skills note was queued for the turn. _msn = getattr(self, '_pending_model_switch_note', None) diff --git a/tests/cli/test_cron_failure_digest_integration.py b/tests/cli/test_cron_failure_digest_integration.py new file mode 100644 index 000000000..254583501 --- /dev/null +++ b/tests/cli/test_cron_failure_digest_integration.py @@ -0,0 +1,116 @@ +"""Integration tests for cron failure digest surfacing in the CLI (issue #433). + +The prior slice added ``build_cron_failure_digest`` in ``cron/scheduler.py`` and +persisted cron failures on disk, but the digest was dead code: no user +interaction path invoked it. This test verifies that ``HermesCLI.chat()`` +now surfaces the digest both to the terminal and to the model's +``user_message`` on the next user turn, and that ack timestamps are only +updated when a digest is actually delivered. +""" + +import os +from unittest.mock import MagicMock, patch + +import cli as cli_module +import pytest +from cli import HermesCLI, _get_cron_failure_digest_for_user + + +def _clean_config(): + return { + "model": { + "default": "anthropic/claude-opus-4.6", + "base_url": "https://openrouter.ai/api/v1", + "provider": "auto", + }, + "display": {"compact": False, "tool_progress": "all"}, + "agent": {}, + "terminal": {"env_type": "local"}, + } + + +class TestCronFailureDigestHelper: + def test_returns_digest_when_available(self): + with patch( + "cron.scheduler.build_cron_failure_digest", + return_value="⚠️ Cron failure digest", + ) as mock_digest: + assert _get_cron_failure_digest_for_user() == "⚠️ Cron failure digest" + mock_digest.assert_called_once_with() + + def test_swallows_exceptions_and_returns_none(self): + with patch( + "cron.scheduler.build_cron_failure_digest", side_effect=RuntimeError("boom") + ): + assert _get_cron_failure_digest_for_user() is None + + +class TestCronFailureDigestInChat: + @pytest.fixture + def cli_obj(self): + with patch("cli.get_tool_definitions", return_value=[]), patch.dict( + "os.environ", {"LLM_MODEL": "", "HERMES_MAX_ITERATIONS": ""}, clear=False + ), patch.dict(cli_module.__dict__, {"CLI_CONFIG": _clean_config()}): + obj = HermesCLI() + fake_agent = MagicMock() + fake_agent.run_conversation.return_value = { + "final_response": "ok", + "messages": [], + } + obj.agent = fake_agent + yield obj + + def test_digest_prepended_to_user_message(self, cli_obj): + digest = "⚠️ Cron failure digest (last 24h):\n• 'job' failed" + with patch( + "cli._get_cron_failure_digest_for_user", return_value=digest + ), patch.object(cli_obj, "_ensure_runtime_credentials", return_value=True), patch.object( + cli_obj, + "_resolve_turn_agent_config", + return_value={ + "signature": getattr(cli_obj, "_active_agent_route_signature", None), + "model": cli_obj.model, + "runtime": None, + "request_overrides": {}, + }, + ), patch.object( + cli_obj, "_init_agent", return_value=True + ), patch.object( + cli_obj, "_reset_stream_state" + ), patch.object(cli_obj, "_flush_stream"), patch.object( + cli_obj, "_flush_credit_notices" + ): + cli_obj.chat("hello") + + calls = cli_obj.agent.run_conversation.call_args_list + assert len(calls) == 1 + _, kwargs = calls[0] + user_message = kwargs["user_message"] + assert digest in user_message + assert "hello" in user_message + + def test_no_digest_when_none_available(self, cli_obj): + with patch( + "cli._get_cron_failure_digest_for_user", return_value=None + ), patch.object(cli_obj, "_ensure_runtime_credentials", return_value=True), patch.object( + cli_obj, + "_resolve_turn_agent_config", + return_value={ + "signature": getattr(cli_obj, "_active_agent_route_signature", None), + "model": cli_obj.model, + "runtime": None, + "request_overrides": {}, + }, + ), patch.object( + cli_obj, "_init_agent", return_value=True + ), patch.object( + cli_obj, "_reset_stream_state" + ), patch.object(cli_obj, "_flush_stream"), patch.object( + cli_obj, "_flush_credit_notices" + ): + cli_obj.chat("hello") + + calls = cli_obj.agent.run_conversation.call_args_list + assert len(calls) == 1 + _, kwargs = calls[0] + assert kwargs["user_message"] == "hello"