-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: address critical concurrency, memory, and resource lifecycle gaps #1366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,26 +35,8 @@ class DualLock: | |
| """ | ||
|
|
||
| def __init__(self): | ||
| self._thread_lock = threading.Lock() | ||
| self._async_lock: Optional[asyncio.Lock] = None | ||
| self._loop_id: Optional[int] = None | ||
|
|
||
| def _get_async_lock(self) -> asyncio.Lock: | ||
| """Get or create asyncio.Lock for current event loop.""" | ||
| try: | ||
| current_loop = asyncio.get_running_loop() | ||
| current_loop_id = id(current_loop) | ||
|
|
||
| # Create new lock if loop changed or first time | ||
| if self._loop_id != current_loop_id: | ||
| self._async_lock = asyncio.Lock() | ||
| self._loop_id = current_loop_id | ||
|
|
||
| return self._async_lock | ||
| except RuntimeError: | ||
| # No event loop running, fall back to thread lock in a new loop | ||
| self._async_lock = asyncio.Lock() | ||
| return self._async_lock | ||
| """Initialize with unified thread-safe locking.""" | ||
| self._thread_lock = threading.Lock() # Single canonical lock for all contexts | ||
|
|
||
| @contextmanager | ||
| def sync(self): | ||
|
|
@@ -64,10 +46,13 @@ def sync(self): | |
|
|
||
| @asynccontextmanager | ||
| async def async_lock(self): | ||
| """Acquire lock in asynchronous context using asyncio.Lock.""" | ||
| async_lock = self._get_async_lock() | ||
| async with async_lock: | ||
| """Acquire lock in asynchronous context using threading.Lock via asyncio.to_thread().""" | ||
| # Use asyncio.to_thread to acquire the thread lock without blocking the event loop | ||
| await asyncio.to_thread(self._thread_lock.acquire) | ||
| try: | ||
| yield | ||
| finally: | ||
| self._thread_lock.release() | ||
|
Comment on lines
47
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a coroutine awaiting
Task cancellation is not an edge case in this codebase — the workflow timeout in The safest repair is to catch the cancellation and arrange for the lock to be released once the still-running thread finally acquires it: @asynccontextmanager
async def async_lock(self):
"""Acquire lock in asynchronous context using threading.Lock via asyncio.to_thread()."""
acquired = False
try:
await asyncio.to_thread(self._thread_lock.acquire)
acquired = True
yield
except asyncio.CancelledError:
if not acquired:
# Thread worker is still running and will acquire the lock.
# Schedule a release so no future caller deadlocks.
def _release_when_acquired():
# Worker already holds the lock at this point (or will momentarily).
# Just release it.
try:
self._thread_lock.release()
except RuntimeError:
pass # Was never acquired; nothing to do.
asyncio.get_event_loop().run_in_executor(None, _release_when_acquired)
raise
finally:
if acquired:
self._thread_lock.release()Alternatively, consider switching back to a per-loop |
||
|
|
||
| def is_async_context(self) -> bool: | ||
| """Check if we're currently in an async context.""" | ||
|
|
@@ -129,14 +114,12 @@ def __exit__(self, exc_type, exc_val, exc_tb): | |
|
|
||
| async def __aenter__(self): | ||
| """Support for asynchronous context manager protocol.""" | ||
| async_lock = self._lock._get_async_lock() | ||
| await async_lock.acquire() | ||
| await asyncio.to_thread(self._lock._thread_lock.acquire) | ||
| return self.value | ||
|
|
||
| async def __aexit__(self, exc_type, exc_val, exc_tb): | ||
| """Support for asynchronous context manager protocol.""" | ||
| async_lock = self._lock._get_async_lock() | ||
| async_lock.release() | ||
| self._lock._thread_lock.release() | ||
| return None | ||
|
|
||
| def get(self) -> Any: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| import logging | ||
| import asyncio | ||
| import inspect | ||
| import contextvars | ||
| import concurrent.futures | ||
| from typing import List, Optional, Any, Dict, Union, TYPE_CHECKING | ||
|
|
||
|
|
@@ -190,18 +191,37 @@ def _execute_tool_with_context(self, function_name, arguments, state, tool_call_ | |
| if res.output and res.output.modified_data: | ||
| arguments.update(res.output.modified_data) | ||
|
|
||
| with with_injection_context(state): | ||
| # P8/G11: Apply tool timeout if configured | ||
| tool_timeout = getattr(self, '_tool_timeout', None) | ||
| if tool_timeout and tool_timeout > 0: | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: | ||
| future = executor.submit(self._execute_tool_impl, function_name, arguments) | ||
| try: | ||
| result = future.result(timeout=tool_timeout) | ||
| except concurrent.futures.TimeoutError: | ||
| logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") | ||
| result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} | ||
| else: | ||
| # P8/G11: Apply tool timeout if configured | ||
| tool_timeout = getattr(self, '_tool_timeout', None) | ||
| if tool_timeout and tool_timeout > 0: | ||
| # Use copy_context to preserve injection context in executor thread | ||
| ctx = contextvars.copy_context() | ||
|
|
||
| def execute_with_context(): | ||
| with with_injection_context(state): | ||
| return self._execute_tool_impl(function_name, arguments) | ||
|
|
||
| # Use explicit executor lifecycle to actually bound execution time | ||
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) | ||
| try: | ||
| future = executor.submit(ctx.run, execute_with_context) | ||
| try: | ||
| result = future.result(timeout=tool_timeout) | ||
| except concurrent.futures.TimeoutError: | ||
| # Cancel and shutdown immediately to avoid blocking | ||
| future.cancel() | ||
| executor.shutdown(wait=False, cancel_futures=True) | ||
| logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") | ||
| result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} | ||
|
Comment on lines
+194
to
+215
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
python - <<'PY'
import concurrent.futures
import time
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 2)
try:
future.result(timeout=0.1)
except concurrent.futures.TimeoutError:
print(f"timeout raised at {time.time() - start:.2f}s")
print(f"context manager exited at {time.time() - start:.2f}s")
PYRepository: MervinPraison/PraisonAI Length of output: 122 🏁 Script executed: cat -n src/praisonai-agents/praisonaiagents/agent/tool_execution.py | sed -n '190,215p'Repository: MervinPraison/PraisonAI Length of output: 1672 Tool timeout does not bound execution—the context manager blocks until the worker thread exits. The Replace the context manager with explicit lifecycle control: Suggested fix- with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
- future = executor.submit(ctx.run, execute_with_context)
- try:
- result = future.result(timeout=tool_timeout)
- except concurrent.futures.TimeoutError:
- logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
- result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
+ executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
+ future = executor.submit(ctx.run, execute_with_context)
+ try:
+ result = future.result(timeout=tool_timeout)
+ except concurrent.futures.TimeoutError:
+ executor.shutdown(wait=False, cancel_futures=True)
+ logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
+ result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
+ else:
+ executor.shutdown(wait=False)🤖 Prompt for AI Agents |
||
| else: | ||
| # Normal completion - shutdown gracefully | ||
| executor.shutdown(wait=False) | ||
| finally: | ||
| # Ensure executor is always cleaned up | ||
| if not executor._shutdown: | ||
| executor.shutdown(wait=False) | ||
| else: | ||
| with with_injection_context(state): | ||
| result = self._execute_tool_impl(function_name, arguments) | ||
|
|
||
| # Apply tool output truncation to prevent context overflow | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,16 +62,14 @@ def store_short_term(self, content: str, metadata: Optional[Dict] = None, qualit | |
| except Exception as e: | ||
| self._log_verbose(f"Failed to store in {self.provider} STM: {e}", logging.WARNING) | ||
|
|
||
| # Backward compatibility: Also store in SQLite if not using SQLite adapter | ||
| if hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None): | ||
| # Only use SQLite fallback if primary storage failed completely | ||
| if not memory_id and hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None): | ||
| try: | ||
| fallback_id = self._sqlite_adapter.store_short_term(content, metadata=clean_metadata, **kwargs) | ||
| if not memory_id: | ||
| memory_id = fallback_id | ||
| memory_id = self._sqlite_adapter.store_short_term(content, metadata=clean_metadata, **kwargs) | ||
| self._log_verbose(f"Stored in SQLite STM as fallback: {content[:100]}...") | ||
| except Exception as e: | ||
| logging.error(f"Failed to store in SQLite STM fallback: {e}") | ||
| if not memory_id: | ||
| return "" | ||
| return "" | ||
|
Comment on lines
+65
to
+72
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mirror this fallback policy into the other STM entry points.
🧰 Tools🪛 Ruff (0.15.9)[warning] 70-70: Do not catch blind exception: (BLE001) 🤖 Prompt for AI Agents |
||
|
|
||
| # Auto-promote to long-term memory if quality is high | ||
| if auto_promote and quality_score >= 7.5: # High quality threshold | ||
|
|
@@ -122,38 +120,39 @@ def store_short_term_structured(self, content: str, metadata: Optional[Dict] = N | |
| clean_metadata = self._sanitize_metadata(metadata) | ||
|
|
||
| # Protocol-driven storage: Try primary adapter first | ||
| memory_id = "" | ||
| primary_error = None | ||
| memory_id = None | ||
|
|
||
| try: | ||
| if hasattr(self, 'memory_adapter') and self.memory_adapter: | ||
| memory_id = self.memory_adapter.store_short_term(content, metadata=clean_metadata, **kwargs) | ||
| self._log_verbose(f"Stored in {self.provider} STM via adapter: {content[:100]}...") | ||
|
|
||
| # Auto-promote to long-term memory if quality is high | ||
| if auto_promote and quality_score >= 7.5: | ||
| try: | ||
| self.store_long_term(content, clean_metadata, quality_score, user_id, **kwargs) | ||
| self._log_verbose(f"Auto-promoted STM content to LTM (score: {quality_score:.2f})") | ||
| except Exception as e: | ||
| # Auto-promotion failure doesn't affect the primary storage result | ||
| logging.warning(f"Failed to auto-promote to LTM: {e}") | ||
|
|
||
| # Emit memory event for successful storage | ||
| self._emit_memory_event("store", "short_term", content, clean_metadata) | ||
|
|
||
| return MemoryResult.success_result( | ||
| memory_id=memory_id, | ||
| adapter_used=self.provider, | ||
| context={ | ||
| "quality_score": quality_score, | ||
| "auto_promoted": auto_promote and quality_score >= 7.5 | ||
| } | ||
| ) | ||
| except Exception as e: | ||
| primary_error = str(e) | ||
| self._log_verbose(f"Failed to store in {self.provider} STM: {e}", logging.WARNING) | ||
|
|
||
| # Only proceed with success if we got a valid memory_id | ||
| if memory_id: | ||
| # Auto-promote to long-term memory if quality is high | ||
| if auto_promote and quality_score >= 7.5: | ||
| try: | ||
| self.store_long_term(content, clean_metadata, quality_score, user_id, **kwargs) | ||
| self._log_verbose(f"Auto-promoted STM content to LTM (score: {quality_score:.2f})") | ||
| except Exception as e: | ||
| # Auto-promotion failure doesn't affect the primary storage result | ||
| logging.warning(f"Failed to auto-promote to LTM: {e}") | ||
|
|
||
| # Emit memory event for successful storage | ||
| self._emit_memory_event("store", "short_term", content, clean_metadata) | ||
|
|
||
| return MemoryResult.success_result( | ||
| memory_id=memory_id, | ||
| adapter_used=self.provider, | ||
| context={ | ||
| "quality_score": quality_score, | ||
| "auto_promoted": auto_promote and quality_score >= 7.5 | ||
| } | ||
| ) | ||
|
|
||
| # Fallback to SQLite if available and different from primary adapter | ||
| fallback_error = None | ||
| if hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None): | ||
|
|
@@ -450,13 +449,25 @@ async def store_short_term_async(self, content: str, metadata: Optional[Dict] = | |
| raw_metadata["user_id"] = user_id | ||
| clean_metadata = self._sanitize_metadata(raw_metadata) | ||
|
|
||
| # Store in SQLite STM | ||
| # Try primary adapter first (async version) | ||
| memory_id = "" | ||
| try: | ||
| memory_id = await asyncio.to_thread(self._store_sqlite_stm, content, clean_metadata, quality_score) | ||
| if hasattr(self, 'memory_adapter') and self.memory_adapter: | ||
| memory_id = await asyncio.to_thread( | ||
| self.memory_adapter.store_short_term, content, metadata=clean_metadata, **kwargs | ||
| ) | ||
| self._log_verbose(f"Stored in {self.provider} async STM via adapter: {content[:100]}...") | ||
| except Exception as e: | ||
| logging.error(f"Failed to store in SQLite STM: {e}") | ||
| return "" | ||
| self._log_verbose(f"Failed to store in {self.provider} async STM: {e}", logging.WARNING) | ||
|
|
||
| # Only use SQLite fallback if primary storage failed completely | ||
| if not memory_id and hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None): | ||
| try: | ||
| memory_id = await asyncio.to_thread(self._store_sqlite_stm, content, clean_metadata, quality_score) | ||
| self._log_verbose(f"Stored in SQLite async STM as fallback: {content[:100]}...") | ||
| except Exception as e: | ||
| logging.error(f"Failed to store in SQLite async STM fallback: {e}") | ||
| return "" | ||
|
|
||
| # Auto-promote to long-term memory if quality is high (async) | ||
| if auto_promote and quality_score >= 7.5: # High quality threshold | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio.run()insideclose()will always fail silently in async contextsasyncio.run()raisesRuntimeError: This event loop is already runningwhen called from within a running event loop (e.g., ifclose()is called from anasync withblock's__aexit__, from anasyncio.gather, or from a framework like FastAPI/Starlette). That exception is silently swallowed by the surroundingexcept Exceptionblock, meaningaclose()is never actually called from async callers — the entire intent of the code is defeated.The synchronous
close()method should only attempt synchronous cleanup. The existingaclose()method is the correct place for async LLM teardown. A safe fallback for the sync path is the already-presentclose()call:If you need a best-effort async close from a sync method when no loop is running, use
asyncio.get_event_loop().run_until_complete(...)guarded bynot loop.is_running(), but the cleanest fix is simply to remove theasyncio.run()attempt here and rely on callers to callaclose()in async contexts.