Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions src/praisonai-agents/praisonaiagents/agent/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,27 @@ async def acquire(self, agent_name: str) -> None:
def acquire_sync(self, agent_name: str) -> None:
"""Synchronous acquire — for non-async code paths.

Note: This creates/reuses an event loop internally.
Prefer async acquire() when possible.
If called while an event loop is already running in the current thread,
this method raises RuntimeError to avoid deadlock.
"""
sem = self._get_semaphore(agent_name)
if sem is None:
return
try:
asyncio.get_running_loop()
# If we're in an async context, we can't block
# Just try_acquire or no-op with warning
if not sem._value > 0:
logger.warning(
f"Sync acquire for '{agent_name}' while async loop running and semaphore full. "
f"Consider using async acquire() instead."
)
# Decrement manually for sync context
sem._value = max(0, sem._value - 1)
except RuntimeError:
# No running loop — safe to use asyncio.run
asyncio.get_event_loop().run_until_complete(sem.acquire())
# No running loop — safe to create one
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(sem.acquire())
finally:
loop.close()
Comment on lines 92 to +114
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm release() path and any existing sync/async mixing callers.
rg -nP -C3 '\bacquire_sync\b|registry\.release\b|ConcurrencyRegistry\b'

Repository: MervinPraison/PraisonAI

Length of output: 11770


🏁 Script executed:

cat -n src/praisonai-agents/praisonaiagents/agent/concurrency.py

Repository: MervinPraison/PraisonAI

Length of output: 6363


🏁 Script executed:

# Find actual usage of acquire_sync and release in test/agent code
rg -n "acquire_sync|\.release\(" src/praisonai-agents/praisonaiagents/ src/praisonai-agents/tests/ -A 2

Repository: MervinPraison/PraisonAI

Length of output: 4024


🏁 Script executed:

# Check the test more carefully - is it in async context?
sed -n '101,110p' src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py

# Also check if acquire_sync is called from actual async code anywhere
rg -n "acquire_sync" --type py

Repository: MervinPraison/PraisonAI

Length of output: 760


🏁 Script executed:

# Check Python version and asyncio.Semaphore behavior
python3 --version

# Let's create a test script to see what actually happens with cross-loop semaphore usage
cat > /tmp/test_semaphore.py << 'EOF'
import asyncio
import threading

# Test 1: Semaphore created, then acquire in separate loop
sem = asyncio.Semaphore(1)
print(f"1. Created sem: {sem}")
print(f"   sem._loop initially: {getattr(sem, '_loop', 'not set')}")

# Acquire in a new loop (the worker thread approach)
def acquire_in_new_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    print(f"2. Worker thread loop: {loop}")
    try:
        loop.run_until_complete(sem.acquire())
        print(f"3. After acquire in worker, sem._loop: {getattr(sem, '_loop', 'not set')}")
    finally:
        loop.close()
    print(f"4. After loop.close() in worker")

t = threading.Thread(target=acquire_in_new_loop)
t.start()
t.join()

print(f"5. Main thread, after worker finishes")
print(f"   sem._loop now: {getattr(sem, '_loop', 'not set')}")

# Try to release from main thread (no loop running)
try:
    sem.release()
    print(f"6. Release succeeded from main thread")
except Exception as e:
    print(f"6. Release failed: {type(e).__name__}: {e}")

# Test 2: Create loop, acquire, close immediately
print("\n--- Test 2: Close loop immediately ---")
sem2 = asyncio.Semaphore(1)
loop2 = asyncio.new_event_loop()
asyncio.set_event_loop(loop2)
try:
    loop2.run_until_complete(sem2.acquire())
    print(f"Acquired in loop2, sem2._loop: {getattr(sem2, '_loop', 'not set')}")
finally:
    loop2.close()
    asyncio.set_event_loop(None)

print(f"After loop2.close(), sem2._loop: {getattr(sem2, '_loop', 'not set')}")
try:
    sem2.release()
    print(f"Release succeeded from main thread after loop close")
except Exception as e:
    print(f"Release failed: {type(e).__name__}: {e}")
EOF
python3 /tmp/test_semaphore.py

Repository: MervinPraison/PraisonAI

Length of output: 624


