|
| 1 | +"""真实 MCP 端到端兼容性测试。 |
| 2 | +
|
| 3 | +通过 stdio 启动 MCP Server 进程,进行完整的协议握手和工具调用验证。 |
| 4 | +""" |
| 5 | + |
| 6 | +import asyncio |
| 7 | +import json |
| 8 | +import os |
| 9 | +import sys |
| 10 | +import tempfile |
| 11 | + |
| 12 | +import pytest |
| 13 | + |
| 14 | + |
| 15 | +class MCPClient: |
| 16 | + """简单的 MCP 客户端,用于端到端测试。""" |
| 17 | + |
| 18 | + def __init__(self, process: asyncio.subprocess.Process): |
| 19 | + self.process = process |
| 20 | + self.request_id = 0 |
| 21 | + |
| 22 | + async def send_request(self, method: str, params: dict | None = None) -> dict: |
| 23 | + """发送 JSON-RPC 请求并等待响应。""" |
| 24 | + self.request_id += 1 |
| 25 | + request = { |
| 26 | + "jsonrpc": "2.0", |
| 27 | + "id": self.request_id, |
| 28 | + "method": method, |
| 29 | + "params": params or {}, |
| 30 | + } |
| 31 | + |
| 32 | + # 发送请求 |
| 33 | + message = json.dumps(request) + "\n" |
| 34 | + self.process.stdin.write(message.encode()) |
| 35 | + await self.process.stdin.drain() |
| 36 | + |
| 37 | + # 读取响应 |
| 38 | + response_line = await self.process.stdout.readline() |
| 39 | + if not response_line: |
| 40 | + raise RuntimeError("MCP server closed connection") |
| 41 | + |
| 42 | + response = json.loads(response_line.decode()) |
| 43 | + |
| 44 | + if "error" in response: |
| 45 | + raise RuntimeError(f"MCP error: {response['error']}") |
| 46 | + |
| 47 | + return response.get("result", {}) |
| 48 | + |
| 49 | + async def initialize(self) -> dict: |
| 50 | + """执行 MCP 初始化握手。""" |
| 51 | + # 发送 initialize 请求 |
| 52 | + result = await self.send_request( |
| 53 | + "initialize", |
| 54 | + { |
| 55 | + "protocolVersion": "2024-11-05", |
| 56 | + "capabilities": {}, |
| 57 | + "clientInfo": {"name": "test-client", "version": "1.0.0"}, |
| 58 | + }, |
| 59 | + ) |
| 60 | + |
| 61 | + # 发送 initialized 通知 |
| 62 | + notification = {"jsonrpc": "2.0", "method": "notifications/initialized"} |
| 63 | + message = json.dumps(notification) + "\n" |
| 64 | + self.process.stdin.write(message.encode()) |
| 65 | + await self.process.stdin.drain() |
| 66 | + |
| 67 | + return result |
| 68 | + |
| 69 | + async def list_tools(self) -> list[dict]: |
| 70 | + """获取工具列表。""" |
| 71 | + result = await self.send_request("tools/list") |
| 72 | + return result.get("tools", []) |
| 73 | + |
| 74 | + async def call_tool(self, name: str, arguments: dict) -> dict: |
| 75 | + """调用工具。""" |
| 76 | + return await self.send_request("tools/call", {"name": name, "arguments": arguments}) |
| 77 | + |
| 78 | + async def close(self) -> None: |
| 79 | + """关闭连接。""" |
| 80 | + if self.process.stdin: |
| 81 | + self.process.stdin.close() |
| 82 | + try: |
| 83 | + self.process.kill() |
| 84 | + except ProcessLookupError: |
| 85 | + pass |
| 86 | + |
| 87 | + |
| 88 | +@pytest.fixture |
| 89 | +async def mcp_client(): |
| 90 | + """启动 MCP Server 并返回客户端实例。""" |
| 91 | + # 使用 uv run 启动 autocode-mcp |
| 92 | + process = await asyncio.create_subprocess_exec( |
| 93 | + sys.executable, |
| 94 | + "-m", |
| 95 | + "autocode_mcp.server", |
| 96 | + stdin=asyncio.subprocess.PIPE, |
| 97 | + stdout=asyncio.subprocess.PIPE, |
| 98 | + stderr=asyncio.subprocess.PIPE, |
| 99 | + env={**os.environ, "PYTHONIOENCODING": "utf-8"}, |
| 100 | + ) |
| 101 | + |
| 102 | + client = MCPClient(process) |
| 103 | + |
| 104 | + try: |
| 105 | + yield client |
| 106 | + finally: |
| 107 | + await client.close() |
| 108 | + |
| 109 | + |
| 110 | +@pytest.mark.asyncio |
| 111 | +async def test_mcp_handshake(mcp_client: MCPClient): |
| 112 | + """测试 MCP 协议握手。""" |
| 113 | + result = await mcp_client.initialize() |
| 114 | + |
| 115 | + assert "protocolVersion" in result |
| 116 | + assert "serverInfo" in result |
| 117 | + assert result["serverInfo"]["name"] == "autocode-mcp" |
| 118 | + |
| 119 | + |
| 120 | +@pytest.mark.asyncio |
| 121 | +async def test_mcp_list_tools(mcp_client: MCPClient): |
| 122 | + """测试获取工具列表。""" |
| 123 | + await mcp_client.initialize() |
| 124 | + |
| 125 | + tools = await mcp_client.list_tools() |
| 126 | + |
| 127 | + # 验证有 15 个工具 |
| 128 | + assert len(tools) == 15 |
| 129 | + |
| 130 | + # 验证关键工具存在 |
| 131 | + tool_names = {t["name"] for t in tools} |
| 132 | + expected_tools = { |
| 133 | + "file_read", |
| 134 | + "file_save", |
| 135 | + "solution_build", |
| 136 | + "solution_run", |
| 137 | + "validator_build", |
| 138 | + "generator_build", |
| 139 | + "checker_build", |
| 140 | + "stress_test_run", |
| 141 | + "problem_create", |
| 142 | + "problem_generate_tests", |
| 143 | + } |
| 144 | + assert expected_tools.issubset(tool_names) |
| 145 | + |
| 146 | + |
| 147 | +@pytest.mark.asyncio |
| 148 | +async def test_mcp_call_file_read(mcp_client: MCPClient): |
| 149 | + """测试通过 MCP 调用 file_read 工具。""" |
| 150 | + await mcp_client.initialize() |
| 151 | + |
| 152 | + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f: |
| 153 | + f.write("hello world") |
| 154 | + temp_path = f.name |
| 155 | + |
| 156 | + try: |
| 157 | + result = await mcp_client.call_tool("file_read", {"path": temp_path}) |
| 158 | + |
| 159 | + # 验证返回结构 |
| 160 | + assert "content" in result |
| 161 | + assert not result.get("isError", True) |
| 162 | + |
| 163 | + # 验证 content 是列表且包含 TextContent |
| 164 | + content = result["content"] |
| 165 | + assert isinstance(content, list) |
| 166 | + assert len(content) == 1 |
| 167 | + assert content[0]["type"] == "text" |
| 168 | + |
| 169 | + # 验证文本内容是有效 JSON |
| 170 | + text = content[0]["text"] |
| 171 | + parsed = json.loads(text) |
| 172 | + assert parsed["success"] is True |
| 173 | + assert "data" in parsed |
| 174 | + assert parsed["data"]["content"] == "hello world" |
| 175 | + |
| 176 | + # 验证 structuredContent 存在 |
| 177 | + assert "structuredContent" in result |
| 178 | + assert result["structuredContent"]["success"] is True |
| 179 | + finally: |
| 180 | + os.unlink(temp_path) |
| 181 | + |
| 182 | + |
| 183 | +@pytest.mark.asyncio |
| 184 | +async def test_mcp_call_unknown_tool(mcp_client: MCPClient): |
| 185 | + """测试调用不存在的工具返回错误。""" |
| 186 | + await mcp_client.initialize() |
| 187 | + |
| 188 | + result = await mcp_client.call_tool("nonexistent_tool", {}) |
| 189 | + |
| 190 | + assert result.get("isError") is True |
| 191 | + assert "Unknown tool" in result["content"][0]["text"] |
| 192 | + |
| 193 | + |
| 194 | +@pytest.mark.asyncio |
| 195 | +async def test_mcp_text_content_is_valid_json(mcp_client: MCPClient): |
| 196 | + """测试 TextContent 的文本是有效 JSON(不是 Python repr)。""" |
| 197 | + await mcp_client.initialize() |
| 198 | + |
| 199 | + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f: |
| 200 | + f.write("test") |
| 201 | + temp_path = f.name |
| 202 | + |
| 203 | + try: |
| 204 | + result = await mcp_client.call_tool("file_read", {"path": temp_path}) |
| 205 | + |
| 206 | + text = result["content"][0]["text"] |
| 207 | + |
| 208 | + # 必须是有效 JSON |
| 209 | + parsed = json.loads(text) |
| 210 | + |
| 211 | + # 不能是 Python repr 格式(如 {'success': True}) |
| 212 | + # Python repr 使用单引号,JSON 使用双引号 |
| 213 | + assert "'" not in text # JSON 不使用单引号 |
| 214 | + assert parsed["success"] is True |
| 215 | + finally: |
| 216 | + os.unlink(temp_path) |
| 217 | + |
| 218 | + |
| 219 | +@pytest.mark.asyncio |
| 220 | +async def test_mcp_chinese_text_encoding(mcp_client: MCPClient): |
| 221 | + """测试中文文本编码正确处理。""" |
| 222 | + await mcp_client.initialize() |
| 223 | + |
| 224 | + chinese_content = "你好世界" |
| 225 | + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f: |
| 226 | + f.write(chinese_content) |
| 227 | + temp_path = f.name |
| 228 | + |
| 229 | + try: |
| 230 | + result = await mcp_client.call_tool("file_read", {"path": temp_path}) |
| 231 | + |
| 232 | + text = result["content"][0]["text"] |
| 233 | + parsed = json.loads(text) |
| 234 | + |
| 235 | + # 验证中文正确编码(ensure_ascii=False) |
| 236 | + assert parsed["data"]["content"] == chinese_content |
| 237 | + # 原始文本应该包含中文字符,不是 \uXXXX 转义 |
| 238 | + assert chinese_content in text |
| 239 | + finally: |
| 240 | + os.unlink(temp_path) |
0 commit comments