Skip to content

Concurrency safety gaps in Core SDK: semaphore manipulation, tool executor leak, and unprotected plugin state #1458

@MervinPraison

Description

@MervinPraison

Summary

Deep analysis of src/praisonai-agents/praisonaiagents identified three concurrency and resilience gaps that can cause deadlocks, resource exhaustion, and state corruption under concurrent multi-agent workloads. Each is validated with exact code locations and reproducible scenarios.


Gap 1: Unsafe asyncio.Semaphore Private State Manipulation in ConcurrencyRegistry

File: praisonaiagents/agent/concurrency.py, lines 86–108

The Problem

acquire_sync() directly reads and writes asyncio.Semaphore._value, a private CPython implementation detail, bypassing the semaphore's internal lock and waiter queue. This creates a TOCTOU (time-of-check-time-of-use) race and can cause permanent deadlocks.

Current Code

# concurrency.py lines 86-108
def acquire_sync(self, agent_name: str) -> None:
    sem = self._get_semaphore(agent_name)
    if sem is None:
        return
    try:
        asyncio.get_running_loop()
        # PROBLEM 1: Reading private attribute without synchronization
        if not sem._value > 0:                    # line 99
            logger.warning(...)
        # PROBLEM 2: Directly mutating private attribute — bypasses internal lock + waiter queue
        sem._value = max(0, sem._value - 1)       # line 105
    except RuntimeError:
        asyncio.get_event_loop().run_until_complete(sem.acquire())

Why It Breaks

  1. TOCTOU race: The check on line 99 and the mutation on line 105 are not atomic. Between the check and the write, another coroutine can await sem.acquire(), decrementing _value legitimately. The manual decrement then drives the count negative, which is an invalid state for a semaphore.

  2. Waiter queue bypass: asyncio.Semaphore.acquire() manages an internal _waiters deque. Manually decrementing _value means coroutines blocked on await sem.acquire() are never woken up — causing a permanent deadlock.

  3. CPython coupling: _value is not part of the public API. It can change or be removed in any Python release.

Concrete Deadlock Scenario

Setup: registry.set_limit("agent1", 2)  →  sem._value = 2

Thread A (sync):                     Coroutine B (async):
  sem._value is 2 → check passes      await sem.acquire()  →  _value becomes 1
  [context switch]                     await sem.acquire()  →  _value becomes 0, _locked=True
  sem._value = max(0, 2-1) = 1        [now blocked on next acquire()]
  ↑ STALE READ — should be 0!
  
Later: sem._value is 1 (invalid — 3 holders for limit=2)
  Coroutine B calls sem.release() twice → _value back to 2
  But Thread A never released → phantom permit leaked

  Eventually: await sem.acquire() blocks forever because
  _waiters queue was never signaled by Thread A's "release"

Suggested Fix

Replace the private-attribute manipulation with a proper sync-to-async bridge:

def acquire_sync(self, agent_name: str) -> None:
    """Synchronous acquire using a dedicated event loop in a worker thread."""
    sem = self._get_semaphore(agent_name)
    if sem is None:
        return
    try:
        asyncio.get_running_loop()
        # We're inside an async loop — cannot block. Run acquire in a thread
        # with its own loop to go through the semaphore's proper acquire path.
        import concurrent.futures
        def _acquire_in_new_loop():
            loop = asyncio.new_event_loop()
            try:
                loop.run_until_complete(sem.acquire())
            finally:
                loop.close()
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
            pool.submit(_acquire_in_new_loop).result(timeout=30)
    except RuntimeError:
        # No running loop — safe to create one
        loop = asyncio.new_event_loop()
        try:
            loop.run_until_complete(sem.acquire())
        finally:
            loop.close()

This ensures every acquisition goes through sem.acquire(), preserving the waiter queue and internal lock.


Gap 2: ThreadPoolExecutor Resource Leak on Tool Timeout

File: praisonaiagents/agent/tool_execution.py, lines 205–223

The Problem

Every tool call with a configured timeout creates a new ThreadPoolExecutor(max_workers=1). When the tool times out, future.cancel() is called, but Python's Future.cancel() cannot interrupt a thread that is already running — it only prevents a queued task from starting. The executor is shut down with wait=False, which returns immediately but leaves the worker thread alive. Additionally, the cleanup code accesses executor._shutdown, a private attribute.