🏁 Script executed:

# Let's test the timeout behavior specifically
cat > /tmp/test_timeout.py << 'EOF'
import asyncio
import concurrent.futures
import time
import threading

# Create a semaphore that will never acquire (locked)
sem = asyncio.Semaphore(0)  # Start at 0, so acquire will block

def acquire_in_new_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    print(f"[Worker] Starting acquire in new loop at {time.time():.2f}")
    try:
        # This will hang forever since sem is at 0
        loop.run_until_complete(sem.acquire())
        print(f"[Worker] Acquired (should not happen)")
    except asyncio.CancelledError:
        print(f"[Worker] Cancelled")
    finally:
        loop.close()
    print(f"[Worker] Finished at {time.time():.2f}")

start = time.time()
print(f"Starting at {start:.2f}")

try:
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
        print(f"Pool created at {time.time() - start:.2f}s")
        future = pool.submit(acquire_in_new_loop)
        try:
            result = future.result(timeout=2)
            print(f"Got result: {result}")
        except concurrent.futures.TimeoutError:
            print(f"[Main] result() raised TimeoutError at {time.time() - start:.2f}s")
            print(f"[Main] Now exiting 'with' block (this will block waiting for worker)...")
except Exception as e:
    print(f"[Main] Exception: {e}")

print(f"[Main] Exited 'with' block at {time.time() - start:.2f}s")
print(f"Total time elapsed: {time.time() - start:.2f}s")
EOF
timeout 10 python3 /tmp/test_timeout.py

Repository: MervinPraison/PraisonAI

Length of output: 115


🏁 Script executed:

# Test the timeout behavior without timeout command - use shorter timeout
cat > /tmp/test_timeout.py << 'EOF'
import asyncio
import concurrent.futures
import time
import threading

# Create a semaphore that will never acquire (locked)
sem = asyncio.Semaphore(0)  # Start at 0, so acquire will block

def acquire_in_new_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    print(f"[Worker] Starting acquire in new loop at {time.time():.2f}")
    try:
        # This will hang forever since sem is at 0
        loop.run_until_complete(sem.acquire())
        print(f"[Worker] Acquired (should not happen)")
    except asyncio.CancelledError:
        print(f"[Worker] Cancelled")
    finally:
        loop.close()
    print(f"[Worker] Finished at {time.time():.2f}")

start = time.time()
print(f"Starting at {start:.2f}")

# Test without hanging forever - use very short timeout
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
    print(f"Pool created at {time.time() - start:.2f}s")
    future = pool.submit(acquire_in_new_loop)
    try:
        result = future.result(timeout=1)
        print(f"Got result: {result}")
    except concurrent.futures.TimeoutError:
        print(f"[Main] result(timeout=1) raised TimeoutError at {time.time() - start:.2f}s")
        print(f"[Main] Now about to exit 'with' block (ThreadPoolExecutor.__exit__ will do shutdown(wait=True))...")
        
print(f"[Main] Exited 'with' block at {time.time() - start:.2f}s")
print(f"Total time elapsed: {time.time() - start:.2f}s (should be >> 1 second due to shutdown wait)")
EOF

# Run with a shell timeout
( sleep 15; kill $$ 2>/dev/null ) & python3 /tmp/test_timeout.py ; pkill -P $$

Repository: MervinPraison/PraisonAI

Length of output: 153


🏁 Script executed:

# Check if Python 3.11 asyncio.Semaphore stores loop ref differently
python3 << 'EOF'
import asyncio
import sys

print(f"Python version: {sys.version}")

# Check Semaphore internals
sem = asyncio.Semaphore(1)
print(f"\nSemaphore attributes: {dir(sem)}")
print(f"\nSemaphore __dict__: {sem.__dict__}")

# After creating a semaphore, check what happens when we await it in different loops
import asyncio
loop1 = asyncio.new_event_loop()
asyncio.set_event_loop(loop1)

async def test1():
    print(f"\nIn loop1, sem.__dict__ before acquire: {sem.__dict__}")
    await sem.acquire()
    print(f"In loop1, sem.__dict__ after acquire: {sem.__dict__}")

loop1.run_until_complete(test1())

