Skip to content

Commit eda73b9

Browse files
OhYeeCopilot
andcommitted
feat(auth): implement RAM signature authentication for agentrun-data endpoints
adds comprehensive RAM signature authentication support for both OpenAPI and MCP tool invocations. introduces automatic URL rewriting to -ram endpoints when targeting agentrun-data domains, dynamic signature generation per request, and proper header injection. enhances error handling with detailed logging for HTTP status failures and removes deprecated debug functions. The implementation includes: - httpx Auth handler for dynamic RAM signature calculation per request - automatic URL rewriting from agentrun-data to -ram endpoints - enhanced MCP session with RAM authentication support - improved OpenAPI tool invocation with RAM signature injection - better error logging for failed requests - conditional intranet URL usage based on FC_REGION environment variable - unified MCP URL resolution with fallback mechanisms BREAKING CHANGE: replaces assertion-based error handling with ValueError exceptions in MCP URL resolution feat(auth): 为 agentrun-data 端点实现 RAM 签名认证 为 OpenAPI 和 MCP 工具调用添加了全面的 RAM 签名认证支持。引入了自动 URL 重写为 -ram 端点、动态签名生成和适当的头信息注入。增强了错误处理并提供详细的 HTTP 状态失败日志,移除了已弃用的调试函数。 实现包括: - 用于每个请求动态计算 RAM 签名的 httpx Auth 处理器 - 自动将 agentrun-data 重写为 -ram 端点的 URL - 支持 RAM 认证的增强 MCP 会话 - 具有 RAM 签名注入的改进 OpenAPI 工具调用 - 针对失败请求的改进错误日志 - 基于 FC_REGION 环境变量的条件内网 URL 使用 - 具有回退机制的统一 MCP URL 解析 重大变更:将 MCP URL 解析中的断言错误处理替换为 ValueError 异常 BREAKING CHANGE: replaces assertion-based error handling with ValueError exceptions in MCP URL resolution 重大变更: 在 MCP URL 解析中将基于断言的错误处理替换为 ValueError 异常 Change-Id: I415cb816f44f6eca78b7165b96b1fce8fd7d3404 Co-authored-by: Copilot <copilot@github.com> Signed-off-by: OhYee <oyohyee@oyohyee.com>
1 parent f1ae3a4 commit eda73b9

File tree

8 files changed

+425
-217
lines changed

8 files changed

+425
-217
lines changed

