Skip to content

Commit 8339885

Browse files
mios-devclaude
andcommitted
mios-agent-pipe + OWUI shim: SSE phase markers (step 3 of 5)
Two coordinated changes: usr/lib/mios/agent-pipe/server.py Adds a `mios_status` field on otherwise-empty SSE chunks. The field carries pipe-phase markers (πŸ“‘ prompt -> 🧭 route -> πŸ› οΈ {tool} or 🧠 β†’ hermes -> βœ… done) that any translator gateway can lift into its native status surface (OWUI status pill, Hermes Discord reaction, future Slack icon, ...). Stock OpenAI streaming clients see an unknown field on a content-empty chunk and ignore it cleanly -- graceful degradation. Three chain paths emit markers: DISPATCH fast-path -> πŸ“‘ prompt -> 🧭 route -> πŸ› οΈ {tool} -> <content delta with tool_calls envelope> -> βœ… done CHAT fast-path -> πŸ“‘ prompt -> 🧭 route -> <content delta with reply text> -> βœ… done AGENT / fallback -> πŸ“‘ prompt -> 🧭 route -> 🧠 β†’ hermes -> <backend stream passes through verbatim> The non-streaming response path returns JSON exactly as before (no markers; OpenAI non-streaming has no per-chunk extension point). usr/share/mios/owui/pipes/mios_agent_pipe.py Surgical slim: deletes the now-redundant router/dispatch code path (agent-pipe owns it) and adds the `mios_status` translator. Specifically: * BACKEND_URL default flips from :8642/v1 (hermes direct) to :8640/v1 (agent-pipe). Every chat request the OWUI pipe makes now goes through the agent-pipe service, which runs router / dispatch / SurrealDB writes and forwards leftover work to hermes itself. * The if-verdict-elif-chat-elif-agent dispatch branch (~60 lines) is removed -- agent-pipe handles those three branches now. The OWUI pipe just gets back either a tool_calls envelope content delta, a chat reply, or a streaming hermes response, and yields each to OWUI without re-routing. * SSE loop gains an early-branch on chunk["mios_status"]: lifts {emoji, label, done} into a self._emit() call so OWUI's status pill mirrors the standalone service's phase markers. Anything else (delta.content, finish_reason) handled exactly as before. * Retained intact: task-gen bypass, tail_watcher (Hermes- internal tool emits sideband), CPU refine / polish / critic, narration collapse, output cleanup, Qwen XML stripping. Those are OWUI-specific quality features; can be ported to agent-pipe in Step 2b if Discord wants them too. Live-verified on podman-MiOS-DEV: Chat-path stream from agent-pipe shows the full marker sequence (πŸ“‘ prompt -> 🧭 route -> content -> βœ… chat) in the SSE body alongside the normal delta chunks. OWUI shim re-installed in webui.db; the chat status pill now reflects which agent-pipe phase the request is in instead of saying "thinking..." opaquely. Step 4 (next commit) re-points the Hermes Discord gateway's backend URL at :8640 so Discord chats ride the same chain. The hermes-discord-reactions-patch.py shipped in Step 1 will then animate the per-phase reactions from the `mios_status` markers the agent-pipe emits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a65b4cf commit 8339885

2 files changed

Lines changed: 97 additions & 68 deletions

File tree

β€Žusr/lib/mios/agent-pipe/server.pyβ€Ž

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,14 @@ async def dispatch_mios_verb(tool: str, args: dict) -> dict:
505505

