-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: concurrency safety gaps in Core SDK (fixes #1458) #1459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -202,25 +202,19 @@ def execute_with_context(): | |
| with with_injection_context(state): | ||
| return self._execute_tool_impl(function_name, arguments) | ||
|
|
||
| # Use explicit executor lifecycle to actually bound execution time | ||
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) | ||
| # Use reusable executor to prevent resource leaks | ||
| if not hasattr(self, '_tool_executor'): | ||
| self._tool_executor = concurrent.futures.ThreadPoolExecutor( | ||
| max_workers=2, thread_name_prefix=f"tool-{self.name}" | ||
| ) | ||
|
|
||
| future = self._tool_executor.submit(ctx.run, execute_with_context) | ||
| try: | ||
| future = executor.submit(ctx.run, execute_with_context) | ||
| try: | ||
| result = future.result(timeout=tool_timeout) | ||
| except concurrent.futures.TimeoutError: | ||
| # Cancel and shutdown immediately to avoid blocking | ||
| future.cancel() | ||
| executor.shutdown(wait=False, cancel_futures=True) | ||
| logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") | ||
| result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} | ||
| else: | ||
| # Normal completion - shutdown gracefully | ||
| executor.shutdown(wait=False) | ||
| finally: | ||
| # Ensure executor is always cleaned up | ||
| if not executor._shutdown: | ||
| executor.shutdown(wait=False) | ||
| result = future.result(timeout=tool_timeout) | ||
| except concurrent.futures.TimeoutError: | ||
| future.cancel() | ||
| logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") | ||
| result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} | ||
|
Comment on lines
+205
to
+217
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New per-agent executor has three concurrency / lifecycle gaps worth addressing before this "concurrency fix" lands.
🔒 Suggested direction (illustrative, not a drop-in)- # Use reusable executor to prevent resource leaks
- if not hasattr(self, '_tool_executor'):
- self._tool_executor = concurrent.futures.ThreadPoolExecutor(
- max_workers=2, thread_name_prefix=f"tool-{self.name}"
- )
-
- future = self._tool_executor.submit(ctx.run, execute_with_context)
+ # Reusable executor; created lazily under a lock to avoid
+ # double-initialization races on concurrent first use.
+ executor = getattr(self, '_tool_executor', None)
+ if executor is None:
+ import threading
+ lock = self.__dict__.setdefault('_tool_executor_lock', threading.Lock())
+ with lock:
+ executor = getattr(self, '_tool_executor', None)
+ if executor is None:
+ executor = concurrent.futures.ThreadPoolExecutor(
+ max_workers=2,
+ thread_name_prefix=f"tool-{self.name}",
+ )
+ self._tool_executor = executor
+
+ future = executor.submit(ctx.run, execute_with_context)
try:
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
- future.cancel()
+ if not future.cancel():
+ logging.warning(
+ f"Tool {function_name} timed out after {tool_timeout}s "
+ "and is still running in the executor thread; worker "
+ "slot will remain occupied until the tool returns."
+ )
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}And in # Tool executor cleanup
try:
executor = getattr(self, '_tool_executor', None)
if executor is not None:
executor.shutdown(wait=False, cancel_futures=True)
self._tool_executor = None
except Exception as e:
logger.warning(f"Tool executor cleanup failed: {e}")🤖 Prompt for AI Agents |
||
| else: | ||
| with with_injection_context(state): | ||
| result = self._execute_tool_impl(function_name, arguments) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,6 +78,9 @@ def my_plugin_func(hook_type, *args, **kwargs): | |
| # ============================================================================ | ||
|
|
||
| # Global state for plugin system (lazy initialized) | ||
| import threading | ||
|
|
||
| _plugins_lock = threading.Lock() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make plugin enablement updates one synchronized transaction.
🔒 Proposed synchronization tightening-_plugins_lock = threading.Lock()
+_plugins_lock = threading.RLock()
_plugins_enabled: bool = False
_enabled_plugin_names: list = None # None = all, list = specific
@@
def enable(plugins: list = None) -> None:
@@
global _plugins_enabled, _enabled_plugin_names
-
- with _plugins_lock:
- _plugins_enabled = True
- _enabled_plugin_names = plugins # None = all, list = specific
+ target_plugins = list(plugins) if plugins is not None else None
@@
- # Snapshot the names under lock to avoid TOCTOU
- with _plugins_lock:
- target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None
-
- # Enable specific plugins or all
- if target_plugins is not None:
- # Enable only specified plugins
- for name in target_plugins:
- manager.enable(name)
- else:
- # Enable all discovered plugins
- for plugin_info in manager.list_plugins():
- manager.enable(plugin_info.get("name", ""))
+ with _plugins_lock:
+ # Enable specific plugins or all
+ if target_plugins is not None:
+ # Enable only specified plugins
+ for name in target_plugins:
+ manager.enable(name)
+ else:
+ # Enable all discovered plugins
+ for plugin_info in manager.list_plugins():
+ manager.enable(plugin_info.get("name", ""))
+
+ _plugins_enabled = True
+ _enabled_plugin_names = target_plugins # None = all, list = specific
@@
if plugins is not None:
# Disable specific plugins
- for name in plugins:
- manager.disable(name)
+ with _plugins_lock:
+ for name in plugins:
+ manager.disable(name)
else:
# Disable all plugins
with _plugins_lock:
+ for plugin_info in manager.list_plugins():
+ manager.disable(plugin_info.get("name", ""))
_plugins_enabled = False
_enabled_plugin_names = None
- for plugin_info in manager.list_plugins():
- manager.disable(plugin_info.get("name", ""))
@@
def is_enabled(name: str = None) -> bool:
@@
with _plugins_lock:
if name is None:
return _plugins_enabled
-
+
from .manager import get_plugin_manager
manager = get_plugin_manager()
- return manager.is_enabled(name)
+ with _plugins_lock:
+ return manager.is_enabled(name)If def enable(self, name: str) -> bool:
"""Enable a plugin."""
- if name in self._enabled:
- self._enabled[name] = True
- return True
- return False
+ with self._lock:
+ if name in self._enabled:
+ self._enabled[name] = True
+ return True
+ return False
def disable(self, name: str) -> bool:
"""Disable a plugin."""
- if name in self._enabled:
- self._enabled[name] = False
- return True
- return False
+ with self._lock:
+ if name in self._enabled:
+ self._enabled[name] = False
+ return True
+ return False
def is_enabled(self, name: str) -> bool:
"""Check if a plugin is enabled."""
- return self._enabled.get(name, False)
+ with self._lock:
+ return self._enabled.get(name, False)Also applies to: 114-138, 163-173, 237-243 🤖 Prompt for AI Agents |
||
| _plugins_enabled: bool = False | ||
| _enabled_plugin_names: list = None # None = all, list = specific | ||
|
|
||
|
|
@@ -108,8 +111,9 @@ def enable(plugins: list = None) -> None: | |
| """ | ||
| global _plugins_enabled, _enabled_plugin_names | ||
|
|
||
| _plugins_enabled = True | ||
| _enabled_plugin_names = plugins # None = all, list = specific | ||
| with _plugins_lock: | ||
| _plugins_enabled = True | ||
| _enabled_plugin_names = plugins # None = all, list = specific | ||
|
|
||
| # Get plugin manager and auto-discover | ||
| from .manager import get_plugin_manager | ||
|
|
@@ -119,10 +123,14 @@ def enable(plugins: list = None) -> None: | |
| manager.auto_discover_plugins() | ||
| manager.discover_entry_points() | ||
|
|
||
| # Snapshot the names under lock to avoid TOCTOU | ||
| with _plugins_lock: | ||
| target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None | ||
|
|
||
| # Enable specific plugins or all | ||
| if plugins is not None: | ||
| if target_plugins is not None: | ||
| # Enable only specified plugins | ||
| for name in plugins: | ||
| for name in target_plugins: | ||
| manager.enable(name) | ||
| else: | ||
| # Enable all discovered plugins | ||
|
|
@@ -158,8 +166,9 @@ def disable(plugins: list = None) -> None: | |
| manager.disable(name) | ||
| else: | ||
| # Disable all plugins | ||
| _plugins_enabled = False | ||
| _enabled_plugin_names = None | ||
| with _plugins_lock: | ||
| _plugins_enabled = False | ||
| _enabled_plugin_names = None | ||
| for plugin_info in manager.list_plugins(): | ||
| manager.disable(plugin_info.get("name", "")) | ||
|
|
||
|
|
@@ -225,10 +234,9 @@ def is_enabled(name: str = None) -> bool: | |
| Returns: | ||
| True if enabled, False otherwise. | ||
| """ | ||
| global _plugins_enabled | ||
|
|
||
| if name is None: | ||
| return _plugins_enabled | ||
| with _plugins_lock: | ||
| if name is None: | ||
| return _plugins_enabled | ||
|
|
||
| from .manager import get_plugin_manager | ||
| manager = get_plugin_manager() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 11770
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 6363
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 4024
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 760
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 624
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 115
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 153
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 153
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 1260
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 451
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 412
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 153
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 126
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 399
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 809
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 362
🏁 Script executed:
Repository: MervinPraison/PraisonAI
Length of output: 3324
Cross-loop
asyncio.Semaphorebinding will causeRuntimeErrorand the timeout guarantee is defeated.asyncio.Semaphorebinds itself to the first event loop that calls_get_loop()(duringacquire()). This rewrite introduces subtle issues:Lines 109–114 (no running loop branch): Creates a loop, acquires (which binds the semaphore to that loop), then immediately closes it. The semaphore retains a reference to the closed loop in
_loop. Any subsequentawait acquire()from a different event loop will raiseRuntimeError: Semaphore object is bound to a different event loopbecause_get_loop()enforces loop identity.Lines 96–107 (inside running loop branch): Acquires in a worker thread's new loop, binding the semaphore to that short-lived loop. If
release()is later called from the main async context (which has a different loop), the sameRuntimeErroroccurs. Even if it doesn't, the separated loop contexts defeat the purpose of a semaphore.Timeout is illusory:
with ThreadPoolExecutor(...) as poolcallsshutdown(wait=True)on__exit__, blocking indefinitely until the worker finishes. Ifresult(timeout=30)raisesTimeoutError, thewithblock still waits for the hungacquire(), defeating the 30-second guarantee.Violates coding guidelines: "use asyncio primitives for coordination, not threading" — this bridges an asyncio primitive via threading, which breaks loop-binding semantics. The pattern "One event loop per thread; never nest event loops" is intended to prevent exactly this scenario.
Recommended fix: Don't use
asyncio.Semaphorefor sync contexts. Instead:threading.BoundedSemaphorefor the sync path, created alongside the async one in_get_semaphore()acquire_sync(), use the sync semaphore with a properacquire(timeout=...)callrelease()to release whichever semaphore was acquired (or split intorelease_sync())Alternatively, make
acquire_sync()raise a clear error when called from within a running loop, directing users toawait acquire()instead (fail fast per guidelines).🤖 Prompt for AI Agents