-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path__toolset_async_template.py
More file actions
229 lines (186 loc) · 7.65 KB
/
__toolset_async_template.py
File metadata and controls
229 lines (186 loc) · 7.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""ToolSet 资源类 / ToolSet Resource Class
提供工具集资源的面向对象封装和完整生命周期管理。
Provides object-oriented wrapper and complete lifecycle management for toolset resources.
"""
from typing import Any, Dict, Optional, Tuple
import pydash
from agentrun.utils.config import Config
from agentrun.utils.log import logger
from agentrun.utils.model import BaseModel
from .api.openapi import OpenAPI
from .model import (
MCPServerConfig,
SchemaType,
ToolInfo,
ToolSetSpec,
ToolSetStatus,
)
class ToolSet(BaseModel):
"""工具集资源 / ToolSet Resource
提供工具集的查询、调用等功能。
Provides query, invocation and other functionality for toolsets.
Attributes:
created_time: 创建时间 / Creation time
description: 描述 / Description
generation: 版本号 / Generation number
kind: 资源类型 / Resource kind
labels: 标签 / Labels
name: 工具集名称 / ToolSet name
spec: 规格配置 / Specification
status: 状态 / Status
uid: 唯一标识符 / Unique identifier
"""
created_time: Optional[str] = None
description: Optional[str] = None
generation: Optional[int] = None
kind: Optional[str] = None
labels: Optional[Dict[str, str]] = None
name: Optional[str] = None
spec: Optional[ToolSetSpec] = None
status: Optional[ToolSetStatus] = None
uid: Optional[str] = None
@classmethod
def __get_client(cls, config: Optional[Config] = None):
from .client import ToolSetClient
return ToolSetClient(config)
@classmethod
async def get_by_name_async(
cls, name: str, config: Optional[Config] = None
):
cli = cls.__get_client(config)
return await cli.get_async(name=name)
def type(self):
return SchemaType(pydash.get(self, "spec.tool_schema.type", ""))
def _get_openapi_auth_defaults(
self,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
headers: Dict[str, Any] = {}
query: Dict[str, Any] = {}
auth_config = pydash.get(self, "spec.auth_config", None)
auth_type = getattr(auth_config, "type", None) if auth_config else None
if auth_type == "APIKey":
api_key_param = pydash.get(
auth_config,
"parameters.api_key_parameter",
None,
)
if api_key_param:
key = getattr(api_key_param, "key", None)
value = getattr(api_key_param, "value", None)
location = getattr(api_key_param, "in_", None)
if key and value is not None:
if location == "header":
headers[key] = value
elif location == "query":
query[key] = value
return headers, query
def _get_openapi_base_url(self) -> Optional[str]:
import os
fc_region = os.getenv("FC_REGION")
if fc_region:
intranet_url: Optional[str] = pydash.get(
self, "status.outputs.urls.intranet_url", None
)
if intranet_url:
return intranet_url
return pydash.get(self, "status.outputs.urls.internet_url", None)
async def get_async(self, config: Optional[Config] = None):
if self.name is None:
raise ValueError("ToolSet name is required to get the ToolSet.")
result = await self.get_by_name_async(name=self.name, config=config)
return self.update_self(result)
async def list_tools_async(self, config: Optional[Config] = None):
"""异步获取工具列表,返回统一的 ToolInfo 列表"""
if self.type() == SchemaType.MCP:
mcp_tools = pydash.get(self, "status.outputs.tools", [])
return [ToolInfo.from_mcp_tool(tool) for tool in mcp_tools]
elif self.type() == SchemaType.OpenAPI:
# 直接使用 to_apiset 转换
apiset = self.to_apiset(config=config)
return apiset.tools()
return []
async def call_tool_async(
self,
name: str,
arguments: Optional[Dict[str, str]] = None,
config: Optional[Config] = None,
):
"""异步调用工具,统一使用 ApiSet 实现"""
apiset = self.to_apiset(config=config)
# 对于 OpenAPI,可能需要解析 operation name
if self.type() == SchemaType.OpenAPI:
# 尝试查找实际的 operation name
tool = apiset.get_tool(name)
if tool is None:
# 尝试通过 tool_id 映射查找
openapi_tools = (
pydash.get(self, "status.outputs.open_api_tools", []) or []
)
for tool_meta in openapi_tools:
if tool_meta is None:
continue
if hasattr(tool_meta, "model_dump"):
tool_meta = tool_meta.model_dump()
if not isinstance(tool_meta, dict):
continue
if tool_meta.get("tool_id") == name:
name = tool_meta.get("tool_name") or name
break
logger.debug("invoke tool %s with arguments %s", name, arguments)
result = await apiset.invoke_async(
name=name, arguments=arguments, config=config
)
logger.debug("invoke tool %s got result %s", name, result)
return result
def _get_mcp_url(self) -> str:
"""获取 MCP 工具的最佳 URL / Get the best URL for MCP tool
优先使用 agentrun-data 代理入口(支持 RAM 签名认证),
回退到 mcp_server_config.url(直连)。
Priority: agentrun-data proxy endpoint (with RAM auth) > mcp_server_config.url (direct).
"""
proxy_url = self._get_openapi_base_url()
if proxy_url:
return proxy_url
mcp_server_config: MCPServerConfig = pydash.get(
self, "status.outputs.mcp_server_config", None
)
if mcp_server_config and mcp_server_config.url:
return mcp_server_config.url
raise ValueError("MCP server URL is missing.")
def to_apiset(self, config: Optional[Config] = None):
"""将 ToolSet 转换为统一的 ApiSet 对象
Returns:
ApiSet: 统一的工具集接口
"""
from .api.openapi import ApiSet
if self.type() == SchemaType.MCP:
from .api.mcp import MCPToolSet
mcp_server_config: MCPServerConfig = pydash.get(
self, "status.outputs.mcp_server_config", None
)
mcp_url = self._get_mcp_url()
mcp_headers = (
mcp_server_config.headers if mcp_server_config else None
)
cfg = Config.with_configs(config, Config(headers=mcp_headers))
mcp_client = MCPToolSet(
url=mcp_url,
config=cfg,
)
# 获取 MCP tools
mcp_tools = pydash.get(self, "status.outputs.tools", [])
return ApiSet.from_mcp_tools(
tools=mcp_tools,
mcp_client=mcp_client,
config=cfg,
)
elif self.type() == SchemaType.OpenAPI:
headers, query = self._get_openapi_auth_defaults()
return ApiSet.from_openapi_schema(
schema=pydash.get(self, "spec.tool_schema.detail", None),
base_url=self._get_openapi_base_url(),
headers=headers,
query_params=query,
config=config,
)
raise ValueError(f"Unsupported ToolSet type: {self.type()}")