# Now try from a different loop
loop2 = asyncio.new_event_loop()
asyncio.set_event_loop(loop2)

async def test2():
    print(f"\nIn loop2, sem.__dict__ before acquire: {sem.__dict__}")
    await sem.acquire()
    print(f"In loop2, sem.__dict__ after acquire: {sem.__dict__}")

try:
    loop2.run_until_complete(test2())
except RuntimeError as e:
    print(f"In loop2, got error: {e}")

loop1.close()
loop2.close()
EOF

Repository: MervinPraison/PraisonAI

Length of output: 153


🏁 Script executed:

# Simpler check - just look at asyncio.Semaphore source behavior without executing problematic code
python3 -c "
import asyncio
import inspect

# Get the source of asyncio.Semaphore to understand how it handles loops
print('=== asyncio.Semaphore.acquire source ===')
print(inspect.getsource(asyncio.Semaphore.acquire))
"

Repository: MervinPraison/PraisonAI

Length of output: 1260


🏁 Script executed:

python3 -c "
import asyncio
import inspect

# Get the source of asyncio.Semaphore._get_loop to understand loop binding
print('=== asyncio.Semaphore._get_loop source ===')
try:
    print(inspect.getsource(asyncio.Semaphore._get_loop))
except:
    print('_get_loop not found in Semaphore, checking base Lock...')
    print(inspect.getsource(asyncio.Lock._get_loop))
"

Repository: MervinPraison/PraisonAI

Length of output: 451


🏁 Script executed:

python3 -c "
import asyncio
import inspect

# Get the source of asyncio.Semaphore.release to understand how it handles the loop
print('=== asyncio.Semaphore.release source ===')
try:
    print(inspect.getsource(asyncio.Semaphore.release))
except:
    print('Checking base Lock.release...')
    print(inspect.getsource(asyncio.Lock.release))
"

Repository: MervinPraison/PraisonAI

Length of output: 412


🏁 Script executed:

# Now test the actual critical issue: RuntimeError when semaphore is bound to wrong loop
python3 << 'EOF'
import asyncio
import concurrent.futures

sem = asyncio.Semaphore(1)
print("Initial state: sem._loop =", sem._loop)

# Simulate the problematic path: acquire in worker thread
def acquire_in_worker():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    print("Worker: before acquire, sem._loop =", sem._loop)
    loop.run_until_complete(sem.acquire())
    print("Worker: after acquire, sem._loop =", sem._loop)
    loop.close()
    print("Worker: loop closed")

# Run it
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
    pool.submit(acquire_in_worker).result()

print("Main: after worker finished, sem._loop =", sem._loop)

# Now try to use semaphore from async context
async def use_semaphore():
    print("Async: attempting acquire with sem._loop =", sem._loop)
    try:
        await sem.acquire()
        print("Async: acquire succeeded")
    except RuntimeError as e:
        print(f"Async: RuntimeError: {e}")

# Create a new loop in the main thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(use_semaphore())
finally:
    loop.close()
EOF

Repository: MervinPraison/PraisonAI

Length of output: 153


🏁 Script executed:

# Minimal test without actual event loop execution
python3 << 'EOF'
import asyncio

# Test what _get_loop does when called from non-running context
sem = asyncio.Semaphore(1)
print("Semaphore._loop initially:", sem._loop)

# Check if calling _get_loop outside running loop works
try:
    result = sem._get_loop()
    print("_get_loop() succeeded:", result)
except RuntimeError as e:
    print("_get_loop() raises RuntimeError:", e)
EOF

Repository: MervinPraison/PraisonAI

Length of output: 126


🏁 Script executed:

python3 << 'EOF'
import asyncio

# Check _wake_up_next implementation
import inspect
print("=== Semaphore._wake_up_next source ===")
try:
    print(inspect.getsource(asyncio.Semaphore._wake_up_next))
except AttributeError:
    # Try base class
    print(inspect.getsource(asyncio.Lock._wake_up_next))
EOF

Repository: MervinPraison/PraisonAI

Length of output: 399


🏁 Script executed:

# Check if the test actually runs the problematic code path
# The test_sync_acquire_release is NOT in an async test, so it won't hit the RuntimeError branch
sed -n '101,115p' src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py

