-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathmcp_server.py
More file actions
119 lines (95 loc) · 3.6 KB
/
mcp_server.py
File metadata and controls
119 lines (95 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""ZeroToken MCP Server - 入口点"""
import asyncio
import os
import time
from mcp.server import Server
from mcp.server.stdio import stdio_server
from zerotoken.repository.sqlite import (
new_connection,
SQLiteScriptRepo,
SQLiteTrajectoryRepo,
SQLiteSessionRepo,
SQLiteRuntimeRepo,
SQLiteFingerprintRepo,
SQLiteBindingRepo,
)
from zerotoken.services import BrowserService, TrajectoryService, ScriptService
from zerotoken.benchmark import BenchmarkRecorder
from handlers.browser_handlers import browser_tools, handle_browser_tool
from handlers.trajectory_handlers import trajectory_tools, handle_trajectory_tool
from handlers.script_handlers import script_tools, handle_script_tool
server = Server("zerotoken")
_db_conn = None
_browser_svc = None
_trajectory_svc = None
_script_svc = None
_benchmark_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "benchmarks")
_recorder = BenchmarkRecorder(output_dir=_benchmark_dir)
def init_services():
global _db_conn, _browser_svc, _trajectory_svc, _script_svc
if _db_conn is not None:
return
db_path = os.environ.get("ZEROTOKEN_DB") or os.path.join(
os.path.dirname(os.path.abspath(__file__)), "zerotoken.db"
)
_db_conn = new_connection(db_path)
fp_repo = SQLiteFingerprintRepo(_db_conn)
traj_repo = SQLiteTrajectoryRepo(_db_conn)
_browser_svc = BrowserService(fingerprint_repo=fp_repo)
_trajectory_svc = TrajectoryService(trajectory_repo=traj_repo)
_script_svc = ScriptService(
script_repo=SQLiteScriptRepo(_db_conn),
trajectory_repo=traj_repo,
session_repo=SQLiteSessionRepo(_db_conn),
runtime_repo=SQLiteRuntimeRepo(_db_conn),
binding_repo=SQLiteBindingRepo(_db_conn),
)
def get_recorder() -> BenchmarkRecorder:
"""返回模块级 BenchmarkRecorder 实例"""
return _recorder
@server.list_tools()
async def list_tools():
return browser_tools() + trajectory_tools() + script_tools()
async def dispatch(name: str, arguments: dict):
"""工具分发(经 handler -> service -> pipeline 完整路径)"""
if name.startswith("browser_"):
return await handle_browser_tool(name, arguments, _browser_svc, _trajectory_svc)
elif name.startswith("trajectory_"):
return await handle_trajectory_tool(name, arguments, _trajectory_svc)
else:
return await handle_script_tool(name, arguments, _script_svc, browser_svc=_browser_svc)
@server.call_tool()
async def call_tool(name: str, arguments: dict):
init_services()
start = time.monotonic()
result = None
error = None
try:
result = await dispatch(name, arguments)
return result
except Exception as e:
error = e
raise
finally:
duration_ms = (time.monotonic() - start) * 1000
_recorder.record(name, arguments, result, duration_ms, error)
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
def run(transport: str = "stdio"):
"""入口:支持 stdio 和 streamable-http 两种传输"""
if transport == "streamable-http":
from mcp_server_http import run as run_http
run_http()
else:
asyncio.run(main())
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="ZeroToken MCP Server")
parser.add_argument(
"--transport",
choices=["stdio", "streamable-http"],
default=os.environ.get("ZEROTOKEN_MCP_TRANSPORT", "stdio"),
)
args = parser.parse_args()
run(transport=args.transport)