Skip to content

Commit c876b17

Browse files
authored
Merge pull request #60 from Serverless-Devs/fix-sandbox-ttl-seconds-ut
test: improve SSE streaming test reliability in CI environment
2 parents 1f900cb + 61f18a4 commit c876b17

File tree

2 files changed

+157
-54
lines changed

2 files changed

+157
-54
lines changed

tests/unittests/integration/langchain/test_agent_invoke_methods.py

Lines changed: 108 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,37 @@ def _sse(data: Dict[str, Any]) -> str:
8181
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
8282

8383

84+
def _start_server(app: FastAPI) -> tuple:
85+
"""启动 FastAPI 服务器并返回 (base_url, server, thread)
86+
87+
使用真实的 HTTP 服务器而不是 httpx.ASGITransport,
88+
因为 ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应,
89+
会导致流式响应被提前取消 (CancelledError)。
90+
"""
91+
port = _find_free_port()
92+
config = uvicorn.Config(
93+
app, host="127.0.0.1", port=port, log_level="warning"
94+
)
95+
server = uvicorn.Server(config)
96+
97+
thread = threading.Thread(target=server.run, daemon=True)
98+
thread.start()
99+
100+
base_url = f"http://127.0.0.1:{port}"
101+
for i in range(50):
102+
try:
103+
httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2)
104+
break
105+
except Exception:
106+
if i == 49:
107+
raise RuntimeError(
108+
f"Server failed to start within {50 * 0.1}s: {base_url}"
109+
)
110+
time.sleep(0.1)
111+
112+
return base_url, server, thread
113+
114+
84115
def _build_mock_openai_app() -> FastAPI:
85116
"""构建本地 OpenAI 协议兼容的简单服务"""
86117
app = FastAPI()
@@ -297,20 +328,34 @@ def parse_sse_events(content: str) -> List[Dict[str, Any]]:
297328

298329

299330
async def request_agui_events(
300-
server_app,
331+
server_url_or_app: Union[str, FastAPI],
301332
messages: List[Dict[str, str]],
302333
stream: bool = True,
303334
) -> List[Dict[str, Any]]:
304-
"""发送 AG-UI 请求并返回事件列表"""
305-
async with httpx.AsyncClient(
306-
transport=httpx.ASGITransport(app=server_app),
307-
base_url="http://test",
308-
) as client:
309-
response = await client.post(
310-
"/ag-ui/agent",
311-
json={"messages": messages, "stream": stream},
312-
timeout=60.0,
313-
)
335+
"""发送 AG-UI 请求并返回事件列表
336+
337+
Args:
338+
server_url_or_app: 服务器 URL 或 FastAPI app 对象
339+
messages: 消息列表
340+
stream: 是否流式响应
341+
"""
342+
if isinstance(server_url_or_app, str):
343+
async with httpx.AsyncClient(base_url=server_url_or_app) as client:
344+
response = await client.post(
345+
"/ag-ui/agent",
346+
json={"messages": messages, "stream": stream},
347+
timeout=60.0,
348+
)
349+
else:
350+
async with httpx.AsyncClient(
351+
transport=httpx.ASGITransport(app=server_url_or_app),
352+
base_url="http://test",
353+
) as client:
354+
response = await client.post(
355+
"/ag-ui/agent",
356+
json={"messages": messages, "stream": stream},
357+
timeout=60.0,
358+
)
314359

