-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathserver.py
More file actions
272 lines (219 loc) · 8.41 KB
/
server.py
File metadata and controls
272 lines (219 loc) · 8.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""AgentRun HTTP Server / AgentRun HTTP 服务器
基于 Router 的设计:
- 每个协议提供自己的 Router
- Server 负责挂载 Router 并管理路由前缀
- 支持多协议同时运行(OpenAI + AG-UI)
"""
from typing import Any, List, Optional, Sequence
from fastapi import FastAPI
import uvicorn
from agentrun.utils.log import logger
from .agui_protocol import AGUIProtocolHandler
from .invoker import AgentInvoker
from .model import ServerConfig
from .openai_protocol import OpenAIProtocolHandler
from .protocol import InvokeAgentHandler, ProtocolHandler
class AgentRunServer:
"""AgentRun HTTP Server / AgentRun HTTP 服务器
基于 Router 的架构:
- 每个协议提供完整的 FastAPI Router
- Server 只负责组装和前缀管理
- 易于扩展新协议
Example (最简单用法):
>>> def invoke_agent(request: AgentRequest):
... return "Hello, world!"
>>>
>>> server = AgentRunServer(invoke_agent=invoke_agent)
>>> server.start(port=8000)
# 可访问:
# POST http://localhost:8000/openai/v1/chat/completions (OpenAI)
# POST http://localhost:8000/agui/v1/run (AG-UI)
Example (流式输出):
>>> async def invoke_agent(request: AgentRequest):
... yield "Hello, "
... yield "world!"
>>>
>>> server = AgentRunServer(invoke_agent=invoke_agent)
>>> server.start(port=8000)
Example (使用事件):
>>> from agentrun.server import AgentResult, EventType
>>>
>>> async def invoke_agent(request: AgentRequest):
... yield AgentResult(
... event=EventType.STEP_STARTED,
... data={"step_name": "thinking"}
... )
... yield "I'm thinking..."
... yield AgentResult(
... event=EventType.STEP_FINISHED,
... data={"step_name": "thinking"}
... )
>>>
>>> server = AgentRunServer(invoke_agent=invoke_agent)
>>> server.start(port=8000)
Example (仅 OpenAI 协议):
>>> server = AgentRunServer(
... invoke_agent=invoke_agent,
... protocols=[OpenAIProtocolHandler()]
... )
>>> server.start(port=8000)
Example (集成到现有 FastAPI 应用):
>>> from fastapi import FastAPI
>>>
>>> app = FastAPI()
>>> agent_server = AgentRunServer(invoke_agent=invoke_agent)
>>> app.mount("/agent", agent_server.as_fastapi_app())
# 可访问: POST http://localhost:8000/agent/openai/v1/chat/completions
Example (配置 CORS):
>>> server = AgentRunServer(
... invoke_agent=invoke_agent,
... config=ServerConfig(cors_origins=["http://localhost:3000"])
... )
Example (启用会话历史记录):
>>> server = AgentRunServer(
... invoke_agent=invoke_agent,
... memory_collection_name="my-memory-collection"
... )
>>> server.start(port=8000)
# 会话历史将自动保存到 TableStore
"""
def __init__(
self,
invoke_agent: InvokeAgentHandler,
protocols: Optional[List[ProtocolHandler]] = None,
config: Optional[ServerConfig] = None,
memory_collection_name: Optional[str] = None,
):
"""初始化 AgentRun Server
Args:
invoke_agent: Agent 调用回调函数
- 可以是同步或异步函数
- 支持返回字符串或 AgentResult
- 支持使用 yield 进行流式输出
protocols: 协议处理器列表
- 默认使用 OpenAI + AG-UI 协议
- 可以添加自定义协议
config: 服务器配置
- cors_origins: CORS 允许的源列表
- openai: OpenAI 协议配置
- agui: AG-UI 协议配置
memory_collection_name: MemoryCollection 名称(可选)
- 如果提供,将自动启用会话历史记录功能
- 会话历史将保存到指定的 MemoryCollection 中
"""
self.app = FastAPI(title="AgentRun Server")
# 如果启用了 memory,包装 invoke_agent
if memory_collection_name:
invoke_agent = self._wrap_with_memory(
invoke_agent,
memory_collection_name,
)
self.agent_invoker = AgentInvoker(invoke_agent)
# 注册 health check 路由
self._register_health_check()
# 配置 CORS
self._setup_cors(config.cors_origins if config else None)
# 默认使用 OpenAI 和 AG-UI 协议
if protocols is None:
protocols = [
OpenAIProtocolHandler(config),
AGUIProtocolHandler(config),
]
# 挂载所有协议的 Router
self._mount_protocols(protocols)
def _register_health_check(self):
"""注册 /health 健康检查路由 / Register /health health check route"""
@self.app.get("/health")
async def health_check():
return {"status": "ok"}
def _wrap_with_memory(
self,
invoke_agent: InvokeAgentHandler,
memory_collection_name: str,
) -> InvokeAgentHandler:
"""使用 MemoryConversation 包装 invoke_agent
Args:
invoke_agent: 原始的 invoke_agent 函数
memory_collection_name: MemoryCollection 名称
Returns:
包装后的 invoke_agent 函数
"""
from agentrun.memory_collection import MemoryConversation
# 创建 MemoryConversation 实例
memory = MemoryConversation(
memory_collection_name=memory_collection_name,
)
logger.info(
"Memory integration enabled for collection:"
f" {memory_collection_name}"
)
# 包装 invoke_agent
async def wrapped_invoke_agent(request: Any):
async for event in memory.wrap_invoke_agent(request, invoke_agent):
yield event
return wrapped_invoke_agent
def _setup_cors(self, cors_origins: Optional[Sequence[str]] = None):
"""配置 CORS 中间件
Args:
cors_origins: 允许的源列表,默认为 ["*"] 允许所有源
"""
if not cors_origins:
return
from fastapi.middleware.cors import CORSMiddleware
origins = list(cors_origins) if cors_origins else ["*"]
self.app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)
logger.debug(f"CORS 已启用,允许的源: {origins}")
def _mount_protocols(self, protocols: List[ProtocolHandler]):
"""挂载所有协议的路由
Args:
protocols: 协议处理器列表
"""
for protocol in protocols:
# 获取协议的 Router
router = protocol.as_fastapi_router(self.agent_invoker)
# 使用协议定义的前缀
prefix = protocol.get_prefix()
# 挂载到主应用
self.app.include_router(router, prefix=prefix)
logger.debug(
f"已挂载协议: {protocol.__class__.__name__} ->"
f" {prefix or '(无前缀)'}"
)
def start(
self,
host: str = "0.0.0.0",
port: int = 9000,
log_level: str = "info",
**kwargs: Any,
):
"""启动 HTTP 服务器
Args:
host: 监听地址,默认 0.0.0.0
port: 监听端口,默认 9000
log_level: 日志级别,默认 info
**kwargs: 传递给 uvicorn.run 的其他参数
"""
logger.info(f"启动 AgentRun Server: http://{host}:{port}")
uvicorn.run(
self.app, host=host, port=port, log_level=log_level, **kwargs
)
def as_fastapi_app(self) -> FastAPI:
"""导出 FastAPI 应用
用于集成到现有的 FastAPI 项目中。
Returns:
FastAPI: FastAPI 应用实例
Example:
>>> from fastapi import FastAPI
>>>
>>> app = FastAPI()
>>> agent_server = AgentRunServer(invoke_agent=invoke_agent)
>>> app.mount("/agent", agent_server.as_fastapi_app())
"""
return self.app