Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/user_guide/en/modules/memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,23 @@ memory:
model: text-embedding-3-small
```

### Mem0 Memory Config
```yaml
memory:
- name: agent_memory
type: mem0
config:
api_key: ${MEM0_API_KEY}
agent_id: my-agent
```

## 3. Built-in Store Comparison
| Type | Path | Highlights | Best for |
| --- | --- | --- | --- |
| `simple` | `node/agent/memory/simple_memory.py` | Optional disk persistence (JSON) after runs; FAISS + semantic rerank; read/write capable. | Small conversation history, prototypes. |
| `file` | `node/agent/memory/file_memory.py` | Chunks files/dirs into a vector index, read-only, auto rebuilds when files change. | Knowledge bases, doc QA. |
| `blackboard` | `node/agent/memory/blackboard_memory.py` | Lightweight append-only log trimmed by time/count; no vector search. | Broadcast boards, pipeline debugging. |
| `mem0` | `node/agent/memory/mem0_memory.py` | Cloud-managed by Mem0; semantic search + graph relationships; no local embeddings or persistence needed. Requires `mem0ai` package. | Production memory, cross-session persistence, multi-agent memory sharing. |

All stores register through `register_memory_store()` so summaries show up in UI via `MemoryStoreConfig.field_specs()`.

Expand Down Expand Up @@ -98,6 +109,14 @@ This schema lets multimodal outputs flow into Memory/Thinking modules without ex
- **Retrieval** – Returns the latest `top_k` entries ordered by time.
- **Write** – `update()` appends the latest snapshot (input/output blocks, attachments, previews). No embeddings are generated, so retrieval is purely recency-based.

### 5.4 Mem0Memory
- **Config** – Requires `api_key` (from [app.mem0.ai](https://app.mem0.ai)). Optional `user_id`, `agent_id`, `org_id`, `project_id` for scoping.
- **Important**: `user_id` and `agent_id` are mutually exclusive in Mem0 API calls. If both are configured, two separate searches are made and results merged. For writes, `agent_id` takes precedence. Agent-generated content is stored with `role: "assistant"`.
- **Retrieval** – Uses Mem0's server-side semantic search. Supports `top_k` and `similarity_threshold` via `MemoryAttachmentConfig`.
- **Write** – `update()` sends conversation messages to Mem0 via the SDK. Agent outputs use `role: "assistant"`, user inputs use `role: "user"`.
- **Persistence** – Fully cloud-managed. `load()` and `save()` are no-ops. Memories persist across runs and sessions automatically.
- **Dependencies** – Requires `mem0ai` package (`pip install mem0ai`).

## 6. EmbeddingConfig Notes
- Fields: `provider`, `model`, `api_key`, `base_url`, `params`.
- `provider=openai` uses the official client; override `base_url` for compatibility layers.
Expand Down
2 changes: 2 additions & 0 deletions entity/configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
EmbeddingConfig,
FileMemoryConfig,
FileSourceConfig,
Mem0MemoryConfig,
MemoryAttachmentConfig,
MemoryStoreConfig,
SimpleMemoryConfig,
Expand Down Expand Up @@ -43,6 +44,7 @@
"FunctionToolConfig",
"GraphDefinition",
"HumanConfig",
"Mem0MemoryConfig",
"MemoryAttachmentConfig",
"MemoryStoreConfig",
"McpLocalConfig",
Expand Down
69 changes: 69 additions & 0 deletions entity/configs/node/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,75 @@ def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "BlackboardMemoryCo
}


@dataclass
class Mem0MemoryConfig(BaseConfig):
"""Configuration for Mem0 managed memory service."""

api_key: str = ""
org_id: str | None = None
project_id: str | None = None
user_id: str | None = None
agent_id: str | None = None

@classmethod
def from_dict(cls, data: Mapping[str, Any], *, path: str) -> "Mem0MemoryConfig":
mapping = require_mapping(data, path)
api_key = require_str(mapping, "api_key", path)
org_id = optional_str(mapping, "org_id", path)
project_id = optional_str(mapping, "project_id", path)
user_id = optional_str(mapping, "user_id", path)
agent_id = optional_str(mapping, "agent_id", path)
return cls(
api_key=api_key,
org_id=org_id,
project_id=project_id,
user_id=user_id,
agent_id=agent_id,
path=path,
)

FIELD_SPECS = {
"api_key": ConfigFieldSpec(
name="api_key",
display_name="Mem0 API Key",
type_hint="str",
required=True,
description="Mem0 API key (get one from app.mem0.ai)",
default="${MEM0_API_KEY}",
),
"org_id": ConfigFieldSpec(
name="org_id",
display_name="Organization ID",
type_hint="str",
required=False,
description="Mem0 organization ID for scoping",
advance=True,
),
"project_id": ConfigFieldSpec(
name="project_id",
display_name="Project ID",
type_hint="str",
required=False,
description="Mem0 project ID for scoping",
advance=True,
),
"user_id": ConfigFieldSpec(
name="user_id",
display_name="User ID",
type_hint="str",
required=False,
description="User ID for user-scoped memories. Mutually exclusive with agent_id in API calls.",
),
"agent_id": ConfigFieldSpec(
name="agent_id",
display_name="Agent ID",
type_hint="str",
required=False,
description="Agent ID for agent-scoped memories. Mutually exclusive with user_id in API calls.",
),
}


@dataclass
class MemoryStoreConfig(BaseConfig):
name: str
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"filelock>=3.20.1",
"markdown>=3.10",
"xhtml2pdf>=0.2.17",
"mem0ai>=1.0.9",
]

[build-system]
Expand Down
14 changes: 14 additions & 0 deletions runtime/node/agent/memory/builtin_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from entity.configs.node.memory import (
BlackboardMemoryConfig,
FileMemoryConfig,
Mem0MemoryConfig,
SimpleMemoryConfig,
MemoryStoreConfig,
)
Expand Down Expand Up @@ -34,6 +35,19 @@
)


def _create_mem0_memory(store):
from runtime.node.agent.memory.mem0_memory import Mem0Memory
return Mem0Memory(store)


register_memory_store(
"mem0",
config_cls=Mem0MemoryConfig,
factory=_create_mem0_memory,
summary="Mem0 managed memory with semantic search and graph relationships",
)


class MemoryFactory:
@staticmethod
def create_memory(store: MemoryStoreConfig) -> MemoryBase:
Expand Down
203 changes: 203 additions & 0 deletions runtime/node/agent/memory/mem0_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Mem0 managed memory store implementation."""

import logging
import time
import uuid
from typing import Any, Dict, List

from entity.configs import MemoryStoreConfig
from entity.configs.node.memory import Mem0MemoryConfig
from runtime.node.agent.memory.memory_base import (
MemoryBase,
MemoryContentSnapshot,
MemoryItem,
MemoryWritePayload,
)

logger = logging.getLogger(__name__)


def _get_mem0_client(config: Mem0MemoryConfig):
"""Lazy-import mem0ai and create a MemoryClient."""
try:
from mem0 import MemoryClient
except ImportError:
raise ImportError(
"mem0ai is required for Mem0Memory. Install it with: pip install mem0ai"
)

client_kwargs: Dict[str, Any] = {}
if config.api_key:
client_kwargs["api_key"] = config.api_key
if config.org_id:
client_kwargs["org_id"] = config.org_id
if config.project_id:
client_kwargs["project_id"] = config.project_id

return MemoryClient(**client_kwargs)


class Mem0Memory(MemoryBase):
"""Memory store backed by Mem0's managed cloud service.

Mem0 handles embeddings, storage, and semantic search server-side.
No local persistence or embedding computation is needed.

Important API constraints:
- Agent memories use role="assistant" + agent_id
- user_id and agent_id are stored as separate records in Mem0;
if both are configured, an OR filter is used to search across both scopes.
- search() uses filters dict; add() uses top-level kwargs.
- SDK returns {"memories": [...]} from search.
"""

def __init__(self, store: MemoryStoreConfig):
config = store.as_config(Mem0MemoryConfig)
if not config:
raise ValueError("Mem0Memory requires a Mem0 memory store configuration")
super().__init__(store)
self.config = config
self.client = _get_mem0_client(config)
self.user_id = config.user_id
self.agent_id = config.agent_id

# -------- Persistence (no-ops for cloud-managed store) --------

def load(self) -> None:
"""No-op: Mem0 manages persistence server-side."""
pass

def save(self) -> None:
"""No-op: Mem0 manages persistence server-side."""
pass

# -------- Retrieval --------

def _build_search_filters(self, agent_role: str) -> Dict[str, Any]:
"""Build the filters dict for Mem0 search.

Mem0 search requires a filters dict for entity scoping.
user_id and agent_id are stored as separate records, so
when both are configured we use an OR filter to match either.
"""
if self.user_id and self.agent_id:
return {
"OR": [
{"user_id": self.user_id},
{"agent_id": self.agent_id},
]
}
elif self.user_id:
return {"user_id": self.user_id}
elif self.agent_id:
return {"agent_id": self.agent_id}
else:
# Fallback: use agent_role as agent_id
return {"agent_id": agent_role}

def retrieve(
self,
agent_role: str,
query: MemoryContentSnapshot,
top_k: int,
similarity_threshold: float,
) -> List[MemoryItem]:
"""Search Mem0 for relevant memories.

Uses the filters dict to scope by user_id, agent_id, or both
(via OR filter). The SDK returns {"memories": [...]}.
"""
if not query.text.strip():
return []

try:
filters = self._build_search_filters(agent_role)
search_kwargs: Dict[str, Any] = {
"query": query.text,
"top_k": top_k,
"filters": filters,
}
if similarity_threshold >= 0:
search_kwargs["threshold"] = similarity_threshold

response = self.client.search(**search_kwargs)

# SDK returns {"memories": [...]} — extract the list
if isinstance(response, dict):
raw_results = response.get("memories", response.get("results", []))
else:
raw_results = response
except Exception as e:
logger.error("Mem0 search failed: %s", e)
return []

items: List[MemoryItem] = []
for entry in raw_results:
item = MemoryItem(
id=entry.get("id", f"mem0_{uuid.uuid4().hex}"),
content_summary=entry.get("memory", ""),
metadata={
"agent_role": agent_role,
"score": entry.get("score"),
"categories": entry.get("categories", []),
"source": "mem0",
},
timestamp=time.time(),
)
items.append(item)

return items

# -------- Update --------

def update(self, payload: MemoryWritePayload) -> None:
"""Store a memory in Mem0.

Uses role="assistant" + agent_id for agent-generated memories,
and role="user" + user_id for user-scoped memories.
"""
snapshot = payload.output_snapshot or payload.input_snapshot
if not snapshot or not snapshot.text.strip():
return

messages = self._build_messages(payload)
if not messages:
return

add_kwargs: Dict[str, Any] = {"messages": messages}

# Determine scoping: agent_id takes precedence for agent-generated content
if self.agent_id:
add_kwargs["agent_id"] = self.agent_id
elif self.user_id:
add_kwargs["user_id"] = self.user_id
else:
# Default: use agent_role as agent_id
add_kwargs["agent_id"] = payload.agent_role

try:
self.client.add(**add_kwargs)
except Exception as e:
logger.error("Mem0 add failed: %s", e)

def _build_messages(self, payload: MemoryWritePayload) -> List[Dict[str, str]]:
"""Build Mem0-compatible message list from write payload.

Agent-generated content uses role="assistant".
User input uses role="user".
"""
messages: List[Dict[str, str]] = []

if payload.inputs_text and payload.inputs_text.strip():
messages.append({
"role": "user",
"content": payload.inputs_text.strip(),
})

if payload.output_snapshot and payload.output_snapshot.text.strip():
messages.append({
"role": "assistant",
"content": payload.output_snapshot.text.strip(),
})

return messages
Loading