forked from AzureCosmosDB/AgentMemoryToolkit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadvanced_memory_lifecycle.py
More file actions
117 lines (90 loc) · 4.63 KB
/
Copy pathadvanced_memory_lifecycle.py
File metadata and controls
117 lines (90 loc) · 4.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Advanced memory lifecycle — create → process → archive (delete raw turns).
Demonstrates the typical long-term-memory flow:
1. Add raw conversation turns
2. Extract structured memories (facts / procedural / episodic)
3. Generate a thread summary
4. Delete raw turns — keeping only the compact derived memories
Uses the in-process ProcessingPipeline (same code as the Azure Function
change-feed trigger). No Function deployment required.
Required env vars (.env supported):
COSMOS_DB_ENDPOINT, AI_FOUNDRY_ENDPOINT, AI_FOUNDRY_EMBEDDING_DEPLOYMENT_NAME, AI_FOUNDRY_CHAT_DEPLOYMENT_NAME
COSMOS_DB_KEY (optional fallback)
"""
from __future__ import annotations
import os
import sys
import uuid
from dotenv import load_dotenv
from agent_memory_toolkit import CosmosMemoryClient
load_dotenv()
def _header(step: int, title: str) -> None:
print(f"\n{'=' * 60}\n STEP {step}: {title}\n{'=' * 60}")
def _print_memories(mem: CosmosMemoryClient, user_id: str, thread_id: str) -> None:
items = mem.get_memories(user_id=user_id, thread_id=thread_id)
by_type: dict[str, int] = {}
for m in items:
by_type[m.get("type", "?")] = by_type.get(m.get("type", "?"), 0) + 1
print(f" total memories: {len(items)} :: {by_type}")
for m in items:
tags = ", ".join(m.get("tags") or [])
print(f" • [{m.get('type', '?'):11}] {m['content'][:80]} [tags={tags}]")
def main() -> None:
required = ["COSMOS_DB_ENDPOINT", "AI_FOUNDRY_ENDPOINT"]
missing = [v for v in required if not os.environ.get(v)]
if missing:
print(f"ERROR: missing env vars: {', '.join(missing)}")
sys.exit(1)
mem = CosmosMemoryClient(
cosmos_endpoint=os.environ["COSMOS_DB_ENDPOINT"],
cosmos_key=os.environ.get("COSMOS_DB_KEY") or None,
cosmos_database=os.environ.get("COSMOS_DB_DATABASE", "ai_memory"),
cosmos_container=os.environ.get("COSMOS_DB_CONTAINER", "memories"),
ai_foundry_endpoint=os.environ["AI_FOUNDRY_ENDPOINT"],
ai_foundry_api_key=os.environ.get("AI_FOUNDRY_API_KEY") or None,
embedding_deployment_name=os.environ.get("AI_FOUNDRY_EMBEDDING_DEPLOYMENT_NAME", "text-embedding-3-large"),
chat_deployment_name=os.environ.get("AI_FOUNDRY_CHAT_DEPLOYMENT_NAME", "gpt-4o-mini"),
)
print("✅ Connected to Cosmos DB")
user_id = f"demo-user-{uuid.uuid4().hex[:6]}"
thread_id = str(uuid.uuid4())
_header(1, "Add raw conversation turns")
for role, content in [
("user", "I'm building a recommendation engine for an online bookstore."),
("agent", "Great! Are you using collaborative filtering or content-based?"),
("user", "Hybrid. I want to use embeddings on book descriptions and reviews."),
("agent", "Cosmos DB for NoSQL with the vector index works well for that."),
("user", "I prefer Python and want to use FastAPI for the API layer."),
("agent", "FastAPI is a great choice — fast, type-safe, async-native."),
("user", "Last quarter I tried doing this with Pinecone and the costs blew up."),
]:
mem.add_cosmos(user_id=user_id, role=role, content=content, thread_id=thread_id)
_print_memories(mem, user_id, thread_id)
_header(2, "Extract structured memories (facts / procedural / episodic)")
stats = mem.extract_memories(user_id=user_id, thread_id=thread_id)
print(f" extraction stats: {stats}")
_header(3, "Generate thread summary")
summary_doc = mem.generate_thread_summary(user_id=user_id, thread_id=thread_id)
print(f" summary id: {summary_doc['id']}")
print(f" summary : {summary_doc['content'][:200]}…")
_header(4, "Inventory before archiving raw turns")
_print_memories(mem, user_id, thread_id)
_header(5, "Archive: delete raw turns, keep derived memories")
deleted = 0
for m in mem.get_memories(user_id=user_id, thread_id=thread_id, memory_types=["turn"]):
mem.delete_cosmos(memory_id=m["id"], thread_id=thread_id, user_id=user_id)
deleted += 1
print(f" deleted {deleted} raw turn(s)")
_header(6, "Final inventory — only compact long-term memory remains")
_print_memories(mem, user_id, thread_id)
_header(7, "Search still works against the archived knowledge")
for q in [
"what is the user building",
"what programming language does the user prefer",
"what previous experience does the user have with vector databases",
]:
print(f'\n query: "{q}"')
for r in mem.search_cosmos(search_terms=q, user_id=user_id, top_k=3):
print(f" → [{r.get('type')}] {r['content'][:80]}")
print("\nDone.")
if __name__ == "__main__":
main()