2222from workers .utils .text_utils import clean_llm_output
2323import re
2424from workers .tasks import refine_and_plan_ai_task
25+ from main .voice .utils import translate_text
2526
2627logger = logging .getLogger (__name__ )
2728
@@ -462,41 +463,60 @@ def worker():
462463async def process_voice_command (
463464 user_id : str ,
464465 transcribed_text : str ,
466+ detected_language : Optional [str ],
465467 send_status_update : Callable [[Dict [str , Any ]], Coroutine [Any , Any , None ]],
466468 db_manager : MongoManager
467469) -> Tuple [str , str ]:
468470 """
469- Processes a transcribed voice command. It first classifies the intent as simple or complex.
470- Complex tasks are offloaded to the background task system. Simple requests are handled
471- synchronously by a specialized voice agent for a fast response.
471+ Processes a transcribed voice command, including multilingual translation.
472472 """
473473 assistant_message_id = str (uuid .uuid4 ())
474- logger .info (f"Processing voice command for user { user_id } : '{ transcribed_text } '" )
474+ logger .info (f"Processing voice command for user { user_id } : '{ transcribed_text } ' (Language: { detected_language } )" )
475+
476+ original_language = detected_language .split ('-' )[0 ] if detected_language else 'en'
477+ text_for_llm = transcribed_text
475478
476479 try :
477- # 1. Save user message and create a placeholder for the assistant's response.
480+ # 1. Translate to English if necessary
481+ if original_language != 'en' and text_for_llm :
482+ await send_status_update ({"type" : "status" , "message" : "translating" })
483+ logger .info (f"Translating from '{ original_language } ' to 'en'." )
484+ text_for_llm = await translate_text (text_for_llm , target_language = 'en' , source_language = original_language )
485+ logger .info (f"Translated text for LLM: '{ text_for_llm } '" )
486+
487+ if not text_for_llm :
488+ logger .warning ("Text for LLM is empty after translation or initially. Aborting." )
489+ return "I'm sorry, I didn't catch that." , assistant_message_id
490+
491+ # 2. Save user message (original transcription) and create placeholder for assistant's response.
478492 await db_manager .add_message (user_id = user_id , role = "user" , content = transcribed_text )
479493 await db_manager .add_message (user_id = user_id , role = "assistant" , content = "[Thinking...]" , message_id = assistant_message_id )
480494
481- # 2 . Get conversation history for the Stage 1 intent classification.
495+ # 3 . Get conversation history for Stage 1 intent classification.
482496 history_from_db = await db_manager .get_message_history (user_id , limit = 10 )
483497 messages_for_stage1 = list (reversed (history_from_db ))
498+ # Replace the last user message content with the translated version for the LLM
499+ if messages_for_stage1 and messages_for_stage1 [- 1 ]['role' ] == 'user' :
500+ messages_for_stage1 [- 1 ]['content' ] = text_for_llm
484501
485- # 3 . Run Voice Stage 1 to classify intent.
502+ # 4 . Run Voice Stage 1 to classify intent.
486503 await send_status_update ({"type" : "status" , "message" : "thinking" })
487504 stage1_result = await _get_voice_stage1_response (messages_for_stage1 , user_id )
488505 intent_type = stage1_result .get ("intent_type" , "simple_request" )
489506
490- # 4. Handle Complex Tasks: Offload to Celery and provide immediate feedback.
507+ llm_response_text = ""
508+ final_turn_steps = []
509+
510+ # 5. Handle Complex Tasks or Simple Requests
491511 if intent_type == "complex_task" :
492512 logger .info (f"Voice command for user { user_id } classified as COMPLEX. Offloading to task system." )
493- task_summary = stage1_result .get ("summary_for_task" , transcribed_text )
513+ task_summary = stage1_result .get ("summary_for_task" , text_for_llm )
494514
495515 task_data = {
496516 "name" : task_summary ,
497517 "description" : f"Task created from voice command: { transcribed_text } " ,
498518 "task_type" : "single" ,
499- "original_context" : {"source" : "voice_command" , "prompt" : transcribed_text }
519+ "original_context" : {"source" : "voice_command" , "prompt" : text_for_llm }
500520 }
501521 new_task_id = await db_manager .add_task (user_id , task_data )
502522
@@ -505,85 +525,88 @@ async def process_voice_command(
505525
506526 refine_and_plan_ai_task .delay (new_task_id , user_id )
507527
508- final_text_for_tts = "I've added a task for that. I'll let you know when it's complete."
528+ llm_response_text = "I've added a task for that. I'll let you know when it's complete."
529+ else :
530+ # Handle Simple Requests
531+ logger .info (f"Voice command for user { user_id } classified as SIMPLE. Processing synchronously." )
509532
510- await db_manager .messages_collection .update_one (
511- {"message_id" : assistant_message_id },
512- {"$set" : {"content" : final_text_for_tts }}
513- )
514- return final_text_for_tts , assistant_message_id
515-
516- # 5. Handle Simple Requests: Execute synchronously with a specialized voice agent.
517- logger .info (f"Voice command for user { user_id } classified as SIMPLE. Processing synchronously." )
518-
519- user_profile = await db_manager .get_user_profile (user_id )
520- user_data = user_profile .get ("userData" , {}) if user_profile else {}
521- personal_info = user_data .get ("personalInfo" , {})
522- username = personal_info .get ("name" , "User" )
523- timezone_str = personal_info .get ("timezone" , "UTC" )
524- location_raw = personal_info .get ("location" )
525- location = str (location_raw ) if location_raw else "Not specified"
526- try :
527- user_timezone = ZoneInfo (timezone_str )
528- except ZoneInfoNotFoundError :
529- user_timezone = ZoneInfo ("UTC" )
530- current_user_time = datetime .datetime .now (user_timezone ).strftime ('%Y-%m-%d %H:%M:%S %Z' )
531-
532- relevant_tool_names = stage1_result .get ("tools" , [])
533- mandatory_tools = {"memory" , "history" } # Tasks tool is excluded for simple requests
534- final_tool_names = set (relevant_tool_names ) | mandatory_tools
533+ user_profile = await db_manager .get_user_profile (user_id )
534+ user_data = user_profile .get ("userData" , {}) if user_profile else {}
535+ personal_info = user_data .get ("personalInfo" , {})
536+ username = personal_info .get ("name" , "User" )
537+ timezone_str = personal_info .get ("timezone" , "UTC" )
538+ location_raw = personal_info .get ("location" )
539+ location = str (location_raw ) if location_raw else "Not specified"
540+ try :
541+ user_timezone = ZoneInfo (timezone_str )
542+ except ZoneInfoNotFoundError :
543+ user_timezone = ZoneInfo ("UTC" )
544+ current_user_time = datetime .datetime .now (user_timezone ).strftime ('%Y-%m-%d %H:%M:%S %Z' )
545+
546+ relevant_tool_names = stage1_result .get ("tools" , [])
547+ mandatory_tools = {"memory" , "history" }
548+ final_tool_names = set (relevant_tool_names ) | mandatory_tools
549+
550+ filtered_mcp_servers = {}
551+ for tool_name in final_tool_names :
552+ config = INTEGRATIONS_CONFIG .get (tool_name , {})
553+ if config :
554+ mcp_config = config .get ("mcp_server_config" , {})
555+ if mcp_config and mcp_config .get ("url" ) and mcp_config .get ("name" ):
556+ server_name = mcp_config ["name" ]
557+ filtered_mcp_servers [server_name ] = {"url" : mcp_config ["url" ], "headers" : {"X-User-ID" : user_id }, "transport" : "sse" }
558+
559+ tools = [{"mcpServers" : filtered_mcp_servers }]
560+ logger .info (f"Voice Stage 2 Tools: { list (filtered_mcp_servers .keys ())} " )
561+
562+ system_prompt = VOICE_STAGE_2_SYSTEM_PROMPT .format (username = username , location = location , current_user_time = current_user_time )
563+
564+ loop = asyncio .get_running_loop ()
565+ def agent_worker ():
566+ final_run_response = None
567+ try :
568+ for response in run_agent (system_message = system_prompt , function_list = tools , messages = messages_for_stage1 ):
569+ final_run_response = response
570+ if isinstance (response , list ) and response :
571+ last_message = response [- 1 ]
572+ if last_message .get ('role' ) == 'assistant' and last_message .get ('function_call' ):
573+ tool_name = last_message ['function_call' ]['name' ]
574+ asyncio .run_coroutine_threadsafe (send_status_update ({"type" : "status" , "message" : f"using_tool_{ tool_name } " }), loop )
575+ return final_run_response
576+ except Exception as e :
577+ logger .error (f"Error in voice agent_worker thread: { e } " , exc_info = True )
578+ return None
579+
580+ final_run_response = await asyncio .to_thread (agent_worker )
581+
582+ if not final_run_response or not isinstance (final_run_response , list ):
583+ final_run_response = []
584+
585+ assistant_turn_start_index = next ((i + 1 for i in range (len (final_run_response ) - 1 , - 1 , - 1 ) if final_run_response [i ].get ('role' ) == 'user' ), 0 )
586+ assistant_messages = final_run_response [assistant_turn_start_index :]
587+
588+ parsed_response = parse_assistant_response (assistant_messages )
589+ llm_response_text = parsed_response .get ("final_content" , "I'm sorry, I couldn't process that." )
590+ final_turn_steps = parsed_response .get ("turn_steps" , [])
591+
592+ if not llm_response_text and assistant_messages :
593+ last_message = assistant_messages [- 1 ]
594+ if last_message .get ('role' ) == 'function' :
595+ llm_response_text = "The action has been completed."
535596
536- filtered_mcp_servers = {}
537- for tool_name in final_tool_names :
538- config = INTEGRATIONS_CONFIG .get (tool_name , {})
539- if config :
540- mcp_config = config .get ("mcp_server_config" , {})
541- if mcp_config and mcp_config .get ("url" ) and mcp_config .get ("name" ):
542- server_name = mcp_config ["name" ]
543- filtered_mcp_servers [server_name ] = {"url" : mcp_config ["url" ], "headers" : {"X-User-ID" : user_id }, "transport" : "sse" }
597+ # 6. Translate response back to original language if necessary
598+ final_text_for_tts = llm_response_text
599+ if original_language != 'en' and llm_response_text :
600+ logger .info (f"Translating LLM response back to '{ original_language } '." )
601+ final_text_for_tts = await translate_text (llm_response_text , target_language = original_language , source_language = 'en' )
602+ logger .info (f"Final translated text for TTS: '{ final_text_for_tts } '" )
544603
545- tools = [{"mcpServers" : filtered_mcp_servers }]
546- logger .info (f"Voice Stage 2 Tools: { list (filtered_mcp_servers .keys ())} " )
547-
548- system_prompt = VOICE_STAGE_2_SYSTEM_PROMPT .format (username = username , location = location , current_user_time = current_user_time )
549-
550- loop = asyncio .get_running_loop ()
551- def agent_worker ():
552- final_run_response = None
553- try :
554- for response in run_agent (system_message = system_prompt , function_list = tools , messages = messages_for_stage1 ):
555- final_run_response = response
556- if isinstance (response , list ) and response :
557- last_message = response [- 1 ]
558- if last_message .get ('role' ) == 'assistant' and last_message .get ('function_call' ):
559- tool_name = last_message ['function_call' ]['name' ]
560- asyncio .run_coroutine_threadsafe (send_status_update ({"type" : "status" , "message" : f"using_tool_{ tool_name } " }), loop )
561- return final_run_response
562- except Exception as e :
563- logger .error (f"Error in voice agent_worker thread: { e } " , exc_info = True )
564- return None
565-
566- final_run_response = await asyncio .to_thread (agent_worker )
567-
568- if not final_run_response or not isinstance (final_run_response , list ):
569- final_run_response = []
570-
571- assistant_turn_start_index = next ((i + 1 for i in range (len (final_run_response ) - 1 , - 1 , - 1 ) if final_run_response [i ].get ('role' ) == 'user' ), 0 )
572- assistant_messages = final_run_response [assistant_turn_start_index :]
573-
574- parsed_response = parse_assistant_response (assistant_messages )
575- final_text_for_tts = parsed_response .get ("final_content" , "I'm sorry, I couldn't process that." )
576-
577- if not final_text_for_tts and assistant_messages :
578- last_message = assistant_messages [- 1 ]
579- if last_message .get ('role' ) == 'function' :
580- final_text_for_tts = "The action has been completed."
581-
604+ # 7. Update assistant message in DB with the final text for TTS
582605 await db_manager .messages_collection .update_one (
583606 {"message_id" : assistant_message_id , "user_id" : user_id },
584607 {"$set" : {
585608 "content" : final_text_for_tts ,
586- "turn_steps" : parsed_response . get ( "turn_steps" , [])
609+ "turn_steps" : final_turn_steps
587610 }}
588611 )
589612
@@ -592,8 +615,12 @@ def agent_worker():
592615 except Exception as e :
593616 logger .error (f"Error processing voice command for { user_id } : { e } " , exc_info = True )
594617 error_msg = "I encountered an error while processing your request."
618+ final_error_msg = error_msg
619+ if original_language != 'en' :
620+ final_error_msg = await translate_text (error_msg , target_language = original_language )
621+
595622 await db_manager .messages_collection .update_one (
596623 {"message_id" : assistant_message_id },
597- {"$set" : {"content" : error_msg }}
624+ {"$set" : {"content" : final_error_msg }}
598625 )
599- return error_msg , assistant_message_id
626+ return final_error_msg , assistant_message_id
0 commit comments