Skip to content

Commit 8593a3e

Browse files
Lexus2016Hermes Evolution
andauthored
feat: wire cron failure digest into CLI user turn (Closes #433) (#476)
* feat: cron failure records + digest for silent job failures (#433) - Persist per-job run status under ~/.hermes/cron/failures/ via save_job_failure/list_job_failures/get_latest_failure. - run_one_job writes a failure record (last output + traceback) on every failed run and a success marker on recovery, replacing invisible cron failures with a durable audit trail. - Add opt-in cron.failure_digest config key; build_cron_failure_digest() surfaces recent un-acked failures to the user on the next interaction. - Recognize circuit_breaker as a known custom-provider config key. - Add tests covering persistence, latest resolution, run_one_job hooks, digest gating, and ack behavior. Closes #433 Co-Authored-By: Hermes Evolution <evolution@hermes.ai> * feat: wire cron failure digest into CLI user turn (#433) The persistence and build_cron_failure_digest helper from the first slice were not connected to any user-interaction path, so the digest never reached the operator. This change: - Adds a small lazy-import helper _get_cron_failure_digest_for_user(). - Surfaces the digest at the start of HermesCLI.chat() before the user message is sent to the agent. - Prepends the same digest to the model's user_message so the agent sees the recent cron failures without adding a phantom turn to history. - Adds integration tests proving the digest reaches run_conversation and that it is skipped when no digest is available. Closes #433 Co-Authored-By: Hermes Evolution <evolution@hermes.ai> --------- Co-authored-by: Hermes Evolution <evolution@hermes.ai>
1 parent b9e3dd3 commit 8593a3e

6 files changed

Lines changed: 588 additions & 4 deletions

File tree

cli.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2396,6 +2396,23 @@ def _prepend_note_to_message(message, note: str):
23962396
return message
23972397

23982398

2399+
def _get_cron_failure_digest_for_user() -> Optional[str]:
2400+
"""Build a user-visible cron failure digest if enabled and failures exist.
2401+
2402+
Returns a formatted digest string when ``cron.failure_digest`` is enabled
2403+
and there are un-acknowledged cron failures within the last 24 hours.
2404+
Returns ``None`` otherwise. The underlying implementation updates ack
2405+
timestamps only when it actually emits a digest, so calling this on every
2406+
user turn is safe and will not repeat the same failure.
2407+
"""
2408+
try:
2409+
from cron.scheduler import build_cron_failure_digest
2410+
2411+
return build_cron_failure_digest()
2412+
except Exception:
2413+
return None
2414+
2415+
23992416
# ---------------------------------------------------------------------------
24002417
# File-drop / local attachment detection — extracted as pure helpers for tests.
24012418
# ---------------------------------------------------------------------------
@@ -10984,6 +11001,13 @@ def chat(self, message, images: list = None) -> Optional[str]:
1098411001
from run_agent import _sanitize_surrogates
1098511002
message = _sanitize_surrogates(message)
1098611003

11004+
# Surface recent cron failures to the operator before this turn.
11005+
# The digest is opt-in via ``cron.failure_digest`` and acks on delivery,
11006+
# so the same failure is surfaced only once per user interaction cycle.
11007+
_cron_failure_digest = _get_cron_failure_digest_for_user()
11008+
if _cron_failure_digest:
11009+
_cprint(f"\n{_cron_failure_digest}\n")
11010+
1098711011
# Add user message to history
1098811012
self.conversation_history.append({"role": "user", "content": message})
1098911013

@@ -11102,9 +11126,14 @@ def run_agent():
1110211126
reset_current_session_key = None # type: ignore[assignment]
1110311127
_approval_session_token = None
1110411128
agent_message = _voice_prefix + message if _voice_prefix else message
11129+
# If recent cron failures were surfaced, prepend them to the
11130+
# user message so the model sees them without adding a phantom
11131+
# turn to conversation_history.
11132+
if _cron_failure_digest:
11133+
agent_message = _prepend_note_to_message(agent_message, _cron_failure_digest)
1110511134
# Prepend pending notes via _prepend_note_to_message, which
1110611135
# handles both plain-string and multimodal content-parts list
11107-
# messages. Naive ``note + "\n\n" + agent_message`` crashed with
11136+
# messages. Naive ``note + "\\n\\n" + agent_message`` crashed with
1110811137
# TypeError when an image was attached (agent_message is a list)
1110911138
# and a /model or /reload-skills note was queued for the turn.
1111011139
_msn = getattr(self, '_pending_model_switch_note', None)

cron/jobs.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
_jobs_file_lock = threading.RLock()
7474
_jobs_lock_state = threading.local()
7575
OUTPUT_DIR = CRON_DIR / "output"
76+
FAILURE_DIR = CRON_DIR / "failures"
7677
ONESHOT_GRACE_SECONDS = 120
7778

7879

@@ -272,8 +273,10 @@ def ensure_dirs():
272273
"""Ensure cron directories exist with secure permissions."""
273274
CRON_DIR.mkdir(parents=True, exist_ok=True)
274275
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
276+
FAILURE_DIR.mkdir(parents=True, exist_ok=True)
275277
_secure_dir(CRON_DIR)
276278
_secure_dir(OUTPUT_DIR)
279+
_secure_dir(FAILURE_DIR)
277280

278281

279282
# =============================================================================
@@ -1483,6 +1486,93 @@ def save_job_output(job_id: str, output: str):
14831486
return output_file
14841487

14851488

1489+
def save_job_failure(
1490+
job: Dict[str, Any],
1491+
*,
1492+
success: bool,
1493+
error: Optional[str] = None,
1494+
output: str = "",
1495+
exit_code: Optional[int] = None,
1496+
traceback_text: Optional[str] = None,
1497+
max_output_chars: int = 4000,
1498+
) -> Path:
1499+
"""Persist a per-job failure record under ``FAILURE_DIR``.
1500+
1501+
Captures the last N characters of the job output plus any traceback so
1502+
operators can diagnose why a cron job failed without re-running it.
1503+
Records are keyed by job id and timestamp; the most recent file per job
1504+
is the canonical "latest failure". Failures are written even when the
1505+
job later recovers, so the record reflects the *most recent* run status.
1506+
1507+
Returns the path of the written record.
1508+
"""
1509+
ensure_dirs()
1510+
job_id = str(job.get("id") or "unknown")
1511+
failure_job_dir = FAILURE_DIR / job_id
1512+
failure_job_dir.mkdir(parents=True, exist_ok=True)
1513+
_secure_dir(failure_job_dir)
1514+
1515+
now = _hermes_now()
1516+
# Include sub-seconds in the filename so rapid successive failures don't
1517+
# collide and overwrite each other.
1518+
timestamp = now.strftime("%Y-%m-%d_%H-%M-%S") + f"_{now.microsecond:06d}"
1519+
record_file = failure_job_dir / f"{timestamp}.json"
1520+
1521+
trimmed_output = output
1522+
if len(trimmed_output) > max_output_chars:
1523+
trimmed_output = "..." + trimmed_output[-max_output_chars:]
1524+
1525+
record = {
1526+
"job_id": job_id,
1527+
"job_name": str(job.get("name") or job_id),
1528+
"timestamp": now.isoformat(),
1529+
"success": bool(success),
1530+
"exit_code": exit_code,
1531+
"error": error,
1532+
"traceback": traceback_text,
1533+
"last_output": trimmed_output,
1534+
}
1535+
1536+
fd, tmp_path = tempfile.mkstemp(dir=str(failure_job_dir), suffix=".tmp", prefix=".failure_")
1537+
try:
1538+
with os.fdopen(fd, "w", encoding="utf-8") as f:
1539+
json.dump(record, f, indent=2, default=str)
1540+
f.flush()
1541+
os.fsync(f.fileno())
1542+
atomic_replace(tmp_path, record_file)
1543+
_secure_file(record_file)
1544+
except BaseException:
1545+
try:
1546+
os.unlink(tmp_path)
1547+
except OSError:
1548+
pass
1549+
raise
1550+
1551+
return record_file
1552+
1553+
1554+
def list_job_failures(job_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]:
1555+
"""Return recent failure records for a job, newest first."""
1556+
failure_job_dir = FAILURE_DIR / job_id
1557+
if not failure_job_dir.exists():
1558+
return []
1559+
records: List[Dict[str, Any]] = []
1560+
for path in sorted(failure_job_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
1561+
try:
1562+
records.append(json.loads(path.read_text(encoding="utf-8")))
1563+
except Exception:
1564+
continue
1565+
if limit is not None and len(records) >= limit:
1566+
break
1567+
return records
1568+
1569+
1570+
def get_latest_failure(job_id: str) -> Optional[Dict[str, Any]]:
1571+
"""Return the most recent failure record for a job, or None."""
1572+
records = list_job_failures(job_id, limit=1)
1573+
return records[0] if records else None
1574+
1575+
14861576
# =============================================================================
14871577
# Skill reference rewriting (curator integration)
14881578
# =============================================================================

cron/scheduler.py

Lines changed: 142 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import subprocess
2121
import sys
2222
import threading
23+
import traceback
2324

2425
# fcntl is Unix-only; on Windows use msvcrt for file locking
2526
try:
@@ -68,14 +69,14 @@ def _summarize_cron_failure_for_delivery(job: dict, error: str | None) -> str:
6869
return (
6970
f"⚠️ Cron '{job_name}' failed: provider {reason}. "
7071
"Fallback chain was exhausted or unavailable. "
71-
"Full details saved in cron output."
72+
"Full details saved in cron output / cron/failures."
7273
)
7374

7475
if "readtimeout" in lower or "timed out" in lower or "timeout" in lower:
7576
return (
7677
f"⚠️ Cron '{job_name}' failed: provider timeout. "
7778
"Fallback chain was exhausted or unavailable. "
78-
"Full details saved in cron output."
79+
"Full details saved in cron output / cron/failures."
7980
)
8081

8182
# Match authentication/authorization wording at a word boundary and the
@@ -84,7 +85,7 @@ def _summarize_cron_failure_for_delivery(job: dict, error: str | None) -> str:
8485
if re.search(r"authenticat|authoriz", lower) or re.search(r"\b(401|403)\b", text):
8586
return (
8687
f"⚠️ Cron '{job_name}' failed: provider authentication error. "
87-
"Full details saved in cron output."
88+
"Full details saved in cron output / cron/failures."
8889
)
8990

9091
# Strip common exception wrappers and collapse provider payloads. Bound
@@ -206,9 +207,15 @@ def _resolve_cron_enabled_toolsets(job: dict, cfg: dict) -> list[str] | None:
206207

207208
from cron.jobs import (
208209
get_due_jobs,
210+
load_jobs,
209211
mark_job_run,
210212
mark_job_started,
213+
save_jobs,
211214
save_job_output,
215+
save_job_failure,
216+
list_job_failures,
217+
get_latest_failure,
218+
_jobs_lock,
212219
advance_next_run,
213220
)
214221

@@ -289,6 +296,112 @@ def _get_hermes_home() -> Path:
289296
return _hermes_home or get_hermes_home()
290297

291298

299+
def _failure_digest_enabled(cfg: dict) -> bool:
300+
"""Return whether ``cron.failure_digest`` is enabled in config.yaml.
301+
302+
The digest surfaces recent cron failures to the user on the next
303+
interaction. Default disabled (False); opt-in via config.yaml.
304+
"""
305+
try:
306+
cron_cfg = cfg.get("cron", {}) if isinstance(cfg, dict) else {}
307+
return bool(cron_cfg.get("failure_digest", False))
308+
except Exception:
309+
return False
310+
311+
312+
def _load_cron_config() -> dict:
313+
"""Load config.yaml, returning an empty dict on any failure."""
314+
try:
315+
from hermes_cli.config import load_config
316+
317+
return load_config() or {}
318+
except Exception:
319+
return {}
320+
321+
322+
def build_cron_failure_digest(adapters=None, loop=None) -> Optional[str]:
323+
"""Build a user-visible digest of recent cron failures.
324+
325+
Scans all jobs and emits a compact message for any job whose latest
326+
failure record reports success=False and is newer than the job's last
327+
acknowledged digest timestamp (stored in ``failure_digest_last_at``).
328+
Updates that timestamp when a failure is included.
329+
330+
Returns the digest text, or None if there is nothing new to surface.
331+
"""
332+
cfg = _load_cron_config()
333+
if not _failure_digest_enabled(cfg):
334+
return None
335+
336+
import datetime as _dt
337+
338+
now = _hermes_now()
339+
cutoff = now - _dt.timedelta(hours=24)
340+
lines: List[str] = []
341+
jobs = load_jobs()
342+
for job in jobs:
343+
if not job.get("enabled", True):
344+
continue
345+
record = get_latest_failure(job["id"])
346+
if not record:
347+
continue
348+
if record.get("success") is True:
349+
continue
350+
try:
351+
ts = _dt.datetime.fromisoformat(str(record.get("timestamp") or ""))
352+
except (TypeError, ValueError):
353+
continue
354+
if ts < cutoff:
355+
continue
356+
357+
last_ack = job.get("failure_digest_last_at")
358+
if last_ack:
359+
try:
360+
last_ack_dt = _dt.datetime.fromisoformat(str(last_ack))
361+
if ts <= last_ack_dt:
362+
continue
363+
except (TypeError, ValueError):
364+
pass
365+
366+
job_name = record.get("job_name") or job.get("name") or job["id"]
367+
err = (record.get("error") or "unknown error")[:120]
368+
lines.append(f"• '{job_name}' failed at {ts.strftime('%Y-%m-%d %H:%M')}: {err}")
369+
370+
if not lines:
371+
return None
372+
373+
digest = (
374+
"⚠️ Cron failure digest (last 24h):\n"
375+
+ "\n".join(lines)
376+
+ "\n\nFull details: ~/.hermes/cron/failures/"
377+
)
378+
379+
# Update ack timestamps so we don't repeat the same failures every turn.
380+
try:
381+
with _jobs_lock():
382+
jobs = load_jobs()
383+
now_iso = now.isoformat()
384+
changed = False
385+
for job in jobs:
386+
record = get_latest_failure(job["id"])
387+
if not record or record.get("success") is True:
388+
continue
389+
try:
390+
ts = _dt.datetime.fromisoformat(str(record.get("timestamp") or ""))
391+
except (TypeError, ValueError):
392+
continue
393+
if ts < cutoff:
394+
continue
395+
job["failure_digest_last_at"] = now_iso
396+
changed = True
397+
if changed:
398+
save_jobs(jobs)
399+
except Exception:
400+
logger.debug("Could not update failure_digest_last_at", exc_info=True)
401+
402+
return digest
403+
404+
292405
def _get_lock_paths() -> tuple[Path, Path]:
293406
"""Resolve cron lock paths at call time so profile/env changes are honored."""
294407
hermes_home = _get_hermes_home()
@@ -2391,6 +2504,32 @@ def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) -
23912504
if verbose:
23922505
logger.info("Output saved to: %s", output_file)
23932506

2507+
# Persist a failure record whenever a job fails or the agent returns an
2508+
# empty response. This is the per-job audit trail that makes silent
2509+
# failures visible; successful runs overwrite the latest record so the
2510+
# digest only shows current problems.
2511+
if not success:
2512+
tb = traceback.format_exc() if sys.exc_info()[0] is not None else None
2513+
try:
2514+
save_job_failure(
2515+
job,
2516+
success=False,
2517+
error=error,
2518+
output=output,
2519+
traceback_text=tb,
2520+
)
2521+
logger.warning(
2522+
"Job '%s' failure record saved to cron/failures",
2523+
job.get("id"),
2524+
)
2525+
except Exception as fe:
2526+
logger.error("Could not save cron failure record: %s", fe)
2527+
else:
2528+
try:
2529+
save_job_failure(job, success=True, output=output)
2530+
except Exception:
2531+
pass
2532+
23942533
# Deliver the final response to the origin/target chat.
23952534
# If the agent responded with [SILENT], skip delivery (but
23962535
# output is already saved above). Failed jobs always deliver.

hermes_cli/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2345,6 +2345,11 @@ def _ensure_hermes_home_managed(home: Path):
23452345
# 1 = serial (pre-v0.9 behaviour).
23462346
# Also overridable via HERMES_CRON_MAX_PARALLEL env var.
23472347
"max_parallel_jobs": None,
2348+
# Optional user-visible digest that surfaces recent cron failures on the
2349+
# next interaction. Set ``cron.failure_digest: true`` in config.yaml to
2350+
# enable; default is false so existing users are not surprised by new
2351+
# messages. No env var — config.yaml is the canonical UI.
2352+
"failure_digest": False,
23482353
},
23492354

23502355
# Kanban multi-agent coordination — controls the dispatcher loop that
@@ -4091,6 +4096,7 @@ def _normalize_custom_provider_entry(
40914096
"api_mode", "transport", "model", "default_model", "models",
40924097
"context_length", "rate_limit_delay",
40934098
"request_timeout_seconds", "stale_timeout_seconds",
4099+
"circuit_breaker",
40944100
"discover_models", "extra_body",
40954101
}
40964102
for camel, snake in _CAMEL_ALIASES.items():

0 commit comments

Comments
 (0)