Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions examples/mcp_server_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Register MemU as a Model Context Protocol server with Claude Code.

Two integrations are shipped: a FastMCP-based one (recommended) and a
low-level one using the official `mcp` SDK directly. Both expose the same
five tools (memorize, retrieve, list_items, list_categories, clear_memory).

Install one of the optional extras and register the server::

# FastMCP (recommended)
pip install "memu-py[mcp]"
claude mcp add memu -- python -m memu.integrations.mcp_server

# Low-level (official `mcp` SDK)
pip install "memu-py[mcp-lowlevel]"
claude mcp add memu -- python -m memu.integrations.mcp_server_lowlevel

The default CLI entry reads ``MEMU_API_KEY`` (or ``OPENAI_API_KEY``) and the
optional overrides ``MEMU_BASE_URL`` / ``MEMU_CHAT_MODEL`` /
``MEMU_EMBED_MODEL``, so it works against any OpenAI-compatible provider
(DeepSeek, Qwen DashScope, OpenRouter, Together, local Ollama, ...). For
separate chat and embedding endpoints, or a non-default storage backend,
construct the service programmatically as shown below.
"""

from __future__ import annotations

import os

from memu.app.service import MemoryService
from memu.integrations.mcp_server import build_server


def main() -> None:
service = MemoryService(
llm_profiles={"default": {"api_key": os.environ["OPENAI_API_KEY"]}},
database_config={"metadata_store": {"provider": "inmemory"}},
)
build_server(service).run()


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ dev = [
{include-group = "docs"},
{include-group = "lint"},
{include-group = "test"},
"fastmcp>=3.3.0",
"langchain-openai>=1.1.7",
"langgraph>=1.0.6",
"mcp>=1.0.0",
]
docs = [
"mkdocs>=1.6.1",
Expand All @@ -70,6 +72,8 @@ test = [
postgres = ["pgvector>=0.3.4", "sqlalchemy[postgresql-psycopgbinary]>=2.0.36"]
langgraph = ["langgraph>=0.0.10", "langchain-core>=0.1.0"]
claude = ["claude-agent-sdk>=0.1.24"]
mcp = ["fastmcp>=2.7.0"]
mcp-lowlevel = ["mcp>=1.0.0"]

[project.urls]
"Homepage" = "https://github.com/NevaMind-AI/MemU"
Expand Down
15 changes: 15 additions & 0 deletions src/memu/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
from typing import Any

from .langgraph import MemULangGraphTools

__all__ = ["MemULangGraphTools"]


def __getattr__(name: str) -> Any:
# Defer SDK imports so listing the package never requires fastmcp/mcp.
if name == "build_mcp_server":
from .mcp_server import build_server as _fastmcp_build

return _fastmcp_build
if name == "build_mcp_server_lowlevel":
from .mcp_server_lowlevel import build_server as _lowlevel_build

return _lowlevel_build
raise AttributeError(name)
154 changes: 154 additions & 0 deletions src/memu/integrations/mcp_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""FastMCP server exposing MemU as Model Context Protocol tools.

Run as a stdio server:

python -m memu.integrations.mcp_server

The companion module `mcp_server_lowlevel` exposes the same tools through the
official `mcp` SDK directly for users who prefer it over FastMCP.
"""

from __future__ import annotations

import contextlib
import logging
import os
import tempfile
import uuid
from typing import TYPE_CHECKING

from memu.app.service import MemoryService

if TYPE_CHECKING:
from fastmcp import FastMCP

logger = logging.getLogger("memu.integrations.mcp")


class _MemUTools:
"""Async tool implementations shared by both FastMCP and low-level servers."""

def __init__(self, service: MemoryService):
self.service = service