# Verify: is it decorated with `@pytest.mark.asyncio`?
sed -n '100,115p' src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py

Repository: MervinPraison/PraisonAI

Length of output: 809


🏁 Script executed:

# Check if any test calls acquire_sync from within an `@pytest.mark.asyncio` test
rg -B 5 "acquire_sync" src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py

Repository: MervinPraison/PraisonAI

Length of output: 362


🏁 Script executed:

# Search for actual usage of acquire_sync in agent code (not tests)
rg -n "acquire_sync" src/praisonai-agents/praisonaiagents/ --type py --not-tests

# Also check if there are any agent implementations that mix sync and async
rg -B5 -A5 "def.*acquire" src/praisonai-agents/praisonaiagents/agent/ --type py | head -50

Repository: MervinPraison/PraisonAI

Length of output: 3324


Cross-loop asyncio.Semaphore binding will cause RuntimeError and the timeout guarantee is defeated.

asyncio.Semaphore binds itself to the first event loop that calls _get_loop() (during acquire()). This rewrite introduces subtle issues:

  1. Lines 109–114 (no running loop branch): Creates a loop, acquires (which binds the semaphore to that loop), then immediately closes it. The semaphore retains a reference to the closed loop in _loop. Any subsequent await acquire() from a different event loop will raise RuntimeError: Semaphore object is bound to a different event loop because _get_loop() enforces loop identity.

  2. Lines 96–107 (inside running loop branch): Acquires in a worker thread's new loop, binding the semaphore to that short-lived loop. If release() is later called from the main async context (which has a different loop), the same RuntimeError occurs. Even if it doesn't, the separated loop contexts defeat the purpose of a semaphore.

  3. Timeout is illusory: with ThreadPoolExecutor(...) as pool calls shutdown(wait=True) on __exit__, blocking indefinitely until the worker finishes. If result(timeout=30) raises TimeoutError, the with block still waits for the hung acquire(), defeating the 30-second guarantee.

  4. Violates coding guidelines: "use asyncio primitives for coordination, not threading" — this bridges an asyncio primitive via threading, which breaks loop-binding semantics. The pattern "One event loop per thread; never nest event loops" is intended to prevent exactly this scenario.

Recommended fix: Don't use asyncio.Semaphore for sync contexts. Instead:

  • Maintain a parallel threading.BoundedSemaphore for the sync path, created alongside the async one in _get_semaphore()
  • In acquire_sync(), use the sync semaphore with a proper acquire(timeout=...) call
  • Update release() to release whichever semaphore was acquired (or split into release_sync())

