Skip to content

Commit d80e621

Browse files
fix: resolve wrapper gaps - nested asyncio.run, hot-path logging.basicConfig, and fragmented approval schema (fixes #1456)
- Add _async_bridge.py: Safe async/sync bridging with background loop reuse instead of per-call asyncio.run - Add _logging.py: Namespaced logging without root logger mutation to prevent hot-path basicConfig calls - Add _approval_spec.py: Unified approval configuration spec with validation across CLI, YAML, Python surfaces Updated 7 files to use new modules: - agents_generator.py: Fixed logging init and asyncio.run in _run_autogen_v4, unified approval mapping - bots/_approval_base.py: Replaced ThreadPoolExecutor + asyncio.run pattern with bridge - integrations/base.py: Fixed tool wrapper asyncio.run - integrations/managed_local.py: Fixed 4 asyncio.run instances - integrations/registry.py: Fixed registry async calls - endpoints/a2u_server.py: Fixed event publishing async calls - cli/main.py: Use centralized CLI logging configuration Fixes: 1. Eliminates nested event loop crashes in multi-agent/Jupyter/Chainlit/FastAPI contexts 2. Removes per-instance basicConfig hot-path overhead and root logger mutation 3. Provides single source of truth for approval config with typo validation Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com>
1 parent e1642ee commit d80e621

File tree

10 files changed

+312
-134
lines changed

10 files changed

+312
-134
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
"""
2+
Approval specification module - unified approval configuration across CLI, YAML, Python.
3+
4+
This module provides a single canonical ApprovalSpec dataclass that all three
5+
surfaces (CLI, YAML, Python) normalize into, preventing fragmentation and
6+
ensuring consistent behavior across all entry points.
7+
"""
8+
from dataclasses import dataclass
9+
from typing import Optional, Literal, Union, Dict, Any
10+
11+
Backend = Literal["console", "slack", "telegram", "discord", "webhook", "http", "agent", "auto", "none"]
12+
ApprovalLevel = Literal["low", "medium", "high", "critical"]
13+
14+
15+
def _parse_timeout(timeout_val: Optional[Union[str, int, float]]) -> Optional[float]:
16+
"""Parse timeout value to float, handling 'none' case."""
17+
if timeout_val is None:
18+
return None
19+
if isinstance(timeout_val, str) and timeout_val.lower() == 'none':
20+
return None
21+
try:
22+
return float(timeout_val)
23+
except (ValueError, TypeError):
24+
raise ValueError(f"Invalid timeout value: {timeout_val}")
25+
26+
27+
@dataclass(frozen=True)
28+
class ApprovalSpec:
29+
"""
30+
Unified approval specification for CLI, YAML, and Python APIs.
31+
32+
This replaces the fragmented approval configuration scattered across
33+
multiple fields and provides consistent behavior across all surfaces.
34+
"""
35+
enabled: bool = False
36+
backend: Backend = "console"
37+
approve_all_tools: bool = False
38+
timeout: Optional[float] = None
39+
approve_level: Optional[ApprovalLevel] = None
40+
guardrails: Optional[str] = None
41+
42+
@classmethod
43+
def from_cli(cls, args) -> "ApprovalSpec":
44+
"""
45+
Create ApprovalSpec from CLI arguments.
46+
47+
Handles --trust, --approval, --approve-all-tools, --approval-timeout,
48+
--approve-level, and --guardrail flags.
49+
"""
50+
# Determine if approval is enabled from any of the CLI flags
51+
enabled = bool(
52+
getattr(args, 'trust', False) or
53+
getattr(args, 'approval', None) or
54+
getattr(args, 'approve_all_tools', False) or
55+
getattr(args, 'approve_level', None)
56+
)
57+
58+
# Determine backend
59+
if getattr(args, 'trust', False):
60+
backend = "auto" # --trust means auto-approve
61+
elif getattr(args, 'approval', None):
62+
backend = args.approval
63+
else:
64+
backend = "console" if enabled else "none"
65+
66+
return cls(
67+
enabled=enabled,
68+
backend=backend, # type: ignore[arg-type]
69+
approve_all_tools=bool(getattr(args, 'approve_all_tools', False)),
70+
timeout=_parse_timeout(getattr(args, 'approval_timeout', None)),
71+
approve_level=getattr(args, 'approve_level', None),
72+
guardrails=getattr(args, 'guardrail', None),
73+
)
74+
75+
@classmethod
76+
def from_yaml(cls, node: Union[None, bool, str, Dict[str, Any]]) -> "ApprovalSpec":
77+
"""
78+
Create ApprovalSpec from YAML approval configuration.
79+
80+
Accepts:
81+
- None/False: disabled
82+
- True: enabled with console backend
83+
- str: enabled with specified backend
84+
- dict: full configuration
85+
86+
Validates keys to prevent silent typos.
87+
"""
88+
if node is None or node is False:
89+
return cls(enabled=False, backend="none")
90+
if node is True:
91+
return cls(enabled=True, backend="console")
92+
if isinstance(node, str):
93+
return cls(enabled=True, backend=node) # type: ignore[arg-type]
94+
if isinstance(node, dict):
95+
# Validate allowed keys to catch typos early
96+
allowed = {
97+
"enabled", "backend", "approve_all_tools", "timeout",
98+
"approve_level", "guardrails",
99+
# Legacy aliases for backward compatibility
100+
"backend_name", "all_tools", "approval_timeout"
101+
}
102+
unknown = set(node) - allowed
103+
if unknown:
104+
raise ValueError(f"Unknown approval keys: {sorted(unknown)}. Allowed: {sorted(allowed)}")
105+
106+
# Handle legacy aliases
107+
backend = node.get("backend") or node.get("backend_name", "console")
108+
approve_all_tools = node.get("approve_all_tools") or node.get("all_tools", False)
109+
timeout_val = node.get("timeout") or node.get("approval_timeout")
110+
111+
return cls(
112+
enabled=node.get("enabled", True),
113+
backend=backend, # type: ignore[arg-type]
114+
approve_all_tools=bool(approve_all_tools),
115+
timeout=_parse_timeout(timeout_val) if timeout_val is not None else None,
116+
approve_level=node.get("approve_level"),
117+
guardrails=node.get("guardrails"),
118+
)
119+
raise TypeError(f"Unsupported approval node type: {type(node).__name__}")
120+
121+
def to_dict(self) -> Dict[str, Any]:
122+
"""Convert to dictionary for backward compatibility with existing code."""
123+
result = {
124+
"enabled": self.enabled,
125+
"backend": self.backend,
126+
"approve_all_tools": self.approve_all_tools,
127+
}
128+
if self.timeout is not None:
129+
result["timeout"] = self.timeout
130+
if self.approve_level is not None:
131+
result["approve_level"] = self.approve_level
132+
if self.guardrails is not None:
133+
result["guardrails"] = self.guardrails
134+
return result
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""
2+
Async bridge module - single source of truth for running coroutines synchronously.
3+
4+
This module provides a safe way to run async functions from sync contexts,
5+
handling nested event loop scenarios without creating a new event loop
6+
on every call (which is expensive and breaks multi-agent workflows).
7+
"""
8+
import asyncio
9+
import threading
10+
from concurrent.futures import Future
11+
from typing import Awaitable, TypeVar
12+
13+
T = TypeVar("T")
14+
15+
_loop: asyncio.AbstractEventLoop | None = None
16+
_loop_lock = threading.Lock()
17+
18+
19+
def _ensure_background_loop() -> asyncio.AbstractEventLoop:
20+
"""Ensure a background event loop exists and return it."""
21+
global _loop
22+
with _loop_lock:
23+
if _loop is None or _loop.is_closed():
24+
_loop = asyncio.new_event_loop()
25+
t = threading.Thread(target=_loop.run_forever, name="praisonai-async", daemon=True)
26+
t.start()
27+
return _loop
28+
29+
30+
def run_sync(coro: Awaitable[T], *, timeout: float | None = None) -> T:
31+
"""
32+
Run a coroutine synchronously, safe inside a running loop.
33+
34+
This function automatically detects if there's already a running event loop
35+
and handles the execution appropriately:
36+
- If no loop is running: uses asyncio.run() (fastest path)
37+
- If a loop is running: schedules on background loop (safe path)
38+
39+
Args:
40+
coro: The coroutine to run
41+
timeout: Maximum time to wait for completion (seconds)
42+
43+
Returns:
44+
The result of the coroutine
45+
46+
Raises:
47+
TimeoutError: If timeout is exceeded
48+
Any exception raised by the coroutine
49+
"""
50+
try:
51+
running = asyncio.get_running_loop()
52+
except RuntimeError:
53+
running = None
54+
55+
if running is None:
56+
# Cheap path: no outer loop, just run.
57+
return asyncio.run(coro)
58+
59+
# Outer loop exists -> schedule on background loop, do NOT nest asyncio.run.
60+
fut: Future = asyncio.run_coroutine_threadsafe(coro, _ensure_background_loop())
61+
return fut.result(timeout=timeout)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""
2+
Logging configuration module - single source of truth for PraisonAI logging.
3+
4+
This module ensures that:
5+
1. Only the CLI configures the root logger (no library-side mutation)
6+
2. Library code uses namespaced loggers
7+
3. No hot-path basicConfig() calls on every instance creation
8+
4. Embedders keep their own logging configuration intact
9+
"""
10+
import logging
11+
import os
12+
13+
_PKG_LOGGER = "praisonai"
14+
_configured = False
15+
16+
17+
def configure_cli_logging(level: str | int | None = None) -> None:
18+
"""
19+
Configure root logging. Must only be called from the CLI entrypoint.
20+
21+
Args:
22+
level: Log level (string like 'INFO' or logging constant)
23+
"""
24+
global _configured
25+
if _configured:
26+
return
27+
lvl = level or os.environ.get("LOGLEVEL", "WARNING")
28+
logging.basicConfig(level=lvl, format="%(asctime)s - %(levelname)s - %(message)s")
29+
_configured = True
30+
31+
32+
def get_logger(name: str | None = None) -> logging.Logger:
33+
"""
34+
Return a namespaced logger; never touches root logger.
35+
36+
Args:
37+
name: Optional logger name suffix
38+
39+
Returns:
40+
A logger with the praisonai namespace
41+
"""
42+
return logging.getLogger(f"{_PKG_LOGGER}.{name}" if name else _PKG_LOGGER)

src/praisonai/praisonai/agents_generator.py

Lines changed: 45 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,18 @@ def __init__(self, agent_file, framework, config_list, log_level=None, agent_cal
174174
self.agent_yaml = agent_yaml
175175
self.tools = tools or [] # Store tool class names as a list
176176
self.cli_config = cli_config or {} # Store CLI configuration overrides
177-
self.log_level = log_level or logging.getLogger().getEffectiveLevel()
178-
if self.log_level == logging.NOTSET:
179-
self.log_level = os.environ.get('LOGLEVEL', 'INFO').upper()
177+
# Use namespaced logger - no hot-path basicConfig calls
178+
from ._logging import get_logger
179+
self.logger = get_logger("agents_generator")
180180

181-
logging.basicConfig(level=self.log_level, format='%(asctime)s - %(levelname)s - %(message)s')
182-
self.logger = logging.getLogger(__name__)
183-
self.logger.setLevel(self.log_level)
181+
# Set level if provided, but don't mutate root logger
182+
if log_level:
183+
if isinstance(log_level, str):
184+
self.logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
185+
else:
186+
self.logger.setLevel(log_level)
187+
elif os.environ.get('LOGLEVEL'):
188+
self.logger.setLevel(getattr(logging, os.environ.get('LOGLEVEL', 'INFO').upper(), logging.INFO))
184189

185190
# Initialize tool registry (replaces globals() pattern)
186191
self.tool_registry = ToolRegistry()
@@ -238,26 +243,29 @@ def _merge_cli_config(self, config, cli_config):
238243
config['config']['lsp'] = cli_config['lsp']
239244
self.logger.debug(f"CLI override: lsp = {cli_config['lsp']}")
240245

241-
# Handle agent-level overrides (trust, tool_timeout, planning_tools, autonomy, guardrail, approval)
242-
agent_level_fields = ['trust', 'tool_timeout', 'planning_tools', 'autonomy', 'guardrail', 'approval', 'approve_all_tools', 'approval_timeout']
246+
# Handle agent-level overrides using unified approach
247+
agent_level_fields = ['tool_timeout', 'planning_tools', 'autonomy']
243248
agent_overrides = {k: v for k, v in cli_config.items() if k in agent_level_fields}
244249

245-
# Map CLI field names to YAML field names
246-
field_mappings = {
247-
'guardrail': 'guardrails', # CLI uses --guardrail, YAML uses guardrails
248-
'trust': 'approval' # --trust maps to approval=True
249-
}
250-
251-
# Apply field mappings and special handling
252-
for cli_field in field_mappings:
253-
if cli_field in agent_overrides:
254-
value = agent_overrides.pop(cli_field)
255-
if cli_field == 'trust' and value:
256-
# --trust flag maps to approval=True for auto-approval
257-
agent_overrides['approval'] = True
258-
elif cli_field == 'guardrail':
259-
# --guardrail "description" maps to guardrails config
260-
agent_overrides['guardrails'] = value
250+
# Handle approval configuration using unified spec
251+
approval_fields = ['trust', 'approval', 'approve_all_tools', 'approval_timeout', 'approve_level']
252+
if any(field in cli_config for field in approval_fields):
253+
from ._approval_spec import ApprovalSpec
254+
255+
# Create a mock args object for CLI parsing
256+
class MockArgs:
257+
def __init__(self, cli_config):
258+
for field in approval_fields:
259+
setattr(self, field, cli_config.get(field))
260+
self.guardrail = cli_config.get('guardrail')
261+
262+
spec = ApprovalSpec.from_cli(MockArgs(cli_config))
263+
if spec.enabled:
264+
agent_overrides['approval'] = spec.to_dict()
265+
266+
# Handle guardrail separately
267+
if 'guardrail' in cli_config:
268+
agent_overrides['guardrails'] = cli_config['guardrail']
261269

262270
if agent_overrides:
263271
# Apply to all agents in the config
@@ -761,9 +769,10 @@ async def run_autogen_v4_async():
761769
# Close the model client
762770
await model_client.close()
763771

764-
# Run the async function
772+
# Run the async function using safe bridge
773+
from ._async_bridge import run_sync
765774
try:
766-
return asyncio.run(run_autogen_v4_async())
775+
return run_sync(run_autogen_v4_async())
767776
except Exception as e:
768777
self.logger.error(f"Error running AutoGen v0.4: {str(e)}")
769778
return f"### AutoGen v0.4 Error ###\n{str(e)}"
@@ -1192,30 +1201,18 @@ def _run_praisonai(self, config, topic, tools_dict):
11921201
stream_enabled = cli_config.get('stream', False)
11931202
stream_metrics = cli_config.get('stream_metrics', False)
11941203

1195-
# Reconstruct approval config from potentially scattered settings
1196-
approval_val = details.get('approval')
1197-
approve_all = details.get('approve_all_tools')
1198-
approval_timeout = details.get('approval_timeout')
1199-
1204+
# Use unified approval specification
12001205
approval_config = None
1201-
if approval_val is not None or approve_all is not None or approval_timeout is not None:
1202-
if isinstance(approval_val, dict):
1203-
approval_dict = approval_val
1204-
else:
1205-
approval_dict = {'backend': approval_val}
1206-
1207-
if approve_all is not None:
1208-
approval_dict['approve_all_tools'] = approve_all
1209-
if approval_timeout is not None:
1210-
approval_dict['approval_timeout'] = approval_timeout
1211-
1206+
if 'approval' in details:
1207+
from ._approval_spec import ApprovalSpec
12121208
try:
1213-
from .cli.features.approval import resolve_approval_config
1214-
# Map common YAML fields to resolve_approval_config parameters
1215-
approval_config = resolve_approval_config(
1216-
backend_name=approval_dict.get('backend') or approval_dict.get('backend_name'),
1217-
all_tools=approval_dict.get('approve_all_tools') or approval_dict.get('all_tools', False),
1218-
timeout=approval_dict.get('approval_timeout') or approval_dict.get('timeout')
1209+
spec = ApprovalSpec.from_yaml(details.get('approval'))
1210+
if spec.enabled:
1211+
from .cli.features.approval import resolve_approval_config
1212+
approval_config = resolve_approval_config(
1213+
backend_name=spec.backend,
1214+
all_tools=spec.approve_all_tools,
1215+
timeout=spec.timeout
12191216
)
12201217
except ImportError:
12211218
# Fallback: Create ApprovalConfig directly if resolve_approval_config isn't available

src/praisonai/praisonai/bots/_approval_base.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,5 @@ async def classify_with_llm(
125125

126126
def sync_wrapper(async_fn, timeout: float):
127127
"""Run *async_fn* (a coroutine) synchronously, handling nested loops."""
128-
try:
129-
loop = asyncio.get_running_loop()
130-
except RuntimeError:
131-
loop = None
132-
133-
if loop and loop.is_running():
134-
import concurrent.futures
135-
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
136-
future = pool.submit(asyncio.run, async_fn)
137-
return future.result(timeout=timeout + 10)
138-
else:
139-
return asyncio.run(async_fn)
128+
from .._async_bridge import run_sync
129+
return run_sync(async_fn, timeout=timeout)

src/praisonai/praisonai/cli/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,9 @@ def _get_autogen():
221221
import autogen
222222
return autogen
223223

224-
logging.basicConfig(level=os.environ.get('LOGLEVEL', 'WARNING') or 'WARNING', format='%(asctime)s - %(levelname)s - %(message)s')
224+
# Configure root logging only at CLI entrypoint
225+
from .._logging import configure_cli_logging
226+
configure_cli_logging(os.environ.get('LOGLEVEL', 'WARNING') or 'WARNING')
225227
logging.getLogger('alembic').setLevel(logging.ERROR)
226228
logging.getLogger('gradio').setLevel(logging.ERROR)
227229
logging.getLogger('gradio').setLevel(os.environ.get('GRADIO_LOGLEVEL', 'WARNING'))

0 commit comments

Comments
 (0)