@@ -3,7 +3,7 @@ use std::sync::Mutex;
33
44use futures_util:: StreamExt ;
55use serde:: { Deserialize , Serialize } ;
6- use tauri:: { ipc:: Channel , State } ;
6+ use tauri:: { ipc:: Channel , Emitter , State } ;
77use tokio_util:: sync:: CancellationToken ;
88
99use crate :: config:: defaults:: STRIP_PATTERNS ;
@@ -393,6 +393,16 @@ pub fn engine_start_error(detail: &str) -> EngineError {
393393/// [`apply_capability_filter`] path and stderr notice the cache-driven filter
394394/// uses, instead of letting the whole request fail.
395395///
396+ /// This request's own first content chunk (`Token`/`ThinkingToken`) is
397+ /// authoritative proof the model is warm, independent of the proactive
398+ /// warm-up prime (`crate::warmup::warm_builtin`), which can still be queued
399+ /// behind this same request at the engine's single execution slot. On that
400+ /// first chunk, `warm_state.mark_warmed_by_real_request` is consulted and
401+ /// `on_warmed` fires at most once, so a caller wired to emit
402+ /// `warmup:builtin-warmed` from it never leaves the Settings status stuck on
403+ /// "warming" for the duration of a response that raced ahead of its own
404+ /// prime.
405+ ///
396406/// Returns the accumulated assistant content (empty on the error paths) so
397407/// the caller's persistence tail treats every route identically.
398408#[ allow( clippy:: too_many_arguments) ]
@@ -404,6 +414,8 @@ pub(crate) async fn stream_builtin_chat(
404414 mut messages : Vec < ChatMessage > ,
405415 client : & reqwest:: Client ,
406416 cancel_token : CancellationToken ,
417+ warm_state : & crate :: warmup:: BuiltinWarmState ,
418+ on_warmed : impl Fn ( ) ,
407419 on_chunk : impl Fn ( StreamChunk ) ,
408420) -> String {
409421 engine. touch ( ) ;
@@ -436,6 +448,18 @@ pub(crate) async fn stream_builtin_chat(
436448 ) ;
437449 }
438450 }
451+ let warmed_announced = std:: sync:: atomic:: AtomicBool :: new ( false ) ;
452+ let on_chunk = |chunk : StreamChunk | {
453+ if !warmed_announced. load ( std:: sync:: atomic:: Ordering :: Relaxed )
454+ && matches ! ( chunk, StreamChunk :: Token ( _) | StreamChunk :: ThinkingToken ( _) )
455+ {
456+ warmed_announced. store ( true , std:: sync:: atomic:: Ordering :: Relaxed ) ;
457+ if warm_state. mark_warmed_by_real_request ( port) {
458+ on_warmed ( ) ;
459+ }
460+ }
461+ on_chunk ( chunk) ;
462+ } ;
439463 crate :: openai:: stream_openai_chat (
440464 crate :: openai:: OpenAiChatParams {
441465 base_url,
@@ -1152,6 +1176,8 @@ pub async fn ask_model(
11521176 model_store : State < ' _ , crate :: models:: storage:: ModelStore > ,
11531177 engine : State < ' _ , crate :: engine:: runner:: EngineHandle > ,
11541178 secrets : State < ' _ , crate :: keychain:: Secrets > ,
1179+ app : tauri:: AppHandle ,
1180+ warm_state : State < ' _ , crate :: warmup:: BuiltinWarmState > ,
11551181) -> Result < ( ) , String > {
11561182 // Snapshot the config once so all downstream reads (endpoint, prompt, model)
11571183 // see a consistent view even if the user edits Settings mid-stream.
@@ -1367,6 +1393,10 @@ pub async fn ask_model(
13671393 messages,
13681394 & client,
13691395 cancel_token. clone ( ) ,
1396+ & warm_state,
1397+ || {
1398+ let _ = app. emit ( "warmup:builtin-warmed" , ( ) ) ;
1399+ } ,
13701400 builtin_pump,
13711401 )
13721402 . await ;
@@ -1521,6 +1551,28 @@ mod tests {
15211551 ( chunks, callback)
15221552 }
15231553
1554+ /// Shared `stream_builtin_chat` `on_warmed` no-op for tests that never
1555+ /// reach a real streamed token (ensure fails/cancels, or the mocked
1556+ /// response has no content chunk). One source location shared across
1557+ /// every such call site, so `stream_builtin_chat_announces_warmed_*`
1558+ /// invoking the equivalent counting closure below is enough to prove
1559+ /// this shape is reachable - none of these individual call sites need to
1560+ /// invoke it themselves for coverage.
1561+ fn noop_on_warmed ( ) -> impl Fn ( ) {
1562+ || { }
1563+ }
1564+
1565+ /// Builds an `on_warmed` counter for tests: the returned closure
1566+ /// increments a shared count so a test can assert exactly how many times
1567+ /// `stream_builtin_chat` announced a warm-up.
1568+ fn warmed_counter ( ) -> ( Arc < AtomicU64 > , impl Fn ( ) ) {
1569+ let count = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
1570+ let count_cb = Arc :: clone ( & count) ;
1571+ ( count, move || {
1572+ count_cb. fetch_add ( 1 , Ordering :: Relaxed ) ;
1573+ } )
1574+ }
1575+
15241576 /// Helper: builds a `/api/chat` response line from content + done flag.
15251577 fn chat_line ( content : & str , done : bool ) -> String {
15261578 format ! (
@@ -3750,6 +3802,8 @@ mod tests {
37503802 vec ! [ ] ,
37513803 & client,
37523804 CancellationToken :: new ( ) ,
3805+ & crate :: warmup:: BuiltinWarmState :: default ( ) ,
3806+ noop_on_warmed ( ) ,
37533807 callback,
37543808 )
37553809 . await ;
@@ -3765,6 +3819,115 @@ mod tests {
37653819 engine. shutdown ( ) . await ;
37663820 }
37673821
3822+ #[ tokio:: test]
3823+ async fn stream_builtin_chat_announces_warmed_exactly_once_on_first_token ( ) {
3824+ let mut server = mockito:: Server :: new_async ( ) . await ;
3825+ let port: u16 = server
3826+ . url ( )
3827+ . rsplit ( ':' )
3828+ . next ( )
3829+ . unwrap ( )
3830+ . parse ( )
3831+ . expect ( "mockito url ends in a port" ) ;
3832+ let mock = server
3833+ . mock ( "POST" , "/v1/chat/completions" )
3834+ . with_header ( "content-type" , "text/event-stream" )
3835+ . with_body (
3836+ "data: {\" choices\" :[{\" delta\" :{\" content\" :\" Hi\" }}]}\n \n \
3837+ data: {\" choices\" :[{\" delta\" :{\" content\" :\" there\" }}]}\n \n \
3838+ data: [DONE]\n ",
3839+ )
3840+ . create_async ( )
3841+ . await ;
3842+
3843+ let engine = spawn_engine ( ScriptedEngineProcess {
3844+ port,
3845+ spawn_error : None ,
3846+ healthy : true ,
3847+ } ) ;
3848+ let client = reqwest:: Client :: new ( ) ;
3849+ let ( _chunks, callback) = collect_chunks ( ) ;
3850+ let warm_state = crate :: warmup:: BuiltinWarmState :: default ( ) ;
3851+ let ( warmed_count, on_warmed) = warmed_counter ( ) ;
3852+ stream_builtin_chat (
3853+ & engine,
3854+ engine_target ( ) ,
3855+ "org/repo:m.gguf" . to_string ( ) ,
3856+ false ,
3857+ vec ! [ ] ,
3858+ & client,
3859+ CancellationToken :: new ( ) ,
3860+ & warm_state,
3861+ on_warmed,
3862+ callback,
3863+ )
3864+ . await ;
3865+
3866+ mock. assert_async ( ) . await ;
3867+ assert_eq ! (
3868+ warmed_count. load( Ordering :: Relaxed ) ,
3869+ 1 ,
3870+ "two tokens stream but on_warmed fires only for the first"
3871+ ) ;
3872+ assert ! (
3873+ !warm_state. try_begin( port) ,
3874+ "the real request's first token marked this port as warmed"
3875+ ) ;
3876+ engine. shutdown ( ) . await ;
3877+ }
3878+
3879+ #[ tokio:: test]
3880+ async fn stream_builtin_chat_skips_on_warmed_when_the_port_is_already_marked ( ) {
3881+ let mut server = mockito:: Server :: new_async ( ) . await ;
3882+ let port: u16 = server
3883+ . url ( )
3884+ . rsplit ( ':' )
3885+ . next ( )
3886+ . unwrap ( )
3887+ . parse ( )
3888+ . expect ( "mockito url ends in a port" ) ;
3889+ let mock = server
3890+ . mock ( "POST" , "/v1/chat/completions" )
3891+ . with_header ( "content-type" , "text/event-stream" )
3892+ . with_body ( "data: {\" choices\" :[{\" delta\" :{\" content\" :\" Hi\" }}]}\n \n data: [DONE]\n " )
3893+ . create_async ( )
3894+ . await ;
3895+
3896+ let engine = spawn_engine ( ScriptedEngineProcess {
3897+ port,
3898+ spawn_error : None ,
3899+ healthy : true ,
3900+ } ) ;
3901+ let client = reqwest:: Client :: new ( ) ;
3902+ let ( _chunks, callback) = collect_chunks ( ) ;
3903+ let warm_state = crate :: warmup:: BuiltinWarmState :: default ( ) ;
3904+ // A proactive prime already announced this port as warmed before the
3905+ // real request's first token arrives.
3906+ assert ! ( warm_state. mark_warmed_by_real_request( port) ) ;
3907+ let ( warmed_count, on_warmed) = warmed_counter ( ) ;
3908+ stream_builtin_chat (
3909+ & engine,
3910+ engine_target ( ) ,
3911+ "org/repo:m.gguf" . to_string ( ) ,
3912+ false ,
3913+ vec ! [ ] ,
3914+ & client,
3915+ CancellationToken :: new ( ) ,
3916+ & warm_state,
3917+ on_warmed,
3918+ callback,
3919+ )
3920+ . await ;
3921+
3922+ mock. assert_async ( ) . await ;
3923+ assert_eq ! (
3924+ warmed_count. load( Ordering :: Relaxed ) ,
3925+ 0 ,
3926+ "the port was already announced warmed; no redundant emit"
3927+ ) ;
3928+ engine. shutdown ( ) . await ;
3929+ }
3930+
37683931 #[ tokio:: test]
37693932 async fn superseded_ensure_emits_cancelled ( ) {
37703933 // Health probes hang, so the ensure stays in flight until the
@@ -3788,6 +3951,8 @@ mod tests {
37883951 vec ! [ ] ,
37893952 & client,
37903953 CancellationToken :: new ( ) ,
3954+ & crate :: warmup:: BuiltinWarmState :: default ( ) ,
3955+ noop_on_warmed ( ) ,
37913956 callback,
37923957 )
37933958 . await
@@ -3842,6 +4007,8 @@ mod tests {
38424007 vec ! [ ] ,
38434008 & client,
38444009 cancel_token,
4010+ & crate :: warmup:: BuiltinWarmState :: default ( ) ,
4011+ noop_on_warmed ( ) ,
38454012 callback,
38464013 )
38474014 . await
@@ -3887,6 +4054,8 @@ mod tests {
38874054 vec ! [ ] ,
38884055 & client,
38894056 CancellationToken :: new ( ) ,
4057+ & crate :: warmup:: BuiltinWarmState :: default ( ) ,
4058+ noop_on_warmed ( ) ,
38904059 callback,
38914060 )
38924061 . await ;
@@ -4028,6 +4197,8 @@ mod tests {
40284197 image_message ( ) ,
40294198 & client,
40304199 CancellationToken :: new ( ) ,
4200+ & crate :: warmup:: BuiltinWarmState :: default ( ) ,
4201+ noop_on_warmed ( ) ,
40314202 callback,
40324203 )
40334204 . await ;
0 commit comments