Alternatively, make acquire_sync() raise a clear error when called from within a running loop, directing users to await acquire() instead (fail fast per guidelines).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/agent/concurrency.py` around lines 92 -
114, The current acquire_sync implementation binds an asyncio.Semaphore to
different event loops (via new_event_loop or thread worker) which causes
RuntimeError and defeats the timeout; replace this by never touching
asyncio.Semaphore from sync code: update _get_semaphore to create and return a
pair (asyncio.Semaphore, threading.BoundedSemaphore) or store a thread-safe
BoundedSemaphore alongside the async one, remove the
ThreadPoolExecutor/new_event_loop code in acquire_sync and call the
threading.BoundedSemaphore.acquire(timeout=30) there instead, and update
release() (or add release_sync/release_async) to release the corresponding
semaphore; alternatively, make acquire_sync raise a clear RuntimeError directing
callers to await acquire() if you prefer to fail-fast.

else:
raise RuntimeError(
f"acquire_sync('{agent_name}') cannot be called with a running event loop; "
"use async acquire() in async contexts."
)

def release(self, agent_name: str) -> None:
"""Release concurrency slot for agent. No-op if unlimited."""
Expand Down
30 changes: 12 additions & 18 deletions src/praisonai-agents/praisonaiagents/agent/tool_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,25 +202,19 @@ def execute_with_context():
with with_injection_context(state):
return self._execute_tool_impl(function_name, arguments)

# Use explicit executor lifecycle to actually bound execution time
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
# Use reusable executor to prevent resource leaks
if not hasattr(self, '_tool_executor'):
self._tool_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix=f"tool-{self.name}"
)

future = self._tool_executor.submit(ctx.run, execute_with_context)
try:
future = executor.submit(ctx.run, execute_with_context)
try:
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
# Cancel and shutdown immediately to avoid blocking
future.cancel()
executor.shutdown(wait=False, cancel_futures=True)
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
else:
# Normal completion - shutdown gracefully
executor.shutdown(wait=False)
finally:
# Ensure executor is always cleaned up
if not executor._shutdown:
executor.shutdown(wait=False)
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
future.cancel()
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
Comment on lines +205 to +217
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

New per-agent executor has three concurrency / lifecycle gaps worth addressing before this "concurrency fix" lands.

  1. Race in lazy initialization (Line 206-209). hasattr(self, '_tool_executor') followed by assignment is not atomic. Two threads invoking execute_tool concurrently on the same agent can both observe the attribute as missing and each construct a ThreadPoolExecutor; the loser is orphaned (its worker thread lingers until GC). For a PR whose stated goal is concurrency safety this is the exact failure mode being patched elsewhere — initialize the executor eagerly in __init__ (or guard creation with a threading.Lock / functools.cached_property-style idiom) so first-use contention cannot double-spawn pools.

  2. Executor is never shut down → thread leak on agent close. The Agent.close() implementation in src/praisonai-agents/praisonaiagents/agent/agent.py (lines 4570-4640, provided as context) cleans up memory, LLM clients, MCP clients, server registrations and background tasks but has no branch for _tool_executor. Each Agent instance that executes a timed-out-or-not tool will retain up to 2 worker threads for the life of the process. This converts the prior "per-call executor leak" into a "per-agent executor leak" rather than fixing it. Add a self._tool_executor.shutdown(wait=False, cancel_futures=True) step (guarded by hasattr) to close() (and the async counterpart if any).

  3. future.cancel() does not stop an already-running tool (Line 215). concurrent.futures.Future.cancel() only succeeds while the task is still in the queue; once ctx.run(execute_with_context) has started, cancel() returns False and the worker thread keeps running the tool body to completion. With max_workers=2, two consecutive timeouts from hung tools will wedge the whole pool and every subsequent tool call on this agent will block inside future.result(timeout=tool_timeout) waiting for a free worker before its own timeout even starts counting. At minimum, log when cancel() returns False so the leak is observable; better, track the number of orphaned in-flight futures and either refuse further submissions or grow the pool.

🔒 Suggested direction (illustrative, not a drop-in)
-                # Use reusable executor to prevent resource leaks
-                if not hasattr(self, '_tool_executor'):
-                    self._tool_executor = concurrent.futures.ThreadPoolExecutor(
-                        max_workers=2, thread_name_prefix=f"tool-{self.name}"
-                    )
-
-                future = self._tool_executor.submit(ctx.run, execute_with_context)
+                # Reusable executor; created lazily under a lock to avoid
+                # double-initialization races on concurrent first use.
+                executor = getattr(self, '_tool_executor', None)
+                if executor is None:
+                    import threading
+                    lock = self.__dict__.setdefault('_tool_executor_lock', threading.Lock())
+                    with lock:
+                        executor = getattr(self, '_tool_executor', None)
+                        if executor is None:
+                            executor = concurrent.futures.ThreadPoolExecutor(
+                                max_workers=2,
+                                thread_name_prefix=f"tool-{self.name}",
+                            )
+                            self._tool_executor = executor
+
+                future = executor.submit(ctx.run, execute_with_context)
                 try:
                     result = future.result(timeout=tool_timeout)
                 except concurrent.futures.TimeoutError:
-                    future.cancel()
+                    if not future.cancel():
+                        logging.warning(
+                            f"Tool {function_name} timed out after {tool_timeout}s "
+                            "and is still running in the executor thread; worker "
+                            "slot will remain occupied until the tool returns."
+                        )
                     logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
                     result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}

And in Agent.close() (agent.py):

# Tool executor cleanup
try:
    executor = getattr(self, '_tool_executor', None)
    if executor is not None:
        executor.shutdown(wait=False, cancel_futures=True)
        self._tool_executor = None
except Exception as e:
    logger.warning(f"Tool executor cleanup failed: {e}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/agent/tool_execution.py` around lines
