Skip to content

Commit e87df99

Browse files
Aayush KatariaCopilot
authored andcommitted
Add procedural synthesis, strip per-turn procedural, fact_count rename
Brings forward the procedural-synthesis feature on top of the post-PR-#13 baseline and applies hygiene cleanup. - Adds synthesize_procedural() on pipeline + processors (sync/async) and the Durable Functions orchestrator. Procedural memory is now produced via this dedicated reflection flow over a user's full history (system- prompt-style), not as a per-turn extraction. - Strips procedural emission from extract_memories: removes the procedural type from the extraction prompt, drops procedural_count from result dicts and docstrings, and trims procedural from _load_existing_memories. - Renames facts_count -> fact_count across pipeline, processors, FA orchestrator docstring, and all tests. - Cosmetic enum rename: supersede_reason value 'contradiction' -> 'contradict' for verb-tense consistency with 'update'. README and Docs/concepts.md updated accordingly. Unit suite: 531 passed. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent ebdcd18 commit e87df99

30 files changed

Lines changed: 1649 additions & 473 deletions

Docs/concepts.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ Prompts for summarization and fact extraction live in `azure_functions/prompts/`
119119
The `reconcile_memories(user_id, n=50)` pipeline step reads up to N most-recent active facts for a user and asks the LLM to identify two orthogonal outcomes in one pass:
120120

121121
- **Duplicates** — two or more facts that restate the same claim in different words. Resolution: collapse into one merged fact; the originals are soft-deleted with `supersede_reason="duplicate"` and `superseded_by` set to the merged fact's id.
122-
- **Contradictions** — two facts that assert opposing claims about the same subject. Resolution: keep the winner (more recent first, higher confidence as tiebreaker), soft-delete the loser with `supersede_reason="contradiction"` and `superseded_by` set to the winner.
122+
- **Contradictions** — two facts that assert opposing claims about the same subject. Resolution: keep the winner (more recent first, higher confidence as tiebreaker), soft-delete the loser with `supersede_reason="contradict"` and `superseded_by` set to the winner.
123123