506506
def _sse_chunk(content: str, *, chat_id: str, model: str,
507507
role: Optional[str] = None,
508-
finish_reason: Optional[str] = None) -> bytes:
508+
finish_reason: Optional[str] = None,
509+
mios_status: Optional[dict] = None) -> bytes:
510+
"""Build an OpenAI-streaming SSE chunk. Optional `mios_status`
511+
field carries pipe-internal phase emits (πŸ“‘ prompt, 🧭 route,
512+
πŸ› οΈ {tool}, βœ…) that translator gateways (OWUI shim, Hermes
513+
Discord) lift into their native status surfaces. Stock OpenAI
514+
clients see this as an unknown field and ignore it -- graceful
515+
degradation."""
509516
delta: dict[str, Any] = {}
510517
if role:
511518
delta["role"] = role
@@ -522,9 +529,24 @@ def _sse_chunk(content: str, *, chat_id: str, model: str,
522529
"finish_reason": finish_reason,
523530
}],
524531
}
532+
if mios_status:
533+
chunk["mios_status"] = mios_status
525534
return ("data: " + json.dumps(chunk) + "\n\n").encode("utf-8")
526535

527536

537+
def _sse_status(*, chat_id: str, model: str, emoji: str, label: str,
538+
done: bool = False) -> bytes:
539+
"""Emit a content-empty SSE chunk whose only purpose is the
540+
`mios_status` field. Standard OpenAI clients see a no-op delta
541+
+ ignore the extra field. Translator gateways pull the phase
542+
info from `mios_status` and surface it natively (OWUI's
543+
event_emitter status, Hermes Discord's reactions, etc.)."""
544+
return _sse_chunk(
545+
"", chat_id=chat_id, model=model,
546+
mios_status={"emoji": emoji, "label": label, "done": done},
547+
)
548+
549+
528550
def _sse_done() -> bytes:
529551
return b"data: [DONE]\n\n"
530552