agentrun/toolset/__toolset_async_template.py

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,15 @@ def _get_openapi_auth_defaults(
9494
return headers, query
9595

9696
def _get_openapi_base_url(self) -> Optional[str]:
97-
intranet_url: Optional[str] = pydash.get(
98-
self, "status.outputs.urls.intranet_url", None
99-
)
100-
if intranet_url:
101-
return intranet_url
97+
import os
98+
99+
fc_region = os.getenv("FC_REGION")
100+
if fc_region:
101+
intranet_url: Optional[str] = pydash.get(
102+
self, "status.outputs.urls.intranet_url", None
103+
)
104+
if intranet_url:
105+
return intranet_url
102106

103107
return pydash.get(self, "status.outputs.urls.internet_url", None)
104108

@@ -156,6 +160,25 @@ async def call_tool_async(
156160
logger.debug("invoke tool %s got result %s", name, result)
157161
return result
158162

163+
def _get_mcp_url(self) -> str:
164+
"""获取 MCP 工具的最佳 URL / Get the best URL for MCP tool
165+
166+
优先使用 agentrun-data 代理入口(支持 RAM 签名认证),
167+
回退到 mcp_server_config.url(直连)。
168+
Priority: agentrun-data proxy endpoint (with RAM auth) > mcp_server_config.url (direct).
169+
"""
170+
proxy_url = self._get_openapi_base_url()
171+
if proxy_url:
172+
return proxy_url
173+
174+
mcp_server_config: MCPServerConfig = pydash.get(
175+
self, "status.outputs.mcp_server_config", None
176+
)
177+
if mcp_server_config and mcp_server_config.url:
178+
return mcp_server_config.url
179+
180+
raise ValueError("MCP server URL is missing.")
181+
159182
def to_apiset(self, config: Optional[Config] = None):
160183
"""将 ToolSet 转换为统一的 ApiSet 对象
161184
@@ -170,16 +193,16 @@ def to_apiset(self, config: Optional[Config] = None):
170193
mcp_server_config: MCPServerConfig = pydash.get(
171194
self, "status.outputs.mcp_server_config", None
172195
)
173-
assert (
174-
mcp_server_config.url is not None
175-
), "MCP server URL is missing."
176196

177-
cfg = Config.with_configs(
178-
config, Config(headers=mcp_server_config.headers)
197+
mcp_url = self._get_mcp_url()
198+
199+
mcp_headers = (
200+
mcp_server_config.headers if mcp_server_config else None
179201
)
202+
cfg = Config.with_configs(config, Config(headers=mcp_headers))
180203

181204
mcp_client = MCPToolSet(
182-
url=mcp_server_config.url,
205+
url=mcp_url,
183206
config=cfg,
184207
)
185208

agentrun/toolset/api/mcp.py

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,88 @@
44
Handles tool invocations for MCP (Model Context Protocol).
55
"""
66

7-
from typing import Any, Dict, Optional
7+
from typing import Any, Dict, Generator, Optional
8+
from urllib.parse import urlparse, urlunparse
9+
10+
import httpx
811

912
from agentrun.utils.config import Config
1013
from agentrun.utils.log import logger
14+
from agentrun.utils.ram_signature import get_agentrun_signed_headers
15+
16+
17+
class _AgentrunRamAuth(httpx.Auth):
18+
"""httpx Auth handler:为每次请求动态生成 RAM 签名。
19+
20+
SSE 场景下同一个 httpx.AsyncClient 会发出 GET(SSE 连接)和
21+
POST(消息发送)请求,URL / method / body 各不相同,因此必须
22+
per-request 计算签名,不能在 client 初始化时一次性设置 headers。
23+
"""
24+
25+
def __init__(
26+
self,
27+
access_key_id: str,
28+
access_key_secret: str,
29+
region: str,
30+
security_token: Optional[str] = None,
31+
):
32+
self._ak = access_key_id
33+
self._sk = access_key_secret
34+
self._region = region
35+
self._security_token = security_token
36+
37+
def auth_flow(
38+
self, request: httpx.Request
39+
) -> Generator[httpx.Request, httpx.Response, None]:
40+
url = str(request.url)
41+
method = request.method
42+
43+
body: Optional[bytes] = None
44+
if request.content:
45+
body = request.content
46+
47+
content_type: Optional[str] = request.headers.get("content-type")
48+
49+
try:
50+
signed = get_agentrun_signed_headers(
51+
url=url,
52+
method=method,
53+
access_key_id=self._ak,
54+
access_key_secret=self._sk,
55+
security_token=self._security_token,
56+
region=self._region,
57+
product="agentrun",
58+
body=body,
59+
content_type=content_type,
60+
)
61+
for k, v in signed.items():
62+
request.headers[k] = v
63+
logger.debug(
64+
"applied RAM signature for MCP %s request to %s",
65+
method,
66+
url[:80] + ("..." if len(url) > 80 else ""),
67+
)
68+
except ValueError as e:
69+
logger.warning("RAM signing skipped for MCP request: %s", e)
70+
71+
yield request
72+
73+
74+
def _rewrite_to_ram_url(url: str) -> str:
75+
"""将 agentrun-data 域名改写为 -ram 端点。"""
76+
parsed = urlparse(url)
77+
parts = parsed.netloc.split(".", 1)
78+
if len(parts) == 2:
79+
ram_netloc = parts[0] + "-ram." + parts[1]
80+
return urlunparse((
81+
parsed.scheme,
82+
ram_netloc,
83+
parsed.path or "",
84+
parsed.params,
85+
parsed.query,
86+
parsed.fragment,
87+
))
88+
return url
1189

1290

1391
class MCPSession:
@@ -16,14 +94,46 @@ def __init__(self, url: str, config: Optional[Config] = None):
1694
self.url = url
1795
self.config = Config.with_configs(config)
1896

97+
def _build_ram_auth(self, url: str) -> tuple:
98+
"""当目标是 agentrun-data 域名时,改写 URL 并返回 httpx Auth handler。
99+
100+
Returns:
101+
(rewritten_url, auth_or_none)
102+
"""
103+
parsed = urlparse(url)
104+
if ".agentrun-data." not in (parsed.netloc or ""):
105+
return url, None
106+
107+
cfg = self.config
108+
ak = cfg.get_access_key_id()
109+
sk = cfg.get_access_key_secret()
110+
if not ak or not sk:
111+
return url, None
112+
113+
url = _rewrite_to_ram_url(url)
114+
115+
auth = _AgentrunRamAuth(
116+
access_key_id=ak,
117+
access_key_secret=sk,
118+
region=cfg.get_region_id(),
119+
security_token=cfg.get_security_token() or None,
120+
)
121+
return url, auth
122+
19123
async def __aenter__(self):
20124
from mcp import ClientSession
21125
from mcp.client.sse import sse_client
22126

23127
timeout = self.config.get_timeout()
128+
headers = self.config.get_headers()
129+
url = self.url
130+
131+
url, auth = self._build_ram_auth(url)
132+
24133
self.client = sse_client(
25-
url=self.url,
26-
headers=self.config.get_headers(),
134+
url=url,
135+
headers=headers,
136+
auth=auth,
27137
timeout=timeout if timeout else 60,
28138
)
29139
read, write = await self.client.__aenter__()

agentrun/toolset/api/openapi.py

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from copy import deepcopy
88
import json
99
from typing import Any, Dict, List, Optional, Tuple, Union
10+
from urllib.parse import urlparse, urlunparse
1011

1112
import httpx
1213
from pydash import get as pg
@@ -15,6 +16,7 @@
1516
from agentrun.utils.config import Config
1617
from agentrun.utils.log import logger
1718
from agentrun.utils.model import BaseModel
19+
from agentrun.utils.ram_signature import get_agentrun_signed_headers
1820

1921
from ..model import ToolInfo, ToolSchema
2022

@@ -830,11 +832,21 @@ def invoke_tool(
830832
name, arguments, config
831833
)
832834

833-
print(request_kwargs)
834835
with httpx.Client(timeout=timeout) as client:
835836
response = client.request(**request_kwargs)
836837
if raise_for_status:
837-
response.raise_for_status()
838+
try:
839+
response.raise_for_status()
840+
except httpx.HTTPStatusError:
841+
logger.error(
842+
"OpenAPI tool request failed: status=%s url=%s "
843+
"response_headers=%s response_body=%s",
844+
response.status_code,
845+
response.request.url,
846+
dict(response.headers),
847+
response.text[:2000],
848+
)
849+
raise
838850
return self._format_response(response)
839851

840852
async def invoke_tool_async(
@@ -850,7 +862,18 @@ async def invoke_tool_async(
850862
async with httpx.AsyncClient(timeout=timeout) as client:
851863
response = await client.request(**request_kwargs)
852864
if raise_for_status:
853-
response.raise_for_status()
865+
try:
866+
response.raise_for_status()
867+
except httpx.HTTPStatusError:
868+
logger.error(
869+
"OpenAPI tool request failed: status=%s url=%s "
870+
"response_headers=%s response_body=%s",
871+
response.status_code,
872+
response.request.url,
873+
dict(response.headers),
874+
response.text[:2000],
875+
)
876+
raise
854877
return self._format_response(response)
855878

856879
def _load_schema(self, schema: Any) -> Dict[str, Any]:
@@ -945,7 +968,6 @@ def _walk(node: Any):
945968

946969
def _extract_base_url(self, schema: Dict[str, Any]) -> Optional[str]:
947970
servers = schema.get("servers") or []
948-
print("======", servers)
949971
return self._pick_server_url(servers)
950972

951973
def _convert_to_native(self, value: Any) -> Any:
@@ -1177,8 +1199,66 @@ def _prepare_request(
11771199
args,
11781200
)
11791201

1202+
self._apply_ram_auth(request_kwargs, combined_config)
1203+
11801204
return request_kwargs, timeout, raise_for_status
11811205

1206+
def _apply_ram_auth(
1207+
self,
1208+
request_kwargs: Dict[str, Any],
1209+
config: Optional[Config],
1210+
) -> None:
1211+
"""当目标是 agentrun-data 域名时,自动注入 RAM 签名鉴权 headers 并改写为 -ram 端点。"""
1212+
url = request_kwargs.get("url", "")
1213+
parsed = urlparse(url)
1214+
if ".agentrun-data." not in (parsed.netloc or ""):
1215+
return
1216+
1217+
cfg = Config.with_configs(config)
1218+
ak = cfg.get_access_key_id()
1219+
sk = cfg.get_access_key_secret()
1220+
if not ak or not sk:
1221+
return
1222+
1223+
parts = parsed.netloc.split(".", 1)
1224+
if len(parts) == 2:
1225+
ram_netloc = parts[0] + "-ram." + parts[1]
1226+
url = urlunparse((
1227+
parsed.scheme,
1228+
ram_netloc,
1229+
parsed.path or "",
1230+
parsed.params,
1231+
parsed.query,
1232+
parsed.fragment,
1233+
))
1234+
request_kwargs["url"] = url
1235+
1236+
method = request_kwargs.get("method", "GET")
1237+
body_bytes: Optional[bytes] = None
1238+
json_body = request_kwargs.get("json")
1239+
if json_body is not None:
1240+
body_bytes = json.dumps(json_body).encode("utf-8")
1241+
1242+
try:
1243+
signed = get_agentrun_signed_headers(
1244+
url=url,
1245+
method=method,
1246+
access_key_id=ak,
1247+
access_key_secret=sk,
1248+
security_token=cfg.get_security_token() or None,
1249+
region=cfg.get_region_id(),
1250+
product="agentrun",
1251+
body=body_bytes,
1252+
)
1253+
existing_headers: Dict[str, str] = request_kwargs.get("headers", {})
1254+
request_kwargs["headers"] = {**signed, **existing_headers}
1255+
logger.debug(
1256+
"applied RAM signature for OpenAPI tool request to %s",
1257+
url[:80] + "..." if len(url) > 80 else url,
1258+
)
1259+
except ValueError as e:
1260+
logger.warning("RAM signing skipped for OpenAPI tool: %s", e)
1261+
11821262
def _format_response(self, response: httpx.Response) -> Dict[str, Any]:
11831263
try:
11841264
body = response.json()

0 commit comments

Comments
 (0)