3838from v4 .models .messages import WebsocketMessageType
3939from v4 .orchestration .human_approval_manager import HumanApprovalMagenticManager
4040from v4 .magentic_agents .magentic_agent_factory import MagenticAgentFactory
41+ from common .database .database_factory import DatabaseFactory
42+ from v4 .models .models import PlanStatus
4143
4244
4345class OrchestrationManager :
@@ -47,6 +49,7 @@ class OrchestrationManager:
4749
4850 def __init__ (self ):
4951 self .user_id : Optional [str ] = None
52+ self ._plan_id : Optional [str ] = None
5053 self .logger = self .__class__ .logger
5154
5255 def _extract_response_text (self , data ) -> str :
@@ -293,10 +296,11 @@ async def get_current_or_new_orchestration(
293296 # ---------------------------
294297 # Execution
295298 # ---------------------------
296- async def run_orchestration (self , user_id : str , input_task ) -> None :
299+ async def run_orchestration (self , user_id : str , input_task , plan_id : str = None ) -> None :
297300 """
298301 Execute the Magentic workflow for the provided user and task description.
299302 """
303+ self ._plan_id = plan_id
300304 job_id = str (uuid .uuid4 ())
301305 orchestration_config .set_approval_pending (job_id )
302306 self .logger .info (
@@ -545,19 +549,50 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
545549 self .logger .error ("Error attributes: %s" , e .__dict__ )
546550 self .logger .info ("=" * 50 )
547551
548- # Send error status to user
552+ # Build a user-friendly error message
553+ error_str = str (e )
554+ if "Too Many Requests" in error_str or "429" in error_str :
555+ user_error_message = (
556+ "The service is currently experiencing high demand (rate limit exceeded). "
557+ "Please wait a moment and try again."
558+ )
559+ elif "timeout" in error_str .lower ():
560+ user_error_message = (
561+ "The request timed out while processing. Please try again."
562+ )
563+ elif "conflict" in error_str .lower () or "modified concurrently" in error_str .lower ():
564+ user_error_message = (
565+ "A conflict occurred while processing your request. "
566+ "The resource was modified by another operation. Please start a new task and try again."
567+ )
568+ else :
569+ user_error_message = "An error occurred while processing your request. Please start a new task and try again."
570+
571+ # Update plan status to failed in the database
572+ try :
573+ if self ._plan_id :
574+ memory_store = await DatabaseFactory .get_database (user_id = user_id )
575+ plan = await memory_store .get_plan_by_plan_id (plan_id = self ._plan_id )
576+ if plan :
577+ plan .overall_status = PlanStatus .FAILED
578+ await memory_store .update_plan (plan )
579+ self .logger .info ("Plan '%s' status updated to FAILED" , self ._plan_id )
580+ except Exception as db_error :
581+ self .logger .error ("Failed to update plan status to FAILED: %s" , db_error )
582+
583+ # Send error status to user via ERROR_MESSAGE type
549584 try :
550585 await connection_config .send_status_update_async (
551586 {
552- "type" : WebsocketMessageType .FINAL_RESULT_MESSAGE ,
587+ "type" : WebsocketMessageType .ERROR_MESSAGE ,
553588 "data" : {
554- "content" : f"Error during orchestration: { str ( e ) } " ,
589+ "content" : user_error_message ,
555590 "status" : "error" ,
556591 "timestamp" : asyncio .get_event_loop ().time (),
557592 },
558593 },
559594 user_id ,
560- message_type = WebsocketMessageType .FINAL_RESULT_MESSAGE ,
595+ message_type = WebsocketMessageType .ERROR_MESSAGE ,
561596 )
562597 except Exception as send_error :
563598 self .logger .error ("Failed to send error status: %s" , send_error )
0 commit comments