From e9d86025504802baa20d1231b9cf72e5649aff1b Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Sat, 18 Apr 2026 09:28:39 +0000 Subject: [PATCH 1/3] fix: concurrency safety gaps in Core SDK - Replace unsafe asyncio.Semaphore private attribute manipulation with proper sync-to-async bridge in concurrency.py - Fix ThreadPoolExecutor resource leak by using reusable agent-level executor in tool_execution.py - Add thread lock protection for unprotected plugin state in plugins/__init__.py Fixes #1458 Co-authored-by: MervinPraison --- .../praisonaiagents/agent/concurrency.py | 30 +++++++++++-------- .../praisonaiagents/agent/tool_execution.py | 30 ++++++++----------- .../praisonaiagents/plugins/__init__.py | 28 ++++++++++------- 3 files changed, 48 insertions(+), 40 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/agent/concurrency.py b/src/praisonai-agents/praisonaiagents/agent/concurrency.py index 0bc586889..afe7351be 100644 --- a/src/praisonai-agents/praisonaiagents/agent/concurrency.py +++ b/src/praisonai-agents/praisonaiagents/agent/concurrency.py @@ -86,7 +86,7 @@ async def acquire(self, agent_name: str) -> None: def acquire_sync(self, agent_name: str) -> None: """Synchronous acquire — for non-async code paths. - Note: This creates/reuses an event loop internally. + Uses a proper sync-to-async bridge to avoid private attribute manipulation. Prefer async acquire() when possible. """ sem = self._get_semaphore(agent_name) @@ -94,18 +94,24 @@ def acquire_sync(self, agent_name: str) -> None: return try: asyncio.get_running_loop() - # If we're in an async context, we can't block - # Just try_acquire or no-op with warning - if not sem._value > 0: - logger.warning( - f"Sync acquire for '{agent_name}' while async loop running and semaphore full. " - f"Consider using async acquire() instead." - ) - # Decrement manually for sync context - sem._value = max(0, sem._value - 1) + # We're inside an async loop — cannot block. Run acquire in a thread + # with its own loop to go through the semaphore's proper acquire path. + import concurrent.futures + def _acquire_in_new_loop(): + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(sem.acquire()) + finally: + loop.close() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + pool.submit(_acquire_in_new_loop).result(timeout=30) except RuntimeError: - # No running loop — safe to use asyncio.run - asyncio.get_event_loop().run_until_complete(sem.acquire()) + # No running loop — safe to create one + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(sem.acquire()) + finally: + loop.close() def release(self, agent_name: str) -> None: """Release concurrency slot for agent. No-op if unlimited.""" diff --git a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py index d1009cd7a..54b3b30c0 100644 --- a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py +++ b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py @@ -202,25 +202,19 @@ def execute_with_context(): with with_injection_context(state): return self._execute_tool_impl(function_name, arguments) - # Use explicit executor lifecycle to actually bound execution time - executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + # Use reusable executor to prevent resource leaks + if not hasattr(self, '_tool_executor'): + self._tool_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=2, thread_name_prefix=f"tool-{self.name}" + ) + + future = self._tool_executor.submit(ctx.run, execute_with_context) try: - future = executor.submit(ctx.run, execute_with_context) - try: - result = future.result(timeout=tool_timeout) - except concurrent.futures.TimeoutError: - # Cancel and shutdown immediately to avoid blocking - future.cancel() - executor.shutdown(wait=False, cancel_futures=True) - logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") - result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} - else: - # Normal completion - shutdown gracefully - executor.shutdown(wait=False) - finally: - # Ensure executor is always cleaned up - if not executor._shutdown: - executor.shutdown(wait=False) + result = future.result(timeout=tool_timeout) + except concurrent.futures.TimeoutError: + future.cancel() + logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") + result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} else: with with_injection_context(state): result = self._execute_tool_impl(function_name, arguments) diff --git a/src/praisonai-agents/praisonaiagents/plugins/__init__.py b/src/praisonai-agents/praisonaiagents/plugins/__init__.py index d1636f8d7..b428c440d 100644 --- a/src/praisonai-agents/praisonaiagents/plugins/__init__.py +++ b/src/praisonai-agents/praisonaiagents/plugins/__init__.py @@ -78,6 +78,9 @@ def my_plugin_func(hook_type, *args, **kwargs): # ============================================================================ # Global state for plugin system (lazy initialized) +import threading + +_plugins_lock = threading.Lock() _plugins_enabled: bool = False _enabled_plugin_names: list = None # None = all, list = specific @@ -108,8 +111,9 @@ def enable(plugins: list = None) -> None: """ global _plugins_enabled, _enabled_plugin_names - _plugins_enabled = True - _enabled_plugin_names = plugins # None = all, list = specific + with _plugins_lock: + _plugins_enabled = True + _enabled_plugin_names = plugins # None = all, list = specific # Get plugin manager and auto-discover from .manager import get_plugin_manager @@ -119,10 +123,14 @@ def enable(plugins: list = None) -> None: manager.auto_discover_plugins() manager.discover_entry_points() + # Snapshot the names under lock to avoid TOCTOU + with _plugins_lock: + target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None + # Enable specific plugins or all - if plugins is not None: + if target_plugins is not None: # Enable only specified plugins - for name in plugins: + for name in target_plugins: manager.enable(name) else: # Enable all discovered plugins @@ -158,8 +166,9 @@ def disable(plugins: list = None) -> None: manager.disable(name) else: # Disable all plugins - _plugins_enabled = False - _enabled_plugin_names = None + with _plugins_lock: + _plugins_enabled = False + _enabled_plugin_names = None for plugin_info in manager.list_plugins(): manager.disable(plugin_info.get("name", "")) @@ -225,10 +234,9 @@ def is_enabled(name: str = None) -> bool: Returns: True if enabled, False otherwise. """ - global _plugins_enabled - - if name is None: - return _plugins_enabled + with _plugins_lock: + if name is None: + return _plugins_enabled from .manager import get_plugin_manager manager = get_plugin_manager() From ec2c519cfb97e179c0572de393205be75a972d8a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 18 Apr 2026 09:32:41 +0000 Subject: [PATCH 2/3] fix: avoid acquire_sync deadlock under running event loop Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/c9e7c913-a80d-474a-bd5d-731c129ad614 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com> --- .../praisonaiagents/agent/concurrency.py | 18 ++++++------------ .../tests/unit/agent/test_agent_concurrency.py | 10 ++++++++++ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/agent/concurrency.py b/src/praisonai-agents/praisonaiagents/agent/concurrency.py index afe7351be..18ccd64a8 100644 --- a/src/praisonai-agents/praisonaiagents/agent/concurrency.py +++ b/src/praisonai-agents/praisonaiagents/agent/concurrency.py @@ -86,7 +86,6 @@ async def acquire(self, agent_name: str) -> None: def acquire_sync(self, agent_name: str) -> None: """Synchronous acquire — for non-async code paths. - Uses a proper sync-to-async bridge to avoid private attribute manipulation. Prefer async acquire() when possible. """ sem = self._get_semaphore(agent_name) @@ -94,17 +93,12 @@ def acquire_sync(self, agent_name: str) -> None: return try: asyncio.get_running_loop() - # We're inside an async loop — cannot block. Run acquire in a thread - # with its own loop to go through the semaphore's proper acquire path. - import concurrent.futures - def _acquire_in_new_loop(): - loop = asyncio.new_event_loop() - try: - loop.run_until_complete(sem.acquire()) - finally: - loop.close() - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - pool.submit(_acquire_in_new_loop).result(timeout=30) + # Running loop in current thread: blocking here can deadlock. + logger.warning( + f"acquire_sync('{agent_name}') called with a running event loop; " + "use async acquire() in async contexts." + ) + return except RuntimeError: # No running loop — safe to create one loop = asyncio.new_event_loop() diff --git a/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py b/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py index 0806f3b6d..f3afadaea 100644 --- a/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py +++ b/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py @@ -106,3 +106,13 @@ def test_sync_acquire_release(self): # Sync acquire should work reg.acquire_sync("sync_agent") reg.release("sync_agent") + + @pytest.mark.asyncio + async def test_sync_acquire_running_loop_noop(self): + """Sync acquire in async context should not block event loop.""" + from praisonaiagents.agent.concurrency import ConcurrencyRegistry + reg = ConcurrencyRegistry() + reg.set_limit("loop_agent", 1) + await reg.acquire("loop_agent") + reg.acquire_sync("loop_agent") + reg.release("loop_agent") From 28791793c298581d03a47f579647832df3c638f7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 18 Apr 2026 09:34:35 +0000 Subject: [PATCH 3/3] test: enforce fail-fast acquire_sync behavior in async contexts Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/c9e7c913-a80d-474a-bd5d-731c129ad614 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com> --- .../praisonaiagents/agent/concurrency.py | 13 +++++++------ .../tests/unit/agent/test_agent_concurrency.py | 9 +++++++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/praisonai-agents/praisonaiagents/agent/concurrency.py b/src/praisonai-agents/praisonaiagents/agent/concurrency.py index 18ccd64a8..5c31e7da3 100644 --- a/src/praisonai-agents/praisonaiagents/agent/concurrency.py +++ b/src/praisonai-agents/praisonaiagents/agent/concurrency.py @@ -87,18 +87,14 @@ def acquire_sync(self, agent_name: str) -> None: """Synchronous acquire — for non-async code paths. Prefer async acquire() when possible. + If called while an event loop is already running in the current thread, + this method raises RuntimeError to avoid deadlock. """ sem = self._get_semaphore(agent_name) if sem is None: return try: asyncio.get_running_loop() - # Running loop in current thread: blocking here can deadlock. - logger.warning( - f"acquire_sync('{agent_name}') called with a running event loop; " - "use async acquire() in async contexts." - ) - return except RuntimeError: # No running loop — safe to create one loop = asyncio.new_event_loop() @@ -106,6 +102,11 @@ def acquire_sync(self, agent_name: str) -> None: loop.run_until_complete(sem.acquire()) finally: loop.close() + else: + raise RuntimeError( + f"acquire_sync('{agent_name}') cannot be called with a running event loop; " + "use async acquire() in async contexts." + ) def release(self, agent_name: str) -> None: """Release concurrency slot for agent. No-op if unlimited.""" diff --git a/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py b/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py index f3afadaea..c6c37c99b 100644 --- a/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py +++ b/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py @@ -109,10 +109,15 @@ def test_sync_acquire_release(self): @pytest.mark.asyncio async def test_sync_acquire_running_loop_noop(self): - """Sync acquire in async context should not block event loop.""" + """Sync acquire in async context should fail fast without changing state.""" from praisonaiagents.agent.concurrency import ConcurrencyRegistry reg = ConcurrencyRegistry() reg.set_limit("loop_agent", 1) await reg.acquire("loop_agent") - reg.acquire_sync("loop_agent") + with pytest.raises(RuntimeError, match="running event loop"): + reg.acquire_sync("loop_agent") + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(reg.acquire("loop_agent"), timeout=0.05) + reg.release("loop_agent") + await asyncio.wait_for(reg.acquire("loop_agent"), timeout=0.05) reg.release("loop_agent")