Current Code

# tool_execution.py lines 205-223
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
try:
    future = executor.submit(ctx.run, execute_with_context)
    try:
        result = future.result(timeout=tool_timeout)
    except concurrent.futures.TimeoutError:
        future.cancel()                                    # line 213: no-op if thread is running
        executor.shutdown(wait=False, cancel_futures=True)  # line 214: returns immediately
        logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
        result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
    else:
        executor.shutdown(wait=False)                       # line 219
finally:
    if not executor._shutdown:                              # line 222: private attribute access
        executor.shutdown(wait=False)                       # line 223: redundant

Why It Breaks

  1. Zombie threads accumulate: If a tool does blocking I/O (HTTP request, file read, subprocess), future.cancel() cannot stop it. The thread continues running in the background. Each subsequent timeout creates another zombie thread.

  2. Resource exhaustion: Each zombie thread holds: a stack (~8MB default on Linux), a file descriptor for any open socket/file, and any memory allocated by the tool. After 10 timeouts on I/O-bound tools, that's ~80MB of leaked stacks plus leaked connections.

  3. Private attribute access: executor._shutdown is a CPython implementation detail. It's also redundant — calling shutdown() on an already-shut-down executor is a safe no-op.

Concrete Leak Scenario

Tool: web_search(query="...") with tool_timeout=5s
Tool internally: requests.get(url, timeout=300)  # 300s socket timeout

Iteration 1: Executor thread starts → socket.recv() blocks → 5s timeout fires
  → future.cancel(): no-op (thread is running)
  → executor.shutdown(wait=False): returns, thread still blocked on recv()
  → Thread 1: LEAKED (holding socket + 8MB stack)

Iteration 2: New executor → same pattern → Thread 2: LEAKED
...
Iteration N: N zombie threads, N leaked sockets

After ~1000 iterations: OS hits thread limit (ulimit), fd limit, or OOM

Suggested Fix

Use a reusable executor at the agent level with proper daemon thread cleanup, and add an interrupt mechanism:

# In Agent.__init__ or lazy property:
self._tool_executor = concurrent.futures.ThreadPoolExecutor(
    max_workers=2, thread_name_prefix=f"tool-{self.name}"
)

# In _execute_tool_with_context:
tool_timeout = getattr(self, '_tool_timeout', None)
if tool_timeout and tool_timeout > 0:
    ctx = contextvars.copy_context()
    
    def execute_with_context():
        with with_injection_context(state):
            return self._execute_tool_impl(function_name, arguments)
    
    future = self._tool_executor.submit(ctx.run, execute_with_context)
    try:
        result = future.result(timeout=tool_timeout)
    except concurrent.futures.TimeoutError:
        future.cancel()
        logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
        result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
else:
    with with_injection_context(state):
        result = self._execute_tool_impl(function_name, arguments)

This eliminates per-call executor creation, removes the private _shutdown access, and bounds the total thread count. For full cancellation of blocking I/O, consider wrapping tools in subprocess with SIGTERM on timeout.


Gap 3: Unprotected Module-Level State in Plugin System

File: praisonaiagents/plugins/__init__.py, lines 80–82, 109–112, 150–162, 219–231

The Problem

The plugin system uses two module-level globals (_plugins_enabled and _enabled_plugin_names) that are read and written from enable(), disable(), and is_enabled() without any lock protection. In multi-agent scenarios where agents run in different threads, concurrent calls to these functions cause torn reads and state corruption.

Current Code

# plugins/__init__.py lines 80-82
_plugins_enabled: bool = False
_enabled_plugin_names: list = None  # None = all, list = specific

# lines 109-112 — enable()
def enable(plugins: list = None) -> None:
    global _plugins_enabled, _enabled_plugin_names
    _plugins_enabled = True                    # UNPROTECTED WRITE
    _enabled_plugin_names = plugins            # UNPROTECTED WRITE
    # ... discovery + enable logic follows ...

# lines 150, 161-162 — disable()
def disable(plugins: list = None) -> None:
    global _plugins_enabled, _enabled_plugin_names
    # ...
    _plugins_enabled = False                   # UNPROTECTED WRITE
    _enabled_plugin_names = None               # UNPROTECTED WRITE

