@@ -47,6 +47,7 @@ use std::sync::Arc;
4747
4848use ag_ui_core:: { Event , JsonValue , RunId , ThreadId } ;
4949use axum:: { routing:: { get, post} , Router } ;
50+ use tower_http:: cors:: { Any , CorsLayer } ;
5051use tokio:: sync:: { broadcast, mpsc, RwLock } ;
5152
5253pub use bridge:: EventBridge ;
@@ -255,12 +256,20 @@ impl AgUiServer {
255256 }
256257 }
257258
259+ // Configure CORS to allow requests from any origin (for development)
260+ let cors = CorsLayer :: new ( )
261+ . allow_origin ( Any )
262+ . allow_methods ( Any )
263+ . allow_headers ( Any ) ;
264+
258265 let app = Router :: new ( )
259- . route ( "/" , get ( routes:: health) )
266+ . route ( "/" , get ( routes:: health) . post ( routes:: post_message) )
267+ . route ( "/info" , get ( routes:: info) )
260268 . route ( "/sse" , get ( routes:: sse_handler) )
261269 . route ( "/ws" , get ( routes:: ws_handler) )
262270 . route ( "/message" , post ( routes:: post_message) )
263271 . route ( "/health" , get ( routes:: health) )
272+ . layer ( cors)
264273 . with_state ( self . state ) ;
265274
266275 println ! ( "AG-UI server listening on http://{}" , addr) ;
@@ -439,4 +448,212 @@ mod tests {
439448 handle
440449 ) . await ;
441450 }
451+
452+ // =============================================================================
453+ // E2E Integration Tests (Phase 25)
454+ // =============================================================================
455+
456+ /// Helper to collect events until RunFinished or RunError
457+ async fn collect_until_finished ( rx : & mut tokio:: sync:: broadcast:: Receiver < ag_ui_core:: Event > ) -> Vec < ag_ui_core:: Event > {
458+ use ag_ui_core:: Event ;
459+ let mut events = Vec :: new ( ) ;
460+ loop {
461+ match tokio:: time:: timeout ( std:: time:: Duration :: from_secs ( 5 ) , rx. recv ( ) ) . await {
462+ Ok ( Ok ( event) ) => {
463+ let is_finished = matches ! ( & event, Event :: RunFinished ( _) | Event :: RunError ( _) ) ;
464+ events. push ( event) ;
465+ if is_finished { break ; }
466+ }
467+ _ => break ,
468+ }
469+ }
470+ events
471+ }
472+
473+ /// Helper to drain events until run is finished
474+ async fn drain_events_until_run_finished ( rx : & mut tokio:: sync:: broadcast:: Receiver < ag_ui_core:: Event > ) {
475+ use ag_ui_core:: Event ;
476+ loop {
477+ match tokio:: time:: timeout ( std:: time:: Duration :: from_secs ( 30 ) , rx. recv ( ) ) . await {
478+ Ok ( Ok ( Event :: RunFinished ( _) ) ) => break ,
479+ Ok ( Ok ( Event :: RunError ( _) ) ) => break ,
480+ Ok ( Ok ( _) ) => continue ,
481+ _ => panic ! ( "Timeout or error waiting for RunFinished" ) ,
482+ }
483+ }
484+ }
485+
486+ #[ tokio:: test]
487+ async fn test_multi_turn_conversation ( ) {
488+ use ag_ui_core:: types:: { Message , RunAgentInput } ;
489+
490+ // Create state and components
491+ let state = ServerState :: new ( ) ;
492+ let msg_tx = state. message_sender ( ) ;
493+ let mut event_rx = state. subscribe ( ) ;
494+ let msg_rx = state. take_message_receiver ( ) . await . expect ( "Should get receiver" ) ;
495+
496+ // Create processor
497+ let event_bridge = state. event_sender ( ) ;
498+ let mut processor = AgentProcessor :: with_defaults ( msg_rx, event_bridge) ;
499+
500+ let handle = tokio:: spawn ( async move {
501+ processor. run ( ) . await ;
502+ } ) ;
503+
504+ let thread_id = ThreadId :: random ( ) ;
505+
506+ // Send first message
507+ let input1 = RunAgentInput :: new ( thread_id. clone ( ) , RunId :: random ( ) )
508+ . with_messages ( vec ! [ Message :: new_user( "Hello" ) ] ) ;
509+ msg_tx. send ( AgentMessage :: new ( input1) ) . await . expect ( "Should send" ) ;
510+
511+ // Wait for first response
512+ drain_events_until_run_finished ( & mut event_rx) . await ;
513+
514+ // Send follow-up message (same thread)
515+ let input2 = RunAgentInput :: new ( thread_id. clone ( ) , RunId :: random ( ) )
516+ . with_messages ( vec ! [ Message :: new_user( "Follow up message" ) ] ) ;
517+ msg_tx. send ( AgentMessage :: new ( input2) ) . await . expect ( "Should send" ) ;
518+
519+ // Verify second run completes
520+ drain_events_until_run_finished ( & mut event_rx) . await ;
521+
522+ drop ( msg_tx) ;
523+ let _ = tokio:: time:: timeout ( std:: time:: Duration :: from_millis ( 200 ) , handle) . await ;
524+ }
525+
526+ #[ tokio:: test]
527+ async fn test_event_sequence ( ) {
528+ use ag_ui_core:: types:: { Message , RunAgentInput } ;
529+ use ag_ui_core:: Event ;
530+
531+ // Setup server state
532+ let state = ServerState :: new ( ) ;
533+ let msg_tx = state. message_sender ( ) ;
534+ let mut event_rx = state. subscribe ( ) ;
535+ let msg_rx = state. take_message_receiver ( ) . await . expect ( "receiver" ) ;
536+ let event_bridge = state. event_sender ( ) ;
537+ let mut processor = AgentProcessor :: with_defaults ( msg_rx, event_bridge) ;
538+
539+ tokio:: spawn ( async move { processor. run ( ) . await ; } ) ;
540+
541+ // Send message
542+ let thread_id = ThreadId :: random ( ) ;
543+ let input = RunAgentInput :: new ( thread_id, RunId :: random ( ) )
544+ . with_messages ( vec ! [ Message :: new_user( "Test event sequence" ) ] ) ;
545+ msg_tx. send ( AgentMessage :: new ( input) ) . await . unwrap ( ) ;
546+
547+ // Collect events
548+ let events = collect_until_finished ( & mut event_rx) . await ;
549+
550+ // Verify sequence
551+ assert ! ( !events. is_empty( ) , "Should receive at least one event" ) ;
552+ assert ! ( matches!( events[ 0 ] , Event :: RunStarted ( _) ) , "First event should be RunStarted" ) ;
553+
554+ // Should end with RunFinished or RunError
555+ assert ! (
556+ matches!( events. last( ) , Some ( Event :: RunFinished ( _) | Event :: RunError ( _) ) ) ,
557+ "Last event should be RunFinished or RunError"
558+ ) ;
559+
560+ // When successful (API key available), we expect at least:
561+ // RunStarted -> StepStarted -> StepFinished -> TextMessageStart -> TextMessageContent* -> TextMessageEnd -> RunFinished
562+ // Without API key, we get: RunStarted -> StepStarted -> StepFinished -> RunError
563+ // Either way, verify we have multiple events
564+ assert ! ( events. len( ) >= 2 , "Should have at least RunStarted and terminal event" ) ;
565+
566+ drop ( msg_tx) ;
567+ }
568+
569+ #[ tokio:: test]
570+ async fn test_empty_message_error ( ) {
571+ use ag_ui_core:: types:: RunAgentInput ;
572+ use ag_ui_core:: Event ;
573+
574+ let state = ServerState :: new ( ) ;
575+ let msg_tx = state. message_sender ( ) ;
576+ let mut event_rx = state. subscribe ( ) ;
577+ let msg_rx = state. take_message_receiver ( ) . await . expect ( "receiver" ) ;
578+ let event_bridge = state. event_sender ( ) ;
579+ let mut processor = AgentProcessor :: with_defaults ( msg_rx, event_bridge) ;
580+
581+ tokio:: spawn ( async move { processor. run ( ) . await ; } ) ;
582+
583+ // Send message with no user content
584+ let input = RunAgentInput :: new ( ThreadId :: random ( ) , RunId :: random ( ) ) ;
585+ msg_tx. send ( AgentMessage :: new ( input) ) . await . unwrap ( ) ;
586+
587+ // Collect events
588+ let events = collect_until_finished ( & mut event_rx) . await ;
589+
590+ // Should get RunStarted then RunError
591+ assert ! ( matches!( events[ 0 ] , Event :: RunStarted ( _) ) , "First should be RunStarted" ) ;
592+ assert ! (
593+ matches!( events. last( ) , Some ( Event :: RunError ( _) ) ) ,
594+ "Should end with RunError for empty message"
595+ ) ;
596+
597+ drop ( msg_tx) ;
598+ }
599+
600+ #[ tokio:: test]
601+ async fn test_invalid_provider_error ( ) {
602+ use ag_ui_core:: types:: { Message , RunAgentInput } ;
603+ use ag_ui_core:: Event ;
604+
605+ let state = ServerState :: new ( ) ;
606+ let msg_tx = state. message_sender ( ) ;
607+ let mut event_rx = state. subscribe ( ) ;
608+ let msg_rx = state. take_message_receiver ( ) . await . expect ( "receiver" ) ;
609+ let event_bridge = state. event_sender ( ) ;
610+
611+ // Configure with invalid provider
612+ let config = ProcessorConfig :: new ( ) . with_provider ( "invalid_provider_xyz" ) ;
613+ let mut processor = AgentProcessor :: new ( msg_rx, event_bridge, config) ;
614+
615+ tokio:: spawn ( async move { processor. run ( ) . await ; } ) ;
616+
617+ let input = RunAgentInput :: new ( ThreadId :: random ( ) , RunId :: random ( ) )
618+ . with_messages ( vec ! [ Message :: new_user( "Test invalid provider" ) ] ) ;
619+ msg_tx. send ( AgentMessage :: new ( input) ) . await . unwrap ( ) ;
620+
621+ // Collect events
622+ let events = collect_until_finished ( & mut event_rx) . await ;
623+
624+ // Should error due to unsupported provider
625+ assert ! (
626+ matches!( events. last( ) , Some ( Event :: RunError ( _) ) ) ,
627+ "Should end with RunError for invalid provider"
628+ ) ;
629+
630+ drop ( msg_tx) ;
631+ }
632+
633+ #[ tokio:: test]
634+ async fn test_custom_system_prompt ( ) {
635+ use ag_ui_core:: types:: { Message , RunAgentInput } ;
636+
637+ let state = ServerState :: new ( ) ;
638+ let msg_tx = state. message_sender ( ) ;
639+ let mut event_rx = state. subscribe ( ) ;
640+ let msg_rx = state. take_message_receiver ( ) . await . expect ( "receiver" ) ;
641+ let event_bridge = state. event_sender ( ) ;
642+
643+ // Configure with custom system prompt
644+ let config = ProcessorConfig :: new ( )
645+ . with_system_prompt ( "You are a DevOps assistant. Always respond with deployment advice." ) ;
646+ let mut processor = AgentProcessor :: new ( msg_rx, event_bridge, config) ;
647+
648+ tokio:: spawn ( async move { processor. run ( ) . await ; } ) ;
649+
650+ let input = RunAgentInput :: new ( ThreadId :: random ( ) , RunId :: random ( ) )
651+ . with_messages ( vec ! [ Message :: new_user( "Hello" ) ] ) ;
652+ msg_tx. send ( AgentMessage :: new ( input) ) . await . unwrap ( ) ;
653+
654+ // Should complete (may error without API key, but should not panic)
655+ drain_events_until_run_finished ( & mut event_rx) . await ;
656+
657+ drop ( msg_tx) ;
658+ }
442659}
0 commit comments