Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions src/praisonai-agents/praisonaiagents/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
from .chat_mixin import ChatMixin
from .execution_mixin import ExecutionMixin
from .memory_mixin import MemoryMixin
from .async_memory_mixin import AsyncMemoryMixin
from .tool_execution import ToolExecutionMixin
from .chat_handler import ChatHandlerMixin
from .session_manager import SessionManagerMixin
from .async_safety import AsyncSafeState

# Module-level logger for thread safety errors and debugging
logger = get_logger(__name__)
Expand Down Expand Up @@ -190,7 +192,7 @@ def _is_file_path(value: str) -> bool:
# Import structured error from central errors module
from ..errors import BudgetExceededError

class Agent(ToolExecutionMixin, ChatHandlerMixin, SessionManagerMixin, ChatMixin, ExecutionMixin, MemoryMixin):
class Agent(ToolExecutionMixin, ChatHandlerMixin, SessionManagerMixin, ChatMixin, ExecutionMixin, MemoryMixin, AsyncMemoryMixin):
# Class-level counter for generating unique display names for nameless agents
_agent_counter = 0
_agent_counter_lock = threading.Lock()
Expand Down Expand Up @@ -1569,12 +1571,11 @@ def __init__(
self.embedder_config = embedder_config
self.knowledge = knowledge
self.use_system_prompt = use_system_prompt
# Thread-safe chat_history with eager lock initialization
self.chat_history = []
self.__history_lock = threading.Lock() # Eager initialization to prevent race conditions
# Async-safe chat_history with dual-lock protection
self.__chat_history_state = AsyncSafeState([])

# Thread-safe snapshot/redo stack lock - always available even when autonomy is disabled
self.__snapshot_lock = threading.Lock()
# Async-safe snapshot/redo stack lock - always available even when autonomy is disabled
self.__snapshot_state = AsyncSafeState(None)
self.markdown = markdown
self.stream = stream
self.metrics = metrics
Expand Down Expand Up @@ -1813,10 +1814,20 @@ def _telemetry(self):
self.__telemetry_initialized = True
return self.__telemetry

@property
def chat_history(self):
"""Get chat history (read-only access, use context managers for modifications)."""
return self.__chat_history_state.get()
Comment on lines +1817 to +1820
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The chat_history property is defined as read-only (getter only). However, the Agent class inherits from MemoryMixin, which contains several methods (e.g., clear_history, prune_history, delete_history_matching) that attempt to assign a new list to self.chat_history. This will result in an AttributeError: can't set attribute at runtime. A setter should be added to update the underlying AsyncSafeState value to maintain compatibility with the mixin's functionality.

Suggested change
@property
def chat_history(self):
"""Get chat history (read-only access, use context managers for modifications)."""
return self.__chat_history_state.get()
@property
def chat_history(self):
"""Get chat history (read-only access, use context managers for modifications)."""
return self.__chat_history_state.get()
@chat_history.setter
def chat_history(self, value):
"""Set chat history (updates the underlying async-safe state)."""
self.__chat_history_state.value = value

Comment thread
coderabbitai[bot] marked this conversation as resolved.

Comment on lines +1817 to +1821
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chat_history is now a read-only @property with no setter, but there are multiple existing call sites that assign to self.chat_history (e.g. rollback/truncation in chat_mixin.py and memory_mixin.py). This will raise AttributeError at runtime. Consider either restoring chat_history as a mutable attribute, or adding a setter (and/or explicit mutation APIs) that update the underlying async-safe state under the appropriate lock.

Copilot uses AI. Check for mistakes.
@chat_history.setter
def chat_history(self, value):
"""Set chat history (updates the underlying async-safe state)."""
self.__chat_history_state.value = value
Comment on lines +1822 to +1825
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 chat_history setter bypasses the async-safe lock

The setter writes directly to self.__chat_history_state.value without acquiring any lock, undermining the entire AsyncSafeState abstraction. Any concurrent thread that is inside with self._history_lock: (e.g. _add_to_chat_history) will hold a reference to the old list object. If another thread simultaneously calls agent.chat_history = [], the setter silently replaces the underlying list while the first thread appends to the now-orphaned old list — data loss with no error.

The setter should acquire the thread lock before replacing the value:

Suggested change
@chat_history.setter
def chat_history(self, value):
"""Set chat history (updates the underlying async-safe state)."""
self.__chat_history_state.value = value
@chat_history.setter
def chat_history(self, value):
"""Set chat history (updates the underlying async-safe state)."""
with self.__chat_history_state._lock.sync():
self.__chat_history_state.value = value

Alternatively, expose a dedicated set(value) method on AsyncSafeState that acquires the lock internally, and call that from the setter.


@property
def _history_lock(self):
"""Thread-safe chat history lock."""
return self.__history_lock
"""Get appropriate lock for chat history based on execution context."""
return self.__chat_history_state
Comment on lines 1827 to +1830
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_history_lock is now returning an AsyncSafeState instance, but existing code uses with self._history_lock: expecting a lock/context-manager (e.g. MemoryMixin._add_to_chat_history, _truncate_chat_history, etc.). AsyncSafeState doesn’t implement __enter__/__exit__, so this will break those code paths. Either return an object that supports with directly (e.g. a dedicated dual-lock type), or implement sync/async context manager methods on the returned object to preserve the current locking API.

Copilot uses AI. Check for mistakes.

@property
def _cache_lock(self):
Expand All @@ -1825,8 +1836,8 @@ def _cache_lock(self):

@property
def _snapshot_lock(self):
"""Thread-safe snapshot/redo stack lock."""
return self.__snapshot_lock
"""Async-safe snapshot/redo stack lock."""
return self.__snapshot_state
Comment on lines 1837 to +1840
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_snapshot_lock is now an AsyncSafeState, but internal methods still use with self._snapshot_lock: (e.g. undo/redo/diff code). Since AsyncSafeState is not a lock context manager, those with blocks will fail. Consider keeping _snapshot_lock as an actual lock abstraction (sync+async), separate from any stored state container.

Copilot uses AI. Check for mistakes.


@property
Expand Down
219 changes: 219 additions & 0 deletions src/praisonai-agents/praisonaiagents/agent/async_memory_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
"""
Async memory operations mixin for the Agent class.

Provides async-safe memory operations that can be used in async contexts
without blocking the event loop. This extends the base MemoryMixin with
async capabilities following the AsyncMemoryProtocol.
"""

import asyncio
import logging
from typing import List, Dict, Any, Optional, Union
from praisonaiagents._logging import get_logger
from ..memory.protocols import AsyncMemoryProtocol

logger = get_logger(__name__)


class AsyncMemoryMixin:
"""
Mixin providing async-safe memory operations for agents.

This mixin adds async memory methods that can be used in async contexts
like async agent execution methods (achat, arun, astart) without
blocking the event loop.
"""

async def astore_memory(
self,
content: str,
memory_type: str = "short_term",
metadata: Optional[Dict[str, Any]] = None,
**kwargs
) -> Optional[str]:
"""
Async store content in agent memory.

Args:
content: Content to store
memory_type: "short_term" or "long_term"
metadata: Optional metadata
**kwargs: Additional parameters

Returns:
Memory ID if successful, None otherwise
"""
if not hasattr(self, '_memory_instance') or self._memory_instance is None:
logger.debug("No memory configured for async storage")
return None

# Check if memory adapter supports async operations
if isinstance(self._memory_instance, AsyncMemoryProtocol):
try:
if memory_type == "long_term":
return await self._memory_instance.astore_long_term(content, metadata, **kwargs)
else:
return await self._memory_instance.astore_short_term(content, metadata, **kwargs)
except Exception as e:
logger.error(f"Error in async memory storage: {e}")
return None
else:
# Fallback: run sync memory operations in thread pool
return await self._run_memory_in_thread(
"store", content, memory_type, metadata, **kwargs
)

async def asearch_memory(
self,
query: str,
memory_type: str = "short_term",
limit: int = 5,
**kwargs
) -> List[Dict[str, Any]]:
"""
Async search agent memory.

Args:
query: Search query
memory_type: "short_term" or "long_term"
limit: Maximum results
**kwargs: Additional parameters

Returns:
List of memory entries
"""
if not hasattr(self, '_memory_instance') or self._memory_instance is None:
logger.debug("No memory configured for async search")
return []

# Check if memory adapter supports async operations
if isinstance(self._memory_instance, AsyncMemoryProtocol):
try:
if memory_type == "long_term":
return await self._memory_instance.asearch_long_term(query, limit, **kwargs)
else:
return await self._memory_instance.asearch_short_term(query, limit, **kwargs)
except Exception as e:
logger.error(f"Error in async memory search: {e}")
return []
else:
# Fallback: run sync memory operations in thread pool
return await self._run_memory_in_thread(
"search", query, memory_type, limit=limit, **kwargs
)

async def _run_memory_in_thread(
self,
operation: str,
content: str,
memory_type: str,
metadata: Optional[Dict[str, Any]] = None,
limit: int = 5,
**kwargs
) -> Union[str, List[Dict[str, Any]], None]:
"""
Run synchronous memory operations in a thread pool to avoid blocking.

Args:
operation: "store" or "search"
content: Content to store or query to search
memory_type: "short_term" or "long_term"
metadata: Optional metadata for store operations
limit: Limit for search operations
**kwargs: Additional parameters

Returns:
Result of the memory operation
"""
loop = asyncio.get_running_loop()

try:
if operation == "store":
if memory_type == "long_term" and hasattr(self._memory_instance, 'store_long_term'):
return await loop.run_in_executor(
None,
lambda: self._memory_instance.store_long_term(content, metadata, **kwargs)
)
elif hasattr(self._memory_instance, 'store_short_term'):
return await loop.run_in_executor(
None,
lambda: self._memory_instance.store_short_term(content, metadata, **kwargs)
)
elif operation == "search":
if memory_type == "long_term" and hasattr(self._memory_instance, 'search_long_term'):
return await loop.run_in_executor(
None,
lambda: self._memory_instance.search_long_term(content, limit, **kwargs)
)
elif hasattr(self._memory_instance, 'search_short_term'):
return await loop.run_in_executor(
None,
lambda: self._memory_instance.search_short_term(content, limit, **kwargs)
)

except Exception as e:
logger.error(f"Error in threaded memory operation {operation}: {e}")

return [] if operation == "search" else None

async def _async_build_memory_context(
self,
query: str,
max_memories: int = 10,
memory_types: List[str] = None
) -> str:
"""
Async version of _build_memory_context that doesn't block the event loop.

Args:
query: Query to search for relevant memories
max_memories: Maximum number of memories to include
memory_types: Types of memory to search ("short_term", "long_term")

Returns:
Formatted memory context string
"""
if memory_types is None:
memory_types = ["short_term", "long_term"]

all_memories = []

# Search each memory type asynchronously
for memory_type in memory_types:
memories = await self.asearch_memory(
query,
memory_type=memory_type,
limit=max_memories // len(memory_types)
)
all_memories.extend(memories)
Comment on lines +181 to +188
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit=max_memories // len(memory_types) can evaluate to 0 (e.g., max_memories=1 with two memory types), which will likely yield no results even when memories exist. Consider ensuring a minimum per-type limit (e.g. max(1, ...)) and then trimming the combined list to max_memories.

Copilot uses AI. Check for mistakes.

# Sort by relevance/recency and limit total
all_memories = all_memories[:max_memories]

if not all_memories:
return ""

# Build context string
context_lines = ["Relevant memories:"]
for i, memory in enumerate(all_memories, 1):
text = memory.get('text', str(memory))
context_lines.append(f"{i}. {text}")

return "\n".join(context_lines)

def _ensure_async_memory_compatibility(self):
"""
Ensure memory adapter is compatible with async operations.

Logs warnings if memory adapter doesn't support async operations
and will fall back to thread pool execution.
"""
if hasattr(self, '_memory_instance') and self._memory_instance is not None:
if not isinstance(self._memory_instance, AsyncMemoryProtocol):
logger.info(
f"Memory adapter {type(self._memory_instance).__name__} doesn't implement "
f"AsyncMemoryProtocol, falling back to thread pool execution"
)
return False
return True
return False
Loading
Loading