Summary
Deep analysis of src/praisonai-agents/praisonaiagents identified three concurrency and resilience gaps that can cause deadlocks, resource exhaustion, and state corruption under concurrent multi-agent workloads. Each is validated with exact code locations and reproducible scenarios.
Gap 1: Unsafe asyncio.Semaphore Private State Manipulation in ConcurrencyRegistry
File: praisonaiagents/agent/concurrency.py, lines 86–108
The Problem
acquire_sync() directly reads and writes asyncio.Semaphore._value, a private CPython implementation detail, bypassing the semaphore's internal lock and waiter queue. This creates a TOCTOU (time-of-check-time-of-use) race and can cause permanent deadlocks.
Current Code
# concurrency.py lines 86-108
def acquire_sync(self, agent_name: str) -> None:
sem = self._get_semaphore(agent_name)
if sem is None:
return
try:
asyncio.get_running_loop()
# PROBLEM 1: Reading private attribute without synchronization
if not sem._value > 0: # line 99
logger.warning(...)
# PROBLEM 2: Directly mutating private attribute — bypasses internal lock + waiter queue
sem._value = max(0, sem._value - 1) # line 105
except RuntimeError:
asyncio.get_event_loop().run_until_complete(sem.acquire())
Why It Breaks
-
TOCTOU race: The check on line 99 and the mutation on line 105 are not atomic. Between the check and the write, another coroutine can await sem.acquire(), decrementing _value legitimately. The manual decrement then drives the count negative, which is an invalid state for a semaphore.
-
Waiter queue bypass: asyncio.Semaphore.acquire() manages an internal _waiters deque. Manually decrementing _value means coroutines blocked on await sem.acquire() are never woken up — causing a permanent deadlock.
-
CPython coupling: _value is not part of the public API. It can change or be removed in any Python release.
Concrete Deadlock Scenario
Setup: registry.set_limit("agent1", 2) → sem._value = 2
Thread A (sync): Coroutine B (async):
sem._value is 2 → check passes await sem.acquire() → _value becomes 1
[context switch] await sem.acquire() → _value becomes 0, _locked=True
sem._value = max(0, 2-1) = 1 [now blocked on next acquire()]
↑ STALE READ — should be 0!
Later: sem._value is 1 (invalid — 3 holders for limit=2)
Coroutine B calls sem.release() twice → _value back to 2
But Thread A never released → phantom permit leaked
Eventually: await sem.acquire() blocks forever because
_waiters queue was never signaled by Thread A's "release"
Suggested Fix
Replace the private-attribute manipulation with a proper sync-to-async bridge:
def acquire_sync(self, agent_name: str) -> None:
"""Synchronous acquire using a dedicated event loop in a worker thread."""
sem = self._get_semaphore(agent_name)
if sem is None:
return
try:
asyncio.get_running_loop()
# We're inside an async loop — cannot block. Run acquire in a thread
# with its own loop to go through the semaphore's proper acquire path.
import concurrent.futures
def _acquire_in_new_loop():
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(sem.acquire())
finally:
loop.close()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
pool.submit(_acquire_in_new_loop).result(timeout=30)
except RuntimeError:
# No running loop — safe to create one
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(sem.acquire())
finally:
loop.close()
This ensures every acquisition goes through sem.acquire(), preserving the waiter queue and internal lock.
Gap 2: ThreadPoolExecutor Resource Leak on Tool Timeout
File: praisonaiagents/agent/tool_execution.py, lines 205–223
The Problem
Every tool call with a configured timeout creates a new ThreadPoolExecutor(max_workers=1). When the tool times out, future.cancel() is called, but Python's Future.cancel() cannot interrupt a thread that is already running — it only prevents a queued task from starting. The executor is shut down with wait=False, which returns immediately but leaves the worker thread alive. Additionally, the cleanup code accesses executor._shutdown, a private attribute.
Current Code
# tool_execution.py lines 205-223
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
try:
future = executor.submit(ctx.run, execute_with_context)
try:
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
future.cancel() # line 213: no-op if thread is running
executor.shutdown(wait=False, cancel_futures=True) # line 214: returns immediately
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
else:
executor.shutdown(wait=False) # line 219
finally:
if not executor._shutdown: # line 222: private attribute access
executor.shutdown(wait=False) # line 223: redundant
Why It Breaks
-
Zombie threads accumulate: If a tool does blocking I/O (HTTP request, file read, subprocess), future.cancel() cannot stop it. The thread continues running in the background. Each subsequent timeout creates another zombie thread.
-
Resource exhaustion: Each zombie thread holds: a stack (~8MB default on Linux), a file descriptor for any open socket/file, and any memory allocated by the tool. After 10 timeouts on I/O-bound tools, that's ~80MB of leaked stacks plus leaked connections.
-
Private attribute access: executor._shutdown is a CPython implementation detail. It's also redundant — calling shutdown() on an already-shut-down executor is a safe no-op.
Concrete Leak Scenario
Tool: web_search(query="...") with tool_timeout=5s
Tool internally: requests.get(url, timeout=300) # 300s socket timeout
Iteration 1: Executor thread starts → socket.recv() blocks → 5s timeout fires
→ future.cancel(): no-op (thread is running)
→ executor.shutdown(wait=False): returns, thread still blocked on recv()
→ Thread 1: LEAKED (holding socket + 8MB stack)
Iteration 2: New executor → same pattern → Thread 2: LEAKED
...
Iteration N: N zombie threads, N leaked sockets
After ~1000 iterations: OS hits thread limit (ulimit), fd limit, or OOM
Suggested Fix
Use a reusable executor at the agent level with proper daemon thread cleanup, and add an interrupt mechanism:
# In Agent.__init__ or lazy property:
self._tool_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix=f"tool-{self.name}"
)
# In _execute_tool_with_context:
tool_timeout = getattr(self, '_tool_timeout', None)
if tool_timeout and tool_timeout > 0:
ctx = contextvars.copy_context()
def execute_with_context():
with with_injection_context(state):
return self._execute_tool_impl(function_name, arguments)
future = self._tool_executor.submit(ctx.run, execute_with_context)
try:
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
future.cancel()
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
else:
with with_injection_context(state):
result = self._execute_tool_impl(function_name, arguments)
This eliminates per-call executor creation, removes the private _shutdown access, and bounds the total thread count. For full cancellation of blocking I/O, consider wrapping tools in subprocess with SIGTERM on timeout.
Gap 3: Unprotected Module-Level State in Plugin System
File: praisonaiagents/plugins/__init__.py, lines 80–82, 109–112, 150–162, 219–231
The Problem
The plugin system uses two module-level globals (_plugins_enabled and _enabled_plugin_names) that are read and written from enable(), disable(), and is_enabled() without any lock protection. In multi-agent scenarios where agents run in different threads, concurrent calls to these functions cause torn reads and state corruption.
Current Code
# plugins/__init__.py lines 80-82
_plugins_enabled: bool = False
_enabled_plugin_names: list = None # None = all, list = specific
# lines 109-112 — enable()
def enable(plugins: list = None) -> None:
global _plugins_enabled, _enabled_plugin_names
_plugins_enabled = True # UNPROTECTED WRITE
_enabled_plugin_names = plugins # UNPROTECTED WRITE
# ... discovery + enable logic follows ...
# lines 150, 161-162 — disable()
def disable(plugins: list = None) -> None:
global _plugins_enabled, _enabled_plugin_names
# ...
_plugins_enabled = False # UNPROTECTED WRITE
_enabled_plugin_names = None # UNPROTECTED WRITE
# lines 219-231 — is_enabled()
def is_enabled(name: str = None) -> bool:
global _plugins_enabled
if name is None:
return _plugins_enabled # UNPROTECTED READ
Why It Breaks
-
Torn state between two variables: _plugins_enabled and _enabled_plugin_names are written in two separate statements. A reader can observe _plugins_enabled = True while _enabled_plugin_names is still None (from a previous disable() call), or vice versa.
-
List mutation race: If one thread calls enable(["logging"]) while another calls enable(["metrics"]), the final value of _enabled_plugin_names is unpredictable — last writer wins, but the PluginManager may have already loaded the first set.
-
is_enabled() TOCTOU: Code that checks is_enabled() then reads _enabled_plugin_names separately has a classic race — the state can change between the two reads.
Concrete Corruption Scenario
Thread A: plugins.enable(["logging", "metrics"]) Thread B: plugins.disable()
_plugins_enabled = True
_enabled_plugin_names = ["logging", "metrics"] _plugins_enabled = False
[context switch before discovery] _enabled_plugin_names = None
manager.auto_discover_plugins()
for name in _enabled_plugin_names: # ← reads None!
manager.enable(name) # TypeError: cannot iterate over None
Suggested Fix
Guard all reads and writes with a module-level lock, and make the two-variable update atomic:
import threading
_plugins_lock = threading.Lock()
_plugins_enabled: bool = False
_enabled_plugin_names: list = None
def enable(plugins: list = None) -> None:
global _plugins_enabled, _enabled_plugin_names
with _plugins_lock:
_plugins_enabled = True
_enabled_plugin_names = plugins
from .manager import get_plugin_manager
manager = get_plugin_manager()
manager.auto_discover_plugins()
manager.discover_entry_points()
# Snapshot the names under lock to avoid TOCTOU
with _plugins_lock:
target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None
if target_plugins is not None:
for name in target_plugins:
manager.enable(name)
else:
for plugin_info in manager.list_plugins():
manager.enable(plugin_info.get("name", ""))
def disable(plugins: list = None) -> None:
global _plugins_enabled, _enabled_plugin_names
from .manager import get_plugin_manager
manager = get_plugin_manager()
if plugins is not None:
for name in plugins:
manager.disable(name)
else:
with _plugins_lock:
_plugins_enabled = False
_enabled_plugin_names = None
for plugin_info in manager.list_plugins():
manager.disable(plugin_info.get("name", ""))
def is_enabled(name: str = None) -> bool:
with _plugins_lock:
if name is None:
return _plugins_enabled
from .manager import get_plugin_manager
return get_plugin_manager().is_enabled(name)
Additional Validated Finding: Bare except in Memory Storage
File: praisonaiagents/memory/storage.py, lines 96–104
While not a concurrency issue, this is a resilience gap worth noting:
# storage.py lines 96-104
try:
self.stm_collection = self.chroma_client.get_collection("short_term_memory")
except: # ← catches SystemExit, KeyboardInterrupt, GeneratorExit
self.stm_collection = self.chroma_client.create_collection("short_term_memory")
try:
self.ltm_collection = self.chroma_client.get_collection("long_term_memory")
except: # ← same issue
self.ltm_collection = self.chroma_client.create_collection("long_term_memory")
Bare except: catches SystemExit and KeyboardInterrupt, preventing graceful process shutdown. Should use except Exception as e: with a debug log.
Impact Summary
| Gap |
Severity |
Trigger Condition |
Impact |
| Semaphore manipulation |
Critical |
Any sync acquire_sync() + concurrent async acquire() |
Deadlock, phantom permits |
| Tool executor leak |
High |
Tool timeout on I/O-bound tools |
Memory/thread/FD exhaustion |
| Plugin state race |
High |
enable()/disable() from multiple threads |
TypeError crash, inconsistent config |
All three gaps are latent in single-agent, single-threaded usage but surface under the multi-agent concurrent workloads that the framework is designed to support.
Summary
Deep analysis of
src/praisonai-agents/praisonaiagentsidentified three concurrency and resilience gaps that can cause deadlocks, resource exhaustion, and state corruption under concurrent multi-agent workloads. Each is validated with exact code locations and reproducible scenarios.Gap 1: Unsafe
asyncio.SemaphorePrivate State Manipulation in ConcurrencyRegistryFile:
praisonaiagents/agent/concurrency.py, lines 86–108The Problem
acquire_sync()directly reads and writesasyncio.Semaphore._value, a private CPython implementation detail, bypassing the semaphore's internal lock and waiter queue. This creates a TOCTOU (time-of-check-time-of-use) race and can cause permanent deadlocks.Current Code
Why It Breaks
TOCTOU race: The check on line 99 and the mutation on line 105 are not atomic. Between the check and the write, another coroutine can
await sem.acquire(), decrementing_valuelegitimately. The manual decrement then drives the count negative, which is an invalid state for a semaphore.Waiter queue bypass:
asyncio.Semaphore.acquire()manages an internal_waitersdeque. Manually decrementing_valuemeans coroutines blocked onawait sem.acquire()are never woken up — causing a permanent deadlock.CPython coupling:
_valueis not part of the public API. It can change or be removed in any Python release.Concrete Deadlock Scenario
Suggested Fix
Replace the private-attribute manipulation with a proper sync-to-async bridge:
This ensures every acquisition goes through
sem.acquire(), preserving the waiter queue and internal lock.Gap 2: ThreadPoolExecutor Resource Leak on Tool Timeout
File:
praisonaiagents/agent/tool_execution.py, lines 205–223The Problem
Every tool call with a configured timeout creates a new
ThreadPoolExecutor(max_workers=1). When the tool times out,future.cancel()is called, but Python'sFuture.cancel()cannot interrupt a thread that is already running — it only prevents a queued task from starting. The executor is shut down withwait=False, which returns immediately but leaves the worker thread alive. Additionally, the cleanup code accessesexecutor._shutdown, a private attribute.Current Code
Why It Breaks
Zombie threads accumulate: If a tool does blocking I/O (HTTP request, file read, subprocess),
future.cancel()cannot stop it. The thread continues running in the background. Each subsequent timeout creates another zombie thread.Resource exhaustion: Each zombie thread holds: a stack (~8MB default on Linux), a file descriptor for any open socket/file, and any memory allocated by the tool. After 10 timeouts on I/O-bound tools, that's ~80MB of leaked stacks plus leaked connections.
Private attribute access:
executor._shutdownis a CPython implementation detail. It's also redundant — callingshutdown()on an already-shut-down executor is a safe no-op.Concrete Leak Scenario
Suggested Fix
Use a reusable executor at the agent level with proper daemon thread cleanup, and add an interrupt mechanism:
This eliminates per-call executor creation, removes the private
_shutdownaccess, and bounds the total thread count. For full cancellation of blocking I/O, consider wrapping tools insubprocesswithSIGTERMon timeout.Gap 3: Unprotected Module-Level State in Plugin System
File:
praisonaiagents/plugins/__init__.py, lines 80–82, 109–112, 150–162, 219–231The Problem
The plugin system uses two module-level globals (
_plugins_enabledand_enabled_plugin_names) that are read and written fromenable(),disable(), andis_enabled()without any lock protection. In multi-agent scenarios where agents run in different threads, concurrent calls to these functions cause torn reads and state corruption.Current Code
Why It Breaks
Torn state between two variables:
_plugins_enabledand_enabled_plugin_namesare written in two separate statements. A reader can observe_plugins_enabled = Truewhile_enabled_plugin_namesis stillNone(from a previousdisable()call), or vice versa.List mutation race: If one thread calls
enable(["logging"])while another callsenable(["metrics"]), the final value of_enabled_plugin_namesis unpredictable — last writer wins, but the PluginManager may have already loaded the first set.is_enabled() TOCTOU: Code that checks
is_enabled()then reads_enabled_plugin_namesseparately has a classic race — the state can change between the two reads.Concrete Corruption Scenario
Suggested Fix
Guard all reads and writes with a module-level lock, and make the two-variable update atomic:
Additional Validated Finding: Bare
exceptin Memory StorageFile:
praisonaiagents/memory/storage.py, lines 96–104While not a concurrency issue, this is a resilience gap worth noting:
Bare
except:catchesSystemExitandKeyboardInterrupt, preventing graceful process shutdown. Should useexcept Exception as e:with a debug log.Impact Summary
acquire_sync()+ concurrent asyncacquire()enable()/disable()from multiple threadsAll three gaps are latent in single-agent, single-threaded usage but surface under the multi-agent concurrent workloads that the framework is designed to support.