-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: address top 3 architectural gaps - async safety, guardrails, and memory pipeline #1354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0c663e6
70116ad
13811df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,9 +15,11 @@ | |||||||||||||||||||
| from .chat_mixin import ChatMixin | ||||||||||||||||||||
| from .execution_mixin import ExecutionMixin | ||||||||||||||||||||
| from .memory_mixin import MemoryMixin | ||||||||||||||||||||
| from .async_memory_mixin import AsyncMemoryMixin | ||||||||||||||||||||
| from .tool_execution import ToolExecutionMixin | ||||||||||||||||||||
| from .chat_handler import ChatHandlerMixin | ||||||||||||||||||||
| from .session_manager import SessionManagerMixin | ||||||||||||||||||||
| from .async_safety import AsyncSafeState | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Module-level logger for thread safety errors and debugging | ||||||||||||||||||||
| logger = get_logger(__name__) | ||||||||||||||||||||
|
|
@@ -190,7 +192,7 @@ def _is_file_path(value: str) -> bool: | |||||||||||||||||||
| # Import structured error from central errors module | ||||||||||||||||||||
| from ..errors import BudgetExceededError | ||||||||||||||||||||
|
|
||||||||||||||||||||
| class Agent(ToolExecutionMixin, ChatHandlerMixin, SessionManagerMixin, ChatMixin, ExecutionMixin, MemoryMixin): | ||||||||||||||||||||
| class Agent(ToolExecutionMixin, ChatHandlerMixin, SessionManagerMixin, ChatMixin, ExecutionMixin, MemoryMixin, AsyncMemoryMixin): | ||||||||||||||||||||
| # Class-level counter for generating unique display names for nameless agents | ||||||||||||||||||||
| _agent_counter = 0 | ||||||||||||||||||||
| _agent_counter_lock = threading.Lock() | ||||||||||||||||||||
|
|
@@ -1569,12 +1571,11 @@ def __init__( | |||||||||||||||||||
| self.embedder_config = embedder_config | ||||||||||||||||||||
| self.knowledge = knowledge | ||||||||||||||||||||
| self.use_system_prompt = use_system_prompt | ||||||||||||||||||||
| # Thread-safe chat_history with eager lock initialization | ||||||||||||||||||||
| self.chat_history = [] | ||||||||||||||||||||
| self.__history_lock = threading.Lock() # Eager initialization to prevent race conditions | ||||||||||||||||||||
| # Async-safe chat_history with dual-lock protection | ||||||||||||||||||||
| self.__chat_history_state = AsyncSafeState([]) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Thread-safe snapshot/redo stack lock - always available even when autonomy is disabled | ||||||||||||||||||||
| self.__snapshot_lock = threading.Lock() | ||||||||||||||||||||
| # Async-safe snapshot/redo stack lock - always available even when autonomy is disabled | ||||||||||||||||||||
| self.__snapshot_state = AsyncSafeState(None) | ||||||||||||||||||||
| self.markdown = markdown | ||||||||||||||||||||
| self.stream = stream | ||||||||||||||||||||
| self.metrics = metrics | ||||||||||||||||||||
|
|
@@ -1813,10 +1814,20 @@ def _telemetry(self): | |||||||||||||||||||
| self.__telemetry_initialized = True | ||||||||||||||||||||
| return self.__telemetry | ||||||||||||||||||||
|
|
||||||||||||||||||||
| @property | ||||||||||||||||||||
| def chat_history(self): | ||||||||||||||||||||
| """Get chat history (read-only access, use context managers for modifications).""" | ||||||||||||||||||||
| return self.__chat_history_state.get() | ||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
|
||||||||||||||||||||
|
|
||||||||||||||||||||
|
Comment on lines
+1817
to
+1821
|
||||||||||||||||||||
| @chat_history.setter | ||||||||||||||||||||
| def chat_history(self, value): | ||||||||||||||||||||
| """Set chat history (updates the underlying async-safe state).""" | ||||||||||||||||||||
| self.__chat_history_state.value = value | ||||||||||||||||||||
|
Comment on lines
+1822
to
+1825
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The setter writes directly to The setter should acquire the thread lock before replacing the value:
Suggested change
Alternatively, expose a dedicated |
||||||||||||||||||||
|
|
||||||||||||||||||||
| @property | ||||||||||||||||||||
| def _history_lock(self): | ||||||||||||||||||||
| """Thread-safe chat history lock.""" | ||||||||||||||||||||
| return self.__history_lock | ||||||||||||||||||||
| """Get appropriate lock for chat history based on execution context.""" | ||||||||||||||||||||
| return self.__chat_history_state | ||||||||||||||||||||
|
Comment on lines
1827
to
+1830
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| @property | ||||||||||||||||||||
| def _cache_lock(self): | ||||||||||||||||||||
|
|
@@ -1825,8 +1836,8 @@ def _cache_lock(self): | |||||||||||||||||||
|
|
||||||||||||||||||||
| @property | ||||||||||||||||||||
| def _snapshot_lock(self): | ||||||||||||||||||||
| """Thread-safe snapshot/redo stack lock.""" | ||||||||||||||||||||
| return self.__snapshot_lock | ||||||||||||||||||||
| """Async-safe snapshot/redo stack lock.""" | ||||||||||||||||||||
| return self.__snapshot_state | ||||||||||||||||||||
|
Comment on lines
1837
to
+1840
|
||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| @property | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,219 @@ | ||
| """ | ||
| Async memory operations mixin for the Agent class. | ||
|
|
||
| Provides async-safe memory operations that can be used in async contexts | ||
| without blocking the event loop. This extends the base MemoryMixin with | ||
| async capabilities following the AsyncMemoryProtocol. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from typing import List, Dict, Any, Optional, Union | ||
| from praisonaiagents._logging import get_logger | ||
| from ..memory.protocols import AsyncMemoryProtocol | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
|
|
||
| class AsyncMemoryMixin: | ||
| """ | ||
| Mixin providing async-safe memory operations for agents. | ||
|
|
||
| This mixin adds async memory methods that can be used in async contexts | ||
| like async agent execution methods (achat, arun, astart) without | ||
| blocking the event loop. | ||
| """ | ||
|
|
||
| async def astore_memory( | ||
| self, | ||
| content: str, | ||
| memory_type: str = "short_term", | ||
| metadata: Optional[Dict[str, Any]] = None, | ||
| **kwargs | ||
| ) -> Optional[str]: | ||
| """ | ||
| Async store content in agent memory. | ||
|
|
||
| Args: | ||
| content: Content to store | ||
| memory_type: "short_term" or "long_term" | ||
| metadata: Optional metadata | ||
| **kwargs: Additional parameters | ||
|
|
||
| Returns: | ||
| Memory ID if successful, None otherwise | ||
| """ | ||
| if not hasattr(self, '_memory_instance') or self._memory_instance is None: | ||
| logger.debug("No memory configured for async storage") | ||
| return None | ||
|
|
||
| # Check if memory adapter supports async operations | ||
| if isinstance(self._memory_instance, AsyncMemoryProtocol): | ||
| try: | ||
| if memory_type == "long_term": | ||
| return await self._memory_instance.astore_long_term(content, metadata, **kwargs) | ||
| else: | ||
| return await self._memory_instance.astore_short_term(content, metadata, **kwargs) | ||
| except Exception as e: | ||
| logger.error(f"Error in async memory storage: {e}") | ||
| return None | ||
| else: | ||
| # Fallback: run sync memory operations in thread pool | ||
| return await self._run_memory_in_thread( | ||
| "store", content, memory_type, metadata, **kwargs | ||
| ) | ||
|
|
||
| async def asearch_memory( | ||
| self, | ||
| query: str, | ||
| memory_type: str = "short_term", | ||
| limit: int = 5, | ||
| **kwargs | ||
| ) -> List[Dict[str, Any]]: | ||
| """ | ||
| Async search agent memory. | ||
|
|
||
| Args: | ||
| query: Search query | ||
| memory_type: "short_term" or "long_term" | ||
| limit: Maximum results | ||
| **kwargs: Additional parameters | ||
|
|
||
| Returns: | ||
| List of memory entries | ||
| """ | ||
| if not hasattr(self, '_memory_instance') or self._memory_instance is None: | ||
| logger.debug("No memory configured for async search") | ||
| return [] | ||
|
|
||
| # Check if memory adapter supports async operations | ||
| if isinstance(self._memory_instance, AsyncMemoryProtocol): | ||
| try: | ||
| if memory_type == "long_term": | ||
| return await self._memory_instance.asearch_long_term(query, limit, **kwargs) | ||
| else: | ||
| return await self._memory_instance.asearch_short_term(query, limit, **kwargs) | ||
| except Exception as e: | ||
| logger.error(f"Error in async memory search: {e}") | ||
| return [] | ||
| else: | ||
| # Fallback: run sync memory operations in thread pool | ||
| return await self._run_memory_in_thread( | ||
| "search", query, memory_type, limit=limit, **kwargs | ||
| ) | ||
|
|
||
| async def _run_memory_in_thread( | ||
| self, | ||
| operation: str, | ||
| content: str, | ||
| memory_type: str, | ||
| metadata: Optional[Dict[str, Any]] = None, | ||
| limit: int = 5, | ||
| **kwargs | ||
| ) -> Union[str, List[Dict[str, Any]], None]: | ||
| """ | ||
| Run synchronous memory operations in a thread pool to avoid blocking. | ||
|
|
||
| Args: | ||
| operation: "store" or "search" | ||
| content: Content to store or query to search | ||
| memory_type: "short_term" or "long_term" | ||
| metadata: Optional metadata for store operations | ||
| limit: Limit for search operations | ||
| **kwargs: Additional parameters | ||
|
|
||
| Returns: | ||
| Result of the memory operation | ||
| """ | ||
| loop = asyncio.get_running_loop() | ||
|
|
||
| try: | ||
| if operation == "store": | ||
| if memory_type == "long_term" and hasattr(self._memory_instance, 'store_long_term'): | ||
| return await loop.run_in_executor( | ||
| None, | ||
| lambda: self._memory_instance.store_long_term(content, metadata, **kwargs) | ||
| ) | ||
| elif hasattr(self._memory_instance, 'store_short_term'): | ||
| return await loop.run_in_executor( | ||
| None, | ||
| lambda: self._memory_instance.store_short_term(content, metadata, **kwargs) | ||
| ) | ||
| elif operation == "search": | ||
| if memory_type == "long_term" and hasattr(self._memory_instance, 'search_long_term'): | ||
| return await loop.run_in_executor( | ||
| None, | ||
| lambda: self._memory_instance.search_long_term(content, limit, **kwargs) | ||
| ) | ||
| elif hasattr(self._memory_instance, 'search_short_term'): | ||
| return await loop.run_in_executor( | ||
| None, | ||
| lambda: self._memory_instance.search_short_term(content, limit, **kwargs) | ||
| ) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error in threaded memory operation {operation}: {e}") | ||
|
|
||
| return [] if operation == "search" else None | ||
|
|
||
| async def _async_build_memory_context( | ||
| self, | ||
| query: str, | ||
| max_memories: int = 10, | ||
| memory_types: List[str] = None | ||
| ) -> str: | ||
| """ | ||
| Async version of _build_memory_context that doesn't block the event loop. | ||
|
|
||
| Args: | ||
| query: Query to search for relevant memories | ||
| max_memories: Maximum number of memories to include | ||
| memory_types: Types of memory to search ("short_term", "long_term") | ||
|
|
||
| Returns: | ||
| Formatted memory context string | ||
| """ | ||
| if memory_types is None: | ||
| memory_types = ["short_term", "long_term"] | ||
|
|
||
| all_memories = [] | ||
|
|
||
| # Search each memory type asynchronously | ||
| for memory_type in memory_types: | ||
| memories = await self.asearch_memory( | ||
| query, | ||
| memory_type=memory_type, | ||
| limit=max_memories // len(memory_types) | ||
| ) | ||
| all_memories.extend(memories) | ||
|
Comment on lines
+181
to
+188
|
||
|
|
||
| # Sort by relevance/recency and limit total | ||
| all_memories = all_memories[:max_memories] | ||
|
|
||
| if not all_memories: | ||
| return "" | ||
|
|
||
| # Build context string | ||
| context_lines = ["Relevant memories:"] | ||
| for i, memory in enumerate(all_memories, 1): | ||
| text = memory.get('text', str(memory)) | ||
| context_lines.append(f"{i}. {text}") | ||
|
|
||
| return "\n".join(context_lines) | ||
|
|
||
| def _ensure_async_memory_compatibility(self): | ||
| """ | ||
| Ensure memory adapter is compatible with async operations. | ||
|
|
||
| Logs warnings if memory adapter doesn't support async operations | ||
| and will fall back to thread pool execution. | ||
| """ | ||
| if hasattr(self, '_memory_instance') and self._memory_instance is not None: | ||
| if not isinstance(self._memory_instance, AsyncMemoryProtocol): | ||
| logger.info( | ||
| f"Memory adapter {type(self._memory_instance).__name__} doesn't implement " | ||
| f"AsyncMemoryProtocol, falling back to thread pool execution" | ||
| ) | ||
| return False | ||
| return True | ||
| return False | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
chat_historyproperty is defined as read-only (getter only). However, theAgentclass inherits fromMemoryMixin, which contains several methods (e.g.,clear_history,prune_history,delete_history_matching) that attempt to assign a new list toself.chat_history. This will result in anAttributeError: can't set attributeat runtime. A setter should be added to update the underlyingAsyncSafeStatevalue to maintain compatibility with the mixin's functionality.