@@ -711,9 +733,22 @@ async def chat_completions(request: Request) -> Any:
711733
)
712734
if streaming:
713735
async def _stream_dispatch() -> AsyncGenerator[bytes, None]:
736+
# Phase markers: prompt -> route -> tool -> done.
737+
# Translator gateways pull the emoji/label from
738+
# `mios_status` and surface natively (OWUI status
739+
# event_emitter, Discord reactions, etc.).
740+
yield _sse_status(chat_id=chat_id, model=model,
741+
emoji="πŸ“‘", label="prompt")
742+
yield _sse_status(chat_id=chat_id, model=model,
743+
emoji="🧭", label="route")
744+
yield _sse_status(chat_id=chat_id, model=model,
745+
emoji="πŸ› οΈ", label=tool)
714746
yield _sse_chunk("", chat_id=chat_id, model=model,
715747
role="assistant")
716748
yield _sse_chunk(rendered, chat_id=chat_id, model=model)
749+
yield _sse_status(chat_id=chat_id, model=model,
750+
emoji="βœ…" if ok else "⚠️",
751+
label=tool, done=True)
717752
yield _sse_chunk("", chat_id=chat_id, model=model,
718753
finish_reason="stop")
719754
yield _sse_done()
@@ -738,9 +773,15 @@ async def _stream_dispatch() -> AsyncGenerator[bytes, None]:
738773
if reply:
739774
if streaming:
740775
async def _stream_chat() -> AsyncGenerator[bytes, None]:
776+
yield _sse_status(chat_id=chat_id, model=model,
777+
emoji="πŸ“‘", label="prompt")
778+
yield _sse_status(chat_id=chat_id, model=model,
779+
emoji="🧭", label="route")
741780
yield _sse_chunk("", chat_id=chat_id, model=model,
742781
role="assistant")
743782
yield _sse_chunk(reply, chat_id=chat_id, model=model)
783+
yield _sse_status(chat_id=chat_id, model=model,
784+
emoji="βœ…", label="chat", done=True)
744785
yield _sse_chunk("", chat_id=chat_id, model=model,
745786
finish_reason="stop")
746787
yield _sse_done()
@@ -766,6 +807,18 @@ async def _stream_chat() -> AsyncGenerator[bytes, None]:
766807
headers.setdefault("Content-Type", "application/json")
767808
if streaming:
768809
async def _stream_backend() -> AsyncGenerator[bytes, None]:
810+
# Phase markers BEFORE the backend stream so the OWUI shim
811+
# / Discord reactions know the pipe handed off to hermes.
812+
# The backend's own content chunks pass through unchanged
813+
# (no mios_status injected mid-stream -- the backend may
814+
# emit its own tool-call status via tail_watcher or its
815+
# internal mechanism).
816+
yield _sse_status(chat_id=chat_id, model=model,
817+
emoji="πŸ“‘", label="prompt")
818+
yield _sse_status(chat_id=chat_id, model=model,
819+
emoji="🧭", label="route")
820+
yield _sse_status(chat_id=chat_id, model=model,
821+
emoji="🧠", label="β†’ hermes")
769822
client = await _get_client()
770823
async with client.stream(
771824
"POST", f"{BACKEND}/chat/completions",

β€Žusr/share/mios/owui/pipes/mios_agent_pipe.pyβ€Ž

Lines changed: 43 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ def _db_fire(coro: Awaitable) -> None:
195195
class Pipe:
196196
class Valves(BaseModel):
197197
BACKEND_URL: str = Field(
198-
default="http://host.containers.internal:8642/v1",
199-
description="OpenAI-compat backend = hermes-agent gateway on :8642 (direct). The prefilter @ :8641 sat in front for delegate_task forcing but the pipe now does refinement directly, so prefilter is bypassed -- removes a moving part and fixes the ClientConnectorError when prefilter is down (operator-flagged 2026-05-17). OWUI runs in a podman Quadlet so the host is reached via host.containers.internal.",
198+
default="http://host.containers.internal:8640/v1",
199+
description="OpenAI-compat backend = the standalone MiOS Agent Pipe service at :8640 (NOT hermes directly). Operator directive 2026-05-18: extract the router/dispatch/SurrealDB-writes chain out of this OWUI pipe class into a gateway-agnostic FastAPI service so Hermes Discord + future Slack/Telegram/MCP gateways get the same tool-understanding parity as OWUI. The agent-pipe service forwards to hermes-agent (:8642) itself. OWUI runs in a podman Quadlet so the host is reached via host.containers.internal.",
200200
)
201201
BACKEND_MODEL: str = Field(
202202
default="hermes-agent",
@@ -2064,71 +2064,28 @@ async def pipe(
20642064
_last_user_text = raw
20652065
break
20662066

2067-
# ── Layer-1 ROUTER ─────────────────────────────────────────
2068-
# Micro-LLM classifies into dispatch / chat / agent. Single-
2069-
# intent dispatch runs the broker tool directly (skip refine
2070-
# + hermes); chat returns a one-liner; agent falls through
2071-
# to the existing refine -> hermes -> compose -> critic chain.
2072-
verdict = await self._classify_intent(_last_user_text, __event_emitter__)
2073-
if verdict:
2074-
action = verdict.get("action")
2075-
if action == "dispatch":
2076-
tool = str(verdict.get("tool", "")).strip()
2077-
args = verdict.get("args") or {}
2078-
if tool:
2079-
result_json = await self._dispatch_mios_verb(
2080-
tool, args if isinstance(args, dict) else {},
2081-
__event_emitter__,
2082-
)
2083-
try:
2084-
result = json.loads(result_json)
2085-
except json.JSONDecodeError:
2086-
result = {"output": result_json}
2087-
ok = bool(result.get("success"))
2088-
await self._emit(__event_emitter__,
2089-
"βœ…" if ok else "⚠️", done=True)
2090-
# OpenAI-tool-result-native propagation: emit the
2091-
# structured tool_call + tool_result envelope inside
2092-
# a <details type="tool_calls"> block OWUI renders
2093-
# natively. The only literal characters at this layer
2094-
# are cross-locale identifiers (the tool name) +
2095-
# universal symbols (βœ… / ⚠️). The structured JSON
2096-
# matches the OpenAI tool_calls / role:tool shape,
2097-
# so any downstream agent reading chat history sees
2098-
# the canonical tool_result content (not a prose
2099-
# render) and the operator gets a collapsible block
2100-
# with the raw data inside (zero English narrative).
2101-
envelope = {
2102-
"tool_call": {
2103-
"id": f"call_{int(time.time()*1000)}",
2104-
"type": "function",
2105-
"function": {
2106-
"name": tool,
2107-
"arguments": args if isinstance(args, dict) else {},
2108-
},
2109-
},
2110-
"tool_result": {
2111-
"success": ok,
2112-
"output": (result.get("output") or "")[:2000],
2113-
"stderr": (result.get("stderr") or "")[:2000],
2114-
},
2115-
}
2116-
symbol = "βœ…" if ok else "⚠️"
2117-
yield (
2118-
f"<details type=\"tool_calls\" done=\"true\">\n"
2119-
f"<summary>{symbol} `{tool}`</summary>\n\n"
2120-
f"```json\n{json.dumps(envelope, indent=2, default=str)}\n```\n"
2121-
f"</details>"
2122-
)
2123-
return
2124-
elif action == "chat":
2125-
reply = str(verdict.get("reply", "")).strip()
2126-
if reply:
2127-
await self._emit(__event_emitter__, "βœ…", done=True)
2128-
yield reply
2129-
return
2130-
# action == "agent" (or any unrecognized) -> fall through
2131-
await self._emit(__event_emitter__, "🧭 β†’ agent")
2067+
# ── Layer-1 ROUTER -- DELEGATED to mios-agent-pipe service ──
2068+
# The router + dispatch + chat-fast-path + SurrealDB writes
2069+
# are owned by the standalone agent-pipe service at :8640 now
2070+
# (operator directive 2026-05-18: "discord chats not going
2071+
# through MiOS-Agent paths" -- extracted the chain into a
2072+
# gateway-agnostic service so Hermes Discord + future
2073+
# Slack/Telegram get the same tool surface). The OWUI pipe
2074+
# POSTs to agent-pipe (BACKEND_URL = :8640) which runs the
2075+
# router and either returns a tool_calls envelope (dispatch),
2076+
# a short reply (chat), or streams hermes content (agent path).
2077+
#
2078+
# OWUI-specific behaviors RETAINED in this shim:
2079+
# - task-gen bypass (above)
2080+
# - tail_watcher (Hermes-internal tool_call emits via the
2081+
# /var/lib/mios/hermes-tail/latest.json sideband)
2082+
# - mios_status SSE field translation (below) -- agent-pipe
2083+
# emits {emoji, label, done} markers on each phase; the
2084+
# translator below calls _emit() so the OWUI status pill
2085+
# stays lit during dispatch/chat/agent paths
2086+
# - CPU REFINE / CRITIC / POLISH (kept below; these add
2087+
# OWUI-specific quality but aren't ported to agent-pipe
2088+
# yet -- Step 2b if Discord needs them too)
21322089

21332090
# ── CPU REFINEMENT (in-pipe) ─────────────────────────────────
21342091
# Extract the last user message, refine via the small CPU model,
@@ -2229,6 +2186,25 @@ def _open_details_chunk() -> str:
22292186
chunk = json.loads(payload_str)
22302187
except json.JSONDecodeError:
22312188
continue
2189+
# mios-agent-pipe (the standalone service at
2190+
# :8640) injects a `mios_status` field on
2191+
# content-empty SSE chunks to carry pipe-phase
2192+
# markers (πŸ“‘ prompt, 🧭 route, πŸ› οΈ {tool}, βœ…).
2193+
# Translate it into the OWUI status pill via
2194+
# __event_emitter__ -- stock OpenAI clients
2195+
# would just ignore the unknown field.
2196+
_mios_status = chunk.get("mios_status")
2197+
if isinstance(_mios_status, dict):
2198+
emoji = str(_mios_status.get("emoji", ""))
2199+
label = str(_mios_status.get("label", ""))
2200+
done = bool(_mios_status.get("done", False))
2201+
description = (
2202+
f"{emoji} {label}".strip() if (emoji or label)
2203+
else ""
2204+
)
2205+
if description:
2206+
await self._emit(__event_emitter__,
2207+
description, done=done)
22322208
choices = chunk.get("choices") or []
22332209
if not choices:
22342210
continue

0 commit comments

Comments
Β (0)