diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index ab9932bb9..30ab480e2 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -169,6 +169,16 @@ def _is_file_path(value: str) -> bool: _registered_agents = {} # Dict of port -> Dict of path -> agent_id _shared_apps = {} # Dict of port -> FastAPI app +def _get_registered_agents_for_port(port: int) -> Dict[str, str]: + """Safely get registered agents for a port (thread-safe).""" + with _server_lock: + return _registered_agents.get(port, {}).copy() + +def _get_shared_app_for_port(port: int): + """Safely get shared app for a port (thread-safe).""" + with _server_lock: + return _shared_apps.get(port) + # Don't import FastAPI dependencies here - use lazy loading instead if TYPE_CHECKING: @@ -8789,8 +8799,10 @@ def run_server(): try: print(f"โœ… FastAPI server started at http://{host}:{port}") print(f"๐Ÿ“š API documentation available at http://{host}:{port}/docs") - print(f"๐Ÿ”Œ Available endpoints: {', '.join(list(_registered_agents[port].keys()))}") - uvicorn.run(_shared_apps[port], host=host, port=port, log_level="debug" if debug else "info") + endpoints = _get_registered_agents_for_port(port) + print(f"๐Ÿ”Œ Available endpoints: {', '.join(list(endpoints.keys()))}") + app = _get_shared_app_for_port(port) + uvicorn.run(app, host=host, port=port, log_level="debug" if debug else "info") except Exception as e: logging.error(f"Error starting server: {str(e)}", exc_info=True) print(f"โŒ Error starting server: {str(e)}") @@ -8804,7 +8816,8 @@ def run_server(): else: # If server is already running, wait a moment to make sure the endpoint is registered time.sleep(0.1) - print(f"๐Ÿ”Œ Available endpoints on port {port}: {', '.join(list(_registered_agents[port].keys()))}") + endpoints = _get_registered_agents_for_port(port) + print(f"๐Ÿ”Œ Available endpoints on port {port}: {', '.join(list(endpoints.keys()))}") # Get the stack frame to check if this is the last launch() call in the script import inspect diff --git a/src/praisonai-agents/praisonaiagents/agent/context_agent.py b/src/praisonai-agents/praisonaiagents/agent/context_agent.py index 06ee9da5a..929b5adcc 100644 --- a/src/praisonai-agents/praisonaiagents/agent/context_agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/context_agent.py @@ -19,41 +19,52 @@ from typing import Optional, Any, Dict, Union, List, TYPE_CHECKING # Lazy imports for performance - these are only loaded when needed +# Thread-safe lazy loading with double-checked locking pattern +import threading +_lazy_lock = threading.Lock() _subprocess = None _glob = None _ast = None _asyncio = None def _get_subprocess(): - """Lazy import subprocess to avoid import-time overhead.""" + """Lazy import subprocess to avoid import-time overhead (thread-safe).""" global _subprocess if _subprocess is None: - import subprocess - _subprocess = subprocess + with _lazy_lock: + if _subprocess is None: + import subprocess + _subprocess = subprocess return _subprocess def _get_glob(): - """Lazy import glob to avoid import-time overhead.""" + """Lazy import glob to avoid import-time overhead (thread-safe).""" global _glob if _glob is None: - import glob - _glob = glob + with _lazy_lock: + if _glob is None: + import glob + _glob = glob return _glob def _get_ast(): - """Lazy import ast to avoid import-time overhead.""" + """Lazy import ast to avoid import-time overhead (thread-safe).""" global _ast if _ast is None: - import ast - _ast = ast + with _lazy_lock: + if _ast is None: + import ast + _ast = ast return _ast def _get_asyncio(): - """Lazy import asyncio to avoid import-time overhead.""" + """Lazy import asyncio to avoid import-time overhead (thread-safe).""" global _asyncio if _asyncio is None: - import asyncio - _asyncio = asyncio + with _lazy_lock: + if _asyncio is None: + import asyncio + _asyncio = asyncio return _asyncio async def _async_subprocess_run(cmd: list, timeout: int = 60) -> tuple: diff --git a/test_agentic_functionality.py b/test_agentic_functionality.py new file mode 100644 index 000000000..87f0ca8d8 --- /dev/null +++ b/test_agentic_functionality.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +""" +Real agentic test as required by AGENTS.md section 9.4. + +This test verifies that the Agent actually runs end-to-end and calls the LLM, +not just smoke tests. This ensures our thread safety fixes don't break actual +agent functionality. +""" + +import sys +import os + +# Add the path to find praisonaiagents +sys.path.insert(0, '/home/runner/work/PraisonAI/PraisonAI/src/praisonai-agents') + +def test_real_agentic_functionality(): + """Test that Agent actually runs and calls LLM end-to-end.""" + print("๐Ÿงช Running real agentic test (Agent must call LLM)...") + + try: + from praisonaiagents import Agent + + # Create agent and run a real task + agent = Agent(name="test", instructions="You are a helpful assistant") + print(f"๐Ÿ“‹ Agent created: {agent.name}") + + # This MUST call the LLM and produce actual output + print("๐Ÿš€ Starting agent with real prompt...") + result = agent.start("Say hello in one sentence") + + # Print full output for verification + print("๐Ÿ“„ Agent output:") + print(f"Result: {result}") + + # Verify we got actual output + if not result or not isinstance(result, str) or len(result.strip()) == 0: + print("โŒ Agent did not produce valid output") + return False + + print("โœ… Agent successfully called LLM and produced output!") + return True + + except Exception as e: + print(f"โŒ Agent test failed with exception: {e}") + import traceback + traceback.print_exc() + return False + + +def main(): + """Run the real agentic test.""" + print("๐Ÿค– Testing real agentic functionality after thread safety fixes...\n") + + success = test_real_agentic_functionality() + + print("\n" + "=" * 60) + if success: + print("๐ŸŽ‰ Real agentic test PASSED!") + print("โœ… Thread safety fixes do not break agent functionality") + else: + print("โŒ Real agentic test FAILED!") + print("โš ๏ธ Agent functionality may be broken") + + return success + + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1) \ No newline at end of file diff --git a/test_thread_safety_fixes.py b/test_thread_safety_fixes.py new file mode 100644 index 000000000..f4dc6288c --- /dev/null +++ b/test_thread_safety_fixes.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +""" +Comprehensive thread safety test for Issue #1145 fixes. + +This test verifies that all thread-unsafe global mutable state issues have been fixed: +1. Context agent lazy loaders are thread-safe +2. Agent counter is thread-safe (already fixed) +3. Tools instance cache is thread-safe (already fixed) +4. HTTP server globals access is thread-safe +5. Agent lazy loaders are thread-safe (already fixed) +""" + +import threading +import time +import concurrent.futures +from typing import List, Any +import sys +import os + +# Add the path to find praisonaiagents +sys.path.insert(0, '/home/runner/work/PraisonAI/PraisonAI/src/praisonai-agents') + +def test_context_agent_lazy_loaders(): + """Test that context agent lazy loaders are thread-safe.""" + print("๐Ÿงช Testing context agent lazy loaders thread safety...") + + # Import the functions from context_agent + from praisonaiagents.agent.context_agent import _get_subprocess, _get_glob, _get_ast, _get_asyncio + + results = [] + errors = [] + + def worker(func, results_list, error_list): + try: + result = func() + results_list.append(result) + except Exception as e: + error_list.append(e) + + # Test each lazy loader with concurrent access + for func_name, func in [ + ("_get_subprocess", _get_subprocess), + ("_get_glob", _get_glob), + ("_get_ast", _get_ast), + ("_get_asyncio", _get_asyncio), + ]: + print(f" โ€ข Testing {func_name}...") + results.clear() + errors.clear() + + # Run 10 concurrent calls to the same lazy loader + threads = [] + for i in range(10): + thread = threading.Thread(target=worker, args=(func, results, errors)) + threads.append(thread) + + # Start all threads at once + for thread in threads: + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + # Check results + if errors: + print(f" โŒ Errors in {func_name}: {errors}") + return False + + if len(results) != 10: + print(f" โŒ Expected 10 results, got {len(results)} for {func_name}") + return False + + # All results should be the same object (same module) + first_result = results[0] + if not all(r is first_result for r in results): + print(f" โŒ Different objects returned by {func_name} - not thread-safe!") + return False + + print(f" โœ… {func_name} is thread-safe") + + print("โœ… Context agent lazy loaders thread safety test passed!") + return True + + +def test_agent_counter_thread_safety(): + """Test that agent counter increment is thread-safe.""" + print("๐Ÿงช Testing agent counter thread safety...") + + from praisonaiagents import Agent + + # Create many agents concurrently to test counter + agents = [] + errors = [] + + def create_agent(): + try: + # Create nameless agent to trigger counter increment + agent = Agent(instructions="test assistant") + agents.append(agent) + except Exception as e: + errors.append(e) + + # Create 50 agents concurrently + threads = [] + for i in range(50): + thread = threading.Thread(target=create_agent) + threads.append(thread) + + # Start all threads + for thread in threads: + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + if errors: + print(f" โŒ Errors creating agents: {errors}") + return False + + # Check that all agents have unique indices + indices = [agent._agent_index for agent in agents] + if len(set(indices)) != len(indices): + print(f" โŒ Duplicate agent indices found: {len(set(indices))} unique out of {len(indices)}") + return False + + print(f"โœ… Agent counter thread safety test passed! Created {len(agents)} agents with unique indices.") + return True + + +def test_tools_instance_cache(): + """Test that tools instance cache is thread-safe.""" + print("๐Ÿงช Testing tools instance cache thread safety...") + + # This test would be complex to set up properly, but we can at least + # verify the _instances_lock exists and the code pattern is correct + try: + from praisonaiagents.tools import _tools_lock + print("โœ… Tools lock exists and is accessible") + return True + except ImportError: + print("โŒ Could not import _tools_lock") + return False + + +def test_agent_lazy_loaders(): + """Test that agent lazy loaders are thread-safe.""" + print("๐Ÿงช Testing agent lazy loaders thread safety...") + + # Import the lazy loader functions from agent.py + from praisonaiagents.agent.agent import _get_console, _get_live, _get_llm_functions + + results = [] + errors = [] + + def worker(func, results_list, error_list): + try: + result = func() + results_list.append(result) + except Exception as e: + error_list.append(e) + + # Test each lazy loader with concurrent access + for func_name, func in [ + ("_get_console", _get_console), + ("_get_live", _get_live), + ("_get_llm_functions", _get_llm_functions), + ]: + print(f" โ€ข Testing {func_name}...") + results.clear() + errors.clear() + + # Run 10 concurrent calls to the same lazy loader + threads = [] + for i in range(10): + thread = threading.Thread(target=worker, args=(func, results, errors)) + threads.append(thread) + + # Start all threads at once + for thread in threads: + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + # Check results + if errors: + print(f" โŒ Errors in {func_name}: {errors}") + return False + + if len(results) != 10: + print(f" โŒ Expected 10 results, got {len(results)} for {func_name}") + return False + + # All results should be the same object (same module/function) + first_result = results[0] + if not all(r is first_result for r in results): + print(f" โŒ Different objects returned by {func_name} - not thread-safe!") + return False + + print(f" โœ… {func_name} is thread-safe") + + print("โœ… Agent lazy loaders thread safety test passed!") + return True + + +def test_concurrent_imports(): + """Test that concurrent imports work correctly.""" + print("๐Ÿงช Testing concurrent imports...") + + import importlib + import sys + + # Remove the modules if they're already loaded + modules_to_test = [ + 'praisonaiagents.agent.context_agent', + 'praisonaiagents.agent.agent', + ] + + for module_name in modules_to_test: + if module_name in sys.modules: + del sys.modules[module_name] + + errors = [] + + def import_worker(module_name): + try: + importlib.import_module(module_name) + except Exception as e: + errors.append(f"{module_name}: {e}") + + # Import modules concurrently + threads = [] + for module_name in modules_to_test: + for _ in range(5): # 5 concurrent imports per module + thread = threading.Thread(target=import_worker, args=(module_name,)) + threads.append(thread) + + # Start all threads + for thread in threads: + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + if errors: + print(f" โŒ Import errors: {errors}") + return False + + print("โœ… Concurrent imports test passed!") + return True + + +def main(): + """Run all thread safety tests.""" + print("๐Ÿš€ Running thread safety tests for Issue #1145 fixes...\n") + + tests = [ + ("Context Agent Lazy Loaders", test_context_agent_lazy_loaders), + ("Agent Counter Thread Safety", test_agent_counter_thread_safety), + ("Tools Instance Cache", test_tools_instance_cache), + ("Agent Lazy Loaders", test_agent_lazy_loaders), + ("Concurrent Imports", test_concurrent_imports), + ] + + passed = 0 + failed = 0 + + for test_name, test_func in tests: + try: + if test_func(): + passed += 1 + else: + failed += 1 + print(f"โŒ {test_name} FAILED\n") + except Exception as e: + failed += 1 + print(f"โŒ {test_name} FAILED with exception: {e}\n") + print() + + print("=" * 60) + print(f"๐Ÿ“Š Test Results: {passed} passed, {failed} failed") + + if failed == 0: + print("๐ŸŽ‰ All thread safety tests passed!") + return True + else: + print(f"๐Ÿ˜ž {failed} test(s) failed") + return False + + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1) \ No newline at end of file