Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2396,6 +2396,23 @@ def _prepend_note_to_message(message, note: str):
return message


def _get_cron_failure_digest_for_user() -> Optional[str]:
"""Build a user-visible cron failure digest if enabled and failures exist.

Returns a formatted digest string when ``cron.failure_digest`` is enabled
and there are un-acknowledged cron failures within the last 24 hours.
Returns ``None`` otherwise. The underlying implementation updates ack
timestamps only when it actually emits a digest, so calling this on every
user turn is safe and will not repeat the same failure.
"""
try:
from cron.scheduler import build_cron_failure_digest

return build_cron_failure_digest()
except Exception:
return None


# ---------------------------------------------------------------------------
# File-drop / local attachment detection — extracted as pure helpers for tests.
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -10984,6 +11001,13 @@ def chat(self, message, images: list = None) -> Optional[str]:
from run_agent import _sanitize_surrogates
message = _sanitize_surrogates(message)

# Surface recent cron failures to the operator before this turn.
# The digest is opt-in via ``cron.failure_digest`` and acks on delivery,
# so the same failure is surfaced only once per user interaction cycle.
_cron_failure_digest = _get_cron_failure_digest_for_user()
if _cron_failure_digest:
_cprint(f"\n{_cron_failure_digest}\n")

# Add user message to history
self.conversation_history.append({"role": "user", "content": message})

Expand Down Expand Up @@ -11102,9 +11126,14 @@ def run_agent():
reset_current_session_key = None # type: ignore[assignment]
_approval_session_token = None
agent_message = _voice_prefix + message if _voice_prefix else message
# If recent cron failures were surfaced, prepend them to the
# user message so the model sees them without adding a phantom
# turn to conversation_history.
if _cron_failure_digest:
agent_message = _prepend_note_to_message(agent_message, _cron_failure_digest)
# Prepend pending notes via _prepend_note_to_message, which
# handles both plain-string and multimodal content-parts list
# messages. Naive ``note + "\n\n" + agent_message`` crashed with
# messages. Naive ``note + "\\n\\n" + agent_message`` crashed with
# TypeError when an image was attached (agent_message is a list)
# and a /model or /reload-skills note was queued for the turn.
_msn = getattr(self, '_pending_model_switch_note', None)
Expand Down
90 changes: 90 additions & 0 deletions cron/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
_jobs_file_lock = threading.RLock()
_jobs_lock_state = threading.local()
OUTPUT_DIR = CRON_DIR / "output"
FAILURE_DIR = CRON_DIR / "failures"
ONESHOT_GRACE_SECONDS = 120


Expand Down Expand Up @@ -272,8 +273,10 @@ def ensure_dirs():
"""Ensure cron directories exist with secure permissions."""
CRON_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
FAILURE_DIR.mkdir(parents=True, exist_ok=True)
_secure_dir(CRON_DIR)
_secure_dir(OUTPUT_DIR)
_secure_dir(FAILURE_DIR)


# =============================================================================
Expand Down Expand Up @@ -1483,6 +1486,93 @@ def save_job_output(job_id: str, output: str):
return output_file


def save_job_failure(
job: Dict[str, Any],
*,
success: bool,
error: Optional[str] = None,
output: str = "",
exit_code: Optional[int] = None,
traceback_text: Optional[str] = None,
max_output_chars: int = 4000,
) -> Path:
"""Persist a per-job failure record under ``FAILURE_DIR``.

Captures the last N characters of the job output plus any traceback so
operators can diagnose why a cron job failed without re-running it.
Records are keyed by job id and timestamp; the most recent file per job
is the canonical "latest failure". Failures are written even when the
job later recovers, so the record reflects the *most recent* run status.

Returns the path of the written record.
"""
ensure_dirs()
job_id = str(job.get("id") or "unknown")
failure_job_dir = FAILURE_DIR / job_id
failure_job_dir.mkdir(parents=True, exist_ok=True)
_secure_dir(failure_job_dir)

now = _hermes_now()
# Include sub-seconds in the filename so rapid successive failures don't
# collide and overwrite each other.
timestamp = now.strftime("%Y-%m-%d_%H-%M-%S") + f"_{now.microsecond:06d}"
record_file = failure_job_dir / f"{timestamp}.json"

