@@ -85,10 +85,31 @@ async def execute_code(
8585 state_archival_service = state_archival_service ,
8686 )
8787
88- # Streaming responses send headers before iterating the body. Validate
89- # synchronously first so handled request errors still return proper HTTP codes.
90- if not request .code or not request .code .strip ():
91- raise ValidationError (message = "Code cannot be empty" )
88+ async def _execute () -> ExecResponse :
89+ return await orchestrator .execute (
90+ request ,
91+ request_id ,
92+ api_key_hash = api_key_hash ,
93+ is_env_key = is_env_key ,
94+ )
95+
96+ execution_task = asyncio .create_task (_execute ())
97+
98+ try :
99+ response = await asyncio .wait_for (
100+ asyncio .shield (execution_task ), timeout = _KEEPALIVE_INTERVAL
101+ )
102+ logger .info (
103+ "Code execution completed" ,
104+ request_id = request_id ,
105+ session_id = response .session_id ,
106+ )
107+ return response
108+ except asyncio .TimeoutError :
109+ # Fall through to streamed keepalives for genuinely long-running work.
110+ pass
111+ except (ValidationError , ServiceUnavailableError ):
112+ raise
92113
93114 async def _stream_response ():
94115 """Execute code and stream the response with keepalive whitespace.
@@ -98,45 +119,22 @@ async def _stream_response():
98119 whitespace is ignored by JSON parsers, so this is transparent
99120 to clients.
100121 """
101- result_holder = {}
102- error_holder = {}
103-
104- async def _run ():
105- try :
106- result_holder ["response" ] = await orchestrator .execute (
107- request ,
108- request_id ,
109- api_key_hash = api_key_hash ,
110- is_env_key = is_env_key ,
111- )
112- except Exception as e :
113- error_holder ["error" ] = e
114-
115- task = asyncio .create_task (_run ())
116-
117122 # Send keepalive spaces while execution is running
118- while not task .done ():
123+ while not execution_task .done ():
119124 try :
120125 await asyncio .wait_for (
121- asyncio .shield (task ), timeout = _KEEPALIVE_INTERVAL
126+ asyncio .shield (execution_task ), timeout = _KEEPALIVE_INTERVAL
122127 )
123128 except asyncio .TimeoutError :
124129 # Execution still running — send keepalive space
125130 yield b" "
126- except Exception :
127- # Task raised an exception — will be handled below
128- break
129131
130132 # Ensure the task is complete
131- if not task .done ():
132- await task
133-
134- # Re-raise validation/service errors so FastAPI exception handlers
135- # can return proper HTTP status codes (400, 503, etc.)
136- if "error" in error_holder :
137- err = error_holder ["error" ]
138- if isinstance (err , (ValidationError , ServiceUnavailableError )):
139- raise err
133+ try :
134+ response = await execution_task
135+ except Exception as err :
136+ # Once the streaming response has started, surface failures as a JSON
137+ # error payload instead of raising after headers have been sent.
140138 error_resp = ErrorResponse (
141139 error = str (err ),
142140 error_type = "execution" ,
@@ -145,7 +143,6 @@ async def _run():
145143 return
146144
147145 # Send the JSON response
148- response = result_holder ["response" ]
149146 logger .info (
150147 "Code execution completed" ,
151148 request_id = request_id ,
0 commit comments