Skip to content

Commit 0bf30ad

Browse files
authored
fix(memmachein): preserve conversation message order in add_items (#1856)
## Summary - Fixes a race condition in \`MemMachineEditor.add_items\` that caused non-deterministic ordering of conversation messages and flaky CI failures on Python 3.13 ## Root Cause \`add_items\` wrapped each conversation message in \`asyncio.to_thread(add_memory)\` and dispatched all tasks concurrently via \`asyncio.gather(*tasks)\`. Thread pool tasks complete in nondeterministic order, so the API spy recorded calls by completion order rather than insertion order — causing \`test_conversation_messages_preserved_in_order\` to flip assertions under CI load. ## Fix Refactored \`add_items\` to introduce an inner \`add_item\` coroutine per \`MemoryItem\`. Within that coroutine, conversation messages are \`await\`ed sequentially via \`asyncio.to_thread\`, preserving chronological order. Multiple \`MemoryItem\`s are still dispatched concurrently via \`asyncio.gather\` — so there is no performance regression on batch inserts. \`\`\`python async def add_item(memory_item: MemoryItem) -> None: ... for msg in conversation: await asyncio.to_thread(add_memory) # sequential within one item if items: await asyncio.gather(*(add_item(item) for item in items)) # concurrent across items \`\`\` The index-based assertions in the test are preserved as-is — they are now correct because the implementation guarantees order. ## Test plan - [x] \`test_conversation_messages_preserved_in_order\` passes 50 consecutive runs on Python 3.11, 3.12, and 3.13 locally - [x] Full test suite: 37 passed, 6 skipped (integration) on all three Python versions Closes #1855 ## Summary by CodeRabbit * **Refactor** * Updated memory item upload processing to preserve the sequential order of conversation messages while maintaining concurrent processing of multiple memory items. Authors: - Federico Kamelhar (https://github.com/fede-kamel) Approvers: - Will Killian (https://github.com/willkill07) URL: #1856
1 parent 2f82a98 commit 0bf30ad

1 file changed

Lines changed: 13 additions & 11 deletions

File tree

packages/nvidia_nat_memmachine/src/nat/plugins/memmachine/memmachine_editor.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,14 @@ async def add_items(self, items: list[MemoryItem]) -> None:
111111
Each MemoryItem is translated and uploaded through the MemMachine API.
112112
113113
All memories are added to both episodic and semantic memory types.
114+
115+
Conversation messages within a single MemoryItem are added sequentially to
116+
preserve chronological order. Separate MemoryItems (and non-conversation
117+
memories) are still dispatched concurrently via asyncio.gather.
114118
"""
115-
# Run synchronous operations in thread pool to make them async
116-
tasks = []
117119

118-
for memory_item in items:
120+
async def add_item(memory_item: MemoryItem) -> None:
121+
"""Upload a single MemoryItem, adding conversation messages sequentially."""
119122
# Make a copy of metadata to avoid modifying the original
120123
item_meta = memory_item.metadata.copy() if memory_item.metadata else {}
121124
conversation = memory_item.conversation
@@ -139,7 +142,9 @@ async def add_items(self, items: list[MemoryItem]) -> None:
139142
# If we have a conversation, add each message separately
140143
# Otherwise, use memory_text or skip if no content
141144
if conversation:
142-
# Add each message in the conversation with its role
145+
# Add each message sequentially to preserve conversation order.
146+
# asyncio.to_thread tasks dispatched via gather() complete in
147+
# nondeterministic order, so we await each one before the next.
143148
for msg in conversation:
144149
msg_role = msg.get('role', 'user')
145150
msg_content = msg.get('content', '')
@@ -154,7 +159,6 @@ async def add_items(self, items: list[MemoryItem]) -> None:
154159
# Convert list to comma-separated string
155160
metadata["tags"] = ", ".join(tags) if isinstance(tags, list) else str(tags)
156161

157-
# Capture variables in closure to avoid late binding issues
158162
def add_memory(
159163
content=msg_content,
160164
role=msg_role,
@@ -173,8 +177,7 @@ def add_memory(
173177
episode_type=None # Use default (MESSAGE)
174178
)
175179

176-
task = asyncio.to_thread(add_memory)
177-
tasks.append(task)
180+
await asyncio.to_thread(add_memory)
178181
elif memory_text:
179182
# Add as a single memory item (direct memory without conversation)
180183
# Add tags to metadata if present
@@ -195,11 +198,10 @@ def add_memory(content=memory_text, mem=memory, meta=metadata, mem_types=memory_
195198
episode_type=None # Use default (MESSAGE)
196199
)
197200

198-
task = asyncio.to_thread(add_memory)
199-
tasks.append(task)
201+
await asyncio.to_thread(add_memory)
200202

201-
if tasks:
202-
await asyncio.gather(*tasks)
203+
if items:
204+
await asyncio.gather(*(add_item(item) for item in items))
203205

204206
async def search(self, query: str, top_k: int = 5, **kwargs) -> list[MemoryItem]:
205207
"""

0 commit comments

Comments
 (0)