async def memorize(self, content: str, user_id: str, modality: str = "conversation") -> str:
path = os.path.join(tempfile.gettempdir(), f"memu_mcp_{uuid.uuid4()}.txt")
try:
with open(path, "w", encoding="utf-8") as f:
f.write(content)
await self.service.memorize(resource_url=path, modality=modality, user={"user_id": user_id})
except Exception as e:
logger.exception("memorize failed for user %s", user_id)
return f"Failed to save memory: {e!s}"
finally:
with contextlib.suppress(OSError):
os.unlink(path)
return "Memory saved."

async def retrieve(self, query: str, user_id: str, limit: int = 5) -> str:
try:
result = await self.service.retrieve(
queries=[{"role": "user", "content": query}],
where={"user_id": user_id},
)
except Exception as e:
logger.exception("retrieve failed for user %s", user_id)
return f"Failed to retrieve memory: {e!s}"
items = (result.get("items") or [])[:limit]
if not items:
return "No relevant memories found."
return "\n".join(f"- {it.get('summary', '')}" for it in items)

async def list_items(self, user_id: str) -> str:
result = await self.service.list_memory_items(where={"user_id": user_id})
items = result.get("items") or []
if not items:
return "No memory items."
return "\n".join(f"- [{it.get('memory_type')}] {it.get('summary', '')}" for it in items)

async def list_categories(self, user_id: str) -> str:
result = await self.service.list_memory_categories(where={"user_id": user_id})
cats = result.get("categories") or []
if not cats:
return "No categories."
return "\n".join(f"- {c.get('name')}: {c.get('summary') or c.get('description', '')}" for c in cats)

async def clear_memory(self, user_id: str) -> str:
await self.service.clear_memory(where={"user_id": user_id})
return f"Cleared memory for user {user_id}."


def build_server(service: MemoryService) -> FastMCP:
"""Build a FastMCP server exposing the five MemU tools.

Requires `pip install memu-py[mcp]` for the `fastmcp` dependency.
"""
try:
from fastmcp import FastMCP
except ImportError as e:
msg = "Install 'memu-py[mcp]' (which pulls in fastmcp) to use this integration."
raise ImportError(msg) from e

tools = _MemUTools(service)
mcp = FastMCP("memu")

@mcp.tool
async def memu_memorize(content: str, user_id: str, modality: str = "conversation") -> str:
"""Save content to long-term memory for a user."""
return await tools.memorize(content, user_id, modality)

@mcp.tool
async def memu_retrieve(query: str, user_id: str, limit: int = 5) -> str:
"""Search memories relevant to the query."""
return await tools.retrieve(query, user_id, limit)

@mcp.tool
async def memu_list_items(user_id: str) -> str:
"""List all memory items for a user."""
return await tools.list_items(user_id)

@mcp.tool
async def memu_list_categories(user_id: str) -> str:
"""List all memory categories with their summaries for a user."""
return await tools.list_categories(user_id)

@mcp.tool
async def memu_clear_memory(user_id: str) -> str:
"""Delete every memory item, category, and resource for a user."""
return await tools.clear_memory(user_id)

return mcp


def _service_from_env() -> MemoryService:
"""Build a default MemoryService from environment variables.

Reads ``MEMU_API_KEY`` (or ``OPENAI_API_KEY`` as a fallback) and, optionally,
``MEMU_BASE_URL`` / ``MEMU_CHAT_MODEL`` / ``MEMU_EMBED_MODEL`` so the CLI
entry point works against any OpenAI-compatible provider (DeepSeek, Qwen
DashScope, OpenRouter, Together, local Ollama, ...). Users who need
separate chat and embedding endpoints should construct a MemoryService
explicitly and pass it to :func:`build_server`.
"""
api_key = os.environ.get("MEMU_API_KEY") or os.environ.get("OPENAI_API_KEY")
if not api_key:
msg = "Set MEMU_API_KEY (or OPENAI_API_KEY) before launching the MCP server."
raise RuntimeError(msg)
profile: dict[str, str] = {"api_key": api_key}
for env, key in (
("MEMU_BASE_URL", "base_url"),
("MEMU_CHAT_MODEL", "chat_model"),
("MEMU_EMBED_MODEL", "embed_model"),
):
if value := os.environ.get(env):
profile[key] = value
return MemoryService(llm_profiles={"default": profile})


