Skip to content

Commit 61f18a4

Browse files
committed
test: migrate from astream_events to astream for reliable SSE streaming in CI
Migrate test implementations to use astream(stream_mode="updates") instead of astream_events to resolve async generator cancellation issues in CI environments. Also switch from httpx.ASGITransport to real HTTP servers with uvicorn for proper SSE stream handling in tests, addressing flaky streaming behavior in Linux CI environments. 将测试实现从 astream_events 迁移到 astream(stream_mode="updates") 以解决 CI 环境中的异步生成器取消问题。同时从 httpx.ASGITransport 切换到使用 uvicorn 的真实 HTTP 服务器,以正确处理测试中的 SSE 流式传输,解决 Linux CI 环境中的不稳定流式行为。 Change-Id: I4e9f694a80e952a94e240f479bb40fef59c0d649 Signed-off-by: OhYee <oyohyee@oyohyee.com>
1 parent 1f900cb commit 61f18a4

File tree

2 files changed

+157
-54
lines changed

2 files changed

+157
-54
lines changed

tests/unittests/integration/langchain/test_agent_invoke_methods.py

Lines changed: 108 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,37 @@ def _sse(data: Dict[str, Any]) -> str:
8181
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
8282

8383

84+
def _start_server(app: FastAPI) -> tuple:
85+
"""启动 FastAPI 服务器并返回 (base_url, server, thread)
86+
87+
使用真实的 HTTP 服务器而不是 httpx.ASGITransport,
88+
因为 ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应,
89+
会导致流式响应被提前取消 (CancelledError)。
90+
"""
91+
port = _find_free_port()
92+
config = uvicorn.Config(
93+
app, host="127.0.0.1", port=port, log_level="warning"
94+
)
95+
server = uvicorn.Server(config)
96+
97+
thread = threading.Thread(target=server.run, daemon=True)
98+
thread.start()
99+
100+
base_url = f"http://127.0.0.1:{port}"
101+
for i in range(50):
102+
try:
103+
httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2)
104+
break
105+
except Exception:
106+
if i == 49:
107+
raise RuntimeError(
108+
f"Server failed to start within {50 * 0.1}s: {base_url}"
109+
)
110+
time.sleep(0.1)
111+
112+
return base_url, server, thread
113+
114+
84115
def _build_mock_openai_app() -> FastAPI:
85116
"""构建本地 OpenAI 协议兼容的简单服务"""
86117
app = FastAPI()
@@ -297,20 +328,34 @@ def parse_sse_events(content: str) -> List[Dict[str, Any]]:
297328

298329

299330
async def request_agui_events(
300-
server_app,
331+
server_url_or_app: Union[str, FastAPI],
301332
messages: List[Dict[str, str]],
302333
stream: bool = True,
303334
) -> List[Dict[str, Any]]:
304-
"""发送 AG-UI 请求并返回事件列表"""
305-
async with httpx.AsyncClient(
306-
transport=httpx.ASGITransport(app=server_app),
307-
base_url="http://test",
308-
) as client:
309-
response = await client.post(
310-
"/ag-ui/agent",
311-
json={"messages": messages, "stream": stream},
312-
timeout=60.0,
313-
)
335+
"""发送 AG-UI 请求并返回事件列表
336+
337+
Args:
338+
server_url_or_app: 服务器 URL 或 FastAPI app 对象
339+
messages: 消息列表
340+
stream: 是否流式响应
341+
"""
342+
if isinstance(server_url_or_app, str):
343+
async with httpx.AsyncClient(base_url=server_url_or_app) as client:
344+
response = await client.post(
345+
"/ag-ui/agent",
346+
json={"messages": messages, "stream": stream},
347+
timeout=60.0,
348+
)
349+
else:
350+
async with httpx.AsyncClient(
351+
transport=httpx.ASGITransport(app=server_url_or_app),
352+
base_url="http://test",
353+
) as client:
354+
response = await client.post(
355+
"/ag-ui/agent",
356+
json={"messages": messages, "stream": stream},
357+
timeout=60.0,
358+
)
314359

