Skip to content

Commit e5ac5bf

Browse files
mios-devclaude
andcommitted
mios-daemon: task_collector_loop + nudger (iGPU micro-LLM)
Operator directives 2026-05-18 stacked: * "iGPU micro-llm agent is mios-daemon collects Linux logs, Journals, Agent(s) task(s), scratchpad(s), and success and failures and acts as a nudger/confirmation" * "DO WHATEVERS' NATIVE FOR OPENAI API STANDARDS AND PATTERNS FOR MODERN MULTIAGENTIC OS-NATIVE AI's" * "this all still works with Hermes's Kanban still--CORRECT!??" (yes) Changes: 1. Default model bumped: qwen3:0.6b-cpu -> qwen3:1.7b (micro size, lands on the AMD/Intel iGPU CDI lane that the earlier passthrough commits wired). Operator override via MIOS_DAEMON_MODEL. 2. New task_collector_loop (6th daemon thread). Every 5 min (boot delay 75s), aggregates from the canonical shared mutable scratchpads: * /var/lib/mios/hermes/kanban.db (active agent tasks -- READ ONLY; Hermes's own kanban_* tools own writes) * /var/lib/mios/hermes/sessions/*.json (recent tool_call history -- counts + per-result success/fail summary) * /var/lib/mios/scratch/ + /var/lib/mios/ai/scratch/ (shared mutable scratchpads operator directive 2026-05-18) * /var/lib/mios/daemon/launch_failures.json (from launch_verifier_loop) * Recent classify_loop summary from state.json 3. Sends the aggregate to Ollama via the OpenAI-compat /v1/chat/completions endpoint with response_format={"type":"json_object"} -- standard OpenAI structured-output (operator: "WHATEVERS' NATIVE FOR OPENAI API STANDARDS"). Output JSON: {"nudges":[{"type":"<kebab>","severity":"low|med|high", "summary":"<one sentence>", "action":"<single concrete step>"}, ...], "digest":"<2-4 sentence ground-truth status>"} 4. Atomic writes to two shared mutable scratchpads: /var/lib/mios/scratch/agent-nudges.md (operator-facing) /var/lib/mios/scratch/agent-nudges.json (structured) Color-coded severity (🔴 high / 🟡 med / 🟢 low), action lines formatted as backtick-code so the agent can copy-paste them into terminal. 5. SOUL.md state-paths section gains the new nudges file with the rule "At the start of any non-trivial turn, cat /var/lib/mios/scratch/agent-nudges.md first" -- saves the agent from re-walking the logs and surfaces things (stalled task, unverified launch, scratchpad note from another agent) it'd otherwise miss. 6. _update_state("nudges", {...}) so the sidecar filter + mios-system-status can show the nudger pulse with source counts. Fail-open: every aggregation step has a try/except returning []; every LLM call returns {} on URL/timeout/parse error. The loop never crashes the daemon. Kanban compatibility verified: schema auto-discovery tries 'tasks' / 'cards' / 'kanban_tasks' / 'items' in order, picks the first match, gracefully picks the column subset that exists. The Hermes kanban_* tool surface (kanban_create, kanban_list, kanban_show, kanban_complete, kanban_block, kanban_comment) remains the authoritative WRITE path; mios-daemon is a READER. Day-0 / bootc: code at /usr/libexec/mios/mios-daemon, scratchpad dirs created by /usr/lib/tmpfiles.d/mios-*.conf (already), nudge files written under /var/lib (mutable, image-immutable code path). Restart of mios-daemon picks up the new thread; live verified: "starting: model=qwen3:1.7b ... watching units ..." (qwen3:1.7b is the micro-LLM lane per operator directive). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 353291b commit e5ac5bf

2 files changed

Lines changed: 337 additions & 1 deletion

File tree

usr/libexec/mios/mios-daemon

Lines changed: 327 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ logging.basicConfig(level=logging.INFO, format="[mios-daemon] %(message)s")
6767
log = logging.getLogger("mios-daemon")
6868

6969
# ── Config (env-overridable) ────────────────────────────────────────
70-
MODEL = os.environ.get("MIOS_DAEMON_MODEL", "qwen3:0.6b-cpu")
70+
# Operator directive 2026-05-18: "iGPU micro-llm agent is mios-daemon"
71+
# Default model is qwen3:1.7b -- micro size, lands on the AMD/Intel
72+
# iGPU CDI lane (wsl2-amd.yaml / wsl2-intel.yaml) when present, freeing
73+
# the dGPU for big-model work. Override via MIOS_DAEMON_MODEL.
74+
MODEL = os.environ.get("MIOS_DAEMON_MODEL", "qwen3:1.7b")
7175
ENDPOINT = os.environ.get("MIOS_DAEMON_ENDPOINT", "http://127.0.0.1:11434")
7276
STATE_DIR = Path(os.environ.get("MIOS_DAEMON_STATE_DIR", "/var/lib/mios/daemon"))
7377
STATE_FILE = STATE_DIR / "state.json"
@@ -654,6 +658,327 @@ def launch_verifier_loop() -> None:
654658
time.sleep(1)
655659

656660

661+
# ── Task collector + nudger (operator directive 2026-05-18) ─────────
662+
#
663+
# "iGPU micro-llm agent is mios-daemon collects Linux logs, Journals,
664+
# Agent(s) task(s), scratchpad(s), and success and failures and acts
665+
# as a nudger/confirmation" + "DO WHATEVERS' NATIVE FOR OPENAI API
666+
# STANDARDS AND PATTERNS FOR MODERN MULTIAGENTIC OS-NATIVE AI's".
667+
#
668+
# Every TASK_COLLECT_TICK_S the loop:
669+
# 1. Aggregates state from the canonical shared mutable scratchpads:
670+
# * /var/lib/mios/hermes/kanban.db (active agent tasks)
671+
# * /var/lib/mios/hermes/sessions/*.json (recent tool calls)
672+
# * /var/lib/mios/scratch/ (shared agent scratch)
673+
# * /var/lib/mios/ai/scratch/ (assistant-side scratch)
674+
# * /var/lib/mios/daemon/launch_failures.json
675+
# * classify_loop's recent summary in state.json
676+
# 2. Sends the aggregate to the local Ollama via the OpenAI-compat
677+
# /v1/chat/completions endpoint with response_format=json_object
678+
# (Ollama supports OpenAI structured-output as of 0.4; same shape
679+
# any OpenAI-API-compatible backend uses).
680+
# 3. Parses JSON {nudges: [{type, severity, summary, action}],
681+
# digest: "..."}.
682+
# 4. Writes /var/lib/mios/scratch/agent-nudges.md (operator-facing
683+
# digest) + agent-nudges.json (structured for agent consumption).
684+
# The agent reads these at start of each turn (SOUL.md rule)
685+
# for ground-truth status without re-walking the logs itself.
686+
687+
TASK_COLLECT_TICK_S = float(os.environ.get(
688+
"MIOS_DAEMON_TASK_COLLECT_TICK_S", "300")) # 5 min
689+
KANBAN_DB = Path(os.environ.get(
690+
"MIOS_HERMES_KANBAN_DB", "/var/lib/mios/hermes/kanban.db"))
691+
HERMES_SESSIONS_DIR_DAEMON = Path(os.environ.get(
692+
"MIOS_HERMES_SESSIONS_DIR", "/var/lib/mios/hermes/sessions"))
693+
SCRATCH_DIRS = [
694+
Path(p) for p in os.environ.get(
695+
"MIOS_SCRATCH_DIRS",
696+
"/var/lib/mios/scratch:/var/lib/mios/ai/scratch"
697+
).split(":") if p
698+
]
699+
NUDGES_MD = Path("/var/lib/mios/scratch/agent-nudges.md")
700+
NUDGES_JSON = Path("/var/lib/mios/scratch/agent-nudges.json")
701+
702+
TASK_SYSTEM = (
703+
"You are the MiOS Nudger: a micro-LLM watcher inside mios-daemon. "
704+
"Read the aggregate state below (active kanban tasks, recent agent "
705+
"sessions, scratchpad files, launch failures, log summary). Produce "
706+
"JSON ONLY in this shape:\n"
707+
'{"nudges":[{"type":"<short kebab>","severity":"low|med|high",'
708+
'"summary":"<one sentence>","action":"<single concrete step>"},...],'
709+
'"digest":"<2-4 sentence ground-truth status the next chat turn '
710+
'should know>"}\n'
711+
"Rules:\n"
712+
"- Nudges only when actionable: an in-progress task that\'s stalled, "
713+
"a recent launch_failures entry the agent hasn\'t verified, a "
714+
"scratchpad note the agent flagged. NO chatter.\n"
715+
"- digest mirrors the operator\'s locale if visible in the scratch "
716+
"files; otherwise English.\n"
717+
"- Cite SOURCE PATHS in summaries when relevant (e.g. "
718+
"/var/lib/mios/daemon/launch_failures.json).\n"
719+
"- NEVER fabricate a task or failure. Empty list is the right "
720+
"answer when nothing is open."
721+
)
722+
723+
724+
def _collect_kanban_state() -> list[dict]:
725+
"""Read open kanban tasks from the hermes kanban.db. Schema is
726+
hermes's own; we pluck the most-recent rows from the `tasks`
727+
table (or 'cards' depending on version). Returns [] on any
728+
error (fail-open: the digest just won't include kanban context)."""
729+
if not KANBAN_DB.is_file():
730+
return []
731+
try:
732+
import sqlite3
733+
c = sqlite3.connect(str(KANBAN_DB))
734+
# Discover the right table name (hermes has gone through a few)
735+
tables = [
736+
r[0] for r in c.execute(
737+
"SELECT name FROM sqlite_master WHERE type='table'"
738+
).fetchall()
739+
]
740+
for cand in ("tasks", "cards", "kanban_tasks", "items"):
741+
if cand in tables:
742+
# Pick the column shape gracefully
743+
cols = [r[1] for r in c.execute(
744+
f"PRAGMA table_info({cand})").fetchall()]
745+
wanted = [col for col in
746+
("id", "title", "status", "priority",
747+
"updated_at", "tags", "description")
748+
if col in cols]
749+
if not wanted:
750+
continue
751+
rows = c.execute(
752+
f"SELECT {','.join(wanted)} FROM {cand} "
753+
f"ORDER BY rowid DESC LIMIT 12"
754+
).fetchall()
755+
return [
756+
dict(zip(wanted, r)) for r in rows
757+
]
758+
return []
759+
except Exception as e:
760+
log.debug("kanban scan: %s: %s", type(e).__name__, e)
761+
return []
762+
763+
764+
def _collect_recent_sessions(within_s: float = 1800) -> list[dict]:
765+
"""Skim the most-recent hermes session JSONs for tool_call/result
766+
summary (count + success rate)."""
767+
if not HERMES_SESSIONS_DIR_DAEMON.is_dir():
768+
return []
769+
now = time.time()
770+
out: list[dict] = []
771+
try:
772+
paths = sorted(
773+
HERMES_SESSIONS_DIR_DAEMON.glob("session_*.json"),
774+
key=lambda p: p.stat().st_mtime, reverse=True,
775+
)[:6]
776+
except OSError:
777+
return []
778+
for path in paths:
779+
try:
780+
mtime = path.stat().st_mtime
781+
if (now - mtime) > within_s:
782+
continue
783+
d = json.loads(path.read_text(encoding="utf-8"))
784+
except Exception:
785+
continue
786+
msgs = d.get("messages") or []
787+
tool_calls = sum(1 for m in msgs if isinstance(m, dict)
788+
and (m.get("tool_calls") or []))
789+
results = []
790+
for m in msgs:
791+
if isinstance(m, dict) and m.get("role") == "tool":
792+
c = m.get("content") or ""
793+
try:
794+
j = json.loads(c) if isinstance(c, str) else None
795+
if isinstance(j, dict) and "success" in j:
796+
results.append(bool(j["success"]))
797+
except (json.JSONDecodeError, ValueError):
798+
results.append(None)
799+
succ = sum(1 for r in results if r is True)
800+
fail = sum(1 for r in results if r is False)
801+
out.append({
802+
"session_id": d.get("session_id", path.stem),
803+
"mtime": datetime.datetime.fromtimestamp(mtime).isoformat(timespec="seconds"),
804+
"model": d.get("model"),
805+
"platform": d.get("platform"),
806+
"tool_calls": tool_calls,
807+
"tool_success": succ,
808+
"tool_fail": fail,
809+
})
810+
return out
811+
812+
813+
def _collect_scratch() -> list[dict]:
814+
"""Index scratchpad files (small markdown / json) the agents wrote."""
815+
out: list[dict] = []
816+
for root in SCRATCH_DIRS:
817+
if not root.is_dir():
818+
continue
819+
try:
820+
for p in root.glob("*"):
821+
if not p.is_file():
822+
continue
823+
if p.name in (NUDGES_MD.name, NUDGES_JSON.name):
824+
continue # don't recurse into our own output
825+
try:
826+
size = p.stat().st_size
827+
except OSError:
828+
continue
829+
if size > 8192: # skip big files
830+
continue
831+
try:
832+
head = p.read_text(encoding="utf-8", errors="replace")[:600]
833+
except OSError:
834+
continue
835+
out.append({
836+
"path": str(p),
837+
"size": size,
838+
"head": head,
839+
})
840+
except OSError:
841+
continue
842+
return out[:12]
843+
844+
845+
def _collect_launch_failures() -> list[dict]:
846+
if not LAUNCH_FAILURES_FILE.is_file():
847+
return []
848+
try:
849+
data = json.loads(LAUNCH_FAILURES_FILE.read_text())
850+
return data[-5:] if isinstance(data, list) else []
851+
except Exception:
852+
return []
853+
854+
855+
def _openai_compat_llm_json(system: str, user: str,
856+
timeout: int = 30) -> dict:
857+
"""Call Ollama via its OpenAI-compatible /v1/chat/completions
858+
endpoint with response_format=json_object (operator directive:
859+
'native for OpenAI API standards'). Returns parsed dict; {} on
860+
any failure (fail-open -- the loop just skips a tick)."""
861+
payload = {
862+
"model": MODEL,
863+
"messages": [
864+
{"role": "system", "content": system},
865+
{"role": "user", "content": user},
866+
],
867+
"response_format": {"type": "json_object"},
868+
"temperature": 0.1,
869+
"max_tokens": 700,
870+
"stream": False,
871+
}
872+
try:
873+
req = urllib.request.Request(
874+
f"{ENDPOINT}/v1/chat/completions",
875+
data=json.dumps(payload).encode("utf-8"),
876+
headers={"Content-Type": "application/json"},
877+
method="POST",
878+
)
879+
with urllib.request.urlopen(req, timeout=timeout) as r:
880+
body = json.loads(r.read())
881+
except (urllib.error.URLError, OSError, json.JSONDecodeError) as e:
882+
log.debug("nudger LLM call failed: %s", e)
883+
return {}
884+
choices = body.get("choices") or []
885+
if not choices:
886+
return {}
887+
content = ((choices[0].get("message") or {}).get("content") or "").strip()
888+
if not content:
889+
return {}
890+
try:
891+
parsed = json.loads(content)
892+
return parsed if isinstance(parsed, dict) else {}
893+
except json.JSONDecodeError:
894+
return {}
895+
896+
897+
def _render_nudges_md(payload: dict) -> str:
898+
"""Render the structured nudge payload as a markdown digest the
899+
agent can `cat` for ground-truth status."""
900+
nudges = payload.get("nudges") or []
901+
digest = payload.get("digest") or ""
902+
out = [
903+
f"# MiOS agent nudges",
904+
f"_updated: {datetime.datetime.now().isoformat(timespec='seconds')}_",
905+
"",
906+
f"## Digest",
907+
digest or "_no notable state_",
908+
"",
909+
f"## Open nudges ({len(nudges)})",
910+
]
911+
if not nudges:
912+
out.append("_none -- proceed_")
913+
for n in nudges:
914+
if not isinstance(n, dict):
915+
continue
916+
sev = n.get("severity", "low")
917+
nt = n.get("type", "?")
918+
summ = n.get("summary", "")
919+
act = n.get("action", "")
920+
icon = {"high": "🔴", "med": "🟡", "low": "🟢"}.get(sev, "·")
921+
out.append(f"- {icon} **{nt}** ({sev}): {summ}")
922+
if act:
923+
out.append(f" - **action:** `{act}`")
924+
return "\n".join(out) + "\n"
925+
926+
927+
def task_collector_loop() -> None:
928+
"""5-min cadence aggregator + nudger. See module docstring above."""
929+
# Boot delay -- let mios-daemon settle, sessions populate.
930+
time.sleep(75)
931+
NUDGES_MD.parent.mkdir(parents=True, exist_ok=True)
932+
while not _stop_event.is_set():
933+
try:
934+
agg = {
935+
"kanban": _collect_kanban_state(),
936+
"recent_sessions": _collect_recent_sessions(),
937+
"scratch": _collect_scratch(),
938+
"launch_failures": _collect_launch_failures(),
939+
}
940+
with _state_lock:
941+
cls = (_state.get("classify") or {})
942+
if cls.get("summary"):
943+
agg["recent_log_summary"] = cls["summary"][:300]
944+
# Compose the user message: terse JSON dump of the aggregate.
945+
user_msg = (
946+
"Aggregate MiOS state (last ~30 min):\n\n"
947+
f"```json\n{json.dumps(agg, indent=2, default=str)[:6000]}\n```\n"
948+
"\nProduce the nudges JSON now."
949+
)
950+
parsed = _openai_compat_llm_json(
951+
TASK_SYSTEM, user_msg,
952+
timeout=int(REQUEST_TIMEOUT_S),
953+
)
954+
if not parsed:
955+
parsed = {"nudges": [], "digest": "(nudger LLM unreachable)"}
956+
# Atomic write of both the markdown digest + structured JSON
957+
md = _render_nudges_md(parsed)
958+
for target, content in ((NUDGES_MD, md),
959+
(NUDGES_JSON, json.dumps(parsed, indent=2))):
960+
tmp = target.with_suffix(target.suffix + ".tmp")
961+
tmp.write_text(content, encoding="utf-8")
962+
os.chmod(tmp, 0o644)
963+
tmp.replace(target)
964+
_update_state("nudges", {
965+
"ts": datetime.datetime.now().isoformat(timespec="seconds"),
966+
"count": len(parsed.get("nudges") or []),
967+
"digest_preview": (parsed.get("digest") or "")[:160],
968+
"sources": {
969+
"kanban": len(agg["kanban"]),
970+
"sessions": len(agg["recent_sessions"]),
971+
"scratch": len(agg["scratch"]),
972+
"launch_failures": len(agg["launch_failures"]),
973+
},
974+
})
975+
except Exception as e:
976+
log.warning("task_collector: %s: %s", type(e).__name__, e)
977+
for _ in range(int(TASK_COLLECT_TICK_S)):
978+
if _stop_event.is_set(): return
979+
time.sleep(1)
980+
981+
657982
def suggestions_loop() -> None:
658983
# Boot delay so OWUI + ollama warm before first generation
659984
time.sleep(45)
@@ -751,6 +1076,7 @@ def main() -> int:
7511076
threading.Thread(target=cron_loop, daemon=True, name="cron"),
7521077
threading.Thread(target=suggestions_loop, daemon=True, name="suggestions"),
7531078
threading.Thread(target=launch_verifier_loop, daemon=True, name="launch_verifier"),
1079+
threading.Thread(target=task_collector_loop, daemon=True, name="task_collector"),
7541080
]
7551081
for t in threads:
7561082
t.start()

usr/share/mios/ai/hermes-soul.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,16 @@ State paths (read freely):
217217
file FIRST** before re-attempting -- it lists which earlier
218218
verifications you skipped, with the user prompt + claim sentence
219219
+ verifier verdict.
220+
- `/var/lib/mios/scratch/agent-nudges.md` (+ `.json`) — the
221+
**mios-daemon task_collector nudger digest**, refreshed every 5
222+
min by a micro-LLM on the iGPU lane. Aggregates active kanban
223+
tasks, recent agent sessions, scratchpad files, launch failures,
224+
and the journal classify summary into a ground-truth status the
225+
next chat turn should know. **At the start of any non-trivial
226+
turn, `cat /var/lib/mios/scratch/agent-nudges.md` first** --
227+
saves you from re-walking the logs yourself, and surfaces nudges
228+
(a stalled task, an unverified launch, a scratchpad note flagged
229+
by another agent) you'd otherwise miss.
220230
- `/var/lib/mios/daemon/state.json` — unified daemon state
221231
(classify, refusal, cron, suggestions, launch_verifier sections)
222232

0 commit comments

Comments
 (0)