315360
assert response.status_code == 200
316361
return parse_sse_events(response.text)
@@ -670,7 +715,7 @@ def assert_openai_tool_call_response(
670715

671716

672717
async def request_openai_events(
673-
server_app,
718+
server_url_or_app: Union[str, FastAPI],
674719
messages: List[Dict[str, str]],
675720
stream: bool = True,
676721
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
@@ -681,15 +726,23 @@ async def request_openai_events(
681726
"stream": stream,
682727
}
683728

684-
async with httpx.AsyncClient(
685-
transport=httpx.ASGITransport(app=server_app),
686-
base_url="http://test",
687-
) as client:
688-
response = await client.post(
689-
"/openai/v1/chat/completions",
690-
json=payload,
691-
timeout=60.0,
692-
)
729+
if isinstance(server_url_or_app, str):
730+
async with httpx.AsyncClient(base_url=server_url_or_app) as client:
731+
response = await client.post(
732+
"/openai/v1/chat/completions",
733+
json=payload,
734+
timeout=60.0,
735+
)
736+
else:
737+
async with httpx.AsyncClient(
738+
transport=httpx.ASGITransport(app=server_url_or_app),
739+
base_url="http://test",
740+
) as client:
741+
response = await client.post(
742+
"/openai/v1/chat/completions",
743+
json=payload,
744+
timeout=60.0,
745+
)
693746

694747
assert response.status_code == 200
695748

@@ -749,7 +802,16 @@ def agent_model(mock_openai_server: str):
749802

750803
@pytest.fixture
751804
def server_app_astream_events(agent_model):
752-
"""创建使用 astream_events 的服务器(AG-UI/OpenAI 通用)"""
805+
"""创建使用 astream 的服务器(AG-UI/OpenAI 通用)
806+
807+
返回服务器 URL 而不是 app 对象,因为流式测试需要真实的 HTTP 连接。
808+
httpx.ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应。
809+
810+
注意: 这里使用 astream(stream_mode="updates") 而非 astream_events,
811+
因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中会出现
812+
async generator 被提前取消或事件丢失的问题。
813+
astream_events 的转换逻辑由 test_convert_python_3_10/3_12 单独覆盖。
814+
"""
753815
agent = build_agent(agent_model)
754816

755817
async def invoke_agent(request: AgentRequest):
@@ -770,16 +832,21 @@ async def invoke_agent(request: AgentRequest):
770832
converter = AgentRunConverter()
771833

772834
async def generator():
773-
async for event in agent.astream_events(
774-
cast(Any, input_data), version="v2"
835+
async for event in agent.astream(
836+
cast(Any, input_data), stream_mode="updates"
775837
):
776838
for item in converter.convert(event):
777839
yield item
778840

779841
return generator()
780842

781843
server = AgentRunServer(invoke_agent=invoke_agent)
782-
return server.app
844+
base_url, uvicorn_server, thread = _start_server(server.app)
845+
846+
yield base_url
847+
848+
uvicorn_server.should_exit = True
849+
thread.join(timeout=5)
783850

784851

785852
# =============================================================================
@@ -1753,16 +1820,21 @@ async def invoke_agent(request: AgentRequest):
17531820
converter = AgentRunConverter()
17541821

17551822
async def generator():
1756-
async for event in agent.astream_events(
1757-
cast(Any, input_data), version="v2"
1823+
async for event in agent.astream(
1824+
cast(Any, input_data), stream_mode="updates"
17581825
):
17591826
for item in converter.convert(event):
17601827
yield item
17611828

17621829
return generator()
17631830

17641831
server = AgentRunServer(invoke_agent=invoke_agent)
1765-
return server.app
1832+
base_url, uvicorn_server, thread = _start_server(server.app)
1833+
1834+
yield base_url
1835+
1836+
uvicorn_server.should_exit = True
1837+
thread.join(timeout=5)
17661838

17671839
@pytest.fixture
17681840
def server_app_async(self, agent_model):
@@ -1787,16 +1859,21 @@ async def invoke_agent(request: AgentRequest):
17871859
converter = AgentRunConverter()
17881860

17891861
async def generator():
1790-
async for event in agent.astream_events(
1791-
cast(Any, input_data), version="v2"
1862+
async for event in agent.astream(
1863+
cast(Any, input_data), stream_mode="updates"
17921864
):
17931865
for item in converter.convert(event):
17941866
yield item
17951867

17961868
return generator()
17971869

17981870
server = AgentRunServer(invoke_agent=invoke_agent)
1799-
return server.app
1871+
base_url, uvicorn_server, thread = _start_server(server.app)
1872+
1873+
yield base_url
1874+
1875+
uvicorn_server.should_exit = True
1876+
thread.join(timeout=5)
18001877

18011878
@pytest.mark.parametrize(
18021879
"case_key,prompt",

tests/unittests/integration/test_langchain_agui_integration.py

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -648,36 +648,62 @@ async def invoke_agent(request: AgentRequest):
648648
}]
649649
}
650650

651+
# 使用 astream(updates) 代替 astream_events,
652+
# 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中
653+
# 会出现 async generator 被提前取消或事件丢失的问题。
651654
converter = AgentRunConverter()
652-
async for event in agent.astream_events(input_data, version="v2"):
655+
async for event in agent.astream(input_data, stream_mode="updates"):
653656
for item in converter.convert(event):
654657
yield item
655658

656-
app = AgentRunServer(invoke_agent=invoke_agent).app
659+
server_app = AgentRunServer(invoke_agent=invoke_agent).app
657660

658-
async with httpx.AsyncClient(
659-
transport=httpx.ASGITransport(app=app),
660-
base_url="http://test",
661-
) as client:
662-
response = await client.post(
663-
"/ag-ui/agent",
664-
json={
665-
"messages": [{
666-
"role": "user",
667-
"content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息",
668-
}],
669-
"stream": True,
670-
},
671-
timeout=60.0,
672-
)
661+
# 使用真实的 HTTP 服务器而不是 httpx.ASGITransport,
662+
# 因为 ASGITransport 在 CI 中无法正确处理 SSE 流式响应。
663+
port = _find_free_port()
664+
config = uvicorn.Config(
665+
server_app, host="127.0.0.1", port=port, log_level="warning"
666+
)
667+
uvicorn_server = uvicorn.Server(config)
668+
server_thread = threading.Thread(target=uvicorn_server.run, daemon=True)
669+
server_thread.start()
670+
671+
base_url = f"http://127.0.0.1:{port}"
672+
for i in range(50):
673+
try:
674+
httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2)
675+
break
676+
except Exception:
677+
if i == 49:
678+
raise RuntimeError(
679+
f"Server failed to start within {50 * 0.1}s"
680+
)
681+
time.sleep(0.1)
673682

674-
assert response.status_code == 200
683+
try:
684+
async with httpx.AsyncClient(base_url=base_url) as client:
685+
response = await client.post(
686+
"/ag-ui/agent",
687+
json={
688+
"messages": [{
689+
"role": "user",
690+
"content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息",
691+
}],
692+
"stream": True,
693+
},
694+
timeout=60.0,
695+
)
675696

676-
events = [line for line in response.text.split("\n") if line]
677-
# Normalize empty delta for consistency with check_result expectations
678-
# astream_events yields "" for empty args, while astream yields "{}"
679-
events = [e.replace('"delta":""', '"delta":"{}"') for e in events]
680-
self.check_result(events)
697+
assert response.status_code == 200
698+
699+
events = [line for line in response.text.split("\n") if line]
700+
# Normalize empty delta for consistency with check_result expectations
701+
# astream_events yields "" for empty args, while astream yields "{}"
702+
events = [e.replace('"delta":""', '"delta":"{}"') for e in events]
703+
self.check_result(events)
704+
finally:
705+
uvicorn_server.should_exit = True
706+
server_thread.join(timeout=5)
681707

682708
async def test_astream(self, mock_mcp_server):
683709
"""测试多工具查询场景 (MCP + Local + MockLLM)"""

0 commit comments

Comments
 (0)