# lines 219-231 — is_enabled()
def is_enabled(name: str = None) -> bool:
    global _plugins_enabled
    if name is None:
        return _plugins_enabled                # UNPROTECTED READ

Why It Breaks

  1. Torn state between two variables: _plugins_enabled and _enabled_plugin_names are written in two separate statements. A reader can observe _plugins_enabled = True while _enabled_plugin_names is still None (from a previous disable() call), or vice versa.

  2. List mutation race: If one thread calls enable(["logging"]) while another calls enable(["metrics"]), the final value of _enabled_plugin_names is unpredictable — last writer wins, but the PluginManager may have already loaded the first set.

  3. is_enabled() TOCTOU: Code that checks is_enabled() then reads _enabled_plugin_names separately has a classic race — the state can change between the two reads.

Concrete Corruption Scenario

Thread A: plugins.enable(["logging", "metrics"])    Thread B: plugins.disable()
  _plugins_enabled = True                             
  _enabled_plugin_names = ["logging", "metrics"]      _plugins_enabled = False
  [context switch before discovery]                    _enabled_plugin_names = None
  manager.auto_discover_plugins()                     
  for name in _enabled_plugin_names:                  # ← reads None!
    manager.enable(name)                              # TypeError: cannot iterate over None

Suggested Fix

Guard all reads and writes with a module-level lock, and make the two-variable update atomic:

import threading

_plugins_lock = threading.Lock()
_plugins_enabled: bool = False
_enabled_plugin_names: list = None

def enable(plugins: list = None) -> None:
    global _plugins_enabled, _enabled_plugin_names
    with _plugins_lock:
        _plugins_enabled = True
        _enabled_plugin_names = plugins
    
    from .manager import get_plugin_manager
    manager = get_plugin_manager()
    manager.auto_discover_plugins()
    manager.discover_entry_points()
    
    # Snapshot the names under lock to avoid TOCTOU
    with _plugins_lock:
        target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None
    
    if target_plugins is not None:
        for name in target_plugins:
            manager.enable(name)
    else:
        for plugin_info in manager.list_plugins():
            manager.enable(plugin_info.get("name", ""))

def disable(plugins: list = None) -> None:
    global _plugins_enabled, _enabled_plugin_names
    from .manager import get_plugin_manager
    manager = get_plugin_manager()
    
    if plugins is not None:
        for name in plugins:
            manager.disable(name)
    else:
        with _plugins_lock:
            _plugins_enabled = False
            _enabled_plugin_names = None
        for plugin_info in manager.list_plugins():
            manager.disable(plugin_info.get("name", ""))

def is_enabled(name: str = None) -> bool:
    with _plugins_lock:
        if name is None:
            return _plugins_enabled
    from .manager import get_plugin_manager
    return get_plugin_manager().is_enabled(name)

Additional Validated Finding: Bare except in Memory Storage

File: praisonaiagents/memory/storage.py, lines 96–104

While not a concurrency issue, this is a resilience gap worth noting:

# storage.py lines 96-104
try:
    self.stm_collection = self.chroma_client.get_collection("short_term_memory")
except:  # ← catches SystemExit, KeyboardInterrupt, GeneratorExit
    self.stm_collection = self.chroma_client.create_collection("short_term_memory")
    
try:
    self.ltm_collection = self.chroma_client.get_collection("long_term_memory")
except:  # ← same issue
    self.ltm_collection = self.chroma_client.create_collection("long_term_memory")

Bare except: catches SystemExit and KeyboardInterrupt, preventing graceful process shutdown. Should use except Exception as e: with a debug log.


Impact Summary

Gap Severity Trigger Condition Impact
Semaphore manipulation Critical Any sync acquire_sync() + concurrent async acquire() Deadlock, phantom permits
Tool executor leak High Tool timeout on I/O-bound tools Memory/thread/FD exhaustion
Plugin state race High enable()/disable() from multiple threads TypeError crash, inconsistent config

All three gaps are latent in single-agent, single-threaded usage but surface under the multi-agent concurrent workloads that the framework is designed to support.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingclaudeAuto-trigger Claude analysis

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions