1717# Set up logger
1818logger = logging .getLogger (__name__ )
1919
20+ # Global variables for managing the shared servers
21+ _agents_server_started = {} # Dict of port -> started boolean
22+ _agents_registered_endpoints = {} # Dict of port -> Dict of path -> endpoint_id
23+ _agents_shared_apps = {} # Dict of port -> FastAPI app
24+
2025def encode_file_to_base64 (file_path : str ) -> str :
2126 """Base64-encode a file."""
2227 import base64
@@ -878,4 +883,230 @@ def update_state(self, updates: Dict) -> None:
878883
879884 def clear_state (self ) -> None :
880885 """Clear all state values"""
881- self ._state .clear ()
886+ self ._state .clear ()
887+
888+ def launch (self , path : str = '/agents' , port : int = 8000 , host : str = '0.0.0.0' , debug : bool = False ):
889+ """
890+ Launch all agents as a single API endpoint. The endpoint accepts a query and processes it through
891+ all agents in sequence, with the output of each agent feeding into the next.
892+
893+ Args:
894+ path: API endpoint path (default: '/agents')
895+ port: Server port (default: 8000)
896+ host: Server host (default: '0.0.0.0')
897+ debug: Enable debug mode for uvicorn (default: False)
898+
899+ Returns:
900+ None
901+ """
902+ global _agents_server_started , _agents_registered_endpoints , _agents_shared_apps
903+
904+ if not self .agents :
905+ logging .warning ("No agents to launch. Add agents to the Agents instance first." )
906+ return
907+
908+ # Try to import FastAPI dependencies - lazy loading
909+ try :
910+ import uvicorn
911+ from fastapi import FastAPI , HTTPException , Request
912+ from fastapi .responses import JSONResponse
913+ from pydantic import BaseModel
914+ import threading
915+ import time
916+
917+ # Define the request model here since we need pydantic
918+ class AgentQuery (BaseModel ):
919+ query : str
920+
921+ except ImportError as e :
922+ # Check which specific module is missing
923+ missing_module = str (e ).split ("No module named '" )[- 1 ].rstrip ("'" )
924+ display_error (f"Missing dependency: { missing_module } . Required for launch() method." )
925+ logging .error (f"Missing dependency: { missing_module } . Required for launch() method." )
926+ print (f"\n To add API capabilities, install the required dependencies:" )
927+ print (f"pip install { missing_module } " )
928+ print ("\n Or install all API dependencies with:" )
929+ print ("pip install 'praisonaiagents[api]'" )
930+ return None
931+
932+ # Initialize port-specific collections if needed
933+ if port not in _agents_registered_endpoints :
934+ _agents_registered_endpoints [port ] = {}
935+
936+ # Initialize shared FastAPI app if not already created for this port
937+ if _agents_shared_apps .get (port ) is None :
938+ _agents_shared_apps [port ] = FastAPI (
939+ title = f"PraisonAI Agents API (Port { port } )" ,
940+ description = "API for interacting with multiple PraisonAI Agents"
941+ )
942+
943+ # Add a root endpoint with a welcome message
944+ @_agents_shared_apps [port ].get ("/" )
945+ async def root ():
946+ return {
947+ "message" : f"Welcome to PraisonAI Agents API on port { port } . See /docs for usage." ,
948+ "endpoints" : list (_agents_registered_endpoints [port ].keys ())
949+ }
950+
951+ # Add healthcheck endpoint
952+ @_agents_shared_apps [port ].get ("/health" )
953+ async def healthcheck ():
954+ return {
955+ "status" : "ok" ,
956+ "endpoints" : list (_agents_registered_endpoints [port ].keys ())
957+ }
958+
959+ # Normalize path to ensure it starts with /
960+ if not path .startswith ('/' ):
961+ path = f'/{ path } '
962+
963+ # Check if path is already registered for this port
964+ if path in _agents_registered_endpoints [port ]:
965+ logging .warning (f"Path '{ path } ' is already registered on port { port } . Please use a different path." )
966+ print (f"⚠️ Warning: Path '{ path } ' is already registered on port { port } ." )
967+ # Use a modified path to avoid conflicts
968+ original_path = path
969+ instance_id = str (uuid .uuid4 ())[:6 ]
970+ path = f"{ path } _{ instance_id } "
971+ logging .warning (f"Using '{ path } ' instead of '{ original_path } '" )
972+ print (f"🔄 Using '{ path } ' instead" )
973+
974+ # Generate a unique ID for this agent group's endpoint
975+ endpoint_id = str (uuid .uuid4 ())
976+ _agents_registered_endpoints [port ][path ] = endpoint_id
977+
978+ # Define the endpoint handler
979+ @_agents_shared_apps [port ].post (path )
980+ async def handle_query (request : Request , query_data : Optional [AgentQuery ] = None ):
981+ # Handle both direct JSON with query field and form data
982+ if query_data is None :
983+ try :
984+ request_data = await request .json ()
985+ if "query" not in request_data :
986+ raise HTTPException (status_code = 400 , detail = "Missing 'query' field in request" )
987+ query = request_data ["query" ]
988+ except :
989+ # Fallback to form data or query params
990+ form_data = await request .form ()
991+ if "query" in form_data :
992+ query = form_data ["query" ]
993+ else :
994+ raise HTTPException (status_code = 400 , detail = "Missing 'query' field in request" )
995+ else :
996+ query = query_data .query
997+
998+ try :
999+ # Process the query sequentially through all agents
1000+ current_input = query
1001+ results = []
1002+
1003+ for agent in self .agents :
1004+ try :
1005+ # Use async version if available, otherwise use sync version
1006+ if asyncio .iscoroutinefunction (agent .chat ):
1007+ response = await agent .achat (current_input )
1008+ else :
1009+ # Run sync function in a thread to avoid blocking
1010+ loop = asyncio .get_event_loop ()
1011+ response = await loop .run_in_executor (None , lambda : agent .chat (current_input ))
1012+
1013+ # Store this agent's result
1014+ results .append ({
1015+ "agent" : agent .name ,
1016+ "response" : response
1017+ })
1018+
1019+ # Use this response as input to the next agent
1020+ current_input = response
1021+ except Exception as e :
1022+ logging .error (f"Error with agent { agent .name } : { str (e )} " , exc_info = True )
1023+ results .append ({
1024+ "agent" : agent .name ,
1025+ "error" : str (e )
1026+ })
1027+ # Continue with original input if there's an error
1028+
1029+ # Return all results and the final output
1030+ return {
1031+ "query" : query ,
1032+ "results" : results ,
1033+ "final_response" : current_input
1034+ }
1035+ except Exception as e :
1036+ logging .error (f"Error processing query: { str (e )} " , exc_info = True )
1037+ return JSONResponse (
1038+ status_code = 500 ,
1039+ content = {"error" : f"Error processing query: { str (e )} " }
1040+ )
1041+
1042+ print (f"🚀 Multi-Agent API available at http://{ host } :{ port } { path } " )
1043+ agent_names = ", " .join ([agent .name for agent in self .agents ])
1044+ print (f"📊 Available agents ({ len (self .agents )} ): { agent_names } " )
1045+
1046+ # Start the server if it's not already running for this port
1047+ if not _agents_server_started .get (port , False ):
1048+ # Mark the server as started first to prevent duplicate starts
1049+ _agents_server_started [port ] = True
1050+
1051+ # Start the server in a separate thread
1052+ def run_server ():
1053+ try :
1054+ print (f"✅ FastAPI server started at http://{ host } :{ port } " )
1055+ print (f"📚 API documentation available at http://{ host } :{ port } /docs" )
1056+ print (f"🔌 Available endpoints: { ', ' .join (list (_agents_registered_endpoints [port ].keys ()))} " )
1057+ uvicorn .run (_agents_shared_apps [port ], host = host , port = port , log_level = "debug" if debug else "info" )
1058+ except Exception as e :
1059+ logging .error (f"Error starting server: { str (e )} " , exc_info = True )
1060+ print (f"❌ Error starting server: { str (e )} " )
1061+
1062+ # Run server in a background thread
1063+ server_thread = threading .Thread (target = run_server , daemon = True )
1064+ server_thread .start ()
1065+
1066+ # Wait for a moment to allow the server to start and register endpoints
1067+ time .sleep (0.5 )
1068+ else :
1069+ # If server is already running, wait a moment to make sure the endpoint is registered
1070+ time .sleep (0.1 )
1071+ print (f"🔌 Available endpoints on port { port } : { ', ' .join (list (_agents_registered_endpoints [port ].keys ()))} " )
1072+
1073+ # Get the stack frame to check if this is the last launch() call in the script
1074+ import inspect
1075+ stack = inspect .stack ()
1076+
1077+ # If this is called from a Python script (not interactive), try to detect if it's the last launch call
1078+ if len (stack ) > 1 and stack [1 ].filename .endswith ('.py' ):
1079+ caller_frame = stack [1 ]
1080+ caller_line = caller_frame .lineno
1081+
1082+ try :
1083+ # Read the file to check if there are more launch calls after this one
1084+ with open (caller_frame .filename , 'r' ) as f :
1085+ lines = f .readlines ()
1086+
1087+ # Check if there are more launch() calls after the current line
1088+ has_more_launches = False
1089+ for line in lines [caller_line :]:
1090+ if '.launch(' in line and not line .strip ().startswith ('#' ):
1091+ has_more_launches = True
1092+ break
1093+
1094+ # If this is the last launch call, block the main thread
1095+ if not has_more_launches :
1096+ try :
1097+ print ("\n All agents registered. Press Ctrl+C to stop the servers." )
1098+ while True :
1099+ time .sleep (1 )
1100+ except KeyboardInterrupt :
1101+ print ("\n Servers stopped" )
1102+ except Exception as e :
1103+ # If something goes wrong with detection, block anyway to be safe
1104+ logging .error (f"Error in launch detection: { e } " )
1105+ try :
1106+ print ("\n Keeping servers alive. Press Ctrl+C to stop." )
1107+ while True :
1108+ time .sleep (1 )
1109+ except KeyboardInterrupt :
1110+ print ("\n Servers stopped" )
1111+
1112+ return None
0 commit comments