315360
assert response.status_code == 200
316361
return parse_sse_events(response.text)
@@ -670,7 +715,7 @@ def assert_openai_tool_call_response(
670715

671716

672717
async def request_openai_events(
673-
server_app,
718+
server_url_or_app: Union[str, FastAPI],
674719
messages: List[Dict[str, str]],
675720
stream: bool = True,
676721
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
@@ -681,15 +726,23 @@ async def request_openai_events(
681726
"stream": stream,
682727
}
683728

684-
async with httpx.AsyncClient(
685-
transport=httpx.ASGITransport(app=server_app),
686-
base_url="http://test",
687-
) as client:
688-
response = await client.post(
689-
"/openai/v1/chat/completions",
690-
json=payload,
691-
timeout=60.0,
692-
)
729+
if isinstance(server_url_or_app, str):
730+
async with httpx.AsyncClient(base_url=server_url_or_app) as client:
731+
response = await client.post(
732+
"/openai/v1/chat/completions",
733+
json=payload,
734+
timeout=60.0,
735+
)
736+
else:
737+
async with httpx.AsyncClient(
738+
transport=httpx.ASGITransport(app=server_url_or_app),
739+
base_url="http://test",
740+
) as client:
741+
response = await client.post(
742+
"/openai/v1/chat/completions",
743+
json=payload,
744+
timeout=60.0,
745+
)
693746

694747
assert response.status_code == 200
695748

@@ -749,7 +802,16 @@ def agent_model(mock_openai_server: str):
749802

750803
@pytest.fixture
751804
def server_app_astream_events(agent_model):
752-
"""创建使用 astream_events 的服务器(AG-UI/OpenAI 通用)"""
805+
"""创建使用 astream 的服务器(AG-UI/OpenAI 通用)
806+
807+
返回服务器 URL 而不是 app 对象,因为流式测试需要真实的 HTTP 连接。
808+
httpx.ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应。
809+
810+
注意: 这里使用 astream(stream_mode="updates") 而非 astream_events,
811+
因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中会出现
812+
async generator 被提前取消或事件丢失的问题。
813+
astream_events 的转换逻辑由 test_convert_python_3_10/3_12 单独覆盖。
814+
"""
753815
agent = build_agent(agent_model)
754816

755817
async def invoke_agent(request: AgentRequest):
@@ -770,16 +832,21 @@ async def invoke_agent(request: AgentRequest):
770832
converter = AgentRunConverter()
771833

772834
async def generator():
773-
async for event in agent.astream_events(
774-
cast(Any, input_data), version="v2"
835+
async for event in agent.astream(
836+
cast(Any, input_data), stream_mode="updates"
775837
):
776838
for item in converter.convert(event):
777839
yield item
778840

779841
return generator()
780842

781843
server = AgentRunServer(invoke_agent=invoke_agent)
782-
return server.app
844+
base_url, uvicorn_server, thread = _start_server(server.app)
845+
846+
yield base_url
847+
848+
uvicorn_server.should_exit = True
849+
thread.join(timeout=5)
783850

784851

785852
# =============================================================================
@@ -1753,16 +1820,21 @@ async def invoke_agent(request: AgentRequest):
17531820
converter = AgentRunConverter()
17541821

17551822
async def generator():
1756-
async for event in agent.astream_events(
1757-
cast(Any, input_data), version="v2"
1823+
async for event in agent.astream(
1824+
cast(Any, input_data), stream_mode="updates"
17581825
):
17591826
for item in converter.convert(event):
17601827
yield item
17611828

17621829
return generator()
17631830

17641831
server = AgentRunServer(invoke_agent=invoke_agent)
1765-
return server.app
1832+
base_url, uvicorn_server, thread = _start_server(server.app)
1833+
1834+
yield base_url
1835+
1836+
uvicorn_server.should_exit = True
1837+
thread.join(timeout=5)
17661838

17671839
@pytest.fixture
17681840
def server_app_async(self, agent_model):
@@ -1787,16 +1859,21 @@ async def invoke_agent(request: AgentRequest):
17871859
converter = AgentRunConverter()
17881860

17891861
async def generator():
1790-
async for event in agent.astream_events(
1791-
cast(Any, input_data), version="v2"
1862+
async for event in agent.astream(
1863+
cast(Any, input_data), stream_mode="updates"
17921864
):
17931865
for item in converter.convert(event):
17941866
yield item
17951867

17961868
return generator()
17971869

17981870
server = AgentRunServer(invoke_agent=invoke_agent)
1799-
return server.app
1871+
base_url, uvicorn_server, thread = _start_server(server.app)
1872+
1873+
yield base_url
1874+
1875+
uvicorn_server.should_exit = True
1876+
thread.join(timeout=5)
18001877

18011878
@pytest.mark.parametrize(
18021879
"case_key,prompt",

tests/unittests/integration/test_langchain_agui_integration.py

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -648,36 +648,62 @@ async def invoke_agent(request: AgentRequest):
648648
}]
649649
}
650650

651+
# 使用 astream(updates) 代替 astream_events,
652+
# 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中
653+
# 会出现 async generator 被提前取消或事件丢失的问题。
651654
converter = AgentRunConverter()
652-
async for event in agent.astream_events(input_data, version="v2"):
655+
async for event in agent.astream(input_data, stream_mode="updates"):
653656
for item in converter.convert(event):
654657
yield item
655658

656-
app = AgentRunServer(invoke_agent=invoke_agent).app
659+
server_app = AgentRunServer(invoke_agent=invoke_agent).app
657660

658-
async with httpx.AsyncClient(
659-
transport=httpx.ASGITransport(app=app),
660-
base_url="http://test",
661-
) as client:
662-
response = await client.post(
663-
"/ag-ui/agent",
664-
json={
665-
"messages": [{
666-
"role": "user",
667-
"content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息",
668-
}],
669-
"stream": True,
670-
},
671-
timeout=60.0,
672-
)
661+
# 使用真实的 HTTP 服务器而不是 httpx.ASGITransport,
662+
# 因为 ASGITransport 在 CI 中无法正确处理 SSE 流式响应。
663+
port = _find_free_port()
664+
config = uvicorn.Config(
665+
server_app, host="127.0.0.1", port=port, log_level="warning"
666+
)
667+
uvicorn_server = uvicorn.Server(config)
668+
server_thread = threading.Thread(target=uvicorn_server.run, daemon=True)
669+
server_thread.start()
670+
671+
base_url = f"http://127.0.0.1:{port}"
672+
for i in range(50):
673+
try:
674+
httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2)
675+
break
676+
except Exception:
677+
if i == 49:
678+
raise RuntimeError(
679+
f"Server failed to start within {50 * 0.1}s"
680+
)
681+
time.sleep(0.1)
673682

674-
assert response.status_code == 200
683+
try:
684+
async with httpx.AsyncClient(base_url=base_url) as client:
685+
response = await client.post(
686+
"/ag-ui/agent",
687+
json={
688+
"messages": [{
689+
"role": "user",
690+
"content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息",
691+
}],
692+
"stream": True,
693+
},
694+
timeout=60.0,
695+
)
675696

676-
events = [line for line in response.text.split("\n") if line]
677-
# Normalize empty delta for consistency with check_result expectations
678-
# astream_events yields "" for empty args, while astream yields "{}"
679-
events = [e.replace('"delta":""', '"delta":"{}"') for e in events]
680-
self.check_result(events)
697+
assert response.status_code == 200
698+
699+
events = [line for line in response.text.split("\n") if line]
700+
# Normalize empty delta for consistency with check_result expectations
701+
# astream_events yields "" for empty args, while astream yields "{}"
702+
events = [e.replace('"delta":""', '"delta":"{}"') for e in events]
703+
self.check_result(events)
704+
finally:
705+
uvicorn_server.should_exit = True
706+
server_thread.join(timeout=5)
681707

682708
async def test_astream(self, mock_mcp_server):
683709
"""测试多工具查询场景 (MCP + Local + MockLLM)"""

0 commit comments

Comments
 (0)