Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/praisonai-agents/praisonaiagents/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4501,6 +4501,39 @@ def close(self) -> None:
except Exception as e:
logger.warning(f"Memory cleanup failed: {e}")

# LLM client cleanup - target actual live clients, not model strings
try:
# Primary cleanup targets - actual live clients
if hasattr(self, 'llm_instance') and self.llm_instance:
if hasattr(self.llm_instance, 'aclose'):
# Try async close first
try:
import asyncio
if asyncio.iscoroutinefunction(self.llm_instance.aclose):
# We're in sync context, so use asyncio.run() for the cleanup
asyncio.run(self.llm_instance.aclose())
else:
self.llm_instance.aclose()
except Exception:
# Fall back to sync close if async fails
if hasattr(self.llm_instance, 'close'):
self.llm_instance.close()
Comment on lines +4508 to +4520
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 asyncio.run() inside close() will always fail silently in async contexts

asyncio.run() raises RuntimeError: This event loop is already running when called from within a running event loop (e.g., if close() is called from an async with block's __aexit__, from an asyncio.gather, or from a framework like FastAPI/Starlette). That exception is silently swallowed by the surrounding except Exception block, meaning aclose() is never actually called from async callers — the entire intent of the code is defeated.

The synchronous close() method should only attempt synchronous cleanup. The existing aclose() method is the correct place for async LLM teardown. A safe fallback for the sync path is the already-present close() call:

# Try sync close only; async cleanup belongs in aclose()
if hasattr(self.llm_instance, 'close'):
    self.llm_instance.close()

If you need a best-effort async close from a sync method when no loop is running, use asyncio.get_event_loop().run_until_complete(...) guarded by not loop.is_running(), but the cleanest fix is simply to remove the asyncio.run() attempt here and rely on callers to call aclose() in async contexts.

elif hasattr(self.llm_instance, 'close'):
self.llm_instance.close()

# Check for OpenAI client (common pattern in agents)
if hasattr(self, '_Agent__openai_client') and self._Agent__openai_client:
if hasattr(self._Agent__openai_client, 'close'):
self._Agent__openai_client.close()

# Legacy fallback - check self.llm._client (but less likely to work)
if hasattr(self, 'llm') and self.llm and not isinstance(self.llm, str):
llm_client = getattr(self.llm, '_client', None)
if llm_client and hasattr(llm_client, 'close'):
llm_client.close()
except Exception as e:
logger.warning(f"LLM client cleanup failed: {e}")

# MCP cleanup
try:
if hasattr(self, '_mcp_clients') and self._mcp_clients:
Expand Down
37 changes: 10 additions & 27 deletions src/praisonai-agents/praisonaiagents/agent/async_safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,8 @@ class DualLock:
"""

def __init__(self):
self._thread_lock = threading.Lock()
self._async_lock: Optional[asyncio.Lock] = None
self._loop_id: Optional[int] = None

def _get_async_lock(self) -> asyncio.Lock:
"""Get or create asyncio.Lock for current event loop."""
try:
current_loop = asyncio.get_running_loop()
current_loop_id = id(current_loop)

# Create new lock if loop changed or first time
if self._loop_id != current_loop_id:
self._async_lock = asyncio.Lock()
self._loop_id = current_loop_id

return self._async_lock
except RuntimeError:
# No event loop running, fall back to thread lock in a new loop
self._async_lock = asyncio.Lock()
return self._async_lock
"""Initialize with unified thread-safe locking."""
self._thread_lock = threading.Lock() # Single canonical lock for all contexts

@contextmanager
def sync(self):
Expand All @@ -64,10 +46,13 @@ def sync(self):

@asynccontextmanager
async def async_lock(self):
"""Acquire lock in asynchronous context using asyncio.Lock."""
async_lock = self._get_async_lock()
async with async_lock:
"""Acquire lock in asynchronous context using threading.Lock via asyncio.to_thread()."""
# Use asyncio.to_thread to acquire the thread lock without blocking the event loop
await asyncio.to_thread(self._thread_lock.acquire)
try:
yield
finally:
self._thread_lock.release()
Comment on lines 47 to +55
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 DualLock.async_lock leaves an orphaned, unreleased lock on cancellation

When a coroutine awaiting async_lock() is cancelled while the thread-pool worker is still blocked waiting on the contended _thread_lock, the result is a permanent deadlock:

  1. CancelledError propagates from await asyncio.to_thread(self._thread_lock.acquire).
  2. The try/finally block is never entered — execution jumps straight to the caller.
  3. The worker thread eventually acquires the lock, but nobody calls release().
  4. All future callers of async_lock() or sync() will block forever.

Task cancellation is not an edge case in this codebase — the workflow timeout in process.py sets workflow_cancelled = True and breaks the loop, which can cancel pending tasks mid-wait.

The safest repair is to catch the cancellation and arrange for the lock to be released once the still-running thread finally acquires it:

@asynccontextmanager
async def async_lock(self):
    """Acquire lock in asynchronous context using threading.Lock via asyncio.to_thread()."""
    acquired = False
    try:
        await asyncio.to_thread(self._thread_lock.acquire)
        acquired = True
        yield
    except asyncio.CancelledError:
        if not acquired:
            # Thread worker is still running and will acquire the lock.
            # Schedule a release so no future caller deadlocks.
            def _release_when_acquired():
                # Worker already holds the lock at this point (or will momentarily).
                # Just release it.
                try:
                    self._thread_lock.release()
                except RuntimeError:
                    pass  # Was never acquired; nothing to do.
            asyncio.get_event_loop().run_in_executor(None, _release_when_acquired)
        raise
    finally:
        if acquired:
            self._thread_lock.release()

Alternatively, consider switching back to a per-loop asyncio.Lock (guarded with the thread-lock during creation) to avoid mixing thread and async primitives entirely.


def is_async_context(self) -> bool:
"""Check if we're currently in an async context."""
Expand Down Expand Up @@ -129,14 +114,12 @@ def __exit__(self, exc_type, exc_val, exc_tb):

async def __aenter__(self):
"""Support for asynchronous context manager protocol."""
async_lock = self._lock._get_async_lock()
await async_lock.acquire()
await asyncio.to_thread(self._lock._thread_lock.acquire)
return self.value

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Support for asynchronous context manager protocol."""
async_lock = self._lock._get_async_lock()
async_lock.release()
self._lock._thread_lock.release()
return None

def get(self) -> Any:
Expand Down
44 changes: 32 additions & 12 deletions src/praisonai-agents/praisonaiagents/agent/tool_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import asyncio
import inspect
import contextvars
import concurrent.futures
from typing import List, Optional, Any, Dict, Union, TYPE_CHECKING

Expand Down Expand Up @@ -190,18 +191,37 @@ def _execute_tool_with_context(self, function_name, arguments, state, tool_call_
if res.output and res.output.modified_data:
arguments.update(res.output.modified_data)

with with_injection_context(state):
# P8/G11: Apply tool timeout if configured
tool_timeout = getattr(self, '_tool_timeout', None)
if tool_timeout and tool_timeout > 0:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(self._execute_tool_impl, function_name, arguments)
try:
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
else:
# P8/G11: Apply tool timeout if configured
tool_timeout = getattr(self, '_tool_timeout', None)
if tool_timeout and tool_timeout > 0:
# Use copy_context to preserve injection context in executor thread
ctx = contextvars.copy_context()

def execute_with_context():
with with_injection_context(state):
return self._execute_tool_impl(function_name, arguments)

# Use explicit executor lifecycle to actually bound execution time
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
try:
future = executor.submit(ctx.run, execute_with_context)
try:
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
# Cancel and shutdown immediately to avoid blocking
future.cancel()
executor.shutdown(wait=False, cancel_futures=True)
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
Comment on lines +194 to +215
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
python - <<'PY'
import concurrent.futures
import time

start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(time.sleep, 2)
    try:
        future.result(timeout=0.1)
    except concurrent.futures.TimeoutError:
        print(f"timeout raised at {time.time() - start:.2f}s")

print(f"context manager exited at {time.time() - start:.2f}s")
PY

Repository: MervinPraison/PraisonAI

Length of output: 122


🏁 Script executed:

cat -n src/praisonai-agents/praisonaiagents/agent/tool_execution.py | sed -n '190,215p'

Repository: MervinPraison/PraisonAI

Length of output: 1672


Tool timeout does not bound execution—the context manager blocks until the worker thread exits.

The with ThreadPoolExecutor(...) context manager calls shutdown(wait=True) on exit, which blocks the caller until the worker completes even after future.result(timeout=...) raises TimeoutError. This means the configured tool_timeout does not actually prevent requests from being blocked indefinitely.

Replace the context manager with explicit lifecycle control:

Suggested fix
-                with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
-                    future = executor.submit(ctx.run, execute_with_context)
-                    try:
-                        result = future.result(timeout=tool_timeout)
-                    except concurrent.futures.TimeoutError:
-                        logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
-                        result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
+                executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
+                future = executor.submit(ctx.run, execute_with_context)
+                try:
+                    result = future.result(timeout=tool_timeout)
+                except concurrent.futures.TimeoutError:
+                    executor.shutdown(wait=False, cancel_futures=True)
+                    logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
+                    result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
+                else:
+                    executor.shutdown(wait=False)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/agent/tool_execution.py` around lines
193 - 210, The current use of "with concurrent.futures.ThreadPoolExecutor"
blocks on exit (shutdown(wait=True)) even after future.result(timeout=...)
raises, so replace the context manager with an explicit ThreadPoolExecutor()
instance (e.g., executor =
concurrent.futures.ThreadPoolExecutor(max_workers=1)), submit the task via
executor.submit(ctx.run, execute_with_context) and on
concurrent.futures.TimeoutError call executor.shutdown(wait=False) (and
optionally future.cancel()) to avoid waiting for the worker to finish; keep
using contextvars.copy_context(), the execute_with_context wrapper,
with_injection_context(state), and self._execute_tool_impl(function_name,
arguments) as-is, and ensure executor.shutdown() is called in finally to avoid
leaked threads.

else:
# Normal completion - shutdown gracefully
executor.shutdown(wait=False)
finally:
# Ensure executor is always cleaned up
if not executor._shutdown:
executor.shutdown(wait=False)
else:
with with_injection_context(state):
result = self._execute_tool_impl(function_name, arguments)

# Apply tool output truncation to prevent context overflow
Expand Down
14 changes: 11 additions & 3 deletions src/praisonai-agents/praisonaiagents/checkpoints/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,17 @@ async def _prune_checkpoints(self):
if len(self._checkpoints) <= self.config.max_checkpoints:
return

# Keep only the most recent checkpoints
# Note: This doesn't actually delete git history, just our tracking
self._checkpoints = self._checkpoints[:self.config.max_checkpoints]
# Calculate how many to remove
num_to_remove = len(self._checkpoints) - self.config.max_checkpoints

# Keep only the most recent checkpoints in memory (newest-last semantics)
# Since save() appends (newest last), keep the last N entries
self._checkpoints = self._checkpoints[-self.config.max_checkpoints:]

logger.info(f"Pruned {num_to_remove} old checkpoints to stay under limit of {self.config.max_checkpoints}")

# Emit pruning event for any cleanup hooks
self._emit(CheckpointEvent.CHECKPOINTS_PRUNED, {"action": "pruned", "removed_count": num_to_remove})

async def get_checkpoint(self, checkpoint_id: str) -> Optional[Checkpoint]:
"""Get a specific checkpoint by ID."""
Expand Down
1 change: 1 addition & 0 deletions src/praisonai-agents/praisonaiagents/checkpoints/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class CheckpointEvent(str, Enum):
INITIALIZED = "initialized"
CHECKPOINT_CREATED = "checkpoint_created"
CHECKPOINT_RESTORED = "checkpoint_restored"
CHECKPOINTS_PRUNED = "checkpoints_pruned"
ERROR = "error"


Expand Down
79 changes: 45 additions & 34 deletions src/praisonai-agents/praisonaiagents/memory/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ def store_short_term(self, content: str, metadata: Optional[Dict] = None, qualit
except Exception as e:
self._log_verbose(f"Failed to store in {self.provider} STM: {e}", logging.WARNING)

# Backward compatibility: Also store in SQLite if not using SQLite adapter
if hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None):
# Only use SQLite fallback if primary storage failed completely
if not memory_id and hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None):
try:
fallback_id = self._sqlite_adapter.store_short_term(content, metadata=clean_metadata, **kwargs)
if not memory_id:
memory_id = fallback_id
memory_id = self._sqlite_adapter.store_short_term(content, metadata=clean_metadata, **kwargs)
self._log_verbose(f"Stored in SQLite STM as fallback: {content[:100]}...")
except Exception as e:
logging.error(f"Failed to store in SQLite STM fallback: {e}")
if not memory_id:
return ""
return ""
Comment on lines +65 to +72
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Mirror this fallback policy into the other STM entry points.

store_short_term() now treats a falsy memory_id as a failed primary write, but Lines 143-150 in store_short_term_structured() still return success_result(memory_id=None), and Lines 451-456 in store_short_term_async() still bypass memory_adapter and write straight to SQLite. The sync, structured, and async APIs now disagree on what “stored” means.

🧰 Tools
🪛 Ruff (0.15.9)

[warning] 70-70: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/memory/core.py` around lines 65 - 72,
The structured and async STM entrypoints must mirror the fallback policy in
store_short_term: treat a falsy memory_id as a failed primary write and only
attempt the SQLite fallback when hasattr(self, '_sqlite_adapter') and
self._sqlite_adapter != getattr(self, 'memory_adapter', None); in
store_short_term_structured() and store_short_term_async() add the same
try/except that calls self._sqlite_adapter.store_short_term(...) when memory_id
is falsy, log the verbose SQLite success with self._log_verbose and log failures
with logging.error, and return the same failure sentinel used by
store_short_term (i.e., propagate the empty/failed memory_id result rather than
returning success_result(memory_id=None) or unconditionally writing to SQLite).


# Auto-promote to long-term memory if quality is high
if auto_promote and quality_score >= 7.5: # High quality threshold
Expand Down Expand Up @@ -122,38 +120,39 @@ def store_short_term_structured(self, content: str, metadata: Optional[Dict] = N
clean_metadata = self._sanitize_metadata(metadata)

# Protocol-driven storage: Try primary adapter first
memory_id = ""
primary_error = None
memory_id = None

try:
if hasattr(self, 'memory_adapter') and self.memory_adapter:
memory_id = self.memory_adapter.store_short_term(content, metadata=clean_metadata, **kwargs)
self._log_verbose(f"Stored in {self.provider} STM via adapter: {content[:100]}...")

# Auto-promote to long-term memory if quality is high
if auto_promote and quality_score >= 7.5:
try:
self.store_long_term(content, clean_metadata, quality_score, user_id, **kwargs)
self._log_verbose(f"Auto-promoted STM content to LTM (score: {quality_score:.2f})")
except Exception as e:
# Auto-promotion failure doesn't affect the primary storage result
logging.warning(f"Failed to auto-promote to LTM: {e}")

# Emit memory event for successful storage
self._emit_memory_event("store", "short_term", content, clean_metadata)

return MemoryResult.success_result(
memory_id=memory_id,
adapter_used=self.provider,
context={
"quality_score": quality_score,
"auto_promoted": auto_promote and quality_score >= 7.5
}
)
except Exception as e:
primary_error = str(e)
self._log_verbose(f"Failed to store in {self.provider} STM: {e}", logging.WARNING)

# Only proceed with success if we got a valid memory_id
if memory_id:
# Auto-promote to long-term memory if quality is high
if auto_promote and quality_score >= 7.5:
try:
self.store_long_term(content, clean_metadata, quality_score, user_id, **kwargs)
self._log_verbose(f"Auto-promoted STM content to LTM (score: {quality_score:.2f})")
except Exception as e:
# Auto-promotion failure doesn't affect the primary storage result
logging.warning(f"Failed to auto-promote to LTM: {e}")

# Emit memory event for successful storage
self._emit_memory_event("store", "short_term", content, clean_metadata)

return MemoryResult.success_result(
memory_id=memory_id,
adapter_used=self.provider,
context={
"quality_score": quality_score,
"auto_promoted": auto_promote and quality_score >= 7.5
}
)

# Fallback to SQLite if available and different from primary adapter
fallback_error = None
if hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None):
Expand Down Expand Up @@ -450,13 +449,25 @@ async def store_short_term_async(self, content: str, metadata: Optional[Dict] =
raw_metadata["user_id"] = user_id
clean_metadata = self._sanitize_metadata(raw_metadata)

# Store in SQLite STM
# Try primary adapter first (async version)
memory_id = ""
try:
memory_id = await asyncio.to_thread(self._store_sqlite_stm, content, clean_metadata, quality_score)
if hasattr(self, 'memory_adapter') and self.memory_adapter:
memory_id = await asyncio.to_thread(
self.memory_adapter.store_short_term, content, metadata=clean_metadata, **kwargs
)
self._log_verbose(f"Stored in {self.provider} async STM via adapter: {content[:100]}...")
except Exception as e:
logging.error(f"Failed to store in SQLite STM: {e}")
return ""
self._log_verbose(f"Failed to store in {self.provider} async STM: {e}", logging.WARNING)

# Only use SQLite fallback if primary storage failed completely
if not memory_id and hasattr(self, '_sqlite_adapter') and self._sqlite_adapter != getattr(self, 'memory_adapter', None):
try:
memory_id = await asyncio.to_thread(self._store_sqlite_stm, content, clean_metadata, quality_score)
self._log_verbose(f"Stored in SQLite async STM as fallback: {content[:100]}...")
except Exception as e:
logging.error(f"Failed to store in SQLite async STM fallback: {e}")
return ""

# Auto-promote to long-term memory if quality is high (async)
if auto_promote and quality_score >= 7.5: # High quality threshold
Expand Down
25 changes: 22 additions & 3 deletions src/praisonai-agents/praisonaiagents/process/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def __init__(
self.workflow_timeout = workflow_timeout
self.task_retry_counter: Dict[str, int] = {} # Initialize retry counter
self.workflow_finished = False # ADDED: Workflow finished flag
self.workflow_cancelled = False # ADDED: Workflow cancellation flag for timeout
self._state_lock_init = threading.Lock() # Thread lock for async lock creation
self._state_lock = None # Lazy-initialized async lock for shared state protection

# Resolve verbose from output= param (takes precedence) or legacy verbose= param
Expand Down Expand Up @@ -255,7 +257,10 @@ def _find_next_not_started_task(self) -> Optional[Task]:
continue # Skip if no valid path exists

if self.task_retry_counter.get(task_candidate.id, 0) < self.max_retries:
self.task_retry_counter[task_candidate.id] = self.task_retry_counter.get(task_candidate.id, 0) + 1
# Atomic increment using thread lock to prevent race conditions
with self._state_lock_init:
current_count = self.task_retry_counter.get(task_candidate.id, 0)
self.task_retry_counter[task_candidate.id] = current_count + 1
temp_current_task = task_candidate
logging.debug(f"Fallback attempt {fallback_attempts}: Found 'not started' task: {temp_current_task.name}, retry count: {self.task_retry_counter[temp_current_task.id]}")
return temp_current_task # Return the found task immediately
Expand Down Expand Up @@ -429,13 +434,19 @@ async def aworkflow(self) -> AsyncGenerator[str, None]:
if self.workflow_timeout is not None:
elapsed = time.monotonic() - workflow_start
if elapsed > self.workflow_timeout:
logging.warning(f"Workflow timeout ({self.workflow_timeout}s) exceeded after {elapsed:.1f}s, stopping.")
logging.warning(f"Workflow timeout ({self.workflow_timeout}s) exceeded after {elapsed:.1f}s, cancelling workflow.")
self.workflow_cancelled = True
break

# ADDED: Check workflow finished flag at the start of each cycle
if self.workflow_finished:
logging.info("Workflow finished early as all tasks are completed.")
break

# ADDED: Check workflow cancellation flag
if self.workflow_cancelled:
logging.warning("Workflow has been cancelled, stopping task execution.")
break

# Add task summary at start of each cycle
logging.debug(f"""
Expand Down Expand Up @@ -597,8 +608,11 @@ async def aworkflow(self) -> AsyncGenerator[str, None]:
break

# Reset completed task to "not started" so it can run again (atomic operation)
# Atomic state lock initialization
if self._state_lock is None:
self._state_lock = asyncio.Lock()
with self._state_lock_init:
if self._state_lock is None: # Double-checked locking pattern
self._state_lock = asyncio.Lock()
async with self._state_lock:
if self.tasks[task_id].status == "completed":
# Never reset loop tasks, decision tasks, or their subtasks if rerun is False
Expand Down Expand Up @@ -1031,6 +1045,11 @@ def workflow(self):
if self.workflow_finished:
logging.info("Workflow finished early as all tasks are completed.")
break

# ADDED: Check workflow cancellation flag
if self.workflow_cancelled:
logging.warning("Workflow has been cancelled, stopping task execution.")
break

# Add task summary at start of each cycle
logging.debug(f"""
Expand Down
Loading