Skip to content

Commit 13b3bf0

Browse files
committed
feat(evolution): cached-digest provider pre-flight for cron stages, model-resolution fix (#486)
Restore the evolution cached-digest pre-flight as a tracked feature and fix the root bug that made it fail on prod with 'no model configured for pre-flight ping'. What: - cron/evolution_preflight.py: lightweight non-streaming provider ping plus most-recent on-disk digest fallback for evolution pipeline cron stages (introspection/analysis/implementation/research/funnel/integration). - cron/scheduler.py: integrate the pre-flight between runtime resolution and AIAgent construction. On ping failure, return the latest stale digest (graceful degradation) or raise if none exists. Purely additive — the #487 native model-resolution path is unchanged. Why: - When the configured provider is unreachable (e.g. Kimi timeout storms), evolution sessions burn retries/timeouts and deliver nothing. The pre-flight detects this fast and falls back to the last good digest so the pipeline keeps moving with stale-but-structured input instead of failing silently. ROOT-FIX: - preflight_provider() reads runtime['model'], but resolve_runtime_provider() never populates it — the scheduler resolves the model into a separate local 'model' variable (job.model > HERMES_MODEL > config.yaml model.default) and passes it to AIAgent(model=...) directly. On prod runtime['model'] was empty so the ping always returned 'no model configured' and the cached-digest fallback could never trigger. Fixed by syncing runtime['model'] = model (the already-resolved local) before the ping, without clobbering an ACP-resolved model. Provenance: - This feature previously existed only as untracked local code on the prod host; git stash -u hid it and upstream-merge #487 overwrote scheduler.py with the native version. Now restored into git from that stash, with the root bug fixed. Tested: - tests/cron/test_evolution_preflight.py (27 tests) pass, including a new unit test and a scheduler-level test that reproduces the prod failure (runtime returned with NO model + empty job.model + config.yaml model.default) and asserts runtime['model'] is synced to the resolved default. Verified the new test FAILS without the root-fix (captured model == None) and PASSES with it. - Adjacent suites green: test_scheduler_provider.py (29), scheduler model/runtime/run_job slice (77), test_run_one_job.py + codex paths (8).
1 parent 3e09f1f commit 13b3bf0

3 files changed

Lines changed: 768 additions & 0 deletions

File tree

cron/evolution_preflight.py

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
"""Pre-flight provider check + cached digest fallback for evolution cron jobs.
2+
3+
The evolution pipeline (introspection → analysis → implementation → research →
4+
funnel → integration) runs as regular cron agent sessions. When the configured
5+
provider is unreachable, those sessions burn retries/timeouts before producing
6+
zero deliverables. This module provides a lightweight ping and a fallback to
7+
the most recent on-disk digest so the pipeline can keep moving with stale but
8+
useful input instead of failing silently.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import logging
14+
import time
15+
from pathlib import Path
16+
from typing import Any, Dict, Optional
17+
18+
from hermes_constants import get_hermes_home
19+
from hermes_cli.config import load_config_readonly
20+
from hermes_cli.timeouts import get_provider_request_timeout
21+
22+
logger = logging.getLogger(__name__)
23+
24+
# Stages in the evolution pipeline and the file extension each one writes.
25+
_EVOLUTION_STAGES = {
26+
"introspection": ".json",
27+
"analysis": ".json",
28+
"implementation": ".md",
29+
"research": ".md",
30+
"funnel": ".md",
31+
"integration": ".md",
32+
}
33+
34+
35+
def evolution_job_stage(job: Dict[str, Any]) -> Optional[str]:
36+
"""Return the evolution stage for a cron job, or None if it is not an
37+
evolution pipeline job.
38+
39+
Matches job names like ``evolution-introspection`` or tags that include
40+
``evolution`` plus a known stage name.
41+
"""
42+
name = str(job.get("name") or job.get("id") or "").lower()
43+
tags = job.get("tags")
44+
tags_lower = {str(t).lower() for t in tags} if isinstance(tags, list) else set()
45+
46+
if not name.startswith("evolution-") and not name.startswith("evolution") and "evolution" not in tags_lower:
47+
return None
48+
49+
for stage in _EVOLUTION_STAGES:
50+
if stage in name:
51+
return stage
52+
53+
for stage in _EVOLUTION_STAGES:
54+
if stage in tags_lower:
55+
return stage
56+
57+
return None
58+
59+
60+
def _evolution_dir(hermes_home: Optional[Path] = None) -> Path:
61+
home = (hermes_home or get_hermes_home()).resolve()
62+
return home / "profiles" / "user1" / "evolution"
63+
64+
65+
def _preflight_timeout_seconds(cfg: Optional[Any] = None) -> float:
66+
"""Return the configured pre-flight timeout in seconds (default 30)."""
67+
if cfg is None:
68+
try:
69+
cfg = load_config_readonly() or {}
70+
except Exception:
71+
cfg = {}
72+
cron_cfg = cfg.get("cron", {}) if isinstance(cfg, dict) else {}
73+
if not isinstance(cron_cfg, dict):
74+
cron_cfg = {}
75+
raw = cron_cfg.get("preflight_timeout_seconds", 30.0)
76+
try:
77+
value = float(raw)
78+
except (TypeError, ValueError):
79+
return 30.0
80+
if value <= 0:
81+
return 30.0
82+
return value
83+
84+
85+
def _preflight_enabled(cfg: Optional[Any] = None) -> bool:
86+
"""Return whether pre-flight checks are enabled (default True)."""
87+
if cfg is None:
88+
try:
89+
cfg = load_config_readonly() or {}
90+
except Exception:
91+
cfg = {}
92+
cron_cfg = cfg.get("cron", {}) if isinstance(cfg, dict) else {}
93+
if not isinstance(cron_cfg, dict):
94+
cron_cfg = {}
95+
return str(cron_cfg.get("preflight_enabled", "true")).lower() not in {
96+
"false",
97+
"0",
98+
"no",
99+
"off",
100+
"disabled",
101+
}
102+
103+
104+
def find_latest_digest(
105+
stage: str, hermes_home: Optional[Path] = None
106+
) -> Optional[Path]:
107+
"""Return the most recent digest file for an evolution stage, or None."""
108+
if stage not in _EVOLUTION_STAGES:
109+
return None
110+
ext = _EVOLUTION_STAGES[stage]
111+
stage_dir = _evolution_dir(hermes_home) / stage
112+
if not stage_dir.is_dir():
113+
return None
114+
candidates = sorted(
115+
(p for p in stage_dir.iterdir() if p.is_file() and p.suffix == ext),
116+
key=lambda p: p.stat().st_mtime,
117+
reverse=True,
118+
)
119+
return candidates[0] if candidates else None
120+
121+
122+
def load_digest_as_fallback(
123+
stage: str,
124+
hermes_home: Optional[Path] = None,
125+
*,
126+
max_chars: int = 200_000,
127+
) -> Optional[str]:
128+
"""Load the most recent on-disk digest for a stage, bounded in size."""
129+
path = find_latest_digest(stage, hermes_home)
130+
if path is None:
131+
return None
132+
try:
133+
text = path.read_text(encoding="utf-8", errors="replace")
134+
except Exception as exc:
135+
logger.warning("Could not read cached digest %s: %s", path, exc)
136+
return None
137+
if len(text) > max_chars:
138+
text = text[:max_chars] + "\n\n[truncated: stale digest exceeded size limit]"
139+
header = (
140+
f"⚠️ Provider unreachable for '{stage}' cron job. "
141+
f"Using cached digest from {path.name} instead.\n\n"
142+
)
143+
return header + text
144+
145+
146+
def _provider_specific_timeout(runtime: Dict[str, Any], cfg: Optional[Any]) -> float:
147+
"""Pick the tightest sensible timeout for the provider ping."""
148+
provider = runtime.get("provider") or ""
149+
model = runtime.get("model") or ""
150+
configured = get_provider_request_timeout(provider, model)
151+
if configured is not None and configured > 0:
152+
return configured
153+
return _preflight_timeout_seconds(cfg)
154+
155+
156+
def preflight_provider(
157+
runtime: Dict[str, Any], *, cfg: Optional[Any] = None
158+
) -> Optional[str]:
159+
"""Run a minimal, non-streaming provider ping.
160+
161+
Returns None on success, or a short human-readable error string on failure.
162+
This is intentionally lightweight: a single-turn request with max_tokens=1.
163+
"""
164+
api_key = runtime.get("api_key") or ""
165+
base_url = runtime.get("base_url") or ""
166+
provider = runtime.get("provider") or ""
167+
api_mode = runtime.get("api_mode") or "chat_completions"
168+
model = runtime.get("model") or ""
169+
command = runtime.get("command")
170+
171+
if not api_key and not command:
172+
return "no API key or ACP command available for pre-flight ping"
173+
174+
if not model and not command:
175+
return "no model configured for pre-flight ping"
176+
177+
timeout = _provider_specific_timeout(runtime, cfg)
178+
179+
try:
180+
if command or api_mode == "copilot-acp":
181+
# ACP providers are subprocess-based; a real ping would require
182+
# spawning the ACP helper. For now treat them as reachable if the
183+
# runtime resolved (auth setup succeeded). A dedicated ACP ping can
184+
# be added later without changing the scheduler contract.
185+
return None
186+
187+
if api_mode == "anthropic_messages":
188+
return _preflight_anthropic(api_key, base_url, model, timeout)
189+
if api_mode == "bedrock_converse":
190+
return _preflight_bedrock(runtime, timeout)
191+
return _preflight_openai_compatible(api_key, base_url, model, timeout, provider)
192+
except Exception as exc:
193+
logger.debug("Pre-flight ping raised %s: %s", type(exc).__name__, exc)
194+
return f"pre-flight ping failed: {type(exc).__name__}: {exc}"
195+
196+
197+
def _preflight_openai_compatible(
198+
api_key: str,
199+
base_url: str,
200+
model: str,
201+
timeout: float,
202+
provider: str,
203+
) -> Optional[str]:
204+
from openai import OpenAI
205+
206+
client_kwargs: Dict[str, Any] = {"api_key": api_key, "timeout": timeout}
207+
if base_url:
208+
client_kwargs["base_url"] = base_url
209+
client = OpenAI(**client_kwargs)
210+
start = time.time()
211+
try:
212+
client.chat.completions.create(
213+
model=model or "default",
214+
messages=[{"role": "user", "content": "ping"}],
215+
max_tokens=1,
216+
stream=False,
217+
)
218+
elapsed = time.time() - start
219+
logger.debug("Pre-flight ping to %s succeeded in %.2fs", provider, elapsed)
220+
return None
221+
finally:
222+
try:
223+
client.close()
224+
except Exception:
225+
pass
226+
227+
228+
def _preflight_anthropic(
229+
api_key: str, base_url: str, model: str, timeout: float
230+
) -> Optional[str]:
231+
from anthropic import Anthropic
232+
233+
client_kwargs: Dict[str, Any] = {"api_key": api_key, "timeout": timeout}
234+
if base_url:
235+
client_kwargs["base_url"] = base_url
236+
client = Anthropic(**client_kwargs)
237+
start = time.time()
238+
try:
239+
client.messages.create(
240+
model=model or "claude-3-5-haiku-latest",
241+
max_tokens=1,
242+
messages=[{"role": "user", "content": "ping"}],
243+
)
244+
elapsed = time.time() - start
245+
logger.debug("Pre-flight ping to anthropic succeeded in %.2fs", elapsed)
246+
return None
247+
finally:
248+
try:
249+
client.close()
250+
except Exception:
251+
pass
252+
253+
254+
def _preflight_bedrock(runtime: Dict[str, Any], timeout: float) -> Optional[str]:
255+
# Bedrock uses boto3; resolving the runtime already validates credentials.
256+
# A full converse ping would require a model id and may incur token cost,
257+
# so we treat the resolved runtime as reachable. This preserves the fallback
258+
# contract while avoiding unexpected Bedrock charges.
259+
_ = timeout
260+
_ = runtime
261+
return None

cron/scheduler.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2190,6 +2190,7 @@ def _run_job_impl(job: dict) -> tuple[bool, str, str, Optional[str]]:
21902190
format_runtime_provider_error,
21912191
)
21922192
from hermes_cli.auth import AuthError
2193+
from cron import evolution_preflight
21932194
try:
21942195
# Do not inject HERMES_INFERENCE_PROVIDER here. resolve_runtime_provider()
21952196
# already prefers persisted config over stale shell/env overrides when
@@ -2228,6 +2229,58 @@ def _run_job_impl(job: dict) -> tuple[bool, str, str, Optional[str]]:
22282229
message = format_runtime_provider_error(exc)
22292230
raise RuntimeError(message) from exc
22302231

2232+
# Evolution pipeline pre-flight: ping the resolved provider before we
2233+
# build an agent. If it fails, return the most recent on-disk digest
2234+
# so downstream evolution jobs still have stale-but-structured input
2235+
# instead of failing silently during retries. (#486)
2236+
stage = evolution_preflight.evolution_job_stage(job)
2237+
if stage and evolution_preflight._preflight_enabled(_cfg):
2238+
# ROOT-FIX (#486): resolve_runtime_provider() does NOT populate
2239+
# runtime["model"] — the model is resolved into the local ``model``
2240+
# variable above (job.model > HERMES_MODEL > config.yaml model.default)
2241+
# and passed separately to AIAgent(model=...). Without this sync the
2242+
# pre-flight ping saw an empty runtime["model"] and always bailed with
2243+
# "no model configured for pre-flight ping", so cached-digest fallback
2244+
# could never trigger on prod. Build a shallow copy carrying the
2245+
# resolved model for the ping rather than mutating ``runtime`` in
2246+
# place: ``runtime`` is a fresh, request-local dict from
2247+
# resolve_runtime_provider() today, but copying keeps the ping
2248+
# side-effect-free regardless. Never clobber a model the runtime may
2249+
# already carry (e.g. an ACP-resolved one).
2250+
preflight_runtime = (
2251+
runtime if runtime.get("model") else {**runtime, "model": model}
2252+
)
2253+
err = evolution_preflight.preflight_provider(preflight_runtime, cfg=_cfg)
2254+
if err:
2255+
logger.warning(
2256+
"Job '%s' (evolution-%s): provider pre-flight failed: %s",
2257+
job_id,
2258+
stage,
2259+
err,
2260+
)
2261+
digest = evolution_preflight.load_digest_as_fallback(
2262+
stage, _get_hermes_home()
2263+
)
2264+
if digest is not None:
2265+
now_iso = _hermes_now().strftime("%Y-%m-%d %H:%M:%S")
2266+
doc = (
2267+
f"# Cron Job: {job_name}\n\n"
2268+
f"**Job ID:** {job_id}\n"
2269+
f"**Run Time:** {now_iso}\n"
2270+
f"**Status:** provider unreachable — stale digest fallback\n\n"
2271+
f"{digest}\n"
2272+
)
2273+
logger.info(
2274+
"Job '%s' (evolution-%s): returning stale digest fallback",
2275+
job_id,
2276+
stage,
2277+
)
2278+
return True, doc, SILENT_MARKER, None
2279+
else:
2280+
raise RuntimeError(
2281+
f"Evolution pre-flight failed for '{stage}': {err}. No cached digest available."
2282+
)
2283+
22312284
fallback_model = _cfg.get("fallback_providers") or _cfg.get("fallback_model") or None
22322285
credential_pool = None
22332286
runtime_provider = str(runtime.get("provider") or "").strip().lower()

0 commit comments

Comments
 (0)