@@ -44,6 +44,10 @@ class GatewaySession:
4444 _messages : List [GatewayMessage ] = field (default_factory = list )
4545 _max_messages : int = 1000
4646
47+ # Stepper & Concurrency logic
48+ _inbox : asyncio .Queue = field (default_factory = asyncio .Queue )
49+ _is_executing : bool = False
50+
4751 @property
4852 def session_id (self ) -> str :
4953 return self ._session_id
@@ -89,6 +93,20 @@ def get_messages(self, limit: Optional[int] = None) -> List[GatewayMessage]:
8993 def close (self ) -> None :
9094 self ._is_active = False
9195
96+ async def queue_message (self , message : str ) -> None :
97+ """Queue a user message for execution after the current operation."""
98+ await self ._inbox .put (message )
99+
100+ def get_next_message (self ) -> Optional [str ]:
101+ """Fetch the next queued message if available without blocking."""
102+ if self ._inbox .empty ():
103+ return None
104+ return self ._inbox .get_nowait ()
105+
106+ def mark_executing (self , status : bool ) -> None :
107+ """Mark the session as currently executing an agent workflow."""
108+ self ._is_executing = status
109+
92110
93111class WebSocketGateway :
94112 """WebSocket gateway server for multi-agent coordination.
@@ -564,40 +582,72 @@ async def _process_agent_message(
564582 return "Agent not available"
565583
566584 client_id = session .client_id
585+ content = message .content if isinstance (message .content , str ) else str (message .content )
567586
568- try :
569- content = message .content if isinstance (message .content , str ) else str (message .content )
570-
571- # Wire streaming relay if agent has a stream_emitter
572- relay_callback = None
573- emitter = getattr (agent , 'stream_emitter' , None )
574- if emitter is not None and client_id :
575- relay_callback = self ._make_stream_relay (client_id , session )
576- emitter .add_callback (relay_callback )
577-
578- try :
579- loop = asyncio .get_event_loop ()
580- response = await loop .run_in_executor (None , agent .chat , content )
581- finally :
582- # Always clean up the relay callback
583- if relay_callback and emitter is not None :
584- try :
585- emitter .remove_callback (relay_callback )
586- except (ValueError , AttributeError ):
587- pass
588-
589- response_message = GatewayMessage (
590- content = response ,
591- sender_id = session .agent_id ,
592- session_id = session .session_id ,
593- reply_to = message .message_id ,
587+ # Inbox & Stepper logic
588+ if session ._is_executing :
589+ # Send an ephemeral status event
590+ await self ._send_to_client (
591+ client_id ,
592+ {
593+ "type" : "status" ,
594+ "source" : session .agent_id ,
595+ "message" : "Thinking... (I've added your new message to the queue to process next)."
596+ }
594597 )
595- session .add_message (response_message )
598+ await session .queue_message (content )
599+ return "Message queued."
596600
597- return response
598- except Exception as e :
599- logger .error (f"Agent error: { e } " )
600- return f"Error: { str (e )} "
601+ session .mark_executing (True )
602+ await session .queue_message (content )
603+
604+ # Start background task to process the queue
605+ asyncio .create_task (self ._run_session_queue (session , agent , client_id ))
606+ return "Started processing."
607+
608+ async def _run_session_queue (self , session : GatewaySession , agent : Any , client_id : str ) -> None :
609+ """Background task loop that constantly pulls from `_inbox` and executes the agent task."""
610+ try :
611+ while True :
612+ content = session .get_next_message ()
613+ if not content :
614+ break # Queue is empty, exit loop
615+
616+ # Wire streaming relay if agent has a stream_emitter
617+ relay_callback = None
618+ emitter = getattr (agent , 'stream_emitter' , None )
619+ if emitter is not None and client_id :
620+ relay_callback = self ._make_stream_relay (client_id , session )
621+ emitter .add_callback (relay_callback )
622+
623+ try :
624+ loop = asyncio .get_event_loop ()
625+ response = await loop .run_in_executor (None , agent .chat , content )
626+ except Exception as e :
627+ logger .error (f"Agent error in queue processor: { e } " )
628+ response = f"Error: { str (e )} "
629+ finally :
630+ # Always clean up the relay callback
631+ if relay_callback and emitter is not None :
632+ try :
633+ emitter .remove_callback (relay_callback )
634+ except (ValueError , AttributeError ):
635+ pass
636+
637+ response_message = GatewayMessage (
638+ content = response ,
639+ sender_id = session .agent_id ,
640+ session_id = session .session_id ,
641+ )
642+ session .add_message (response_message )
643+
644+ await self ._send_to_client (client_id , {
645+ "type" : "response" ,
646+ "content" : response ,
647+ "session_id" : session .session_id ,
648+ })
649+ finally :
650+ session .mark_executing (False )
601651
602652 def _make_stream_relay (
603653 self , client_id : str , session : "GatewaySession"
0 commit comments