| layout | default |
|---|---|
| title | Chapter 2: Memory Architecture & Types |
| parent | Mem0 Tutorial |
| nav_order | 2 |
Welcome to Chapter 2: Memory Architecture & Types. In this part of Mem0 Tutorial: Building Production-Ready AI Agents with Scalable Long-Term Memory, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Master the multi-level memory architecture that powers intelligent AI agents.
This chapter dives deep into Mem0's memory architecture, exploring the different types of memory, storage mechanisms, and how the system manages context across conversations. You'll understand how Mem0 creates a scalable, intelligent memory layer for AI applications.
from mem0 import Memory
import os
# Initialize Mem0 with different configurations
def initialize_memory_systems():
"""Initialize different memory system configurations"""
# Basic memory system
basic_memory = Memory()
# Memory with vector storage (Qdrant)
vector_memory = Memory(
vector_store="qdrant",
vector_store_config={
"host": "localhost",
"port": 6333,
"collection_name": "mem0_collection"
}
)
# Memory with custom embeddings
custom_memory = Memory(
embedder="huggingface",
embedder_config={
"model": "sentence-transformers/all-MiniLM-L6-v2"
}
)
# Memory with user-specific storage
user_memory = Memory(
user_id="user_123",
vector_store="chromadb",
vector_store_config={
"collection_name": "user_memories"
}
)
return {
"basic": basic_memory,
"vector": vector_memory,
"custom": custom_memory,
"user": user_memory
}
# Usage
memory_systems = initialize_memory_systems()class MemoryTypeExploration:
"""Explore different types of memory in Mem0"""
def __init__(self):
self.memory = Memory()
def demonstrate_memory_types(self):
"""Demonstrate different memory types and their characteristics"""
# 1. Episodic Memory - Specific events and experiences
episodic_memories = [
{
"content": "User prefers dark mode interface",
"metadata": {"type": "preference", "timestamp": "2024-01-15"}
},
{
"content": "User asked about machine learning algorithms yesterday",
"metadata": {"type": "interaction", "timestamp": "2024-01-14"}
}
]
# 2. Semantic Memory - Factual knowledge and concepts
semantic_memories = [
{
"content": "User is a software developer specializing in Python",
"metadata": {"type": "profile", "confidence": 0.95}
},
{
"content": "User works at a tech company in San Francisco",
"metadata": {"type": "work", "verified": True}
}
]
# 3. Procedural Memory - How to perform tasks
procedural_memories = [
{
"content": "User typically wants code examples when asking technical questions",
"metadata": {"type": "behavior", "frequency": "high"}
},
{
"content": "User prefers step-by-step explanations for complex topics",
"metadata": {"type": "learning_style", "preference": "detailed"}
}
]
return {
"episodic": episodic_memories,
"semantic": semantic_memories,
"procedural": procedural_memories
}
def demonstrate_memory_hierarchy(self):
"""Show how memory is organized hierarchically"""
# Add memories with different scopes
memories = {
"global": [
{"content": "General AI knowledge and capabilities", "scope": "global"}
],
"user": [
{"content": "User-specific preferences and history", "scope": "user", "user_id": "user_123"}
],
"session": [
{"content": "Current conversation context", "scope": "session", "session_id": "sess_456"}
],
"agent": [
{"content": "Agent-specific learned behaviors", "scope": "agent", "agent_id": "agent_789"}
]
}
# Store memories at different levels
for scope, memory_list in memories.items():
for memory in memory_list:
self.memory.add(
memory["content"],
user_id=memory.get("user_id"),
metadata={
"scope": scope,
"timestamp": "2024-01-15",
**memory
}
)
return memories
# Usage
memory_explorer = MemoryTypeExploration()
memory_types = memory_explorer.demonstrate_memory_types()
memory_hierarchy = memory_explorer.demonstrate_memory_hierarchy()
print("Memory Types:")
for mem_type, memories in memory_types.items():
print(f"\n{mem_type.upper()} Memories:")
for memory in memories:
print(f" - {memory['content']}")from mem0 import Memory
import chromadb
import qdrant_client
def setup_vector_stores():
"""Setup different vector storage backends"""
# 1. ChromaDB Setup
def setup_chromadb():
"""Setup ChromaDB for vector storage"""
chroma_client = chromadb.PersistentClient(path="./chroma_db")
# Create collection with custom configuration
collection = chroma_client.get_or_create_collection(
name="mem0_memories",
metadata={
"hnsw:space": "cosine", # Distance metric
"hnsw:construction_ef": 128, # Construction efficiency
"hnsw:M": 16 # Number of connections per node
}
)
return collection
# 2. Qdrant Setup
def setup_qdrant():
"""Setup Qdrant for vector storage"""
qdrant_client = qdrant_client.QdrantClient(
host="localhost",
port=6333
)
# Create collection with vector configuration
qdrant_client.create_collection(
collection_name="mem0_collection",
vectors_config={
"size": 384, # Embedding dimension
"distance": "Cosine"
},
optimizers_config={
"default_segment_number": 2,
"indexing_threshold": 10000
}
)
return qdrant_client
# 3. Memory with different vector stores
chroma_memory = Memory(
vector_store="chromadb",
vector_store_config={
"collection_name": "mem0_chroma",
"path": "./chroma_db"
}
)
qdrant_memory = Memory(
vector_store="qdrant",
vector_store_config={
"collection_name": "mem0_qdrant",
"host": "localhost",
"port": 6333
}
)
return {
"chroma": chroma_memory,
"qdrant": qdrant_memory
}
# Usage
vector_stores = setup_vector_stores()import json
import sqlite3
from datetime import datetime, timedelta
class MemoryPersistenceManager:
"""Manage memory persistence across different storage layers"""
def __init__(self):
self.sqlite_db = sqlite3.connect("mem0_memory.db")
self._create_tables()
def _create_tables(self):
"""Create database tables for memory storage"""
self.sqlite_db.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
user_id TEXT,
content TEXT NOT NULL,
metadata TEXT,
embedding TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
access_count INTEGER DEFAULT 0,
last_accessed TIMESTAMP,
importance_score REAL DEFAULT 0.5
)
""")
# Create indexes for performance
self.sqlite_db.execute("CREATE INDEX IF NOT EXISTS idx_user_id ON memories(user_id)")
self.sqlite_db.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON memories(created_at)")
self.sqlite_db.execute("CREATE INDEX IF NOT EXISTS idx_importance ON memories(importance_score)")
self.sqlite_db.commit()
def persist_memory(self, memory_data):
"""Persist memory to database"""
cursor = self.sqlite_db.cursor()
# Insert or update memory
cursor.execute("""
INSERT OR REPLACE INTO memories
(id, user_id, content, metadata, embedding, updated_at, access_count, importance_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
memory_data["id"],
memory_data.get("user_id"),
memory_data["content"],
json.dumps(memory_data.get("metadata", {})),
json.dumps(memory_data.get("embedding", [])),
datetime.now(),
memory_data.get("access_count", 0),
memory_data.get("importance_score", 0.5)
))
self.sqlite_db.commit()
def retrieve_memories(self, user_id=None, limit=10, time_filter=None):
"""Retrieve memories with filtering"""
query = "SELECT * FROM memories WHERE 1=1"
params = []
if user_id:
query += " AND user_id = ?"
params.append(user_id)
if time_filter:
query += " AND created_at >= ?"
params.append(time_filter)
query += " ORDER BY importance_score DESC, last_accessed DESC LIMIT ?"
params.append(limit)
cursor = self.sqlite_db.execute(query, params)
columns = [desc[0] for desc in cursor.description]
memories = []
for row in cursor.fetchall():
memory_dict = dict(zip(columns, row))
memory_dict["metadata"] = json.loads(memory_dict["metadata"] or "{}")
memory_dict["embedding"] = json.loads(memory_dict["embedding"] or "[]")
memories.append(memory_dict)
return memories
def update_memory_access(self, memory_id):
"""Update memory access statistics"""
self.sqlite_db.execute("""
UPDATE memories
SET access_count = access_count + 1,
last_accessed = ?
WHERE id = ?
""", (datetime.now(), memory_id))
self.sqlite_db.commit()
def cleanup_old_memories(self, days_old=90):
"""Clean up old, low-importance memories"""
cutoff_date = datetime.now() - timedelta(days=days_old)
# Delete old memories with low importance
self.sqlite_db.execute("""
DELETE FROM memories
WHERE created_at < ? AND importance_score < 0.3
""", (cutoff_date,))
deleted_count = self.sqlite_db.total_changes
self.sqlite_db.commit()
print(f"Cleaned up {deleted_count} old memories")
return deleted_count
def get_memory_stats(self):
"""Get memory statistics"""
stats = {}
# Total memories
cursor = self.sqlite_db.execute("SELECT COUNT(*) FROM memories")
stats["total_memories"] = cursor.fetchone()[0]
# Memories by user
cursor = self.sqlite_db.execute("""
SELECT user_id, COUNT(*) as count
FROM memories
WHERE user_id IS NOT NULL
GROUP BY user_id
ORDER BY count DESC
LIMIT 10
""")
stats["memories_by_user"] = dict(cursor.fetchall())
# Memory age distribution
cursor = self.sqlite_db.execute("""
SELECT
CASE
WHEN created_at >= datetime('now', '-1 day') THEN 'last_24h'
WHEN created_at >= datetime('now', '-7 day') THEN 'last_week'
WHEN created_at >= datetime('now', '-30 day') THEN 'last_month'
ELSE 'older'
END as age_group,
COUNT(*) as count
FROM memories
GROUP BY age_group
""")
stats["age_distribution"] = dict(cursor.fetchall())
return stats
# Usage
persistence_manager = MemoryPersistenceManager()
# Persist memory
memory_data = {
"id": "mem_123",
"user_id": "user_456",
"content": "User prefers concise explanations",
"metadata": {"type": "preference", "confidence": 0.9},
"embedding": [0.1, 0.2, 0.3], # Example embedding
"importance_score": 0.8
}
persistence_manager.persist_memory(memory_data)
# Retrieve memories
user_memories = persistence_manager.retrieve_memories(user_id="user_456", limit=5)
# Update access
persistence_manager.update_memory_access("mem_123")
# Get stats
stats = persistence_manager.get_memory_stats()
print(f"Memory Stats: {stats}")
# Cleanup
persistence_manager.cleanup_old_memories(days_old=30)from mem0 import Memory
from typing import Dict, List, Any
import time
class MemoryLifecycleManager:
"""Manage the complete lifecycle of memories"""
def __init__(self):
self.memory = Memory()
self.creation_stats = {}
self.retention_policies = self._define_retention_policies()
def _define_retention_policies(self):
"""Define memory retention policies"""
return {
"ephemeral": {
"max_age_days": 1,
"max_count": 10,
"importance_threshold": 0.1
},
"short_term": {
"max_age_days": 7,
"max_count": 50,
"importance_threshold": 0.3
},
"medium_term": {
"max_age_days": 30,
"max_count": 200,
"importance_threshold": 0.5
},
"long_term": {
"max_age_days": 365,
"max_count": 1000,
"importance_threshold": 0.7
}
}
def create_memory(self, content: str, memory_type: str = "general",
user_id: str = None, metadata: Dict[str, Any] = None) -> str:
"""Create a new memory with lifecycle management"""
# Add timestamp and type to metadata
enhanced_metadata = metadata or {}
enhanced_metadata.update({
"memory_type": memory_type,
"created_at": time.time(),
"importance_score": self._calculate_importance(content, memory_type),
"lifecycle_stage": "active"
})
# Create memory
memory_id = self.memory.add(
content,
user_id=user_id,
metadata=enhanced_metadata
)
# Update creation stats
if memory_type not in self.creation_stats:
self.creation_stats[memory_type] = 0
self.creation_stats[memory_type] += 1
print(f"Created {memory_type} memory: {memory_id}")
return memory_id
def _calculate_importance(self, content: str, memory_type: str) -> float:
"""Calculate memory importance score"""
base_score = 0.5
# Type-based scoring
type_scores = {
"preference": 0.9,
"fact": 0.8,
"experience": 0.7,
"relationship": 0.8,
"skill": 0.7,
"general": 0.5
}
base_score = type_scores.get(memory_type, 0.5)
# Content-based adjustments
content_lower = content.lower()
# High-importance keywords
high_importance_keywords = [
"critical", "important", "urgent", "key", "essential",
"always", "never", "prefer", "hate", "love"
]
if any(keyword in content_lower for keyword in high_importance_keywords):
base_score += 0.2
# Length-based adjustment
word_count = len(content.split())
if word_count > 50:
base_score += 0.1
elif word_count < 10:
base_score -= 0.1
return min(max(base_score, 0.0), 1.0)
def retrieve_memories_with_lifecycle(self, query: str, user_id: str = None,
limit: int = 10) -> List[Dict[str, Any]]:
"""Retrieve memories considering lifecycle stage"""
# Get memories
memories = self.memory.search(
query,
user_id=user_id,
limit=limit * 2 # Get more for filtering
)
# Filter and rank based on lifecycle
filtered_memories = []
current_time = time.time()
for memory in memories:
metadata = memory.get("metadata", {})
# Check if memory is still active
if metadata.get("lifecycle_stage") == "archived":
continue
# Check age-based retention
created_at = metadata.get("created_at", current_time)
age_days = (current_time - created_at) / (24 * 3600)
memory_type = metadata.get("memory_type", "general")
policy = self.retention_policies.get(memory_type, self.retention_policies["general"])
if age_days > policy["max_age_days"]:
# Mark for archival
self._archive_memory(memory["id"])
continue
# Add lifecycle info
memory["lifecycle_info"] = {
"age_days": age_days,
"retention_policy": policy,
"should_archive": age_days > policy["max_age_days"] * 0.8
}
filtered_memories.append(memory)
# Sort by importance and recency
filtered_memories.sort(
key=lambda x: (
x["metadata"].get("importance_score", 0.5),
-x["lifecycle_info"]["age_days"] # More recent first
),
reverse=True
)
return filtered_memories[:limit]
def _archive_memory(self, memory_id: str):
"""Archive old memory"""
# In practice, this would update the memory metadata
# and potentially move to cheaper storage
print(f"Archiving memory: {memory_id}")
def update_memory_lifecycle(self):
"""Update memory lifecycle stages based on policies"""
current_time = time.time()
updated_count = 0
# Get all memories (in practice, would be batched)
all_memories = self.memory.get_all() # Assuming this method exists
for memory in all_memories:
metadata = memory.get("metadata", {})
created_at = metadata.get("created_at", current_time)
age_days = (current_time - created_at) / (24 * 3600)
memory_type = metadata.get("memory_type", "general")
policy = self.retention_policies.get(memory_type, self.retention_policies["general"])
# Determine lifecycle stage
if age_days > policy["max_age_days"] * 0.9:
new_stage = "archiving_soon"
elif age_days > policy["max_age_days"] * 0.5:
new_stage = "aging"
else:
new_stage = "active"
if metadata.get("lifecycle_stage") != new_stage:
# Update lifecycle stage
metadata["lifecycle_stage"] = new_stage
self.memory.update(memory["id"], metadata=metadata)
updated_count += 1
print(f"Updated lifecycle for {updated_count} memories")
return updated_count
# Usage
lifecycle_manager = MemoryLifecycleManager()
# Create different types of memories
memory_types = ["preference", "fact", "experience", "skill"]
for mem_type in memory_types:
content = f"This is a {mem_type} memory for testing lifecycle management"
memory_id = lifecycle_manager.create_memory(
content,
memory_type=mem_type,
user_id="test_user",
metadata={"test": True}
)
# Retrieve with lifecycle consideration
memories = lifecycle_manager.retrieve_memories_with_lifecycle(
"testing lifecycle",
user_id="test_user",
limit=5
)
print(f"Retrieved {len(memories)} memories with lifecycle info")
for memory in memories:
lifecycle = memory["lifecycle_info"]
print(f" Age: {lifecycle['age_days']:.1f} days, Stage: {memory['metadata']['lifecycle_stage']}")
# Update lifecycle stages
updated = lifecycle_manager.update_memory_lifecycle()
print(f"Updated {updated} memories")class MemoryConsolidationEngine:
"""Consolidate and optimize memory storage"""
def __init__(self):
self.memory = Memory()
self.consolidation_rules = self._define_consolidation_rules()
def _define_consolidation_rules(self):
"""Define rules for memory consolidation"""
return {
"duplicate_detection": {
"similarity_threshold": 0.85,
"action": "merge"
},
"redundancy_elimination": {
"overlap_threshold": 0.9,
"action": "remove_redundant"
},
"importance_boost": {
"access_threshold": 5,
"time_window_days": 7,
"boost_factor": 0.1
},
"decay_application": {
"max_age_days": 90,
"decay_factor": 0.05
}
}
def consolidate_memories(self, user_id: str = None):
"""Consolidate memories for a user"""
# Get all memories
memories = self.memory.get_all(user_id=user_id)
consolidated = {
"duplicates_removed": 0,
"redundancies_eliminated": 0,
"importance_updated": 0,
"decayed_applied": 0
}
# Apply consolidation rules
memories = self._remove_duplicates(memories)
memories = self._eliminate_redundancy(memories)
memories = self._update_importance_scores(memories)
memories = self._apply_decay(memories)
# Update consolidated memories
for memory in memories:
self.memory.update(memory["id"], metadata=memory["metadata"])
return consolidated
def _remove_duplicates(self, memories):
"""Remove duplicate memories"""
unique_memories = []
seen_content = set()
for memory in memories:
content_hash = hash(memory["content"].lower().strip())
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_memories.append(memory)
else:
# Could merge metadata or keep the more recent one
pass
return unique_memories
def _eliminate_redundancy(self, memories):
"""Eliminate redundant information"""
# Simple redundancy check based on content overlap
filtered_memories = []
for memory in memories:
is_redundant = False
for existing in filtered_memories:
# Calculate content similarity (simplified)
similarity = self._calculate_content_similarity(
memory["content"],
existing["content"]
)
if similarity > self.consolidation_rules["redundancy_elimination"]["overlap_threshold"]:
is_redundant = True
break
if not is_redundant:
filtered_memories.append(memory)
return filtered_memories
def _calculate_content_similarity(self, text1: str, text2: str) -> float:
"""Calculate similarity between two texts"""
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union if union > 0 else 0
def _update_importance_scores(self, memories):
"""Update importance scores based on usage patterns"""
for memory in memories:
metadata = memory.get("metadata", {})
access_count = metadata.get("access_count", 0)
last_accessed = metadata.get("last_accessed", 0)
# Boost importance for frequently accessed memories
if access_count > self.consolidation_rules["importance_boost"]["access_threshold"]:
current_importance = metadata.get("importance_score", 0.5)
boost = self.consolidation_rules["importance_boost"]["boost_factor"]
metadata["importance_score"] = min(current_importance + boost, 1.0)
memory["metadata"] = metadata
return memories
def _apply_decay(self, memories):
"""Apply time-based decay to old memories"""
current_time = time.time()
max_age_seconds = self.consolidation_rules["decay_application"]["max_age_days"] * 24 * 3600
decay_factor = self.consolidation_rules["decay_application"]["decay_factor"]
for memory in memories:
metadata = memory.get("metadata", {})
created_at = metadata.get("created_at", current_time)
age_seconds = current_time - created_at
if age_seconds > max_age_seconds:
# Apply decay
current_importance = metadata.get("importance_score", 0.5)
age_factor = age_seconds / max_age_seconds
decayed_importance = current_importance * (1 - decay_factor * age_factor)
metadata["importance_score"] = max(decayed_importance, 0.1) # Minimum importance
memory["metadata"] = metadata
return memories
# Usage
consolidation_engine = MemoryConsolidationEngine()
# Run memory consolidation
consolidation_results = consolidation_engine.consolidate_memories(user_id="test_user")
print("Consolidation Results:")
for operation, count in consolidation_results.items():
print(f" {operation}: {count}")- Multi-Level Storage: Use different storage tiers for different memory types
- Scalable Indexing: Implement efficient indexing for fast retrieval
- Metadata Enrichment: Add comprehensive metadata for better organization
- Version Control: Track memory versions and changes over time
- Batch Operations: Process multiple memories together for efficiency
- Caching Strategies: Cache frequently accessed memories
- Async Processing: Use async operations for memory-intensive tasks
- Resource Management: Monitor and limit memory usage
- Consistency Checks: Validate memory content and relationships
- Duplicate Detection: Implement robust duplicate detection mechanisms
- Importance Scoring: Use ML models to score memory importance
- Lifecycle Management: Implement proper memory lifecycle policies
With memory architecture mastered, you're ready to:
- Chapter 3: Core Memory Operations - Adding, retrieving, and managing memories effectively
- Chapter 4: Advanced Memory Features - Semantic search, memory consolidation, and optimization
- Chapter 5: Integrating with LLMs - Connecting Mem0 with various language models
Ready to work with memory operations? Continue to Chapter 3: Core Memory Operations! 🚀
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for self, memory, memories so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 2: Memory Architecture & Types as an operating subsystem inside Mem0 Tutorial: Building Production-Ready AI Agents with Scalable Long-Term Memory, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around metadata, user_id, content as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 2: Memory Architecture & Types usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
self. - Input normalization: shape incoming data so
memoryreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
memories. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- View Repo
Why it matters: authoritative reference on
View Repo(github.com).
Suggested trace strategy:
- search upstream code for
selfandmemoryto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production