@@ -3,7 +3,10 @@ use std::fs;
33use std:: io;
44use std:: process:: Command ;
55use std:: path:: Path ;
6- use crate :: llm:: { self , Message } ;
6+ use std:: sync:: Arc ;
7+ use futures_util:: StreamExt ;
8+ use tokio:: sync:: Mutex ;
9+ use crate :: llm:: Message ;
710use crate :: config:: LlmConfig ;
811
912#[ derive( Serialize , Deserialize , Debug ) ]
@@ -683,6 +686,7 @@ edition = "2021"
683686 }
684687}
685688
689+ #[ derive( Clone ) ]
686690pub struct Agent {
687691 messages : Vec < Message > ,
688692}
@@ -1066,7 +1070,11 @@ TOOL: {"name": "RUN_COMMAND", "parameters": {"command": "cargo build --release"}
10661070 None
10671071 }
10681072
1069- pub async fn run ( & mut self , config : & LlmConfig , user_prompt : String , app : & mut crate :: app:: App ) -> Result < ( String , Vec < String > ) , Box < dyn std:: error:: Error > > {
1073+ pub async fn run ( & mut self , config : & LlmConfig , user_prompt : String , app : Arc < Mutex < crate :: app:: App > > ) -> Result < ( String , Vec < String > ) , Box < dyn std:: error:: Error + Send + Sync > > {
1074+ self . run_with_streaming ( config, user_prompt, app) . await
1075+ }
1076+
1077+ pub async fn run_with_streaming ( & mut self , config : & LlmConfig , user_prompt : String , app : Arc < Mutex < crate :: app:: App > > ) -> Result < ( String , Vec < String > ) , Box < dyn std:: error:: Error + Send + Sync > > {
10701078 // Add system message if this is the first interaction
10711079 if self . messages . is_empty ( ) {
10721080 self . messages . push ( Message {
@@ -1110,20 +1118,62 @@ TOOL: {"name": "RUN_COMMAND", "parameters": {"command": "cargo build --release"}
11101118
11111119 let mut all_tool_logs = Vec :: new ( ) ;
11121120 let mut attempts = 0 ;
1113- const MAX_ATTEMPTS : usize = 8 ; // Increased for better error recovery
1121+ const MAX_ATTEMPTS : usize = 8 ;
11141122
11151123 loop {
11161124 attempts += 1 ;
1117-
1118- // Get response from LLM
1119- let ( response, tokens_used) = llm:: ask_llm_with_messages ( config, & self . messages ) . await ?;
1120- app. increment_tokens ( tokens_used) ;
1121- app. increment_requests ( ) ;
1125+
1126+ // Start streaming for this response
1127+ {
1128+ let mut app_guard = app. lock ( ) . await ;
1129+ app_guard. start_streaming ( ) ;
1130+ }
1131+
1132+ // Create a string to collect the full response
1133+ let mut full_response = String :: new ( ) ;
1134+
1135+ // Get streaming response from LLM
1136+ let mut stream = match crate :: llm:: stream_llm_response ( config, & self . messages ) . await {
1137+ Ok ( stream) => stream,
1138+ Err ( e) => {
1139+ let mut app_guard = app. lock ( ) . await ;
1140+ app_guard. finish_streaming ( "Error: Failed to start streaming response" . to_string ( ) ) ;
1141+ return Err ( Box :: new ( e) ) ;
1142+ }
1143+ } ;
1144+
1145+ // Collect tokens from the stream
1146+ let mut token_stream = String :: new ( ) ;
1147+ while let Some ( chunk_result) = stream. next ( ) . await {
1148+ match chunk_result {
1149+ Ok ( chunk) => {
1150+ if !chunk. is_empty ( ) {
1151+ token_stream. push_str ( & chunk) ;
1152+ full_response. push_str ( & chunk) ;
1153+ // Update streaming message with brief lock
1154+ {
1155+ let mut app_guard = app. lock ( ) . await ;
1156+ app_guard. update_streaming_message ( & chunk) ;
1157+ }
1158+ }
1159+ }
1160+ Err ( e) => {
1161+ let mut app_guard = app. lock ( ) . await ;
1162+ app_guard. finish_streaming ( format ! ( "Error in streaming: {}" , e) ) ;
1163+ return Err ( Box :: new ( e) ) ;
1164+ }
1165+ }
1166+ }
1167+
1168+ {
1169+ let mut app_guard = app. lock ( ) . await ;
1170+ app_guard. increment_requests ( ) ;
1171+ }
11221172
11231173 // Check if response contains a tool call
1124- if let Some ( tool) = self . parse_tool_call ( & response ) {
1174+ if let Some ( tool) = self . parse_tool_call ( & full_response ) {
11251175 let mut tool_logs = Vec :: new ( ) ;
1126-
1176+
11271177 // Log the tool execution
11281178 let tool_name = match & tool {
11291179 Tool :: ReadFile { path } => format ! ( "READ_FILE {}" , path) ,
@@ -1150,40 +1200,45 @@ TOOL: {"name": "RUN_COMMAND", "parameters": {"command": "cargo build --release"}
11501200 Tool :: ClearPlan => "CLEAR_PLAN" . to_string ( ) ,
11511201 } ;
11521202 tool_logs. push ( format ! ( "🔧 Attempt {}: Executing {}" , attempts, tool_name) ) ;
1153-
1203+
11541204 // Execute the tool
11551205 let tool_result = match tool. execute ( ) {
11561206 Ok ( result) => {
1157- app. increment_tools_executed ( ) ;
1207+ {
1208+ let mut app_guard = app. lock ( ) . await ;
1209+ app_guard. increment_tools_executed ( ) ;
1210+ }
11581211 tool_logs. push ( format ! ( "✅ Success: {}" , result) ) ;
11591212 result
11601213 }
11611214 Err ( e) => {
11621215 let error_msg = format ! ( "❌ Error: {}" , e) ;
11631216 tool_logs. push ( error_msg. clone ( ) ) ;
1164- // Add error context to help the model understand what went wrong
11651217 format ! ( "Tool failed: {}. Please try a different approach or check if the path/command is correct." , e)
11661218 }
11671219 } ;
11681220 all_tool_logs. extend ( tool_logs) ;
1169-
1221+
11701222 // Add assistant message and tool result to conversation
11711223 self . messages . push ( Message {
11721224 role : "assistant" . to_string ( ) ,
1173- content : response ,
1225+ content : full_response . clone ( ) ,
11741226 } ) ;
1175-
1227+
11761228 self . messages . push ( Message {
11771229 role : "user" . to_string ( ) ,
11781230 content : format ! ( "Tool result: {}" , tool_result) ,
11791231 } ) ;
11801232
11811233 // Check if we should continue or if the task is complete
11821234 if attempts >= MAX_ATTEMPTS {
1183- // Get final response after max attempts
1184- let ( final_response, final_tokens) = llm:: ask_llm_with_messages ( config, & self . messages ) . await ?;
1185- app. increment_tokens ( final_tokens) ;
1186- app. increment_requests ( ) ;
1235+ // Get final response after max attempts (non-streaming for final response)
1236+ let ( final_response, final_tokens) = crate :: llm:: ask_llm_with_messages ( config, & self . messages ) . await ?;
1237+ {
1238+ let mut app_guard = app. lock ( ) . await ;
1239+ app_guard. increment_tokens ( final_tokens) ;
1240+ app_guard. finish_streaming ( final_response. clone ( ) ) ;
1241+ }
11871242
11881243 self . messages . push ( Message {
11891244 role : "assistant" . to_string ( ) ,
@@ -1200,14 +1255,18 @@ TOOL: {"name": "RUN_COMMAND", "parameters": {"command": "cargo build --release"}
12001255 // No tool call, task appears to be complete
12011256 self . messages . push ( Message {
12021257 role : "assistant" . to_string ( ) ,
1203- content : response . clone ( ) ,
1258+ content : full_response . clone ( ) ,
12041259 } ) ;
1205-
1260+
12061261 if attempts > 1 {
12071262 all_tool_logs. push ( format ! ( "✅ Task completed after {} attempts" , attempts) ) ;
12081263 }
1209-
1210- return Ok ( ( response, all_tool_logs) ) ;
1264+
1265+ {
1266+ let mut app_guard = app. lock ( ) . await ;
1267+ app_guard. finish_streaming ( full_response. clone ( ) ) ;
1268+ }
1269+ return Ok ( ( full_response, all_tool_logs) ) ;
12111270 }
12121271 }
12131272 }
0 commit comments