Skip to content

Commit 6baed2f

Browse files
praisonai-triage-agent[bot]MervinPraisonCopilot
authored
fix: Core SDK - resolve unsafe async patterns and global mutable state (#1450)
* fix: replace unsafe async callback dispatch and module-level mutable globals - Replace unsafe asyncio.run() inside running event loops with run_coroutine_safely() utility - Replace module-level mutable globals with thread-safe _AgentServerRegistry class - Fixes race conditions in concurrent launch() calls - Addresses issue #1449 points 2 and 3 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com> * fix: replace unsafe asyncio.Lock() creation with async context method - Add _get_state_lock() method to create asyncio.Lock within async context - Fixes unsafe lazy lock creation that could bind to wrong event loop - Addresses issue #1449 point 4 (asyncio.Lock creation issue) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com> * fix: make agent server route registration atomic and lock-safe Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/b3b60c87-cf60-4e59-8852-07721eb9e9f6 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com> * docs: clarify reserve_route return tuple semantics Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/b3b60c87-cf60-4e59-8852-07721eb9e9f6 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com> * fix: resolve critical server startup and thread-safety issues - Fix uvicorn.Config duplicate log_level parameter causing TypeError - Add thread-safe accessor methods (has_route, list_routes) to registry - Move hot-path run_coroutine_safely import to module scope - Add proper type annotations and noqa comment for host parameter - Hoist threading import to module level for consistency Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com> --------- Co-authored-by: praisonai-triage-agent[bot] <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com>
1 parent 2bc3ffd commit 6baed2f

File tree

2 files changed

+151
-97
lines changed

2 files changed

+151
-97
lines changed

src/praisonai-agents/praisonaiagents/agents/agents.py

Lines changed: 140 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import time
33
import json
44
import logging
5+
import threading
56
from praisonaiagents._logging import get_logger
6-
from typing import Any, Dict, Optional, List
7+
from typing import Any, Dict, Optional, List, Tuple
78
from ..main import display_error, TaskOutput
89
from ..agent.agent import Agent
910
from ..task.task import Task
@@ -18,6 +19,12 @@
1819
except ImportError:
1920
_token_collector = None
2021

22+
# Import async utility for hot-path usage
23+
try:
24+
from ..approval.utils import run_coroutine_safely
25+
except ImportError:
26+
run_coroutine_safely = None
27+
2128
# Task status constants
2229
class TaskStatus(Enum):
2330
"""Enumeration for task status values to ensure consistency"""
@@ -30,12 +37,101 @@ class TaskStatus(Enum):
3037
# Set up logger
3138
logger = get_logger(__name__)
3239

33-
# Global variables for managing the shared servers with thread-safety
34-
import threading
35-
_agents_server_lock = threading.Lock() # Protect all global server state mutations
36-
_agents_server_started = {} # Dict of port -> started boolean
37-
_agents_registered_endpoints = {} # Dict of port -> Dict of path -> endpoint_id
38-
_agents_shared_apps = {} # Dict of port -> FastAPI app
40+
# Agent server registry for thread-safe server management
41+
42+
43+
class _AgentServerRegistry:
44+
"""Encapsulates all shared HTTP server state with proper synchronization."""
45+
46+
def __init__(self):
47+
self._lock = threading.Lock()
48+
self._started: Dict[int, bool] = {}
49+
self._endpoints: Dict[int, Dict[str, str]] = {}
50+
self._apps: Dict[int, Any] = {} # FastAPI apps
51+
self._ready_events: Dict[int, threading.Event] = {}
52+
53+
def get_or_create_app(self, port: int, title: str = "AgentTeam API") -> Tuple[Any, bool]:
54+
"""Thread-safe app creation. Returns (app, is_new)."""
55+
with self._lock:
56+
if port not in self._apps:
57+
# Lazy import to avoid optional dependency at module level
58+
from fastapi import FastAPI
59+
self._apps[port] = FastAPI(title=title)
60+
self._endpoints[port] = {}
61+
self._ready_events[port] = threading.Event()
62+
return self._apps[port], True
63+
return self._apps[port], False
64+
65+
def register_route(self, port: int, path: str, endpoint_id: str = "registered") -> None:
66+
"""Thread-safe route registration tracking."""
67+
with self._lock:
68+
if port not in self._endpoints:
69+
self._endpoints[port] = {}
70+
self._endpoints[port][path] = endpoint_id
71+
72+
def reserve_route(self, port: int, path: str, endpoint_id: str) -> tuple[str, Optional[str]]:
73+
"""Atomically reserve and register a route.
74+
75+
Returns:
76+
tuple[str, Optional[str]]: (final_path, original_path_if_collided).
77+
If there is no collision, final_path equals the requested path and
78+
original_path_if_collided is None.
79+
"""
80+
with self._lock:
81+
if port not in self._endpoints:
82+
self._endpoints[port] = {}
83+
84+
original_path = path
85+
while path in self._endpoints[port]:
86+
path = f"{original_path}_{str(uuid.uuid4())[:6]}"
87+
88+
self._endpoints[port][path] = endpoint_id
89+
return path, (original_path if path != original_path else None)
90+
91+
def list_routes(self, port: int) -> List[str]:
92+
"""Return a snapshot list of registered routes for a port."""
93+
with self._lock:
94+
return list(self._endpoints.get(port, {}).keys())
95+
96+
def has_route(self, port: int, path: str) -> bool:
97+
"""Thread-safe check whether a path is registered on a port."""
98+
with self._lock:
99+
return path in self._endpoints.get(port, {})
100+
101+
def is_server_started(self, port: int) -> bool:
102+
"""Check if server is started for this port."""
103+
with self._lock:
104+
return self._started.get(port, False)
105+
106+
def start_server_if_needed(self, port: int, host: str = "0.0.0.0", **kwargs) -> bool: # noqa: S104
107+
"""Start server with proper readiness signaling. Returns True if server was started."""
108+
with self._lock:
109+
if self._started.get(port, False):
110+
return False # Already started
111+
self._started[port] = True
112+
app = self._apps.get(port)
113+
114+
if not app:
115+
raise ValueError(f"No app registered for port {port}")
116+
117+
ready_event = self._ready_events[port]
118+
119+
def run_server():
120+
import uvicorn
121+
# Remove hardcoded log_level to avoid conflict with kwargs
122+
config = uvicorn.Config(app, host=host, port=port, **kwargs)
123+
server = uvicorn.Server(config)
124+
ready_event.set() # Signal readiness
125+
server.run()
126+
127+
thread = threading.Thread(target=run_server, daemon=True)
128+
thread.start()
129+
ready_event.wait(timeout=5.0) # Deterministic wait instead of sleep(0.5)
130+
return True
131+
132+
133+
# Module level — single registry instance
134+
_server_registry = _AgentServerRegistry()
39135

40136
def encode_file_to_base64(file_path: str) -> str:
41137
"""Base64-encode a file."""
@@ -933,12 +1029,10 @@ async def arun_task(self, task_id):
9331029
if task.callback:
9341030
try:
9351031
if asyncio.iscoroutinefunction(task.callback):
936-
try:
937-
loop = asyncio.get_running_loop()
938-
loop.create_task(task.callback(task_output))
939-
except RuntimeError:
940-
# No event loop running, create new one
941-
asyncio.run(task.callback(task_output))
1032+
if run_coroutine_safely:
1033+
run_coroutine_safely(task.callback(task_output))
1034+
else:
1035+
logger.warning("run_coroutine_safely not available, skipping async callback")
9421036
else:
9431037
task.callback(task_output)
9441038
except Exception as e:
@@ -1164,12 +1258,10 @@ def run_task(self, task_id):
11641258
if task.callback:
11651259
try:
11661260
if asyncio.iscoroutinefunction(task.callback):
1167-
try:
1168-
loop = asyncio.get_running_loop()
1169-
loop.create_task(task.callback(task_output))
1170-
except RuntimeError:
1171-
# No event loop running, create new one
1172-
asyncio.run(task.callback(task_output))
1261+
if run_coroutine_safely:
1262+
run_coroutine_safely(task.callback(task_output))
1263+
else:
1264+
logger.warning("run_coroutine_safely not available, skipping async callback")
11731265
else:
11741266
task.callback(task_output)
11751267
except Exception as e:
@@ -1643,7 +1735,7 @@ def launch(self, path: str = '/agents', port: int = 8000, host: str = '0.0.0.0',
16431735
None
16441736
"""
16451737
if protocol == "http":
1646-
global _agents_server_started, _agents_registered_endpoints, _agents_shared_apps
1738+
# Use centralized server registry
16471739

16481740
if not self.agents:
16491741
logging.warning("No agents to launch for HTTP mode. Add agents to the Agents instance first.")
@@ -1674,58 +1766,43 @@ class AgentQuery(BaseModel):
16741766
print("pip install 'praisonaiagents[api]'")
16751767
return None
16761768

1677-
# Thread-safe initialization of port-specific collections
1678-
with _agents_server_lock:
1679-
# Initialize port-specific collections if needed
1680-
if port not in _agents_registered_endpoints:
1681-
_agents_registered_endpoints[port] = {}
1682-
1683-
# Initialize shared FastAPI app if not already created for this port
1684-
if _agents_shared_apps.get(port) is None:
1685-
_agents_shared_apps[port] = FastAPI(
1686-
title=f"PraisonAI Agents API (Port {port})",
1687-
description="API for interacting with multiple PraisonAI Agents"
1688-
)
1689-
1769+
# Thread-safe initialization of FastAPI app
1770+
app, is_new = _server_registry.get_or_create_app(
1771+
port, f"PraisonAI Agents API (Port {port})"
1772+
)
1773+
1774+
if is_new:
16901775
# Add a root endpoint with a welcome message
1691-
@_agents_shared_apps[port].get("/")
1776+
@app.get("/")
16921777
async def root():
16931778
return {
16941779
"message": f"Welcome to PraisonAI Agents API on port {port}. See /docs for usage.",
1695-
"endpoints": list(_agents_registered_endpoints[port].keys())
1780+
"endpoints": _server_registry.list_routes(port)
16961781
}
16971782

16981783
# Add healthcheck endpoint
1699-
@_agents_shared_apps[port].get("/health")
1784+
@app.get("/health")
17001785
async def healthcheck():
17011786
return {
17021787
"status": "ok",
1703-
"endpoints": list(_agents_registered_endpoints[port].keys())
1788+
"endpoints": _server_registry.list_routes(port)
17041789
}
17051790

17061791
# Normalize path to ensure it starts with /
17071792
if not path.startswith('/'):
17081793
path = f'/{path}'
17091794

1710-
# Thread-safe path registration
1711-
with _agents_server_lock:
1712-
# Check if path is already registered for this port
1713-
if path in _agents_registered_endpoints[port]:
1714-
logging.warning(f"Path '{path}' is already registered on port {port}. Please use a different path.")
1715-
print(f"⚠️ Warning: Path '{path}' is already registered on port {port}.")
1716-
# Use a modified path to avoid conflicts
1717-
original_path = path
1718-
instance_id = str(uuid.uuid4())[:6]
1719-
path = f"{path}_{instance_id}"
1720-
logging.warning(f"Using '{path}' instead of '{original_path}'")
1721-
print(f"🔄 Using '{path}' instead")
1722-
1723-
# Generate a unique ID for this agent group's endpoint
1724-
endpoint_id = str(uuid.uuid4())
1725-
_agents_registered_endpoints[port][path] = endpoint_id
1795+
# Generate a unique ID for this agent group's endpoint and reserve route atomically
1796+
endpoint_id = str(uuid.uuid4())
1797+
path, original_path = _server_registry.reserve_route(port, path, endpoint_id)
1798+
if original_path is not None:
1799+
logging.warning(f"Path '{original_path}' is already registered on port {port}. Please use a different path.")
1800+
print(f"⚠️ Warning: Path '{original_path}' is already registered on port {port}.")
1801+
logging.warning(f"Using '{path}' instead of '{original_path}'")
1802+
print(f"🔄 Using '{path}' instead")
17261803

17271804
# Define the endpoint handler
1728-
@_agents_shared_apps[port].post(path)
1805+
@app.post(path)
17291806
async def handle_query(request: Request, query_data: Optional[AgentQuery] = None):
17301807
# Handle both direct JSON with query field and form data
17311808
if query_data is None:
@@ -1804,7 +1881,7 @@ async def handle_query(request: Request, query_data: Optional[AgentQuery] = None
18041881
agents_dict = {agent.display_name.lower().replace(' ', '_'): agent for agent in self.agents}
18051882

18061883
# Add GET endpoint to list available agents
1807-
@_agents_shared_apps[port].get(f"{path}/list")
1884+
@app.get(f"{path}/list")
18081885
async def list_agents():
18091886
return {
18101887
"agents": [
@@ -1850,45 +1927,19 @@ async def handle_single_agent(request: Request):
18501927
)
18511928
return handle_single_agent
18521929

1853-
# Register the endpoint with thread safety
1854-
_agents_shared_apps[port].post(agent_path)(create_agent_handler(agent_instance))
1855-
with _agents_server_lock:
1856-
_agents_registered_endpoints[port][agent_path] = f"{endpoint_id}_{agent_id}"
1930+
# Register the endpoint
1931+
app.post(agent_path)(create_agent_handler(agent_instance))
1932+
_server_registry.register_route(port, agent_path, f"{endpoint_id}_{agent_id}")
18571933

18581934
print(f"🔗 Per-agent endpoints: {', '.join([f'{path}/{aid}' for aid in agents_dict.keys()])}")
18591935

18601936
# Start the server if it's not already running for this port
1861-
with _agents_server_lock:
1862-
if not _agents_server_started.get(port, False):
1863-
# Mark the server as started first to prevent duplicate starts
1864-
_agents_server_started[port] = True
1865-
should_start_server = True
1866-
else:
1867-
should_start_server = False
1868-
1869-
if should_start_server:
1937+
if _server_registry.start_server_if_needed(port, host, log_level="debug" if debug else "info"):
1938+
print(f"✅ FastAPI server started at http://{host}:{port}")
1939+
print(f"📚 API documentation available at http://{host}:{port}/docs")
18701940

1871-
# Start the server in a separate thread
1872-
def run_server():
1873-
try:
1874-
print(f"✅ FastAPI server started at http://{host}:{port}")
1875-
print(f"📚 API documentation available at http://{host}:{port}/docs")
1876-
print(f"🔌 Registered HTTP endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}")
1877-
uvicorn.run(_agents_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info")
1878-
except Exception as e:
1879-
logging.error(f"Error starting server: {str(e)}", exc_info=True)
1880-
print(f"❌ Error starting server: {str(e)}")
1881-
1882-
# Run server in a background thread
1883-
server_thread = threading.Thread(target=run_server, daemon=True)
1884-
server_thread.start()
1885-
1886-
# Wait for a moment to allow the server to start and register endpoints
1887-
time.sleep(0.5)
1888-
else:
1889-
# If server is already running, wait a moment to make sure the endpoint is registered
1890-
time.sleep(0.1)
1891-
print(f"🔌 Registered HTTP endpoints on port {port}: {', '.join(list(_agents_registered_endpoints[port].keys()))}")
1941+
endpoints = _server_registry.list_routes(port)
1942+
print(f"🔌 Registered HTTP endpoints on port {port}: {', '.join(endpoints)}")
18921943

18931944
# Get the stack frame to check if this is the last launch() call in the script
18941945
import inspect

src/praisonai-agents/praisonaiagents/process/process.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def __init__(
4646
self.task_retry_counter: Dict[str, int] = {} # Initialize retry counter
4747
self.workflow_finished = False # ADDED: Workflow finished flag
4848
self.workflow_cancelled = False # ADDED: Workflow cancellation flag for timeout
49-
self._state_lock_init = threading.Lock() # Thread lock for async lock creation
49+
self._state_lock_init = threading.Lock() # Thread lock for synchronous shared-state updates
5050
self._state_lock = None # Lazy-initialized async lock for shared state protection
5151

5252
# Resolve verbose from output= param (takes precedence) or legacy verbose= param
@@ -64,6 +64,12 @@ def __init__(
6464

6565
logging.debug(f"Verbose mode: {self._verbose}")
6666

67+
async def _get_state_lock(self):
68+
"""Get or create the async state lock (must be called from async context)."""
69+
if self._state_lock is None:
70+
self._state_lock = asyncio.Lock()
71+
return self._state_lock
72+
6773
def _create_llm_instance(self):
6874
"""Create and return a configured LLM instance for manager tasks."""
6975
return LLM(model=self.manager_llm, temperature=0.7)
@@ -608,12 +614,9 @@ async def aworkflow(self) -> AsyncGenerator[str, None]:
608614
break
609615

610616
# Reset completed task to "not started" so it can run again (atomic operation)
611-
# Atomic state lock initialization
612-
if self._state_lock is None:
613-
with self._state_lock_init:
614-
if self._state_lock is None: # Double-checked locking pattern
615-
self._state_lock = asyncio.Lock()
616-
async with self._state_lock:
617+
# Get async state lock (created within async context)
618+
lock = await self._get_state_lock()
619+
async with lock:
617620
if self.tasks[task_id].status == "completed":
618621
# Never reset loop tasks, decision tasks, or their subtasks if rerun is False
619622
subtask_name = self.tasks[task_id].name
@@ -1547,4 +1550,4 @@ class ManagerInstructions(BaseModel):
15471550
self.tasks[manager_task.id].status = "completed"
15481551
if self.verbose >= 1:
15491552
logging.info("All tasks completed under manager supervision.")
1550-
logging.info("Hierarchical task execution finished")
1553+
logging.info("Hierarchical task execution finished")

0 commit comments

Comments
 (0)