trimmed_output = output
if len(trimmed_output) > max_output_chars:
trimmed_output = "..." + trimmed_output[-max_output_chars:]

record = {
"job_id": job_id,
"job_name": str(job.get("name") or job_id),
"timestamp": now.isoformat(),
"success": bool(success),
"exit_code": exit_code,
"error": error,
"traceback": traceback_text,
"last_output": trimmed_output,
}

fd, tmp_path = tempfile.mkstemp(dir=str(failure_job_dir), suffix=".tmp", prefix=".failure_")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(record, f, indent=2, default=str)
f.flush()
os.fsync(f.fileno())
atomic_replace(tmp_path, record_file)
_secure_file(record_file)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise

return record_file


def list_job_failures(job_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""Return recent failure records for a job, newest first."""
failure_job_dir = FAILURE_DIR / job_id
if not failure_job_dir.exists():
return []
records: List[Dict[str, Any]] = []
for path in sorted(failure_job_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
try:
records.append(json.loads(path.read_text(encoding="utf-8")))
except Exception:
continue
if limit is not None and len(records) >= limit:
break
return records


def get_latest_failure(job_id: str) -> Optional[Dict[str, Any]]:
"""Return the most recent failure record for a job, or None."""
records = list_job_failures(job_id, limit=1)
return records[0] if records else None


# =============================================================================
# Skill reference rewriting (curator integration)
# =============================================================================
Expand Down
145 changes: 142 additions & 3 deletions cron/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import subprocess
import sys
import threading
import traceback

# fcntl is Unix-only; on Windows use msvcrt for file locking
try:
Expand Down Expand Up @@ -68,14 +69,14 @@ def _summarize_cron_failure_for_delivery(job: dict, error: str | None) -> str:
return (
f"⚠️ Cron '{job_name}' failed: provider {reason}. "
"Fallback chain was exhausted or unavailable. "
"Full details saved in cron output."
"Full details saved in cron output / cron/failures."
)

if "readtimeout" in lower or "timed out" in lower or "timeout" in lower:
return (
f"⚠️ Cron '{job_name}' failed: provider timeout. "
"Fallback chain was exhausted or unavailable. "
"Full details saved in cron output."
"Full details saved in cron output / cron/failures."
)

# Match authentication/authorization wording at a word boundary and the
Expand All @@ -84,7 +85,7 @@ def _summarize_cron_failure_for_delivery(job: dict, error: str | None) -> str:
if re.search(r"authenticat|authoriz", lower) or re.search(r"\b(401|403)\b", text):
return (
f"⚠️ Cron '{job_name}' failed: provider authentication error. "
"Full details saved in cron output."
"Full details saved in cron output / cron/failures."
)

# Strip common exception wrappers and collapse provider payloads. Bound
Expand Down Expand Up @@ -206,9 +207,15 @@ def _resolve_cron_enabled_toolsets(job: dict, cfg: dict) -> list[str] | None:

from cron.jobs import (
get_due_jobs,
load_jobs,
mark_job_run,
mark_job_started,
save_jobs,
save_job_output,
save_job_failure,
list_job_failures,
get_latest_failure,
_jobs_lock,
advance_next_run,
)

Expand Down Expand Up @@ -289,6 +296,112 @@ def _get_hermes_home() -> Path:
return _hermes_home or get_hermes_home()


def _failure_digest_enabled(cfg: dict) -> bool:
"""Return whether ``cron.failure_digest`` is enabled in config.yaml.

The digest surfaces recent cron failures to the user on the next
interaction. Default disabled (False); opt-in via config.yaml.
"""
try:
cron_cfg = cfg.get("cron", {}) if isinstance(cfg, dict) else {}
return bool(cron_cfg.get("failure_digest", False))
except Exception:
return False


def _load_cron_config() -> dict:
"""Load config.yaml, returning an empty dict on any failure."""
try:
from hermes_cli.config import load_config

return load_config() or {}
except Exception:
return {}


def build_cron_failure_digest(adapters=None, loop=None) -> Optional[str]:
"""Build a user-visible digest of recent cron failures.

Scans all jobs and emits a compact message for any job whose latest
failure record reports success=False and is newer than the job's last
acknowledged digest timestamp (stored in ``failure_digest_last_at``).
Updates that timestamp when a failure is included.

Returns the digest text, or None if there is nothing new to surface.
"""
cfg = _load_cron_config()
if not _failure_digest_enabled(cfg):
return None

import datetime as _dt

now = _hermes_now()
cutoff = now - _dt.timedelta(hours=24)
lines: List[str] = []
jobs = load_jobs()
for job in jobs:
if not job.get("enabled", True):
continue
record = get_latest_failure(job["id"])
if not record:
continue
if record.get("success") is True:
continue
try:
ts = _dt.datetime.fromisoformat(str(record.get("timestamp") or ""))
except (TypeError, ValueError):
continue
if ts < cutoff:
continue

last_ack = job.get("failure_digest_last_at")
if last_ack:
try:
last_ack_dt = _dt.datetime.fromisoformat(str(last_ack))
if ts <= last_ack_dt:
continue
except (TypeError, ValueError):
pass

job_name = record.get("job_name") or job.get("name") or job["id"]
err = (record.get("error") or "unknown error")[:120]
lines.append(f"• '{job_name}' failed at {ts.strftime('%Y-%m-%d %H:%M')}: {err}")

if not lines:
return None

digest = (
"⚠️ Cron failure digest (last 24h):\n"
+ "\n".join(lines)
+ "\n\nFull details: ~/.hermes/cron/failures/"
)

# Update ack timestamps so we don't repeat the same failures every turn.
try:
with _jobs_lock():
jobs = load_jobs()
now_iso = now.isoformat()
changed = False
for job in jobs:
record = get_latest_failure(job["id"])
if not record or record.get("success") is True:
continue
try:
ts = _dt.datetime.fromisoformat(str(record.get("timestamp") or ""))
except (TypeError, ValueError):
continue
if ts < cutoff:
continue
job["failure_digest_last_at"] = now_iso
changed = True
if changed:
save_jobs(jobs)
except Exception:
logger.debug("Could not update failure_digest_last_at", exc_info=True)

return digest


def _get_lock_paths() -> tuple[Path, Path]:
"""Resolve cron lock paths at call time so profile/env changes are honored."""
hermes_home = _get_hermes_home()
Expand Down Expand Up @@ -2391,6 +2504,32 @@ def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) -
if verbose:
logger.info("Output saved to: %s", output_file)

# Persist a failure record whenever a job fails or the agent returns an
# empty response. This is the per-job audit trail that makes silent
# failures visible; successful runs overwrite the latest record so the
# digest only shows current problems.
if not success:
tb = traceback.format_exc() if sys.exc_info()[0] is not None else None
try:
save_job_failure(
job,
success=False,
error=error,
output=output,
traceback_text=tb,
)
logger.warning(
"Job '%s' failure record saved to cron/failures",
job.get("id"),
)
except Exception as fe:
logger.error("Could not save cron failure record: %s", fe)
else:
try:
save_job_failure(job, success=True, output=output)
except Exception:
pass

# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
Expand Down
6 changes: 6 additions & 0 deletions hermes_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2345,6 +2345,11 @@ def _ensure_hermes_home_managed(home: Path):
# 1 = serial (pre-v0.9 behaviour).
# Also overridable via HERMES_CRON_MAX_PARALLEL env var.
"max_parallel_jobs": None,
# Optional user-visible digest that surfaces recent cron failures on the
# next interaction. Set ``cron.failure_digest: true`` in config.yaml to
# enable; default is false so existing users are not surprised by new
# messages. No env var — config.yaml is the canonical UI.
"failure_digest": False,
},

# Kanban multi-agent coordination — controls the dispatcher loop that
Expand Down Expand Up @@ -4091,6 +4096,7 @@ def _normalize_custom_provider_entry(
"api_mode", "transport", "model", "default_model", "models",
"context_length", "rate_limit_delay",
"request_timeout_seconds", "stale_timeout_seconds",
"circuit_breaker",
"discover_models", "extra_body",
}
for camel, snake in _CAMEL_ALIASES.items():
Expand Down
Loading
Loading