diff --git a/cron/jobs.py b/cron/jobs.py index a120ad5d1..4ae13fe20 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -73,6 +73,7 @@ _jobs_file_lock = threading.RLock() _jobs_lock_state = threading.local() OUTPUT_DIR = CRON_DIR / "output" +FAILURE_DIR = CRON_DIR / "failures" ONESHOT_GRACE_SECONDS = 120 @@ -272,8 +273,10 @@ def ensure_dirs(): """Ensure cron directories exist with secure permissions.""" CRON_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + FAILURE_DIR.mkdir(parents=True, exist_ok=True) _secure_dir(CRON_DIR) _secure_dir(OUTPUT_DIR) + _secure_dir(FAILURE_DIR) # ============================================================================= @@ -1483,6 +1486,93 @@ def save_job_output(job_id: str, output: str): return output_file +def save_job_failure( + job: Dict[str, Any], + *, + success: bool, + error: Optional[str] = None, + output: str = "", + exit_code: Optional[int] = None, + traceback_text: Optional[str] = None, + max_output_chars: int = 4000, +) -> Path: + """Persist a per-job failure record under ``FAILURE_DIR``. + + Captures the last N characters of the job output plus any traceback so + operators can diagnose why a cron job failed without re-running it. + Records are keyed by job id and timestamp; the most recent file per job + is the canonical "latest failure". Failures are written even when the + job later recovers, so the record reflects the *most recent* run status. + + Returns the path of the written record. + """ + ensure_dirs() + job_id = str(job.get("id") or "unknown") + failure_job_dir = FAILURE_DIR / job_id + failure_job_dir.mkdir(parents=True, exist_ok=True) + _secure_dir(failure_job_dir) + + now = _hermes_now() + # Include sub-seconds in the filename so rapid successive failures don't + # collide and overwrite each other. + timestamp = now.strftime("%Y-%m-%d_%H-%M-%S") + f"_{now.microsecond:06d}" + record_file = failure_job_dir / f"{timestamp}.json" + + trimmed_output = output + if len(trimmed_output) > max_output_chars: + trimmed_output = "..." + trimmed_output[-max_output_chars:] + + record = { + "job_id": job_id, + "job_name": str(job.get("name") or job_id), + "timestamp": now.isoformat(), + "success": bool(success), + "exit_code": exit_code, + "error": error, + "traceback": traceback_text, + "last_output": trimmed_output, + } + + fd, tmp_path = tempfile.mkstemp(dir=str(failure_job_dir), suffix=".tmp", prefix=".failure_") + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(record, f, indent=2, default=str) + f.flush() + os.fsync(f.fileno()) + atomic_replace(tmp_path, record_file) + _secure_file(record_file) + except BaseException: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + return record_file + + +def list_job_failures(job_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]: + """Return recent failure records for a job, newest first.""" + failure_job_dir = FAILURE_DIR / job_id + if not failure_job_dir.exists(): + return [] + records: List[Dict[str, Any]] = [] + for path in sorted(failure_job_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True): + try: + records.append(json.loads(path.read_text(encoding="utf-8"))) + except Exception: + continue + if limit is not None and len(records) >= limit: + break + return records + + +def get_latest_failure(job_id: str) -> Optional[Dict[str, Any]]: + """Return the most recent failure record for a job, or None.""" + records = list_job_failures(job_id, limit=1) + return records[0] if records else None + + # ============================================================================= # Skill reference rewriting (curator integration) # ============================================================================= diff --git a/cron/scheduler.py b/cron/scheduler.py index af48de7c1..176cf1f42 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -20,6 +20,7 @@ import subprocess import sys import threading +import traceback # fcntl is Unix-only; on Windows use msvcrt for file locking try: @@ -68,14 +69,14 @@ def _summarize_cron_failure_for_delivery(job: dict, error: str | None) -> str: return ( f"⚠️ Cron '{job_name}' failed: provider {reason}. " "Fallback chain was exhausted or unavailable. " - "Full details saved in cron output." + "Full details saved in cron output / cron/failures." ) if "readtimeout" in lower or "timed out" in lower or "timeout" in lower: return ( f"⚠️ Cron '{job_name}' failed: provider timeout. " "Fallback chain was exhausted or unavailable. " - "Full details saved in cron output." + "Full details saved in cron output / cron/failures." ) # Match authentication/authorization wording at a word boundary and the @@ -84,7 +85,7 @@ def _summarize_cron_failure_for_delivery(job: dict, error: str | None) -> str: if re.search(r"authenticat|authoriz", lower) or re.search(r"\b(401|403)\b", text): return ( f"⚠️ Cron '{job_name}' failed: provider authentication error. " - "Full details saved in cron output." + "Full details saved in cron output / cron/failures." ) # Strip common exception wrappers and collapse provider payloads. Bound @@ -206,9 +207,15 @@ def _resolve_cron_enabled_toolsets(job: dict, cfg: dict) -> list[str] | None: from cron.jobs import ( get_due_jobs, + load_jobs, mark_job_run, mark_job_started, + save_jobs, save_job_output, + save_job_failure, + list_job_failures, + get_latest_failure, + _jobs_lock, advance_next_run, ) @@ -289,6 +296,112 @@ def _get_hermes_home() -> Path: return _hermes_home or get_hermes_home() +def _failure_digest_enabled(cfg: dict) -> bool: + """Return whether ``cron.failure_digest`` is enabled in config.yaml. + + The digest surfaces recent cron failures to the user on the next + interaction. Default disabled (False); opt-in via config.yaml. + """ + try: + cron_cfg = cfg.get("cron", {}) if isinstance(cfg, dict) else {} + return bool(cron_cfg.get("failure_digest", False)) + except Exception: + return False + + +def _load_cron_config() -> dict: + """Load config.yaml, returning an empty dict on any failure.""" + try: + from hermes_cli.config import load_config + + return load_config() or {} + except Exception: + return {} + + +def build_cron_failure_digest(adapters=None, loop=None) -> Optional[str]: + """Build a user-visible digest of recent cron failures. + + Scans all jobs and emits a compact message for any job whose latest + failure record reports success=False and is newer than the job's last + acknowledged digest timestamp (stored in ``failure_digest_last_at``). + Updates that timestamp when a failure is included. + + Returns the digest text, or None if there is nothing new to surface. + """ + cfg = _load_cron_config() + if not _failure_digest_enabled(cfg): + return None + + import datetime as _dt + + now = _hermes_now() + cutoff = now - _dt.timedelta(hours=24) + lines: List[str] = [] + jobs = load_jobs() + for job in jobs: + if not job.get("enabled", True): + continue + record = get_latest_failure(job["id"]) + if not record: + continue + if record.get("success") is True: + continue + try: + ts = _dt.datetime.fromisoformat(str(record.get("timestamp") or "")) + except (TypeError, ValueError): + continue + if ts < cutoff: + continue + + last_ack = job.get("failure_digest_last_at") + if last_ack: + try: + last_ack_dt = _dt.datetime.fromisoformat(str(last_ack)) + if ts <= last_ack_dt: + continue + except (TypeError, ValueError): + pass + + job_name = record.get("job_name") or job.get("name") or job["id"] + err = (record.get("error") or "unknown error")[:120] + lines.append(f"• '{job_name}' failed at {ts.strftime('%Y-%m-%d %H:%M')}: {err}") + + if not lines: + return None + + digest = ( + "⚠️ Cron failure digest (last 24h):\n" + + "\n".join(lines) + + "\n\nFull details: ~/.hermes/cron/failures/" + ) + + # Update ack timestamps so we don't repeat the same failures every turn. + try: + with _jobs_lock(): + jobs = load_jobs() + now_iso = now.isoformat() + changed = False + for job in jobs: + record = get_latest_failure(job["id"]) + if not record or record.get("success") is True: + continue + try: + ts = _dt.datetime.fromisoformat(str(record.get("timestamp") or "")) + except (TypeError, ValueError): + continue + if ts < cutoff: + continue + job["failure_digest_last_at"] = now_iso + changed = True + if changed: + save_jobs(jobs) + except Exception: + logger.debug("Could not update failure_digest_last_at", exc_info=True) + + return digest + + def _get_lock_paths() -> tuple[Path, Path]: """Resolve cron lock paths at call time so profile/env changes are honored.""" hermes_home = _get_hermes_home() @@ -2391,6 +2504,32 @@ def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) - if verbose: logger.info("Output saved to: %s", output_file) + # Persist a failure record whenever a job fails or the agent returns an + # empty response. This is the per-job audit trail that makes silent + # failures visible; successful runs overwrite the latest record so the + # digest only shows current problems. + if not success: + tb = traceback.format_exc() if sys.exc_info()[0] is not None else None + try: + save_job_failure( + job, + success=False, + error=error, + output=output, + traceback_text=tb, + ) + logger.warning( + "Job '%s' failure record saved to cron/failures", + job.get("id"), + ) + except Exception as fe: + logger.error("Could not save cron failure record: %s", fe) + else: + try: + save_job_failure(job, success=True, output=output) + except Exception: + pass + # Deliver the final response to the origin/target chat. # If the agent responded with [SILENT], skip delivery (but # output is already saved above). Failed jobs always deliver. diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 669537b30..026fd67cd 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -2345,6 +2345,11 @@ def _ensure_hermes_home_managed(home: Path): # 1 = serial (pre-v0.9 behaviour). # Also overridable via HERMES_CRON_MAX_PARALLEL env var. "max_parallel_jobs": None, + # Optional user-visible digest that surfaces recent cron failures on the + # next interaction. Set ``cron.failure_digest: true`` in config.yaml to + # enable; default is false so existing users are not surprised by new + # messages. No env var — config.yaml is the canonical UI. + "failure_digest": False, }, # Kanban multi-agent coordination — controls the dispatcher loop that @@ -4091,6 +4096,7 @@ def _normalize_custom_provider_entry( "api_mode", "transport", "model", "default_model", "models", "context_length", "rate_limit_delay", "request_timeout_seconds", "stale_timeout_seconds", + "circuit_breaker", "discover_models", "extra_body", } for camel, snake in _CAMEL_ALIASES.items(): diff --git a/tests/cron/test_cron_failure_logging.py b/tests/cron/test_cron_failure_logging.py new file mode 100644 index 000000000..487bd114d --- /dev/null +++ b/tests/cron/test_cron_failure_logging.py @@ -0,0 +1,204 @@ +"""Tests for cron failure logging / per-job failure records / digest (issue #433). + +These tests exercise the focused first slice added to cron/scheduler.py and +cron/jobs.py: + +* ``save_job_failure`` / ``list_job_failures`` / ``get_latest_failure`` persistence +* ``run_one_job`` writes a failure record on agent/script failure +* ``run_one_job`` writes a success marker on recovery +* ``build_cron_failure_digest`` respects the ``cron.failure_digest`` config key +* failure records include last-N output and traceback +""" + +import contextlib +import json +import logging +import os +import time +from pathlib import Path +from unittest.mock import patch + +import pytest + +import cron.jobs as jobs +import cron.scheduler as scheduler +from cron.scheduler import build_cron_failure_digest + + +@pytest.fixture(autouse=True) +def _patch_hermes_home(tmp_path, monkeypatch): + """Redirect HERMES_HOME and scheduler's internal override to a temp dir.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + scheduler._hermes_home = tmp_path + jobs.HERMES_DIR = tmp_path + jobs.CRON_DIR = tmp_path / "cron" + jobs.OUTPUT_DIR = jobs.CRON_DIR / "output" + jobs.FAILURE_DIR = jobs.CRON_DIR / "failures" + jobs.JOBS_FILE = jobs.CRON_DIR / "jobs.json" + jobs.TICKER_HEARTBEAT_FILE = jobs.CRON_DIR / "ticker_heartbeat" + jobs.TICKER_SUCCESS_FILE = jobs.CRON_DIR / "ticker_last_success" + jobs.ensure_dirs() + + +def _write_jobs(jobs_list): + """Persist a raw jobs list directly to the temp jobs.json.""" + jobs.CRON_DIR.mkdir(parents=True, exist_ok=True) + jobs.JOBS_FILE.write_text( + json.dumps({"jobs": jobs_list, "updated_at": jobs._hermes_now().isoformat()}), + encoding="utf-8", + ) + + +def test_save_job_failure_writes_record(tmp_path): + job = {"id": "j1", "name": "test job"} + record_path = jobs.save_job_failure( + job, + success=False, + error="boom", + output="x" * 5000 + "\nLAST LINE", + traceback_text="Traceback (most recent call last):\n ...", + ) + + assert record_path.exists() + assert jobs.FAILURE_DIR in record_path.parents + data = json.loads(record_path.read_text(encoding="utf-8")) + assert data["job_id"] == "j1" + assert data["job_name"] == "test job" + assert data["success"] is False + assert data["error"] == "boom" + assert "Traceback" in data["traceback"] + # last-N output trimming + assert data["last_output"].startswith("...") + assert "LAST LINE" in data["last_output"] + + +def test_save_job_failure_success_marker_overwrites_digest_state(tmp_path): + job = {"id": "j2", "name": "good job"} + jobs.save_job_failure(job, success=False, error="old") + path = jobs.save_job_failure(job, success=True, output="ok") + data = json.loads(path.read_text(encoding="utf-8")) + assert data["success"] is True + assert data["error"] is None + + +def test_list_and_get_latest_failure(tmp_path): + job = {"id": "j3", "name": "multi"} + p1 = jobs.save_job_failure(job, success=False, error="first") + time.sleep(0.05) + p2 = jobs.save_job_failure(job, success=False, error="second") + + latest = jobs.get_latest_failure("j3") + assert latest["error"] == "second" + + all_records = jobs.list_job_failures("j3") + assert len(all_records) == 2 + assert all_records[0]["error"] == "second" + assert all_records[1]["error"] == "first" + + +def test_run_one_job_writes_failure_record_on_agent_failure(monkeypatch): + def fake_run_job(job): + return False, "agent output", "", "provider 429 rate limit" + + monkeypatch.setattr(scheduler, "run_job", fake_run_job) + monkeypatch.setattr( + scheduler, "save_job_output", lambda jid, out: Path("/tmp/out.md") + ) + monkeypatch.setattr(scheduler, "_deliver_result", lambda *a, **kw: None) + monkeypatch.setattr(scheduler, "mark_job_run", lambda *a, **kw: None) + + scheduler.run_one_job({"id": "j4", "name": "fail job"}) + + latest = jobs.get_latest_failure("j4") + assert latest is not None + assert latest["success"] is False + assert latest["error"] + assert "429" in latest["error"] + assert latest["last_output"] == "agent output" + + +def test_run_one_job_writes_success_marker(monkeypatch): + def fake_run_job(job): + return True, "all good", "final response", None + + monkeypatch.setattr(scheduler, "run_job", fake_run_job) + monkeypatch.setattr( + scheduler, "save_job_output", lambda jid, out: Path("/tmp/out.md") + ) + monkeypatch.setattr(scheduler, "_deliver_result", lambda *a, **kw: None) + monkeypatch.setattr(scheduler, "mark_job_run", lambda *a, **kw: None) + + scheduler.run_one_job({"id": "j5", "name": "ok job"}) + + latest = jobs.get_latest_failure("j5") + assert latest is not None + assert latest["success"] is True + + +def test_failure_digest_disabled_by_default(monkeypatch): + assert scheduler._failure_digest_enabled({}) is False + assert ( + scheduler._failure_digest_enabled({"cron": {"failure_digest": False}}) is False + ) + assert ( + scheduler._failure_digest_enabled({"cron": {"failure_digest": "true"}}) is True + ) + + +def test_build_digest_respects_failure_digest_config(monkeypatch): + _write_jobs([{"id": "j6", "name": "digested", "enabled": True}]) + jobs.save_job_failure({"id": "j6", "name": "digested"}, success=False, error="boom") + + # Disabled → no digest + assert build_cron_failure_digest() is None + + # Enabled → digest emitted and ack timestamp updated + monkeypatch.setattr( + scheduler, "_load_cron_config", lambda: {"cron": {"failure_digest": True}} + ) + digest = build_cron_failure_digest() + assert digest is not None + assert "j6" in digest or "digested" in digest + assert "boom" in digest + + saved = json.loads(jobs.JOBS_FILE.read_text(encoding="utf-8")) + assert saved["jobs"][0].get("failure_digest_last_at") + + # Same failure is now acked → no second digest + assert build_cron_failure_digest() is None + + +def test_build_digest_ignores_success_records_and_old_failures(monkeypatch, tmp_path): + _write_jobs([{"id": "j7", "name": "mixed", "enabled": True}]) + monkeypatch.setattr( + scheduler, "_load_cron_config", lambda: {"cron": {"failure_digest": True}} + ) + + jobs.save_job_failure({"id": "j7", "name": "mixed"}, success=True) + assert build_cron_failure_digest() is None + + # Old failure (timestamp in 2020) should not surface + old_path = jobs.save_job_failure( + {"id": "j7", "name": "mixed"}, success=False, error="old" + ) + data = json.loads(old_path.read_text(encoding="utf-8")) + data["timestamp"] = "2020-01-01T00:00:00+00:00" + old_path.write_text(json.dumps(data), encoding="utf-8") + assert build_cron_failure_digest() is None + + +def test_run_one_job_failure_record_logs_warning(caplog, monkeypatch): + def fake_run_job(job): + return False, "out", "", "bang" + + monkeypatch.setattr(scheduler, "run_job", fake_run_job) + monkeypatch.setattr( + scheduler, "save_job_output", lambda jid, out: Path("/tmp/out.md") + ) + monkeypatch.setattr(scheduler, "_deliver_result", lambda *a, **kw: None) + monkeypatch.setattr(scheduler, "mark_job_run", lambda *a, **kw: None) + + with caplog.at_level(logging.WARNING, logger="cron.scheduler"): + scheduler.run_one_job({"id": "j8", "name": "warn job"}) + + assert any("failure record saved" in rec.message for rec in caplog.records)