def main() -> None:
"""Entry point for ``python -m memu.integrations.mcp_server``."""
build_server(_service_from_env()).run()


if __name__ == "__main__":
main()
139 changes: 139 additions & 0 deletions src/memu/integrations/mcp_server_lowlevel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Low-level MCP server using the official `mcp` SDK directly.

Exposes the same five tools as :mod:`memu.integrations.mcp_server` but binds
them via :class:`mcp.server.lowlevel.Server`. Useful for users who prefer the
official protocol library over FastMCP. Run as a stdio server:

python -m memu.integrations.mcp_server_lowlevel
"""

from __future__ import annotations

import asyncio
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any

from memu.app.service import MemoryService
from memu.integrations.mcp_server import _MemUTools, _service_from_env

if TYPE_CHECKING:
from mcp.server.lowlevel import Server

_TOOL_SCHEMA: dict[str, dict[str, Any]] = {
"memu_memorize": {
"description": "Save content to long-term memory for a user.",
"inputSchema": {
"type": "object",
"properties": {
"content": {"type": "string"},
"user_id": {"type": "string"},
"modality": {"type": "string", "default": "conversation"},
},
"required": ["content", "user_id"],
},
},
"memu_retrieve": {
"description": "Search memories relevant to the query.",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"user_id": {"type": "string"},
"limit": {"type": "integer", "default": 5},
},
"required": ["query", "user_id"],
},
},
"memu_list_items": {
"description": "List all memory items for a user.",
"inputSchema": {
"type": "object",
"properties": {"user_id": {"type": "string"}},
"required": ["user_id"],
},
},
"memu_list_categories": {
"description": "List all memory categories with their summaries for a user.",
"inputSchema": {
"type": "object",
"properties": {"user_id": {"type": "string"}},
"required": ["user_id"],
},
},
"memu_clear_memory": {
"description": "Delete every memory item, category, and resource for a user.",
"inputSchema": {
"type": "object",
"properties": {"user_id": {"type": "string"}},
"required": ["user_id"],
},
},
}


def build_server(service: MemoryService) -> Server:
"""Build a low-level MCP Server exposing the same five tools as `mcp_server`.

Requires `pip install memu-py[mcp-lowlevel]` for the `mcp` dependency.
"""
try:
import mcp.types as types
from mcp.server.lowlevel import Server
except ImportError as e:
msg = "Install 'memu-py[mcp-lowlevel]' (which pulls in mcp) to use this integration."
raise ImportError(msg) from e

tools = _MemUTools(service)
server = Server("memu")
handlers: dict[str, Callable[..., Awaitable[str]]] = {
"memu_memorize": tools.memorize,
"memu_retrieve": tools.retrieve,
"memu_list_items": tools.list_items,
"memu_list_categories": tools.list_categories,
"memu_clear_memory": tools.clear_memory,
}

@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [types.Tool(name=name, **schema) for name, schema in _TOOL_SCHEMA.items()]

@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[types.TextContent]:
handler = handlers.get(name)
if handler is None:
msg = f"Unknown tool: {name}"
raise ValueError(msg)
result = await handler(**arguments)
return [types.TextContent(type="text", text=result)]

return server


async def _run(service: MemoryService) -> None:
import mcp.server.stdio
from mcp.server.lowlevel import NotificationOptions
from mcp.server.models import InitializationOptions

server = build_server(service)
async with mcp.server.stdio.stdio_server() as (read, write):
await server.run(
read,
write,
InitializationOptions(
server_name="memu",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)


def main() -> None:
"""Entry point for ``python -m memu.integrations.mcp_server_lowlevel``."""
asyncio.run(_run(_service_from_env()))


if __name__ == "__main__":
main()
Loading