diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index d726f7da1..001abe2fe 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -14,7 +14,8 @@ # This reduces import time from ~420ms to ~20ms for silent mode # ============================================================================ -# Lazy-loaded modules (populated on first use) +# Lazy-loaded modules (populated on first use, protected by _lazy_import_lock) +_lazy_import_lock = threading.Lock() _rich_console = None _rich_live = None _llm_module = None @@ -23,75 +24,87 @@ _stream_emitter_class = None def _get_console(): - """Lazy load rich.console.Console.""" + """Lazy load rich.console.Console (thread-safe).""" global _rich_console if _rich_console is None: - from rich.console import Console - _rich_console = Console + with _lazy_import_lock: + if _rich_console is None: + from rich.console import Console + _rich_console = Console return _rich_console def _get_live(): - """Lazy load rich.live.Live.""" + """Lazy load rich.live.Live (thread-safe).""" global _rich_live if _rich_live is None: - from rich.live import Live - _rich_live = Live + with _lazy_import_lock: + if _rich_live is None: + from rich.live import Live + _rich_live = Live return _rich_live def _get_llm_functions(): - """Lazy load LLM functions.""" + """Lazy load LLM functions (thread-safe).""" global _llm_module if _llm_module is None: - from ..llm import get_openai_client, process_stream_chunks - _llm_module = { - 'get_openai_client': get_openai_client, - 'process_stream_chunks': process_stream_chunks, - } + with _lazy_import_lock: + if _llm_module is None: + from ..llm import get_openai_client, process_stream_chunks + _llm_module = { + 'get_openai_client': get_openai_client, + 'process_stream_chunks': process_stream_chunks, + } return _llm_module def _get_display_functions(): - """Lazy load display functions from main module.""" + """Lazy load display functions from main module (thread-safe).""" global _main_module if _main_module is None: - from ..main import ( - display_error, - display_instruction, - display_interaction, - display_generating, - display_self_reflection, - ReflectionOutput, - adisplay_instruction, - execute_sync_callback - ) - _main_module = { - 'display_error': display_error, - 'display_instruction': display_instruction, - 'display_interaction': display_interaction, - 'display_generating': display_generating, - 'display_self_reflection': display_self_reflection, - 'ReflectionOutput': ReflectionOutput, - 'adisplay_instruction': adisplay_instruction, - 'execute_sync_callback': execute_sync_callback, - } + with _lazy_import_lock: + if _main_module is None: + from ..main import ( + display_error, + display_instruction, + display_interaction, + display_generating, + display_self_reflection, + ReflectionOutput, + adisplay_instruction, + execute_sync_callback + ) + _main_module = { + 'display_error': display_error, + 'display_instruction': display_instruction, + 'display_interaction': display_interaction, + 'display_generating': display_generating, + 'display_self_reflection': display_self_reflection, + 'ReflectionOutput': ReflectionOutput, + 'adisplay_instruction': adisplay_instruction, + 'execute_sync_callback': execute_sync_callback, + } return _main_module def _get_hooks_module(): - """Lazy load hooks module for HookRunner and HookRegistry.""" + """Lazy load hooks module for HookRunner and HookRegistry (thread-safe).""" global _hooks_module if _hooks_module is None: - from ..hooks import HookRunner, HookRegistry - _hooks_module = { - 'HookRunner': HookRunner, - 'HookRegistry': HookRegistry, - } + with _lazy_import_lock: + if _hooks_module is None: + from ..hooks import HookRunner, HookRegistry + _hooks_module = { + 'HookRunner': HookRunner, + 'HookRegistry': HookRegistry, + } return _hooks_module def _get_stream_emitter(): - """Lazy load StreamEventEmitter class.""" + """Lazy load StreamEventEmitter class (thread-safe).""" global _stream_emitter_class if _stream_emitter_class is None: - from ..streaming.events import StreamEventEmitter - _stream_emitter_class = StreamEventEmitter + with _lazy_import_lock: + if _stream_emitter_class is None: + from ..streaming.events import StreamEventEmitter + _stream_emitter_class = StreamEventEmitter return _stream_emitter_class # File extensions that indicate a file path (for output parameter detection) @@ -159,6 +172,8 @@ class Agent: _agent_counter = 0 _agent_counter_lock = threading.Lock() # Class-level cache for environment variables (avoid repeated os.environ.get) + # Protected by _env_cache_lock for thread safety + _env_cache_lock = threading.Lock() _env_output_mode = None _env_output_checked = False _default_model = None @@ -189,18 +204,22 @@ def stream_emitter(self, value): @classmethod def _get_env_output_mode(cls): - """Get cached PRAISONAI_OUTPUT env var value.""" + """Get cached PRAISONAI_OUTPUT env var value (thread-safe).""" if not cls._env_output_checked: - cls._env_output_mode = os.environ.get('PRAISONAI_OUTPUT', '').lower() - cls._env_output_checked = True + with cls._env_cache_lock: + if not cls._env_output_checked: + cls._env_output_mode = os.environ.get('PRAISONAI_OUTPUT', '').lower() + cls._env_output_checked = True return cls._env_output_mode @classmethod def _get_default_model(cls): - """Get cached default model name from OPENAI_MODEL_NAME env var.""" + """Get cached default model name from OPENAI_MODEL_NAME env var (thread-safe).""" if not cls._default_model_checked: - cls._default_model = os.getenv('OPENAI_MODEL_NAME', 'gpt-4o-mini') - cls._default_model_checked = True + with cls._env_cache_lock: + if not cls._default_model_checked: + cls._default_model = os.getenv('OPENAI_MODEL_NAME', 'gpt-4o-mini') + cls._default_model_checked = True return cls._default_model @classmethod diff --git a/src/praisonai-agents/praisonaiagents/llm/llm.py b/src/praisonai-agents/praisonaiagents/llm/llm.py index a6d4bcb9b..3890d428c 100644 --- a/src/praisonai-agents/praisonaiagents/llm/llm.py +++ b/src/praisonai-agents/praisonaiagents/llm/llm.py @@ -71,7 +71,9 @@ def _get_live(): # Logging is already configured in _logging.py via __init__.py # TODO: Include in-build tool calling in LLM class -# TODO: Restructure so that duplicate calls are not made (Sync with agent.py) +# NOTE: The custom-LLM path (Agent.chat → get_response) and OpenAI path +# (Agent.chat → _chat_completion) are separate code paths, not duplicate +# API calls per request. This is a DRY/maintenance concern, not a billing issue. class LLMContextLengthExceededException(Exception): """Raised when LLM context length is exceeded""" def __init__(self, message: str): diff --git a/src/praisonai-agents/praisonaiagents/tools/python_tools.py b/src/praisonai-agents/praisonaiagents/tools/python_tools.py index 0c75984ed..e7af50b8a 100644 --- a/src/praisonai-agents/praisonaiagents/tools/python_tools.py +++ b/src/praisonai-agents/praisonaiagents/tools/python_tools.py @@ -17,276 +17,296 @@ import traceback from ..approval import require_approval -class PythonTools: - """Tools for Python code execution and analysis.""" - - def __init__(self): - """Initialize PythonTools.""" - self._check_dependencies() - - def _check_dependencies(self): - """Check if required packages are installed.""" - missing = [] - for package in ['black', 'pylint', 'autopep8']: - if util.find_spec(package) is None: - missing.append(package) - - if missing: - raise ImportError( - f"Required packages not available. Please install: {', '.join(missing)}\n" - f"Run: pip install {' '.join(missing)}" - ) +def _safe_getattr(obj, name, *default): + """getattr wrapper that blocks access to dunder attributes.""" + if isinstance(name, str) and name.startswith('_'): + raise AttributeError( + f"Access to private/protected attribute '{name}' is restricted" + ) + return getattr(obj, name, *default) if default else getattr(obj, name) - @staticmethod - def _safe_getattr(obj, name, *default): - """getattr wrapper that blocks access to dunder attributes.""" - if isinstance(name, str) and name.startswith('_'): - raise AttributeError( - f"Access to private/protected attribute '{name}' is restricted" - ) - return getattr(obj, name, *default) if default else getattr(obj, name) - @staticmethod - def _validate_code_ast(code: str): - """Validate code using AST — catches attacks that bypass text checks. +def _validate_code_ast(code: str): + """Validate code using AST — catches attacks that bypass text checks. - Returns error message string if dangerous, None if safe. - """ - import ast + Returns error message string if dangerous, None if safe. + """ + import ast - try: - tree = ast.parse(code) - except SyntaxError: - return None # let compile() handle syntax errors later - - # Dangerous dunder attributes attackers use for sandbox escape - _blocked_attrs = frozenset({ - '__subclasses__', '__bases__', '__mro__', '__globals__', - '__code__', '__class__', '__dict__', '__builtins__', - '__import__', '__loader__', '__spec__', '__init_subclass__', - '__set_name__', '__reduce__', '__reduce_ex__', - '__traceback__', '__qualname__', '__module__', - '__wrapped__', '__closure__', '__annotations__', - # Frame/code object introspection - 'gi_frame', 'gi_code', 'cr_frame', 'cr_code', - 'ag_frame', 'ag_code', 'tb_frame', 'tb_next', - 'f_globals', 'f_locals', 'f_builtins', 'f_code', - 'co_consts', 'co_names', - }) - - for node in ast.walk(tree): - # Block import statements - if isinstance(node, (ast.Import, ast.ImportFrom)): - return f"Import statements are not allowed" - - # Block attribute access to dangerous dunders - if isinstance(node, ast.Attribute): - if node.attr in _blocked_attrs: - return ( - f"Access to attribute '{node.attr}' is restricted" - ) + try: + tree = ast.parse(code) + except SyntaxError: + return None # let compile() handle syntax errors later - # Block calls to dangerous builtins by name - if isinstance(node, ast.Call): - func = node.func - if isinstance(func, ast.Name) and func.id in ( - 'exec', 'eval', 'compile', '__import__', - 'open', 'input', 'breakpoint', - 'setattr', 'delattr', 'dir', - ): - return f"Call to '{func.id}' is not allowed" + # Dangerous dunder attributes attackers use for sandbox escape + _blocked_attrs = frozenset({ + '__subclasses__', '__bases__', '__mro__', '__globals__', + '__code__', '__class__', '__dict__', '__builtins__', + '__import__', '__loader__', '__spec__', '__init_subclass__', + '__set_name__', '__reduce__', '__reduce_ex__', + '__traceback__', '__qualname__', '__module__', + '__wrapped__', '__closure__', '__annotations__', + # Frame/code object introspection + 'gi_frame', 'gi_code', 'cr_frame', 'cr_code', + 'ag_frame', 'ag_code', 'tb_frame', 'tb_next', + 'f_globals', 'f_locals', 'f_builtins', 'f_code', + 'co_consts', 'co_names', + }) - return None + for node in ast.walk(tree): + # Block import statements + if isinstance(node, (ast.Import, ast.ImportFrom)): + return f"Import statements are not allowed" - @require_approval(risk_level="critical") - def execute_code( - self, - code: str, - globals_dict: Optional[Dict[str, Any]] = None, - locals_dict: Optional[Dict[str, Any]] = None, - timeout: int = 30, - max_output_size: int = 10000 - ) -> Dict[str, Any]: - """Execute Python code safely with restricted builtins.""" - try: - # Create safe builtins - restricted set of functions - safe_builtins = { - # Basic functions - 'print': print, - 'len': len, - 'range': range, - 'enumerate': enumerate, - 'zip': zip, - 'map': map, - 'filter': filter, - 'sum': sum, - 'min': min, - 'max': max, - 'abs': abs, - 'round': round, - 'sorted': sorted, - 'reversed': reversed, - 'any': any, - 'all': all, - # Type constructors - 'int': int, - 'float': float, - 'str': str, - 'bool': bool, - 'list': list, - 'tuple': tuple, - 'dict': dict, - 'set': set, - # Math functions - 'pow': pow, - 'divmod': divmod, - # Exceptions - 'Exception': Exception, - 'ValueError': ValueError, - 'TypeError': TypeError, - 'KeyError': KeyError, - 'IndexError': IndexError, - 'RuntimeError': RuntimeError, - # Safe introspection (dunder-blocked wrapper) - 'isinstance': isinstance, - 'type': type, - 'hasattr': hasattr, - 'getattr': self._safe_getattr, - # Class definition support - '__build_class__': __builtins__['__build_class__'] if isinstance(__builtins__, dict) else getattr(__builtins__, '__build_class__', None), - # Disable dangerous functions - '__import__': None, - 'eval': None, - 'exec': None, - 'compile': None, - 'open': None, - 'input': None, - 'globals': None, - 'locals': None, - 'vars': None, + # Block attribute access to dangerous dunders + if isinstance(node, ast.Attribute): + if node.attr in _blocked_attrs: + return ( + f"Access to attribute '{node.attr}' is restricted" + ) + + # Block calls to dangerous builtins by name + if isinstance(node, ast.Call): + func = node.func + if isinstance(func, ast.Name) and func.id in ( + 'exec', 'eval', 'compile', '__import__', + 'open', 'input', 'breakpoint', + 'setattr', 'delattr', 'dir', + ): + return f"Call to '{func.id}' is not allowed" + + return None + + +# ────────────────────────────────────────────────────────────────────── +# Standalone execute_code — NO optional deps required (no black/pylint) +# ────────────────────────────────────────────────────────────────────── + +@require_approval(risk_level="critical") +def execute_code( + code: str, + globals_dict: Optional[Dict[str, Any]] = None, + locals_dict: Optional[Dict[str, Any]] = None, + timeout: int = 30, + max_output_size: int = 10000 +) -> Dict[str, Any]: + """Execute Python code safely with restricted builtins. + + This function is available without any optional dependencies. + It uses a 3-layer security sandbox: + 1. AST-based validation (blocks imports, dangerous dunders, eval/exec) + 2. Text-pattern blocklist (defense-in-depth) + 3. Restricted __builtins__ (only safe functions exposed) + """ + try: + # Create safe builtins - restricted set of functions + safe_builtins = { + # Basic functions + 'print': print, + 'len': len, + 'range': range, + 'enumerate': enumerate, + 'zip': zip, + 'map': map, + 'filter': filter, + 'sum': sum, + 'min': min, + 'max': max, + 'abs': abs, + 'round': round, + 'sorted': sorted, + 'reversed': reversed, + 'any': any, + 'all': all, + # Type constructors + 'int': int, + 'float': float, + 'str': str, + 'bool': bool, + 'list': list, + 'tuple': tuple, + 'dict': dict, + 'set': set, + # Math functions + 'pow': pow, + 'divmod': divmod, + # Exceptions + 'Exception': Exception, + 'ValueError': ValueError, + 'TypeError': TypeError, + 'KeyError': KeyError, + 'IndexError': IndexError, + 'RuntimeError': RuntimeError, + # Safe introspection (dunder-blocked wrapper) + 'isinstance': isinstance, + 'type': type, + 'hasattr': hasattr, + 'getattr': _safe_getattr, + # Class definition support + '__build_class__': __builtins__['__build_class__'] if isinstance(__builtins__, dict) else getattr(__builtins__, '__build_class__', None), + # Disable dangerous functions + '__import__': None, + 'eval': None, + 'exec': None, + 'compile': None, + 'open': None, + 'input': None, + 'globals': None, + 'locals': None, + 'vars': None, + } + + # Set up execution environment with safe builtins + if globals_dict is None: + globals_dict = {'__builtins__': safe_builtins} + else: + # Override builtins in provided globals + globals_dict['__builtins__'] = safe_builtins + + if locals_dict is None: + locals_dict = {} + + # Security check 1: AST-based validation (cannot be bypassed + # by string concatenation or runtime tricks) + ast_error = _validate_code_ast(code) + if ast_error: + return { + 'result': None, + 'stdout': '', + 'stderr': f'Security Error: {ast_error}', + 'success': False } - - # Set up execution environment with safe builtins - if globals_dict is None: - globals_dict = {'__builtins__': safe_builtins} - else: - # Override builtins in provided globals - globals_dict['__builtins__'] = safe_builtins - - if locals_dict is None: - locals_dict = {} - - # Security check 1: AST-based validation (cannot be bypassed - # by string concatenation or runtime tricks) - ast_error = self._validate_code_ast(code) - if ast_error: + + # Security check 2: text-based patterns (defense-in-depth) + dangerous_patterns = [ + '__import__', 'import ', 'from ', 'exec', 'eval', + 'compile', 'open(', 'file(', 'input(', 'raw_input', + '__subclasses__', '__bases__', '__globals__', '__code__', + '__class__', 'globals(', 'locals(', 'vars(' + ] + + code_lower = code.lower() + for pattern in dangerous_patterns: + if pattern.lower() in code_lower: return { 'result': None, 'stdout': '', - 'stderr': f'Security Error: {ast_error}', + 'stderr': f'Security Error: Code contains restricted pattern: {pattern}', 'success': False } - # Security check 2: text-based patterns (defense-in-depth) - dangerous_patterns = [ - '__import__', 'import ', 'from ', 'exec', 'eval', - 'compile', 'open(', 'file(', 'input(', 'raw_input', - '__subclasses__', '__bases__', '__globals__', '__code__', - '__class__', 'globals(', 'locals(', 'vars(' - ] - - code_lower = code.lower() - for pattern in dangerous_patterns: - if pattern.lower() in code_lower: - return { - 'result': None, - 'stdout': '', - 'stderr': f'Security Error: Code contains restricted pattern: {pattern}', - 'success': False - } - - # Capture output - stdout_buffer = io.StringIO() - stderr_buffer = io.StringIO() - - try: - # Compile code with restricted mode - compiled_code = compile(code, '', 'exec') - - # Execute with output capture - with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer): - exec(compiled_code, globals_dict, locals_dict) - - # Get last expression value if any - import ast - tree = ast.parse(code) - if tree.body and isinstance(tree.body[-1], ast.Expr): - result = eval( - compile(ast.Expression(tree.body[-1].value), '', 'eval'), - globals_dict, - locals_dict - ) - else: - result = None - - # Get output - stdout = stdout_buffer.getvalue() - stderr = stderr_buffer.getvalue() - - # Truncate output if too large (use smart format) - if len(stdout) > max_output_size: - tail_size = min(max_output_size // 5, 500) - stdout = stdout[:max_output_size - tail_size] + f"\n...[{len(stdout):,} chars, showing first/last portions]...\n" + stdout[-tail_size:] - if len(stderr) > max_output_size: - tail_size = min(max_output_size // 5, 500) - stderr = stderr[:max_output_size - tail_size] + f"\n...[{len(stderr):,} chars, showing first/last portions]...\n" + stderr[-tail_size:] - - return { - 'result': result, - 'stdout': stdout, - 'stderr': stderr, - 'success': True - } - - except Exception as e: - error_msg = f"Error executing code: {str(e)}" - logging.error(error_msg) - return { - 'result': None, - 'stdout': stdout_buffer.getvalue(), - 'stderr': error_msg, - 'success': False - } - + # Capture output + stdout_buffer = io.StringIO() + stderr_buffer = io.StringIO() + + try: + # Compile code with restricted mode + compiled_code = compile(code, '', 'exec') + + # Execute with output capture + with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer): + exec(compiled_code, globals_dict, locals_dict) + + # Get last expression value if any + import ast + tree = ast.parse(code) + if tree.body and isinstance(tree.body[-1], ast.Expr): + result = eval( + compile(ast.Expression(tree.body[-1].value), '', 'eval'), + globals_dict, + locals_dict + ) + else: + result = None + + # Get output + stdout = stdout_buffer.getvalue() + stderr = stderr_buffer.getvalue() + + # Truncate output if too large (use smart format) + if len(stdout) > max_output_size: + tail_size = min(max_output_size // 5, 500) + stdout = stdout[:max_output_size - tail_size] + f"\n...[{len(stdout):,} chars, showing first/last portions]...\n" + stdout[-tail_size:] + if len(stderr) > max_output_size: + tail_size = min(max_output_size // 5, 500) + stderr = stderr[:max_output_size - tail_size] + f"\n...[{len(stderr):,} chars, showing first/last portions]...\n" + stderr[-tail_size:] + + return { + 'result': result, + 'stdout': stdout, + 'stderr': stderr, + 'success': True + } + except Exception as e: error_msg = f"Error executing code: {str(e)}" logging.error(error_msg) return { 'result': None, - 'stdout': '', + 'stdout': stdout_buffer.getvalue(), 'stderr': error_msg, 'success': False } + except Exception as e: + error_msg = f"Error executing code: {str(e)}" + logging.error(error_msg) + return { + 'result': None, + 'stdout': '', + 'stderr': error_msg, + 'success': False + } + + +# ────────────────────────────────────────────────────────────────────── +# PythonTools class — requires optional deps (black, pylint, autopep8) +# Only instantiated when analyze_code/format_code/lint_code are needed. +# ────────────────────────────────────────────────────────────────────── + +class PythonTools: + """Tools for Python code analysis, formatting, and linting. + + Requires: pip install black pylint autopep8 + For code execution only, use the standalone execute_code() function. + """ + + def __init__(self): + """Initialize PythonTools — checks for required packages.""" + self._check_dependencies() + + def _check_dependencies(self): + """Check if required packages are installed.""" + missing = [] + for package in ['black', 'pylint', 'autopep8']: + if util.find_spec(package) is None: + missing.append(package) + + if missing: + raise ImportError( + f"Required packages not available. Please install: {', '.join(missing)}\n" + f"Run: pip install {' '.join(missing)}" + ) + def analyze_code( self, code: str ) -> Optional[Dict[str, Any]]: """Analyze Python code structure and quality. - + Args: code: Python code to analyze - + Returns: Dictionary with analysis results """ try: # Import ast only when needed import ast - + # Parse code tree = ast.parse(code) - + # Analyze structure analysis = { 'imports': [], @@ -300,7 +320,7 @@ def analyze_code( 'branches': 0 } } - + # Analyze nodes for node in ast.walk(tree): if isinstance(node, ast.Import): @@ -338,7 +358,7 @@ def analyze_code( analysis['variables'].append(target.id) elif isinstance(node, (ast.If, ast.While, ast.For)): analysis['complexity']['branches'] += 1 - + return analysis except Exception as e: error_msg = f"Error analyzing code: {str(e)}" @@ -352,12 +372,12 @@ def format_code( line_length: int = 88 ) -> Optional[str]: """Format Python code according to style guide. - + Args: code: Python code to format style: Formatting style ('black' or 'pep8') line_length: Maximum line length - + Returns: Formatted code """ @@ -393,10 +413,10 @@ def lint_code( code: str ) -> Optional[Dict[str, List[Dict[str, Any]]]]: """Lint Python code for potential issues. - + Args: code: Python code to lint - + Returns: Dictionary with linting results """ @@ -409,7 +429,7 @@ def lint_code( # Import pylint only when needed from pylint.reporters import JSONReporter from pylint.lint.run import Run - + # Create temporary file for pylint import tempfile with tempfile.NamedTemporaryFile( @@ -419,7 +439,7 @@ def lint_code( ) as f: f.write(code) temp_path = f.name - + # Run pylint reporter = JSONReporter() Run( @@ -427,14 +447,14 @@ def lint_code( reporter=reporter, exit=False ) - + # Process results results = { 'errors': [], 'warnings': [], 'conventions': [] } - + for msg in reporter.messages: item = { 'type': msg.category, @@ -447,18 +467,18 @@ def lint_code( 'message': msg.msg, 'message-id': msg.msg_id } - + if msg.category in ['error', 'fatal']: results['errors'].append(item) elif msg.category == 'warning': results['warnings'].append(item) else: results['conventions'].append(item) - + # Clean up import os os.unlink(temp_path) - + return results except Exception as e: error_msg = f"Error linting code: {str(e)}" @@ -474,38 +494,60 @@ def disassemble_code( code: str ) -> Optional[str]: """Disassemble Python code to bytecode. - + Args: code: Python code to disassemble - + Returns: Disassembled bytecode as string """ try: # Import dis only when needed import dis - + # Compile code compiled_code = compile(code, '', 'exec') - + # Capture disassembly output = io.StringIO() with redirect_stdout(output): dis.dis(compiled_code) - + return output.getvalue() except Exception as e: error_msg = f"Error disassembling code: {str(e)}" logging.error(error_msg) return None -# Create instance for direct function access -_python_tools = PythonTools() -execute_code = _python_tools.execute_code -analyze_code = _python_tools.analyze_code -format_code = _python_tools.format_code -lint_code = _python_tools.lint_code -disassemble_code = _python_tools.disassemble_code + +# Lazy accessors for optional-dep tools (PythonTools requires black/pylint/autopep8) +# execute_code is already a standalone function above — always available. +def _get_python_tools(): + """Lazy-init PythonTools (requires black/pylint/autopep8).""" + global _python_tools_instance + try: + return _python_tools_instance + except NameError: + _python_tools_instance = PythonTools() + return _python_tools_instance + +_python_tools_instance = None + +def analyze_code(code: str) -> Optional[Dict[str, Any]]: + """Analyze Python code structure and quality. Requires: pip install black pylint autopep8""" + return _get_python_tools().analyze_code(code) + +def format_code(code: str, style: str = 'black', line_length: int = 88) -> Optional[str]: + """Format Python code. Requires: pip install black pylint autopep8""" + return _get_python_tools().format_code(code, style, line_length) + +def lint_code(code: str) -> Optional[Dict[str, List[Dict[str, Any]]]]: + """Lint Python code. Requires: pip install black pylint autopep8""" + return _get_python_tools().lint_code(code) + +def disassemble_code(code: str) -> Optional[str]: + """Disassemble Python code to bytecode. Requires: pip install black pylint autopep8""" + return _get_python_tools().disassemble_code(code) if __name__ == "__main__": print("\n==================================================") diff --git a/src/praisonai-agents/tests/unit/tools/test_python_tools_sandbox.py b/src/praisonai-agents/tests/unit/tools/test_python_tools_sandbox.py index 50685cd9b..7c22ffddd 100644 --- a/src/praisonai-agents/tests/unit/tools/test_python_tools_sandbox.py +++ b/src/praisonai-agents/tests/unit/tools/test_python_tools_sandbox.py @@ -14,15 +14,15 @@ class _SandboxHarness: - """Directly exercises PythonTools sandbox without installing black/pylint.""" + """Directly exercises execute_code sandbox without installing black/pylint.""" def __init__(self): - # Import the class but bypass _check_dependencies - from praisonaiagents.tools.python_tools import PythonTools - self._pt = PythonTools.__new__(PythonTools) + # Import the standalone execute_code function (no optional deps needed) + from praisonaiagents.tools.python_tools import execute_code + self._execute_code = execute_code def run(self, code: str) -> dict: - return self._pt.execute_code.__wrapped__(self._pt, code) + return self._execute_code.__wrapped__(code) @pytest.fixture