124124
### Why one pass
125125

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,14 @@ high_conf_facts = memory.get_memories(user_id="u1", memory_types=["fact"], min_c
170170

171171
### Memory Reconciliation
172172

173-
`reconcile(user_id, n=50)` (on the public client; underlying pipeline method is `ProcessingPipeline.reconcile_memories`) collapses paraphrased duplicates and resolves semantic contradictions in a single LLM pass over the N most-recent active facts. Both outcomes soft-delete the loser with a `supersede_reason` of `"duplicate"` or `"contradiction"`. See [Docs/concepts.md](Docs/concepts.md#memory-reconciliation) for details.
173+
`reconcile(user_id, n=50)` (on the public client; underlying pipeline method is `ProcessingPipeline.reconcile_memories`) collapses paraphrased duplicates and resolves semantic contradictions in a single LLM pass over the N most-recent active facts. Both outcomes soft-delete the loser with a `supersede_reason` of `"duplicate"` or `"contradict"`. See [Docs/concepts.md](Docs/concepts.md#memory-reconciliation) for details.
174174

175175
> **Cost note.** Each reconciliation makes one LLM call covering up to `n` facts (default 50, hard cap 500). With auto-trigger, this fires every `FACT_EXTRACTION_EVERY_N × DEDUP_EVERY_N` turns per user, with `n` taken from `DEDUP_POOL_SIZE`. The previous cosine-cluster pre-filter was removed deliberately — it could not catch semantic contradictions like "vegetarian" vs "ribeye steak" — so the LLM is now invoked whenever there are ≥ 2 active facts. To bound LLM cost more tightly: raise `DEDUP_EVERY_N` (lower frequency — reconcile fires every Nth extraction, so a *higher* N means *less often*), lower `DEDUP_POOL_SIZE` (smaller per-call pool), or override `n` per call when invoking `reconcile()` directly.
176176
177177
| New `MemoryRecord` field | Meaning |
178178
|---|---|
179179
| `content_hash` | SHA-256 of normalized content; enables write-time exact-dedup short-circuit |
180-
| `supersede_reason` | `"duplicate"` or `"contradiction"` (None for live records) |
180+
| `supersede_reason` | `"duplicate"` or `"contradict"` (None for live records) |
181181
| `superseded_at` | ISO timestamp when the supersede happened (None for live records) |
182182
| `superseded_by` | Id of the record that replaced this one (existing field) |
183183

agent_memory_toolkit/aio/cosmos_memory_client.py

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,58 @@ async def remove_tags(self, memory_id: str, user_id: str, thread_id: str, tags:
11791179
# Procedural and episodic memory retrieval
11801180
# ------------------------------------------------------------------
11811181

1182+
async def get_procedural_prompt(self, user_id: str) -> Optional[str]:
1183+
"""Return the active synthesized procedural prompt for a user."""
1184+
await self._require_cosmos()
1185+
1186+
qb = _QueryBuilder()
1187+
qb.add_filter("c.user_id", "@user_id", user_id)
1188+
qb.add_filter("c.thread_id", "@thread_id", "__procedural__")
1189+
qb.add_filter("c.type", "@type", "procedural")
1190+
qb.add_is_null_or_undefined("c.superseded_by")
1191+
1192+
query = f"SELECT TOP 1 c.content, c.version FROM c{qb.build_where()} ORDER BY c.version DESC"
1193+
try:
1194+
items_iter = self._container_client.query_items(query=query, parameters=qb.get_parameters())
1195+
items = [item async for item in items_iter]
1196+
except Exception as exc:
1197+
raise CosmosOperationError(f"async get_procedural_prompt query failed: {exc}") from exc
1198+
1199+
if not items:
1200+
return None
1201+
return items[0].get("content")
1202+
1203+
async def get_procedural_history(self, user_id: str, limit: int = 10) -> list[dict[str, Any]]:
1204+
"""Return synthesized procedural docs for a user, newest first."""
1205+
await self._require_cosmos()
1206+
if limit <= 0:
1207+
return []
1208+
1209+
qb = _QueryBuilder()
1210+
qb.add_filter("c.user_id", "@user_id", user_id)
1211+
qb.add_filter("c.thread_id", "@thread_id", "__procedural__")
1212+
qb.add_filter("c.type", "@type", "procedural")
1213+
1214+
query = f"SELECT * FROM c{qb.build_where()} ORDER BY c.version DESC"
1215+
try:
1216+
items_iter = self._container_client.query_items(query=query, parameters=qb.get_parameters())
1217+
items = [item async for item in items_iter]
1218+
except Exception as exc:
1219+
raise CosmosOperationError(f"async get_procedural_history query failed: {exc}") from exc
1220+
1221+
def _is_active(doc: dict[str, Any]) -> bool:
1222+
return not doc.get("superseded_by")
1223+
1224+
items.sort(
1225+
key=lambda doc: (
1226+
1 if _is_active(doc) else 0,
1227+
int(doc.get("version") or 0),
1228+
int(doc.get("_ts") or 0),
1229+
),
1230+
reverse=True,
1231+
)
1232+
return items[:limit]
1233+
11821234
async def get_procedural_memories(
11831235
self,
11841236
user_id: str,
@@ -1244,14 +1296,7 @@ async def search_episodic_memories(
12441296

12451297
async def build_procedural_context(self, user_id: str) -> str:
12461298
"""Build formatted text for system prompt injection."""
1247-
memories = await self.get_procedural_memories(user_id)
1248-
if not memories:
1249-
return ""
1250-
lines = ["## Learned User Preferences"]
1251-
for m in memories:
1252-
priority = m.get("metadata", {}).get("priority", "should")
1253-
lines.append(f"- {m['content']} [{priority}]")
1254-
return "\n".join(lines)
1299+
return await self.get_procedural_prompt(user_id) or ""
12551300

12561301
async def build_episodic_context(self, user_id: str, query: str, top_k: int = 3) -> str:
12571302
"""Build formatted context of relevant past experiences."""
@@ -1680,7 +1725,7 @@ async def extract_memories(
16801725
thread_id: str,
16811726
recent_k: Optional[int] = None,
16821727
) -> dict[str, int]:
1683-
"""Extract facts, procedural, and episodic memories from a thread.
1728+
"""Extract facts and episodic memories from a thread.
16841729
16851730
Pipeline calls are dispatched to a worker thread via
16861731
:func:`asyncio.to_thread` to avoid blocking the event loop on
@@ -1690,6 +1735,34 @@ async def extract_memories(
16901735
self._require_pipeline()
16911736
return await asyncio.to_thread(self._pipeline.extract_memories, user_id, thread_id, recent_k)
16921737

1738+
async def synthesize_procedural(
1739+
self,
1740+
user_id: str,
1741+
*,
1742+
force: bool = False,
1743+
) -> dict[str, Any]:
1744+
"""Trigger synthesized procedural prompt generation for a user.
1745+
1746+
For DurableFunctionProcessor this returns a deferred status; synthesis
1747+
is auto-driven inside the Function App. ``force=True`` is only honored
1748+
by AsyncInProcessProcessor.
1749+
"""
1750+
await self._require_cosmos()
1751+
processor = self._get_processor()
1752+
if not isinstance(processor, AsyncInProcessProcessor):
1753+
logger.debug("synthesize_procedural deferred to Function App auto-trigger user_id=%s", user_id)
1754+
return {
1755+
"status": "deferred",
1756+
"reason": "durable_auto_trigger",
1757+
"message": (
1758+
"Procedural synthesis runs reactively in the Function App after each "
1759+
"ExtractMemoriesOrchestrator pass. Use get_procedural_prompt() to read "
1760+
"the synthesized prompt once it has been generated."
1761+
),
1762+
}
1763+
1764+
return await processor.synthesize_procedural(user_id=user_id, force=force)
1765+
16931766
async def generate_thread_summary(
16941767
self,
16951768
user_id: str,

agent_memory_toolkit/aio/processors/durable.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,19 @@ async def generate_user_summary(
9393
)
9494
return UserSummaryResult(summary=None)
9595

96+
async def synthesize_procedural(
97+
self,
98+
*,
99+
user_id: str,
100+
force: bool = False,
101+
) -> dict[str, Any]:
102+
logger.debug(
103+
"AsyncDurableFunctionProcessor.synthesize_procedural deferred user_id=%s force=%s",
104+
user_id,
105+
force,
106+
)
107+
return {"status": "deferred", "reason": "durable_auto_trigger"}
108+
96109
async def close(self) -> None:
97110
logger.debug("AsyncDurableFunctionProcessor.close no-op")
98111
return None

agent_memory_toolkit/aio/processors/inprocess.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,15 @@ async def generate_user_summary(
146146
summary = await asyncio.to_thread(self._pipeline.generate_user_summary, user_id, thread_ids)
147147
return UserSummaryResult(summary=summary if isinstance(summary, dict) else None)
148148

149+
async def synthesize_procedural(
150+
self,
151+
*,
152+
user_id: str,
153+
force: bool = False,
154+
) -> dict[str, Any]:
155+
"""Run procedural prompt synthesis through the in-process pipeline."""
156+
return await asyncio.to_thread(self._pipeline.synthesize_procedural, user_id, force=force)
157+
149158
async def close(self) -> None:
150159
return None
151160

agent_memory_toolkit/cosmos_memory_client.py

Lines changed: 92 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,6 +1429,68 @@ def remove_tags(self, memory_id: str, user_id: str, thread_id: str, tags: list[s
14291429
# Procedural and episodic memory retrieval
14301430
# ------------------------------------------------------------------
14311431

1432+
def get_procedural_prompt(self, user_id: str) -> Optional[str]:
1433+
"""Return the active synthesized procedural prompt for a user."""
1434+
self._require_cosmos()
1435+
1436+
qb = _QueryBuilder()
1437+
qb.add_filter("c.user_id", "@user_id", user_id)
1438+
qb.add_filter("c.thread_id", "@thread_id", "__procedural__")
1439+
qb.add_filter("c.type", "@type", "procedural")
1440+
qb.add_is_null_or_undefined("c.superseded_by")
1441+
1442+
query = f"SELECT TOP 1 c.content, c.version FROM c{qb.build_where()} ORDER BY c.version DESC"
1443+
try:
1444+
items = list(
1445+
self._container_client.query_items(
1446+
query=query,
1447+
parameters=qb.get_parameters(),
1448+
enable_cross_partition_query=True,
1449+
)
1450+
)
1451+
except Exception as exc:
1452+
raise CosmosOperationError(f"get_procedural_prompt query failed: {exc}") from exc
1453+
1454+
if not items:
1455+
return None
1456+
return items[0].get("content")
1457+
1458+
def get_procedural_history(self, user_id: str, limit: int = 10) -> list[dict[str, Any]]:
1459+
"""Return synthesized procedural docs for a user, newest first."""
1460+
self._require_cosmos()
1461+
if limit <= 0:
1462+
return []
1463+
1464+
qb = _QueryBuilder()
1465+
qb.add_filter("c.user_id", "@user_id", user_id)
1466+
qb.add_filter("c.thread_id", "@thread_id", "__procedural__")
1467+
qb.add_filter("c.type", "@type", "procedural")
1468+
1469+
query = f"SELECT * FROM c{qb.build_where()} ORDER BY c.version DESC"
1470+
try:
1471+
items = list(
1472+
self._container_client.query_items(
1473+
query=query,
1474+
parameters=qb.get_parameters(),
1475+
enable_cross_partition_query=True,
1476+
)
1477+
)
1478+
except Exception as exc:
1479+
raise CosmosOperationError(f"get_procedural_history query failed: {exc}") from exc
1480+
1481+
def _is_active(doc: dict[str, Any]) -> bool:
1482+
return not doc.get("superseded_by")
1483+
1484+
items.sort(
1485+
key=lambda doc: (
1486+
1 if _is_active(doc) else 0,
1487+
int(doc.get("version") or 0),
1488+
int(doc.get("_ts") or 0),
1489+
),
1490+
reverse=True,
1491+
)
1492+
return items[:limit]
1493+
14321494
def get_procedural_memories(
14331495
self,
14341496
user_id: str,
@@ -1496,14 +1558,7 @@ def search_episodic_memories(
14961558

14971559
def build_procedural_context(self, user_id: str) -> str:
14981560
"""Build formatted text for system prompt injection."""
1499-
memories = self.get_procedural_memories(user_id)
1500-
if not memories:
1501-
return ""
1502-
lines = ["## Learned User Preferences"]
1503-
for m in memories:
1504-
priority = m.get("metadata", {}).get("priority", "should")
1505-
lines.append(f"- {m['content']} [{priority}]")
1506-
return "\n".join(lines)
1561+
return self.get_procedural_prompt(user_id) or ""
15071562

15081563
def build_episodic_context(self, user_id: str, query: str, top_k: int = 3) -> str:
15091564
"""Build formatted context of relevant past experiences."""
@@ -1527,10 +1582,38 @@ def extract_memories(
15271582
thread_id: str,
15281583
recent_k: Optional[int] = None,
15291584
) -> dict[str, int]:
1530-
"""Extract facts, procedural, and episodic memories from a thread."""
1585+
"""Extract facts and episodic memories from a thread."""
15311586
self._require_cosmos()
15321587
return self._pipeline.extract_memories(user_id, thread_id, recent_k)
15331588

1589+
def synthesize_procedural(
1590+
self,
1591+
user_id: str,
1592+
*,
1593+
force: bool = False,
1594+
) -> dict[str, Any]:
1595+
"""Trigger synthesized procedural prompt generation for a user.
1596+
1597+
For DurableFunctionProcessor this returns a deferred status; synthesis
1598+
is auto-driven inside the Function App. ``force=True`` is only honored
1599+
by InProcessProcessor.
1600+
"""
1601+
self._require_cosmos()
1602+
processor = self._get_processor()
1603+
if not isinstance(processor, InProcessProcessor):
1604+
logger.debug("synthesize_procedural deferred to Function App auto-trigger user_id=%s", user_id)
1605+
return {
1606+
"status": "deferred",
1607+
"reason": "durable_auto_trigger",
1608+
"message": (
1609+
"Procedural synthesis runs reactively in the Function App after each "
1610+
"ExtractMemoriesOrchestrator pass. Use get_procedural_prompt() to read "
1611+
"the synthesized prompt once it has been generated."
1612+
),
1613+
}
1614+
1615+
return processor.synthesize_procedural(user_id=user_id, force=force)
1616+
15341617
def generate_thread_summary(
15351618
self,
15361619
user_id: str,

agent_memory_toolkit/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class MemoryRecord(BaseModel):
9595
confidence: Optional[float] = None
9696
content_hash: Optional[str] = None
9797
superseded_by: Optional[str] = None
98-
supersede_reason: Optional[Literal["duplicate", "contradiction", "update"]] = None
98+
supersede_reason: Optional[Literal["duplicate", "contradict", "update"]] = None
9999
superseded_at: Optional[str] = None
100100
supersedes_ids: list[str] = Field(default_factory=list)
101101
source_memory_ids: list[str] = Field(default_factory=list)

0 commit comments

Comments
 (0)