205 - 217, The per-agent executor introduces three issues: racey lazy init, no
shutdown, and ineffective future.cancel(); fix by (1) moving _tool_executor
creation into the Agent constructor (or guard it with a threading.Lock /
cached_property) so execute_tool (or the method containing the shown code) never
does a racy hasattr check, (2) add guarded shutdown logic in Agent.close() (and
the async close counterpart) to call self._tool_executor.shutdown(wait=False,
cancel_futures=True) and clear the attribute, and (3) handle cancel() returning
False after ctx.run has started by logging the failure and tracking
in-flight/orphaned futures (or refusing new submissions / growing the pool) so
hung tasks don’t permanently exhaust the max_workers=2 pool.

else:
with with_injection_context(state):
result = self._execute_tool_impl(function_name, arguments)
Expand Down
28 changes: 18 additions & 10 deletions src/praisonai-agents/praisonaiagents/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def my_plugin_func(hook_type, *args, **kwargs):
# ============================================================================

# Global state for plugin system (lazy initialized)
import threading

_plugins_lock = threading.Lock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make plugin enablement updates one synchronized transaction.

_plugins_lock protects only _plugins_enabled / _enabled_plugin_names; the actual PluginManager._enabled reads/writes still happen outside that lock, and manager.py:104-120 accesses that dict without using PluginManager._lock. Concurrent enable() / disable() can leave is_enabled(None) and is_enabled(name) disagreeing. Also, if discovery fails after Line 115, globals remain enabled even though plugins were not enabled.

🔒 Proposed synchronization tightening
-_plugins_lock = threading.Lock()
+_plugins_lock = threading.RLock()
 _plugins_enabled: bool = False
 _enabled_plugin_names: list = None  # None = all, list = specific
@@
 def enable(plugins: list = None) -> None:
@@
     global _plugins_enabled, _enabled_plugin_names
-    
-    with _plugins_lock:
-        _plugins_enabled = True
-        _enabled_plugin_names = plugins  # None = all, list = specific
+    target_plugins = list(plugins) if plugins is not None else None
@@
-    # Snapshot the names under lock to avoid TOCTOU
-    with _plugins_lock:
-        target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None
-    
-    # Enable specific plugins or all
-    if target_plugins is not None:
-        # Enable only specified plugins
-        for name in target_plugins:
-            manager.enable(name)
-    else:
-        # Enable all discovered plugins
-        for plugin_info in manager.list_plugins():
-            manager.enable(plugin_info.get("name", ""))
+    with _plugins_lock:
+        # Enable specific plugins or all
+        if target_plugins is not None:
+            # Enable only specified plugins
+            for name in target_plugins:
+                manager.enable(name)
+        else:
+            # Enable all discovered plugins
+            for plugin_info in manager.list_plugins():
+                manager.enable(plugin_info.get("name", ""))
+
+        _plugins_enabled = True
+        _enabled_plugin_names = target_plugins  # None = all, list = specific
@@
     if plugins is not None:
         # Disable specific plugins
-        for name in plugins:
-            manager.disable(name)
+        with _plugins_lock:
+            for name in plugins:
+                manager.disable(name)
     else:
         # Disable all plugins
         with _plugins_lock:
+            for plugin_info in manager.list_plugins():
+                manager.disable(plugin_info.get("name", ""))
             _plugins_enabled = False
             _enabled_plugin_names = None
-        for plugin_info in manager.list_plugins():
-            manager.disable(plugin_info.get("name", ""))
@@
 def is_enabled(name: str = None) -> bool:
@@
     with _plugins_lock:
         if name is None:
             return _plugins_enabled
-    
+
     from .manager import get_plugin_manager
     manager = get_plugin_manager()
-    return manager.is_enabled(name)
+    with _plugins_lock:
+        return manager.is_enabled(name)

If PluginManager remains publicly usable, also move the lock into src/praisonai-agents/praisonaiagents/plugins/manager.py so direct manager calls are protected too:

 def enable(self, name: str) -> bool:
     """Enable a plugin."""
