Skip to content

Commit 29df3b8

Browse files
feat: support reverse mcp (#485)
1 parent 4e8462d commit 29df3b8

File tree

6 files changed

+386
-0
lines changed

6 files changed

+386
-0
lines changed

veadk/toolkits/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.

veadk/toolkits/apps/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import json
16+
17+
import httpx
18+
import websockets
19+
20+
from veadk.utils.logger import get_logger
21+
22+
logger = get_logger(__name__)
23+
24+
25+
class ClientWithReverseMCP:
26+
def __init__(self, ws_url: str, mcp_server_url: str, client_id: str):
27+
"""Start a client with reverse mcp,
28+
29+
Args:
30+
ws_url: The url of the websocket server (cloud). Like example.com:8000
31+
mcp_server_url: The url of the mcp server (local).
32+
"""
33+
self.ws_url = f"ws://{ws_url}/ws?id={client_id}"
34+
self.mcp_server_url = mcp_server_url
35+
36+
# set timeout for httpx client
37+
httpx.Timeout(
38+
connect=10.0,
39+
read=None,
40+
write=10.0,
41+
pool=10.0,
42+
)
43+
44+
async def start(self):
45+
async with httpx.AsyncClient(base_url=self.mcp_server_url) as http:
46+
async with websockets.connect(self.ws_url) as ws:
47+
logger.info(f"Connected to cloud {self.ws_url}")
48+
49+
async for raw in ws:
50+
msg = json.loads(raw)
51+
if msg["type"] != "http_request":
52+
continue
53+
54+
req = msg["payload"]
55+
56+
logger.info(f"--- Recv {req} ---")
57+
58+
if (
59+
req["method"] == "GET"
60+
and "text/event-stream" in req["headers"]["accept"]
61+
):
62+
logger.info("Use streamable request")
63+
# streamable request
64+
65+
async with http.stream(
66+
method=req["method"],
67+
url=req["path"],
68+
headers=req["headers"],
69+
content=req["body"],
70+
) as resp:
71+
reply = {
72+
"id": msg["id"],
73+
"type": "http_response",
74+
"payload": {
75+
"status": resp.status_code,
76+
"headers": dict(resp.headers),
77+
"body": "",
78+
},
79+
}
80+
await ws.send(json.dumps(reply))
81+
82+
if req["body"]:
83+
# if body is an empty string, it represents a subscription request, no need to iterate over chunks
84+
async for chunk in resp.aiter_bytes():
85+
if chunk:
86+
await ws.send(
87+
json.dumps(
88+
{
89+
"id": msg["id"],
90+
"type": "http_response_chunk",
91+
"payload": {
92+
"status": resp.status_code,
93+
"headers": dict(resp.headers),
94+
"body": chunk.decode(
95+
"utf-8",
96+
errors="ignore",
97+
),
98+
},
99+
}
100+
)
101+
)
102+
else:
103+
# non-streamable request
104+
logger.info("Use non-streamable request")
105+
resp = await http.request(
106+
method=req["method"],
107+
url=req["path"],
108+
headers=req["headers"],
109+
content=req["body"],
110+
)
111+
reply = {
112+
"id": msg["id"],
113+
"type": "http_response",
114+
"payload": {
115+
"status": resp.status_code,
116+
"headers": dict(resp.headers),
117+
"body": resp.text,
118+
},
119+
}
120+
await ws.send(json.dumps(reply))
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import json
17+
import uuid
18+
from typing import TYPE_CHECKING
19+
20+
from fastapi import FastAPI, Request, Response, WebSocket
21+
from google.adk.tools.mcp_tool.mcp_session_manager import (
22+
StreamableHTTPConnectionParams,
23+
)
24+
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
25+
from pydantic import BaseModel
26+
27+
from veadk import Runner
28+
from veadk.utils.logger import get_logger
29+
30+
if TYPE_CHECKING:
31+
from veadk import Agent
32+
33+
logger = get_logger(__name__)
34+
35+
REVERSE_MCP_HEADER_KEY = "X-Reverse-MCP-ID"
36+
37+
38+
class WebsocketSessionManager:
39+
def __init__(self):
40+
# ws id -> ws instance
41+
self.connections: dict[str, WebSocket] = {}
42+
43+
# ws id -> msg id -> ret
44+
self.pendings: dict[str, dict[str, asyncio.Future]] = {}
45+
46+
async def call_mcp_http(self, ws_id: str, request: dict):
47+
"""Forward MCP request to client."""
48+
try:
49+
ws = self.connections[ws_id]
50+
except KeyError:
51+
logger.error(f"Websocket {ws_id} not found")
52+
return b""
53+
54+
msg = {}
55+
56+
msg["id"] = str(uuid.uuid4())
57+
msg["type"] = "http_request"
58+
msg["payload"] = request
59+
60+
fut = asyncio.get_event_loop().create_future()
61+
62+
if ws_id not in self.pendings:
63+
self.pendings[ws_id] = {}
64+
65+
self.pendings[ws_id][msg["id"]] = fut
66+
67+
await ws.send_text(json.dumps(msg))
68+
return await fut
69+
70+
async def handle_ws_message(self, ws_id: str, raw: str):
71+
msg = json.loads(raw)
72+
if msg.get("type") != "http_response":
73+
return
74+
75+
req_id = msg["id"]
76+
fut = self.pendings[ws_id].pop(req_id, None)
77+
if fut:
78+
fut.set_result(msg)
79+
80+
81+
class ServerWithReverseMCP:
82+
"""Start a simplest agent server to support reverse mcp"""
83+
84+
def __init__(
85+
self,
86+
agent: "Agent",
87+
host: str = "0.0.0.0",
88+
port: int = 8000,
89+
):
90+
self.agent = agent
91+
92+
self.host = host
93+
self.port = port
94+
95+
self.app = FastAPI()
96+
# build routes for self.app
97+
self.build()
98+
99+
self.ws_session_mgr = WebsocketSessionManager()
100+
self.ws_agent_mgr: dict[str, "Agent"] = {}
101+
102+
def build(self):
103+
logger.info("Build routes for server with reverse mcp")
104+
105+
class InvokeRequest(BaseModel):
106+
"""Request model for /invoke endpoint"""
107+
108+
prompt: str
109+
app_name: str
110+
user_id: str
111+
session_id: str
112+
113+
websocket_id: str
114+
115+
class InvokeResponse(BaseModel):
116+
"""Response model for /invoke endpoint"""
117+
118+
response: str
119+
120+
# build agent invocation route
121+
@self.app.post("/invoke")
122+
async def invoke(payload: InvokeRequest) -> InvokeResponse:
123+
user_id = payload.user_id
124+
session_id = payload.session_id
125+
prompt = payload.prompt
126+
127+
agent = self.ws_agent_mgr[payload.websocket_id]
128+
129+
if not agent.tools:
130+
logger.debug("Mount fake MCPToolset to agent")
131+
132+
# we hard code the mcp url with `/mcp` to obey the mcp protocol
133+
agent.tools.append(
134+
MCPToolset(
135+
connection_params=StreamableHTTPConnectionParams(
136+
url=f"http://127.0.0.1:{self.port}/mcp",
137+
headers={REVERSE_MCP_HEADER_KEY: payload.websocket_id},
138+
),
139+
)
140+
)
141+
142+
runner = Runner(app_name=payload.app_name, agent=agent)
143+
response = await runner.run(
144+
messages=[prompt],
145+
user_id=user_id,
146+
session_id=session_id,
147+
)
148+
149+
return InvokeResponse(response=response)
150+
151+
# build websocket endpoint
152+
@self.app.websocket("/ws")
153+
async def ws_endpoint(ws: WebSocket):
154+
client_id = ws.query_params.get("id")
155+
if not client_id:
156+
await ws.close(
157+
code=400,
158+
reason="WebSocket `id` is required like `/ws?id=my_id`",
159+
)
160+
return
161+
162+
logger.info(f"Register websocket {client_id} to session manager.")
163+
self.ws_session_mgr.connections[client_id] = ws
164+
165+
logger.info(f"Fork agent for websocket {client_id}")
166+
self.ws_agent_mgr[client_id] = self.agent.clone()
167+
168+
await ws.accept()
169+
logger.info(f"Websocket {client_id} connected")
170+
171+
while True:
172+
raw = await ws.receive_text()
173+
await self.ws_session_mgr.handle_ws_message(client_id, raw)
174+
175+
# build the fake MPC server,
176+
# and intercept all requests to the client websocket client.
177+
@self.app.api_route("/{path:path}", methods=["GET", "POST"])
178+
async def mcp_proxy(path: str, request: Request):
179+
client_id = request.headers.get(REVERSE_MCP_HEADER_KEY)
180+
if not client_id:
181+
return Response("client id not found", status_code=400)
182+
183+
ws = self.ws_session_mgr.connections.get(client_id)
184+
if not ws:
185+
return Response("websocket `client_id` not connected", status_code=503)
186+
187+
body = await request.body()
188+
headers = dict(request.headers)
189+
method = request.method
190+
path = f"/{path}"
191+
192+
payload = {
193+
"method": method,
194+
"path": path,
195+
"headers": headers,
196+
"body": body.decode(),
197+
}
198+
199+
logger.debug(f"[Reverse mcp proxy] Request from agent: {payload}")
200+
201+
resp = await self.ws_session_mgr.call_mcp_http(client_id, payload)
202+
203+
logger.debug(f"[Reverse mcp proxy] Response from local: {resp}")
204+
205+
return Response(
206+
content=resp["payload"]["body"], # type: ignore
207+
status_code=resp["payload"]["status"], # type: ignore
208+
headers=resp["payload"]["headers"], # type: ignore
209+
)
210+
211+
def run(self):
212+
import uvicorn
213+
214+
uvicorn.run(self.app, host=self.host, port=self.port)

veadk/toolkits/audio/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.

0 commit comments

Comments
 (0)