fix: concurrency safety gaps in Core SDK (fixes #1458)#1459
fix: concurrency safety gaps in Core SDK (fixes #1458)#1459MervinPraison merged 3 commits intomainfrom
Conversation
- Replace unsafe asyncio.Semaphore private attribute manipulation with proper sync-to-async bridge in concurrency.py - Fix ThreadPoolExecutor resource leak by using reusable agent-level executor in tool_execution.py - Add thread lock protection for unprotected plugin state in plugins/__init__.py Fixes #1458 Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com>
There was a problem hiding this comment.
praisonai-triage-agent[bot] has reached the 50-review limit for trial accounts. To continue receiving code reviews, upgrade your plan.
|
@coderabbitai review |
|
/review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughThree concurrent and execution safety updates: (1) Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first — incorporate their findings. Review areas:
|
Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/c9e7c913-a80d-474a-bd5d-731c129ad614 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com>
There was a problem hiding this comment.
praisonai-triage-agent[bot] has reached the 50-review limit for trial accounts. To continue receiving code reviews, upgrade your plan.
Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/c9e7c913-a80d-474a-bd5d-731c129ad614 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com>
There was a problem hiding this comment.
praisonai-triage-agent[bot] has reached the 50-review limit for trial accounts. To continue receiving code reviews, upgrade your plan.
Completed a full PR review and applied a follow-up fix in Findings by area:
Suggested next improvement (optional): add a small dedicated unit test around Screenshot: N/A (no UI changes; backend-only changes). |
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task —— View job Final Architecture ReviewPhase 1: Architecture Validation per AGENTS.md
Phase 2: Issue Analysis & Fixes
Phase 3: Final Verdict
🏗️ Architecture Review Summary✅ COMPLIANT - Protocol-Driven DesignAll changes follow AGENTS.md protocols correctly:
✅ COMPLIANT - Backward Compatibility
✅ COMPLIANT - Performance
🔧 Gap Fixes ImplementedGap 1: Unsafe
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/praisonai-agents/praisonaiagents/agent/concurrency.py`:
- Around line 92-114: The current acquire_sync implementation binds an
asyncio.Semaphore to different event loops (via new_event_loop or thread worker)
which causes RuntimeError and defeats the timeout; replace this by never
touching asyncio.Semaphore from sync code: update _get_semaphore to create and
return a pair (asyncio.Semaphore, threading.BoundedSemaphore) or store a
thread-safe BoundedSemaphore alongside the async one, remove the
ThreadPoolExecutor/new_event_loop code in acquire_sync and call the
threading.BoundedSemaphore.acquire(timeout=30) there instead, and update
release() (or add release_sync/release_async) to release the corresponding
semaphore; alternatively, make acquire_sync raise a clear RuntimeError directing
callers to await acquire() if you prefer to fail-fast.
In `@src/praisonai-agents/praisonaiagents/agent/tool_execution.py`:
- Around line 205-217: The per-agent executor introduces three issues: racey
lazy init, no shutdown, and ineffective future.cancel(); fix by (1) moving
_tool_executor creation into the Agent constructor (or guard it with a
threading.Lock / cached_property) so execute_tool (or the method containing the
shown code) never does a racy hasattr check, (2) add guarded shutdown logic in
Agent.close() (and the async close counterpart) to call
self._tool_executor.shutdown(wait=False, cancel_futures=True) and clear the
attribute, and (3) handle cancel() returning False after ctx.run has started by
logging the failure and tracking in-flight/orphaned futures (or refusing new
submissions / growing the pool) so hung tasks don’t permanently exhaust the
max_workers=2 pool.
In `@src/praisonai-agents/praisonaiagents/plugins/__init__.py`:
- Line 83: The enable/disable flow is not atomic: _plugins_lock only guards
_plugins_enabled/_enabled_plugin_names while PluginManager._enabled is modified
outside that lock, causing inconsistent is_enabled(None)/is_enabled(name) states
and leaving globals partially set if discovery fails; fix by making plugin
enablement a single synchronized transaction—acquire a single lock that covers
both the globals (_plugins_enabled, _enabled_plugin_names, _plugins_lock) and
the PluginManager._enabled mutations (either by moving the lock into
PluginManager as PluginManager._lock and using it inside
PluginManager.enable/disable/is_enabled, or by always acquiring both locks in a
defined order before making changes), perform discovery and all
PluginManager.enable/disable calls while holding the lock, and only update the
globals after successful enablement (or roll back on failure) so functions like
PluginManager.enable/disable and is_enabled are consistently protected.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6ae6330d-311c-4ba3-ab1e-56f4fcf712fe
📒 Files selected for processing (3)
src/praisonai-agents/praisonaiagents/agent/concurrency.pysrc/praisonai-agents/praisonaiagents/agent/tool_execution.pysrc/praisonai-agents/praisonaiagents/plugins/__init__.py
| sem = self._get_semaphore(agent_name) | ||
| if sem is None: | ||
| return | ||
| try: | ||
| asyncio.get_running_loop() | ||
| # If we're in an async context, we can't block | ||
| # Just try_acquire or no-op with warning | ||
| if not sem._value > 0: | ||
| logger.warning( | ||
| f"Sync acquire for '{agent_name}' while async loop running and semaphore full. " | ||
| f"Consider using async acquire() instead." | ||
| ) | ||
| # Decrement manually for sync context | ||
| sem._value = max(0, sem._value - 1) | ||
| # We're inside an async loop — cannot block. Run acquire in a thread | ||
| # with its own loop to go through the semaphore's proper acquire path. | ||
| import concurrent.futures | ||
| def _acquire_in_new_loop(): | ||
| loop = asyncio.new_event_loop() | ||
| try: | ||
| loop.run_until_complete(sem.acquire()) | ||
| finally: | ||
| loop.close() | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: | ||
| pool.submit(_acquire_in_new_loop).result(timeout=30) | ||
| except RuntimeError: | ||
| # No running loop — safe to use asyncio.run | ||
| asyncio.get_event_loop().run_until_complete(sem.acquire()) | ||
| # No running loop — safe to create one | ||
| loop = asyncio.new_event_loop() | ||
| try: | ||
| loop.run_until_complete(sem.acquire()) | ||
| finally: | ||
| loop.close() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm release() path and any existing sync/async mixing callers.
rg -nP -C3 '\bacquire_sync\b|registry\.release\b|ConcurrencyRegistry\b'Repository: MervinPraison/PraisonAI
Length of output: 11770
🏁 Script executed:
cat -n src/praisonai-agents/praisonaiagents/agent/concurrency.pyRepository: MervinPraison/PraisonAI
Length of output: 6363
🏁 Script executed:
# Find actual usage of acquire_sync and release in test/agent code
rg -n "acquire_sync|\.release\(" src/praisonai-agents/praisonaiagents/ src/praisonai-agents/tests/ -A 2Repository: MervinPraison/PraisonAI
Length of output: 4024
🏁 Script executed:
# Check the test more carefully - is it in async context?
sed -n '101,110p' src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py
# Also check if acquire_sync is called from actual async code anywhere
rg -n "acquire_sync" --type pyRepository: MervinPraison/PraisonAI
Length of output: 760
🏁 Script executed:
# Check Python version and asyncio.Semaphore behavior
python3 --version
# Let's create a test script to see what actually happens with cross-loop semaphore usage
cat > /tmp/test_semaphore.py << 'EOF'
import asyncio
import threading
# Test 1: Semaphore created, then acquire in separate loop
sem = asyncio.Semaphore(1)
print(f"1. Created sem: {sem}")
print(f" sem._loop initially: {getattr(sem, '_loop', 'not set')}")
# Acquire in a new loop (the worker thread approach)
def acquire_in_new_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print(f"2. Worker thread loop: {loop}")
try:
loop.run_until_complete(sem.acquire())
print(f"3. After acquire in worker, sem._loop: {getattr(sem, '_loop', 'not set')}")
finally:
loop.close()
print(f"4. After loop.close() in worker")
t = threading.Thread(target=acquire_in_new_loop)
t.start()
t.join()
print(f"5. Main thread, after worker finishes")
print(f" sem._loop now: {getattr(sem, '_loop', 'not set')}")
# Try to release from main thread (no loop running)
try:
sem.release()
print(f"6. Release succeeded from main thread")
except Exception as e:
print(f"6. Release failed: {type(e).__name__}: {e}")
# Test 2: Create loop, acquire, close immediately
print("\n--- Test 2: Close loop immediately ---")
sem2 = asyncio.Semaphore(1)
loop2 = asyncio.new_event_loop()
asyncio.set_event_loop(loop2)
try:
loop2.run_until_complete(sem2.acquire())
print(f"Acquired in loop2, sem2._loop: {getattr(sem2, '_loop', 'not set')}")
finally:
loop2.close()
asyncio.set_event_loop(None)
print(f"After loop2.close(), sem2._loop: {getattr(sem2, '_loop', 'not set')}")
try:
sem2.release()
print(f"Release succeeded from main thread after loop close")
except Exception as e:
print(f"Release failed: {type(e).__name__}: {e}")
EOF
python3 /tmp/test_semaphore.pyRepository: MervinPraison/PraisonAI
Length of output: 624
🏁 Script executed:
# Let's test the timeout behavior specifically
cat > /tmp/test_timeout.py << 'EOF'
import asyncio
import concurrent.futures
import time
import threading
# Create a semaphore that will never acquire (locked)
sem = asyncio.Semaphore(0) # Start at 0, so acquire will block
def acquire_in_new_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print(f"[Worker] Starting acquire in new loop at {time.time():.2f}")
try:
# This will hang forever since sem is at 0
loop.run_until_complete(sem.acquire())
print(f"[Worker] Acquired (should not happen)")
except asyncio.CancelledError:
print(f"[Worker] Cancelled")
finally:
loop.close()
print(f"[Worker] Finished at {time.time():.2f}")
start = time.time()
print(f"Starting at {start:.2f}")
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
print(f"Pool created at {time.time() - start:.2f}s")
future = pool.submit(acquire_in_new_loop)
try:
result = future.result(timeout=2)
print(f"Got result: {result}")
except concurrent.futures.TimeoutError:
print(f"[Main] result() raised TimeoutError at {time.time() - start:.2f}s")
print(f"[Main] Now exiting 'with' block (this will block waiting for worker)...")
except Exception as e:
print(f"[Main] Exception: {e}")
print(f"[Main] Exited 'with' block at {time.time() - start:.2f}s")
print(f"Total time elapsed: {time.time() - start:.2f}s")
EOF
timeout 10 python3 /tmp/test_timeout.pyRepository: MervinPraison/PraisonAI
Length of output: 115
🏁 Script executed:
# Test the timeout behavior without timeout command - use shorter timeout
cat > /tmp/test_timeout.py << 'EOF'
import asyncio
import concurrent.futures
import time
import threading
# Create a semaphore that will never acquire (locked)
sem = asyncio.Semaphore(0) # Start at 0, so acquire will block
def acquire_in_new_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print(f"[Worker] Starting acquire in new loop at {time.time():.2f}")
try:
# This will hang forever since sem is at 0
loop.run_until_complete(sem.acquire())
print(f"[Worker] Acquired (should not happen)")
except asyncio.CancelledError:
print(f"[Worker] Cancelled")
finally:
loop.close()
print(f"[Worker] Finished at {time.time():.2f}")
start = time.time()
print(f"Starting at {start:.2f}")
# Test without hanging forever - use very short timeout
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
print(f"Pool created at {time.time() - start:.2f}s")
future = pool.submit(acquire_in_new_loop)
try:
result = future.result(timeout=1)
print(f"Got result: {result}")
except concurrent.futures.TimeoutError:
print(f"[Main] result(timeout=1) raised TimeoutError at {time.time() - start:.2f}s")
print(f"[Main] Now about to exit 'with' block (ThreadPoolExecutor.__exit__ will do shutdown(wait=True))...")
print(f"[Main] Exited 'with' block at {time.time() - start:.2f}s")
print(f"Total time elapsed: {time.time() - start:.2f}s (should be >> 1 second due to shutdown wait)")
EOF
# Run with a shell timeout
( sleep 15; kill $$ 2>/dev/null ) & python3 /tmp/test_timeout.py ; pkill -P $$Repository: MervinPraison/PraisonAI
Length of output: 153
🏁 Script executed:
# Check if Python 3.11 asyncio.Semaphore stores loop ref differently
python3 << 'EOF'
import asyncio
import sys
print(f"Python version: {sys.version}")
# Check Semaphore internals
sem = asyncio.Semaphore(1)
print(f"\nSemaphore attributes: {dir(sem)}")
print(f"\nSemaphore __dict__: {sem.__dict__}")
# After creating a semaphore, check what happens when we await it in different loops
import asyncio
loop1 = asyncio.new_event_loop()
asyncio.set_event_loop(loop1)
async def test1():
print(f"\nIn loop1, sem.__dict__ before acquire: {sem.__dict__}")
await sem.acquire()
print(f"In loop1, sem.__dict__ after acquire: {sem.__dict__}")
loop1.run_until_complete(test1())
# Now try from a different loop
loop2 = asyncio.new_event_loop()
asyncio.set_event_loop(loop2)
async def test2():
print(f"\nIn loop2, sem.__dict__ before acquire: {sem.__dict__}")
await sem.acquire()
print(f"In loop2, sem.__dict__ after acquire: {sem.__dict__}")
try:
loop2.run_until_complete(test2())
except RuntimeError as e:
print(f"In loop2, got error: {e}")
loop1.close()
loop2.close()
EOFRepository: MervinPraison/PraisonAI
Length of output: 153
🏁 Script executed:
# Simpler check - just look at asyncio.Semaphore source behavior without executing problematic code
python3 -c "
import asyncio
import inspect
# Get the source of asyncio.Semaphore to understand how it handles loops
print('=== asyncio.Semaphore.acquire source ===')
print(inspect.getsource(asyncio.Semaphore.acquire))
"Repository: MervinPraison/PraisonAI
Length of output: 1260
🏁 Script executed:
python3 -c "
import asyncio
import inspect
# Get the source of asyncio.Semaphore._get_loop to understand loop binding
print('=== asyncio.Semaphore._get_loop source ===')
try:
print(inspect.getsource(asyncio.Semaphore._get_loop))
except:
print('_get_loop not found in Semaphore, checking base Lock...')
print(inspect.getsource(asyncio.Lock._get_loop))
"Repository: MervinPraison/PraisonAI
Length of output: 451
🏁 Script executed:
python3 -c "
import asyncio
import inspect
# Get the source of asyncio.Semaphore.release to understand how it handles the loop
print('=== asyncio.Semaphore.release source ===')
try:
print(inspect.getsource(asyncio.Semaphore.release))
except:
print('Checking base Lock.release...')
print(inspect.getsource(asyncio.Lock.release))
"Repository: MervinPraison/PraisonAI
Length of output: 412
🏁 Script executed:
# Now test the actual critical issue: RuntimeError when semaphore is bound to wrong loop
python3 << 'EOF'
import asyncio
import concurrent.futures
sem = asyncio.Semaphore(1)
print("Initial state: sem._loop =", sem._loop)
# Simulate the problematic path: acquire in worker thread
def acquire_in_worker():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print("Worker: before acquire, sem._loop =", sem._loop)
loop.run_until_complete(sem.acquire())
print("Worker: after acquire, sem._loop =", sem._loop)
loop.close()
print("Worker: loop closed")
# Run it
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
pool.submit(acquire_in_worker).result()
print("Main: after worker finished, sem._loop =", sem._loop)
# Now try to use semaphore from async context
async def use_semaphore():
print("Async: attempting acquire with sem._loop =", sem._loop)
try:
await sem.acquire()
print("Async: acquire succeeded")
except RuntimeError as e:
print(f"Async: RuntimeError: {e}")
# Create a new loop in the main thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(use_semaphore())
finally:
loop.close()
EOFRepository: MervinPraison/PraisonAI
Length of output: 153
🏁 Script executed:
# Minimal test without actual event loop execution
python3 << 'EOF'
import asyncio
# Test what _get_loop does when called from non-running context
sem = asyncio.Semaphore(1)
print("Semaphore._loop initially:", sem._loop)
# Check if calling _get_loop outside running loop works
try:
result = sem._get_loop()
print("_get_loop() succeeded:", result)
except RuntimeError as e:
print("_get_loop() raises RuntimeError:", e)
EOFRepository: MervinPraison/PraisonAI
Length of output: 126
🏁 Script executed:
python3 << 'EOF'
import asyncio
# Check _wake_up_next implementation
import inspect
print("=== Semaphore._wake_up_next source ===")
try:
print(inspect.getsource(asyncio.Semaphore._wake_up_next))
except AttributeError:
# Try base class
print(inspect.getsource(asyncio.Lock._wake_up_next))
EOFRepository: MervinPraison/PraisonAI
Length of output: 399
🏁 Script executed:
# Check if the test actually runs the problematic code path
# The test_sync_acquire_release is NOT in an async test, so it won't hit the RuntimeError branch
sed -n '101,115p' src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py
# Verify: is it decorated with `@pytest.mark.asyncio`?
sed -n '100,115p' src/praisonai-agents/tests/unit/agent/test_agent_concurrency.pyRepository: MervinPraison/PraisonAI
Length of output: 809
🏁 Script executed:
# Check if any test calls acquire_sync from within an `@pytest.mark.asyncio` test
rg -B 5 "acquire_sync" src/praisonai-agents/tests/unit/agent/test_agent_concurrency.pyRepository: MervinPraison/PraisonAI
Length of output: 362
🏁 Script executed:
# Search for actual usage of acquire_sync in agent code (not tests)
rg -n "acquire_sync" src/praisonai-agents/praisonaiagents/ --type py --not-tests
# Also check if there are any agent implementations that mix sync and async
rg -B5 -A5 "def.*acquire" src/praisonai-agents/praisonaiagents/agent/ --type py | head -50Repository: MervinPraison/PraisonAI
Length of output: 3324
Cross-loop asyncio.Semaphore binding will cause RuntimeError and the timeout guarantee is defeated.
asyncio.Semaphore binds itself to the first event loop that calls _get_loop() (during acquire()). This rewrite introduces subtle issues:
-
Lines 109–114 (no running loop branch): Creates a loop, acquires (which binds the semaphore to that loop), then immediately closes it. The semaphore retains a reference to the closed loop in
_loop. Any subsequentawait acquire()from a different event loop will raiseRuntimeError: Semaphore object is bound to a different event loopbecause_get_loop()enforces loop identity. -
Lines 96–107 (inside running loop branch): Acquires in a worker thread's new loop, binding the semaphore to that short-lived loop. If
release()is later called from the main async context (which has a different loop), the sameRuntimeErroroccurs. Even if it doesn't, the separated loop contexts defeat the purpose of a semaphore. -
Timeout is illusory:
with ThreadPoolExecutor(...) as poolcallsshutdown(wait=True)on__exit__, blocking indefinitely until the worker finishes. Ifresult(timeout=30)raisesTimeoutError, thewithblock still waits for the hungacquire(), defeating the 30-second guarantee. -
Violates coding guidelines: "use asyncio primitives for coordination, not threading" — this bridges an asyncio primitive via threading, which breaks loop-binding semantics. The pattern "One event loop per thread; never nest event loops" is intended to prevent exactly this scenario.
Recommended fix: Don't use asyncio.Semaphore for sync contexts. Instead:
- Maintain a parallel
threading.BoundedSemaphorefor the sync path, created alongside the async one in_get_semaphore() - In
acquire_sync(), use the sync semaphore with a properacquire(timeout=...)call - Update
release()to release whichever semaphore was acquired (or split intorelease_sync())
Alternatively, make acquire_sync() raise a clear error when called from within a running loop, directing users to await acquire() instead (fail fast per guidelines).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/praisonai-agents/praisonaiagents/agent/concurrency.py` around lines 92 -
114, The current acquire_sync implementation binds an asyncio.Semaphore to
different event loops (via new_event_loop or thread worker) which causes
RuntimeError and defeats the timeout; replace this by never touching
asyncio.Semaphore from sync code: update _get_semaphore to create and return a
pair (asyncio.Semaphore, threading.BoundedSemaphore) or store a thread-safe
BoundedSemaphore alongside the async one, remove the
ThreadPoolExecutor/new_event_loop code in acquire_sync and call the
threading.BoundedSemaphore.acquire(timeout=30) there instead, and update
release() (or add release_sync/release_async) to release the corresponding
semaphore; alternatively, make acquire_sync raise a clear RuntimeError directing
callers to await acquire() if you prefer to fail-fast.
| # Use reusable executor to prevent resource leaks | ||
| if not hasattr(self, '_tool_executor'): | ||
| self._tool_executor = concurrent.futures.ThreadPoolExecutor( | ||
| max_workers=2, thread_name_prefix=f"tool-{self.name}" | ||
| ) | ||
|
|
||
| future = self._tool_executor.submit(ctx.run, execute_with_context) | ||
| try: | ||
| future = executor.submit(ctx.run, execute_with_context) | ||
| try: | ||
| result = future.result(timeout=tool_timeout) | ||
| except concurrent.futures.TimeoutError: | ||
| # Cancel and shutdown immediately to avoid blocking | ||
| future.cancel() | ||
| executor.shutdown(wait=False, cancel_futures=True) | ||
| logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") | ||
| result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} | ||
| else: | ||
| # Normal completion - shutdown gracefully | ||
| executor.shutdown(wait=False) | ||
| finally: | ||
| # Ensure executor is always cleaned up | ||
| if not executor._shutdown: | ||
| executor.shutdown(wait=False) | ||
| result = future.result(timeout=tool_timeout) | ||
| except concurrent.futures.TimeoutError: | ||
| future.cancel() | ||
| logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") | ||
| result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} |
There was a problem hiding this comment.
New per-agent executor has three concurrency / lifecycle gaps worth addressing before this "concurrency fix" lands.
-
Race in lazy initialization (Line 206-209).
hasattr(self, '_tool_executor')followed by assignment is not atomic. Two threads invokingexecute_toolconcurrently on the same agent can both observe the attribute as missing and each construct aThreadPoolExecutor; the loser is orphaned (its worker thread lingers until GC). For a PR whose stated goal is concurrency safety this is the exact failure mode being patched elsewhere — initialize the executor eagerly in__init__(or guard creation with athreading.Lock/functools.cached_property-style idiom) so first-use contention cannot double-spawn pools. -
Executor is never shut down → thread leak on agent close. The
Agent.close()implementation insrc/praisonai-agents/praisonaiagents/agent/agent.py(lines 4570-4640, provided as context) cleans up memory, LLM clients, MCP clients, server registrations and background tasks but has no branch for_tool_executor. EachAgentinstance that executes a timed-out-or-not tool will retain up to 2 worker threads for the life of the process. This converts the prior "per-call executor leak" into a "per-agent executor leak" rather than fixing it. Add aself._tool_executor.shutdown(wait=False, cancel_futures=True)step (guarded byhasattr) toclose()(and the async counterpart if any). -
future.cancel()does not stop an already-running tool (Line 215).concurrent.futures.Future.cancel()only succeeds while the task is still in the queue; oncectx.run(execute_with_context)has started,cancel()returnsFalseand the worker thread keeps running the tool body to completion. Withmax_workers=2, two consecutive timeouts from hung tools will wedge the whole pool and every subsequent tool call on this agent will block insidefuture.result(timeout=tool_timeout)waiting for a free worker before its own timeout even starts counting. At minimum, log whencancel()returnsFalseso the leak is observable; better, track the number of orphaned in-flight futures and either refuse further submissions or grow the pool.
🔒 Suggested direction (illustrative, not a drop-in)
- # Use reusable executor to prevent resource leaks
- if not hasattr(self, '_tool_executor'):
- self._tool_executor = concurrent.futures.ThreadPoolExecutor(
- max_workers=2, thread_name_prefix=f"tool-{self.name}"
- )
-
- future = self._tool_executor.submit(ctx.run, execute_with_context)
+ # Reusable executor; created lazily under a lock to avoid
+ # double-initialization races on concurrent first use.
+ executor = getattr(self, '_tool_executor', None)
+ if executor is None:
+ import threading
+ lock = self.__dict__.setdefault('_tool_executor_lock', threading.Lock())
+ with lock:
+ executor = getattr(self, '_tool_executor', None)
+ if executor is None:
+ executor = concurrent.futures.ThreadPoolExecutor(
+ max_workers=2,
+ thread_name_prefix=f"tool-{self.name}",
+ )
+ self._tool_executor = executor
+
+ future = executor.submit(ctx.run, execute_with_context)
try:
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
- future.cancel()
+ if not future.cancel():
+ logging.warning(
+ f"Tool {function_name} timed out after {tool_timeout}s "
+ "and is still running in the executor thread; worker "
+ "slot will remain occupied until the tool returns."
+ )
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}And in Agent.close() (agent.py):
# Tool executor cleanup
try:
executor = getattr(self, '_tool_executor', None)
if executor is not None:
executor.shutdown(wait=False, cancel_futures=True)
self._tool_executor = None
except Exception as e:
logger.warning(f"Tool executor cleanup failed: {e}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/praisonai-agents/praisonaiagents/agent/tool_execution.py` around lines
205 - 217, The per-agent executor introduces three issues: racey lazy init, no
shutdown, and ineffective future.cancel(); fix by (1) moving _tool_executor
creation into the Agent constructor (or guard it with a threading.Lock /
cached_property) so execute_tool (or the method containing the shown code) never
does a racy hasattr check, (2) add guarded shutdown logic in Agent.close() (and
the async close counterpart) to call self._tool_executor.shutdown(wait=False,
cancel_futures=True) and clear the attribute, and (3) handle cancel() returning
False after ctx.run has started by logging the failure and tracking
in-flight/orphaned futures (or refusing new submissions / growing the pool) so
hung tasks don’t permanently exhaust the max_workers=2 pool.
| # Global state for plugin system (lazy initialized) | ||
| import threading | ||
|
|
||
| _plugins_lock = threading.Lock() |
There was a problem hiding this comment.
Make plugin enablement updates one synchronized transaction.
_plugins_lock protects only _plugins_enabled / _enabled_plugin_names; the actual PluginManager._enabled reads/writes still happen outside that lock, and manager.py:104-120 accesses that dict without using PluginManager._lock. Concurrent enable() / disable() can leave is_enabled(None) and is_enabled(name) disagreeing. Also, if discovery fails after Line 115, globals remain enabled even though plugins were not enabled.
🔒 Proposed synchronization tightening
-_plugins_lock = threading.Lock()
+_plugins_lock = threading.RLock()
_plugins_enabled: bool = False
_enabled_plugin_names: list = None # None = all, list = specific
@@
def enable(plugins: list = None) -> None:
@@
global _plugins_enabled, _enabled_plugin_names
-
- with _plugins_lock:
- _plugins_enabled = True
- _enabled_plugin_names = plugins # None = all, list = specific
+ target_plugins = list(plugins) if plugins is not None else None
@@
- # Snapshot the names under lock to avoid TOCTOU
- with _plugins_lock:
- target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None
-
- # Enable specific plugins or all
- if target_plugins is not None:
- # Enable only specified plugins
- for name in target_plugins:
- manager.enable(name)
- else:
- # Enable all discovered plugins
- for plugin_info in manager.list_plugins():
- manager.enable(plugin_info.get("name", ""))
+ with _plugins_lock:
+ # Enable specific plugins or all
+ if target_plugins is not None:
+ # Enable only specified plugins
+ for name in target_plugins:
+ manager.enable(name)
+ else:
+ # Enable all discovered plugins
+ for plugin_info in manager.list_plugins():
+ manager.enable(plugin_info.get("name", ""))
+
+ _plugins_enabled = True
+ _enabled_plugin_names = target_plugins # None = all, list = specific
@@
if plugins is not None:
# Disable specific plugins
- for name in plugins:
- manager.disable(name)
+ with _plugins_lock:
+ for name in plugins:
+ manager.disable(name)
else:
# Disable all plugins
with _plugins_lock:
+ for plugin_info in manager.list_plugins():
+ manager.disable(plugin_info.get("name", ""))
_plugins_enabled = False
_enabled_plugin_names = None
- for plugin_info in manager.list_plugins():
- manager.disable(plugin_info.get("name", ""))
@@
def is_enabled(name: str = None) -> bool:
@@
with _plugins_lock:
if name is None:
return _plugins_enabled
-
+
from .manager import get_plugin_manager
manager = get_plugin_manager()
- return manager.is_enabled(name)
+ with _plugins_lock:
+ return manager.is_enabled(name)If PluginManager remains publicly usable, also move the lock into src/praisonai-agents/praisonaiagents/plugins/manager.py so direct manager calls are protected too:
def enable(self, name: str) -> bool:
"""Enable a plugin."""
- if name in self._enabled:
- self._enabled[name] = True
- return True
- return False
+ with self._lock:
+ if name in self._enabled:
+ self._enabled[name] = True
+ return True
+ return False
def disable(self, name: str) -> bool:
"""Disable a plugin."""
- if name in self._enabled:
- self._enabled[name] = False
- return True
- return False
+ with self._lock:
+ if name in self._enabled:
+ self._enabled[name] = False
+ return True
+ return False
def is_enabled(self, name: str) -> bool:
"""Check if a plugin is enabled."""
- return self._enabled.get(name, False)
+ with self._lock:
+ return self._enabled.get(name, False)Also applies to: 114-138, 163-173, 237-243
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/praisonai-agents/praisonaiagents/plugins/__init__.py` at line 83, The
enable/disable flow is not atomic: _plugins_lock only guards
_plugins_enabled/_enabled_plugin_names while PluginManager._enabled is modified
outside that lock, causing inconsistent is_enabled(None)/is_enabled(name) states
and leaving globals partially set if discovery fails; fix by making plugin
enablement a single synchronized transaction—acquire a single lock that covers
both the globals (_plugins_enabled, _enabled_plugin_names, _plugins_lock) and
the PluginManager._enabled mutations (either by moving the lock into
PluginManager as PluginManager._lock and using it inside
PluginManager.enable/disable/is_enabled, or by always acquiring both locks in a
defined order before making changes), perform discovery and all
PluginManager.enable/disable calls while holding the lock, and only update the
globals after successful enablement (or roll back on failure) so functions like
PluginManager.enable/disable and is_enabled are consistently protected.
PR #1459 — ✅ ApproveLocal validation on pr-1459 branch: Review against AGENTS.md§4.5 Async-Safe / Multi-Agent Safe — fix is correct
§4.6 Invariants
Minor observation (non-blocking)
VerdictScope-focused, tests green, root-cause fixes (not workarounds). Ready to merge. |
Summary: Fixes three critical concurrency gaps in PraisonAI Core SDK. Gap 1: Replace unsafe asyncio.Semaphore private attribute manipulation with proper sync-to-async bridge. Gap 2: Fix ThreadPoolExecutor resource leak by using reusable agent-level executor. Gap 3: Add thread lock protection for plugin state races. All changes preserve API contracts and follow protocol-driven architecture.
Summary by CodeRabbit
Release Notes