@@ -81,6 +81,37 @@ def _sse(data: Dict[str, Any]) -> str:
8181 return f"data: { json .dumps (data , ensure_ascii = False )} \n \n "
8282
8383
84+ def _start_server (app : FastAPI ) -> tuple :
85+ """启动 FastAPI 服务器并返回 (base_url, server, thread)
86+
87+ 使用真实的 HTTP 服务器而不是 httpx.ASGITransport,
88+ 因为 ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应,
89+ 会导致流式响应被提前取消 (CancelledError)。
90+ """
91+ port = _find_free_port ()
92+ config = uvicorn .Config (
93+ app , host = "127.0.0.1" , port = port , log_level = "warning"
94+ )
95+ server = uvicorn .Server (config )
96+
97+ thread = threading .Thread (target = server .run , daemon = True )
98+ thread .start ()
99+
100+ base_url = f"http://127.0.0.1:{ port } "
101+ for i in range (50 ):
102+ try :
103+ httpx .get (f"{ base_url } /ag-ui/agent/health" , timeout = 0.2 )
104+ break
105+ except Exception :
106+ if i == 49 :
107+ raise RuntimeError (
108+ f"Server failed to start within { 50 * 0.1 } s: { base_url } "
109+ )
110+ time .sleep (0.1 )
111+
112+ return base_url , server , thread
113+
114+
84115def _build_mock_openai_app () -> FastAPI :
85116 """构建本地 OpenAI 协议兼容的简单服务"""
86117 app = FastAPI ()
@@ -297,20 +328,34 @@ def parse_sse_events(content: str) -> List[Dict[str, Any]]:
297328
298329
299330async def request_agui_events (
300- server_app ,
331+ server_url_or_app : Union [ str , FastAPI ] ,
301332 messages : List [Dict [str , str ]],
302333 stream : bool = True ,
303334) -> List [Dict [str , Any ]]:
304- """发送 AG-UI 请求并返回事件列表"""
305- async with httpx .AsyncClient (
306- transport = httpx .ASGITransport (app = server_app ),
307- base_url = "http://test" ,
308- ) as client :
309- response = await client .post (
310- "/ag-ui/agent" ,
311- json = {"messages" : messages , "stream" : stream },
312- timeout = 60.0 ,
313- )
335+ """发送 AG-UI 请求并返回事件列表
336+
337+ Args:
338+ server_url_or_app: 服务器 URL 或 FastAPI app 对象
339+ messages: 消息列表
340+ stream: 是否流式响应
341+ """
342+ if isinstance (server_url_or_app , str ):
343+ async with httpx .AsyncClient (base_url = server_url_or_app ) as client :
344+ response = await client .post (
345+ "/ag-ui/agent" ,
346+ json = {"messages" : messages , "stream" : stream },
347+ timeout = 60.0 ,
348+ )
349+ else :
350+ async with httpx .AsyncClient (
351+ transport = httpx .ASGITransport (app = server_url_or_app ),
352+ base_url = "http://test" ,
353+ ) as client :
354+ response = await client .post (
355+ "/ag-ui/agent" ,
356+ json = {"messages" : messages , "stream" : stream },
357+ timeout = 60.0 ,
358+ )
314359
315360 assert response .status_code == 200
316361 return parse_sse_events (response .text )
@@ -670,7 +715,7 @@ def assert_openai_tool_call_response(
670715
671716
672717async def request_openai_events (
673- server_app ,
718+ server_url_or_app : Union [ str , FastAPI ] ,
674719 messages : List [Dict [str , str ]],
675720 stream : bool = True ,
676721) -> Union [List [Dict [str , Any ]], Dict [str , Any ]]:
@@ -681,15 +726,23 @@ async def request_openai_events(
681726 "stream" : stream ,
682727 }
683728
684- async with httpx .AsyncClient (
685- transport = httpx .ASGITransport (app = server_app ),
686- base_url = "http://test" ,
687- ) as client :
688- response = await client .post (
689- "/openai/v1/chat/completions" ,
690- json = payload ,
691- timeout = 60.0 ,
692- )
729+ if isinstance (server_url_or_app , str ):
730+ async with httpx .AsyncClient (base_url = server_url_or_app ) as client :
731+ response = await client .post (
732+ "/openai/v1/chat/completions" ,
733+ json = payload ,
734+ timeout = 60.0 ,
735+ )
736+ else :
737+ async with httpx .AsyncClient (
738+ transport = httpx .ASGITransport (app = server_url_or_app ),
739+ base_url = "http://test" ,
740+ ) as client :
741+ response = await client .post (
742+ "/openai/v1/chat/completions" ,
743+ json = payload ,
744+ timeout = 60.0 ,
745+ )
693746
694747 assert response .status_code == 200
695748
@@ -749,7 +802,16 @@ def agent_model(mock_openai_server: str):
749802
750803@pytest .fixture
751804def server_app_astream_events (agent_model ):
752- """创建使用 astream_events 的服务器(AG-UI/OpenAI 通用)"""
805+ """创建使用 astream 的服务器(AG-UI/OpenAI 通用)
806+
807+ 返回服务器 URL 而不是 app 对象,因为流式测试需要真实的 HTTP 连接。
808+ httpx.ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应。
809+
810+ 注意: 这里使用 astream(stream_mode="updates") 而非 astream_events,
811+ 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中会出现
812+ async generator 被提前取消或事件丢失的问题。
813+ astream_events 的转换逻辑由 test_convert_python_3_10/3_12 单独覆盖。
814+ """
753815 agent = build_agent (agent_model )
754816
755817 async def invoke_agent (request : AgentRequest ):
@@ -770,16 +832,21 @@ async def invoke_agent(request: AgentRequest):
770832 converter = AgentRunConverter ()
771833
772834 async def generator ():
773- async for event in agent .astream_events (
774- cast (Any , input_data ), version = "v2 "
835+ async for event in agent .astream (
836+ cast (Any , input_data ), stream_mode = "updates "
775837 ):
776838 for item in converter .convert (event ):
777839 yield item
778840
779841 return generator ()
780842
781843 server = AgentRunServer (invoke_agent = invoke_agent )
782- return server .app
844+ base_url , uvicorn_server , thread = _start_server (server .app )
845+
846+ yield base_url
847+
848+ uvicorn_server .should_exit = True
849+ thread .join (timeout = 5 )
783850
784851
785852# =============================================================================
@@ -1753,16 +1820,21 @@ async def invoke_agent(request: AgentRequest):
17531820 converter = AgentRunConverter ()
17541821
17551822 async def generator ():
1756- async for event in agent .astream_events (
1757- cast (Any , input_data ), version = "v2 "
1823+ async for event in agent .astream (
1824+ cast (Any , input_data ), stream_mode = "updates "
17581825 ):
17591826 for item in converter .convert (event ):
17601827 yield item
17611828
17621829 return generator ()
17631830
17641831 server = AgentRunServer (invoke_agent = invoke_agent )
1765- return server .app
1832+ base_url , uvicorn_server , thread = _start_server (server .app )
1833+
1834+ yield base_url
1835+
1836+ uvicorn_server .should_exit = True
1837+ thread .join (timeout = 5 )
17661838
17671839 @pytest .fixture
17681840 def server_app_async (self , agent_model ):
@@ -1787,16 +1859,21 @@ async def invoke_agent(request: AgentRequest):
17871859 converter = AgentRunConverter ()
17881860
17891861 async def generator ():
1790- async for event in agent .astream_events (
1791- cast (Any , input_data ), version = "v2 "
1862+ async for event in agent .astream (
1863+ cast (Any , input_data ), stream_mode = "updates "
17921864 ):
17931865 for item in converter .convert (event ):
17941866 yield item
17951867
17961868 return generator ()
17971869
17981870 server = AgentRunServer (invoke_agent = invoke_agent )
1799- return server .app
1871+ base_url , uvicorn_server , thread = _start_server (server .app )
1872+
1873+ yield base_url
1874+
1875+ uvicorn_server .should_exit = True
1876+ thread .join (timeout = 5 )
18001877
18011878 @pytest .mark .parametrize (
18021879 "case_key,prompt" ,
0 commit comments