-    if name in self._enabled:
-        self._enabled[name] = True
-        return True
-    return False
+    with self._lock:
+        if name in self._enabled:
+            self._enabled[name] = True
+            return True
+        return False
 
 def disable(self, name: str) -> bool:
     """Disable a plugin."""
-    if name in self._enabled:
-        self._enabled[name] = False
-        return True
-    return False
+    with self._lock:
+        if name in self._enabled:
+            self._enabled[name] = False
+            return True
+        return False
 
 def is_enabled(self, name: str) -> bool:
     """Check if a plugin is enabled."""
-    return self._enabled.get(name, False)
+    with self._lock:
+        return self._enabled.get(name, False)

Also applies to: 114-138, 163-173, 237-243

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/plugins/__init__.py` at line 83, The
enable/disable flow is not atomic: _plugins_lock only guards
_plugins_enabled/_enabled_plugin_names while PluginManager._enabled is modified
outside that lock, causing inconsistent is_enabled(None)/is_enabled(name) states
and leaving globals partially set if discovery fails; fix by making plugin
enablement a single synchronized transaction—acquire a single lock that covers
both the globals (_plugins_enabled, _enabled_plugin_names, _plugins_lock) and
the PluginManager._enabled mutations (either by moving the lock into
PluginManager as PluginManager._lock and using it inside
PluginManager.enable/disable/is_enabled, or by always acquiring both locks in a
defined order before making changes), perform discovery and all
PluginManager.enable/disable calls while holding the lock, and only update the
globals after successful enablement (or roll back on failure) so functions like
PluginManager.enable/disable and is_enabled are consistently protected.

_plugins_enabled: bool = False
_enabled_plugin_names: list = None # None = all, list = specific

Expand Down Expand Up @@ -108,8 +111,9 @@ def enable(plugins: list = None) -> None:
"""
global _plugins_enabled, _enabled_plugin_names

_plugins_enabled = True
_enabled_plugin_names = plugins # None = all, list = specific
with _plugins_lock:
_plugins_enabled = True
_enabled_plugin_names = plugins # None = all, list = specific

# Get plugin manager and auto-discover
from .manager import get_plugin_manager
Expand All @@ -119,10 +123,14 @@ def enable(plugins: list = None) -> None:
manager.auto_discover_plugins()
manager.discover_entry_points()

# Snapshot the names under lock to avoid TOCTOU
with _plugins_lock:
target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None

# Enable specific plugins or all
if plugins is not None:
if target_plugins is not None:
# Enable only specified plugins
for name in plugins:
for name in target_plugins:
manager.enable(name)
else:
# Enable all discovered plugins
Expand Down Expand Up @@ -158,8 +166,9 @@ def disable(plugins: list = None) -> None:
manager.disable(name)
else:
# Disable all plugins
_plugins_enabled = False
_enabled_plugin_names = None
with _plugins_lock:
_plugins_enabled = False
_enabled_plugin_names = None
for plugin_info in manager.list_plugins():
manager.disable(plugin_info.get("name", ""))

Expand Down Expand Up @@ -225,10 +234,9 @@ def is_enabled(name: str = None) -> bool:
Returns:
True if enabled, False otherwise.
"""
global _plugins_enabled

if name is None:
return _plugins_enabled
with _plugins_lock:
if name is None:
return _plugins_enabled

from .manager import get_plugin_manager
manager = get_plugin_manager()
Expand Down
15 changes: 15 additions & 0 deletions src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,18 @@ def test_sync_acquire_release(self):
# Sync acquire should work
reg.acquire_sync("sync_agent")
reg.release("sync_agent")

@pytest.mark.asyncio
async def test_sync_acquire_running_loop_noop(self):
"""Sync acquire in async context should fail fast without changing state."""
from praisonaiagents.agent.concurrency import ConcurrencyRegistry
reg = ConcurrencyRegistry()
reg.set_limit("loop_agent", 1)
await reg.acquire("loop_agent")
with pytest.raises(RuntimeError, match="running event loop"):
reg.acquire_sync("loop_agent")
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(reg.acquire("loop_agent"), timeout=0.05)
reg.release("loop_agent")
await asyncio.wait_for(reg.acquire("loop_agent"), timeout=0.05)
reg.release("loop_agent")