forked from HKUDS/OpenSpace
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmcp_tools.py
More file actions
372 lines (310 loc) · 12.6 KB
/
Copy pathmcp_tools.py
File metadata and controls
372 lines (310 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
"""Viking operations exposed as MCP tools for host agents.
The host agent (OpenClaw, Claude Code, Codex, Cursor, nanobot, …) sees
these as ordinary MCP tools and can invoke them directly — without ever
delegating a task to OpenSpace's full execution engine. This closes the
biggest gap in the Viking integration: previously the host agent's
direct chat surface could not benefit from cross-session memories.
Five tools are exposed:
openviking_retrieve_memory(query, category?, limit?)
— fetch cross-session abstracts for a query
openviking_remember(content, category, polarity?)
— record a new memory from an explicit user/agent observation
openviking_forget_memory(uri, reason?)
— mark a specific memory as stale / delete it
openviking_report_stale_memory(uri, reason)
— lighter-weight variant that never deletes, just flags
openviking_memory_status()
— health + config introspection (namespace, user_id, push toggle)
Every tool is best-effort. When Viking is offline or the client is
not configured, tools return a structured error string instead of
raising, so the host LLM can recover gracefully.
Registration: call :func:`register_viking_mcp_tools(mcp, get_client)`
from the MCP server bootstrap (``openspace/mcp_server.py``). The
``get_client`` callable is used to lazily fetch the shared
``OpenVikingClient`` so the MCP server does not need to import the
Viking module at startup.
"""
from __future__ import annotations
import json
from typing import Any, Awaitable, Callable, Dict, List, Optional
from openspace.utils.logging import Logger
logger = Logger.get_logger(__name__)
# Category aliases the host LLM might use — mapped to the canonical
# memory category names understood by the client's URI builder.
_CATEGORY_ALIASES = {
"tools": "tools",
"tool": "tools",
"tool_knowledge": "tools",
"patterns": "patterns",
"pattern": "patterns",
"skills": "skills",
"skill": "skills",
"cases": "cases",
"case": "cases",
"preferences": "preferences",
"preference": "preferences",
"user_preferences": "preferences",
"antipatterns": "antipatterns",
"antipattern": "antipatterns",
"failures": "antipatterns",
"anti_patterns": "antipatterns",
}
def _normalize_category(category: Optional[str]) -> Optional[str]:
if not category:
return None
key = category.strip().lower().replace("-", "_")
return _CATEGORY_ALIASES.get(key)
def _resolve_target_uri(client: Any, category: Optional[str]) -> str:
"""Return the target URI for a normalized category, or '' for all."""
norm = _normalize_category(category)
if norm is None:
return ""
if norm in ("tools", "patterns", "skills", "cases", "antipatterns"):
return client.agent_memory_uri(norm)
if norm == "preferences":
return client.user_memory_uri("preferences")
return ""
def _error(message: str, **extra: Any) -> str:
return json.dumps({"status": "error", "error": message, **extra}, ensure_ascii=False)
def _ok(data: Any) -> str:
return json.dumps({"status": "ok", "data": data}, ensure_ascii=False)
# ---------------------------------------------------------------------------
# Tool implementations
# ---------------------------------------------------------------------------
async def tool_retrieve_memory(
get_client: Callable[[], Awaitable[Any]],
query: str,
category: Optional[str] = None,
limit: int = 5,
) -> str:
"""Retrieve cross-session memory abstracts for a query.
This is how a host agent (OpenClaw chat surface, etc.) pulls user
preferences / tool knowledge / past solutions *without* invoking
the full OpenSpace execution engine.
"""
if not query or not isinstance(query, str):
return _error("query is required and must be a non-empty string")
limit = max(1, min(int(limit or 5), 20))
client = await get_client()
if client is None:
return _error("OpenViking client is not configured")
if not await client.is_available():
return _error("OpenViking server is not available", code="unavailable")
target_uri = _resolve_target_uri(client, category)
try:
results = await client.find_memories(
query,
target_uri=target_uri,
limit=limit,
)
except Exception as exc:
return _error(f"find_memories failed: {exc}")
abstracts: List[Dict[str, Any]] = []
for m in results or []:
if not isinstance(m, dict):
continue
abstracts.append({
"uri": m.get("uri", ""),
"abstract": m.get("abstract") or m.get("summary") or m.get("content") or "",
"score": m.get("score", 0.0),
"category": m.get("category", ""),
})
return _ok({
"query": query,
"category": _normalize_category(category),
"target_uri": target_uri,
"count": len(abstracts),
"results": abstracts,
})
async def tool_remember(
get_client: Callable[[], Awaitable[Any]],
content: str,
category: str = "cases",
polarity: str = "positive",
) -> str:
"""Record an explicit user observation as a Viking memory.
The host agent can surface a "save this" intent from the user and
forward the observation here. A lightweight session is created and
committed so Viking's extraction pipeline writes it into the right
memory bucket.
"""
if not content or not isinstance(content, str):
return _error("content is required")
norm = _normalize_category(category) or "cases"
pol = (polarity or "positive").strip().lower()
if pol not in ("positive", "negative"):
return _error("polarity must be 'positive' or 'negative'")
client = await get_client()
if client is None:
return _error("OpenViking client is not configured")
if not await client.is_available():
return _error("OpenViking server is not available", code="unavailable")
# Use enrichment.feedback_negative for polarity=negative so the
# session is written to the antipatterns bucket with the correct
# polarity marker. For positive we compose a minimal session here.
import time as _time
session_id = f"openspace-remember-{int(_time.time() * 1000)}"
try:
await client.create_session(session_id)
if pol == "negative":
await client.add_session_message(
session_id,
"assistant",
"POLARITY: negative — explicit anti-pattern recorded by host agent.",
)
await client.add_session_message(session_id, "user", content[:4000])
await client.add_session_message(
session_id,
"assistant",
f"Category hint: {norm}",
)
await client.commit_session(session_id)
except Exception as exc:
return _error(f"remember failed: {exc}")
return _ok({
"session_id": session_id,
"category": norm,
"polarity": pol,
})
async def tool_forget_memory(
get_client: Callable[[], Awaitable[Any]],
uri: str,
reason: str = "",
) -> str:
"""Delete (or deprecate) a specific memory by URI."""
if not uri:
return _error("uri is required")
client = await get_client()
if client is None:
return _error("OpenViking client is not configured")
if not await client.is_available():
return _error("OpenViking server is not available", code="unavailable")
try:
resp = await client.delete_resource(uri)
except Exception as exc:
return _error(f"forget_memory failed: {exc}")
if resp:
return _ok({"uri": uri, "deleted": True, "reason": reason[:500]})
return _ok({"uri": uri, "deleted": False, "reason": reason[:500],
"note": "Viking did not confirm deletion — stale-report fallback may still run"})
async def tool_report_stale_memory(
get_client: Callable[[], Awaitable[Any]],
uri: str,
reason: str,
) -> str:
"""Flag a memory as stale without forcing deletion.
Writes a dedicated report session so Viking's extraction pipeline
can later decide to prune or deprioritize the target memory.
"""
if not uri or not reason:
return _error("both uri and reason are required")
client = await get_client()
if client is None:
return _error("OpenViking client is not configured")
if not await client.is_available():
return _error("OpenViking server is not available", code="unavailable")
try:
from .enrichment import VikingEnrichment
enricher = VikingEnrichment(client)
ok = await enricher.report_stale_memory(uri, reason)
except Exception as exc:
return _error(f"report_stale_memory failed: {exc}")
return _ok({"uri": uri, "reported": bool(ok), "reason": reason[:500]})
async def tool_memory_status(
get_client: Callable[[], Awaitable[Any]],
) -> str:
"""Return integration health, namespace, and config visibility.
Useful for host agents to decide whether to rely on Viking for a
given turn and to surface status in UI.
"""
client = await get_client()
if client is None:
return _ok({
"configured": False,
"available": False,
"namespace": "",
"user_id": "",
})
available = await client.is_available()
return _ok({
"configured": True,
"available": bool(available),
"base_url": client.base_url,
"namespace": client.namespace,
"user_id": client.user_id,
"agent_memory_uri_tools": client.agent_memory_uri("tools"),
"user_memory_uri_prefs": client.user_memory_uri("preferences"),
})
# ---------------------------------------------------------------------------
# Registration helper
# ---------------------------------------------------------------------------
def register_viking_mcp_tools(
mcp: Any,
get_client: Callable[[], Awaitable[Any]],
) -> int:
"""Register the Viking tools on an MCP server instance.
``mcp`` is the FastMCP-compatible server object exposing a
``@mcp.tool()`` decorator. ``get_client`` is an async callable
returning the shared ``OpenVikingClient`` (or ``None``).
Returns the number of tools registered. Logs a warning and returns
0 if the MCP object does not support tool registration.
"""
if not hasattr(mcp, "tool") or not callable(mcp.tool):
logger.debug("MCP object does not support .tool() decorator")
return 0
@mcp.tool()
async def openviking_retrieve_memory(
query: str,
category: Optional[str] = None,
limit: int = 5,
) -> str:
"""Retrieve cross-session memory abstracts from OpenViking.
Args:
query: Natural language search for relevant memories.
category: Optional filter: tools, patterns, skills, cases,
preferences, antipatterns. Omit to search all.
limit: Maximum number of abstracts to return (1-20, default 5).
"""
return await tool_retrieve_memory(get_client, query, category, limit)
@mcp.tool()
async def openviking_remember(
content: str,
category: str = "cases",
polarity: str = "positive",
) -> str:
"""Record an explicit observation as a Viking memory.
Args:
content: The observation or knowledge to remember.
category: One of tools, patterns, skills, cases,
preferences, antipatterns.
polarity: "positive" (default) or "negative" for anti-patterns.
"""
return await tool_remember(get_client, content, category, polarity)
@mcp.tool()
async def openviking_forget_memory(
uri: str,
reason: str = "",
) -> str:
"""Delete or deprecate a specific memory by its Viking URI.
Args:
uri: The viking:// URI of the memory to forget.
reason: Optional explanation for the forget action.
"""
return await tool_forget_memory(get_client, uri, reason)
@mcp.tool()
async def openviking_report_stale_memory(
uri: str,
reason: str,
) -> str:
"""Flag a memory as stale without deleting it.
Args:
uri: The viking:// URI of the stale memory.
reason: Why the memory is stale (e.g. "tool X updated to v2").
"""
return await tool_report_stale_memory(get_client, uri, reason)
@mcp.tool()
async def openviking_memory_status() -> str:
"""Return OpenViking integration health and configuration."""
return await tool_memory_status(get_client)
logger.info("Registered 5 OpenViking MCP tools for host agent access")
return 5