Skip to content

Commit 82c465d

Browse files
author
yinsu.zs
committed
refactor
Change-Id: Ibe8ce0c54f633092ff8dd63ee6044e4ea0f61fc6
1 parent 28553c8 commit 82c465d

41 files changed

Lines changed: 6105 additions & 2442 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/code/agent/.DS_Store

6 KB
Binary file not shown.

src/code/agent/constants.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,19 @@
112112
# GPU 函数的 URL,当 COMFYUI_MODE="cpu" 时使用
113113
GPU_FUNCTION_URL = os.getenv("GPU_FUNCTION_URL", "")
114114

115+
# HTTP Header 常量
116+
HEADER_SNAPSHOT_NAME = "X-FunArt-Snapshot-Name"
117+
HEADER_FORWARDED_BY = "X-FunArt-Forwarded-By"
118+
HEADER_FC_INVOCATION_TYPE = "X-FC-Invocation-Type"
119+
115120
# 日志配置
116121
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
117122

123+
# 队列存储配置
124+
QUEUE_STORAGE_TYPE = os.getenv('QUEUE_STORAGE_TYPE', 'memory') # memory | nas
125+
QUEUE_NAS_DIR = f'{MNT_DIR}/task/queue' # 队列 NAS 存储目录
126+
QUEUE_LOCK_TIMEOUT = int(os.getenv('QUEUE_LOCK_TIMEOUT', '5')) # 队列锁超时(秒)
127+
118128
class ERROR_CODE(Enum):
119129
UNCLASSIFY = "UNCLASSIFY"
120130
INVALID_PARAMS = "INVALID_PARAMS"

src/code/agent/routes/cpu_routes.py

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from flask import Blueprint, Flask, jsonify, request
77
from flask_sock import Sock
88

9+
import constants
910
from services.management_service import ManagementService, BackendStatus
1011
from utils.logger import log
1112
from services.gateway import (
@@ -77,36 +78,33 @@ def comfyui_compatible_ws(ws):
7778
"""
7879
try:
7980
# 从查询参数获取 clientId(ComfyUI 前端重连时会传递)
80-
from flask import request as flask_request
81-
client_id = flask_request.args.get('clientId', '')
81+
client_id = request.args.get('clientId', '')
8282

8383
if client_id:
8484
# 复用已有的 client_id(重连场景)
8585
log("INFO", f"WebSocket reconnecting with existing client_id: {client_id}")
8686
else:
8787
# 生成新的 client_id(首次连接)
88-
client_id = f"cpu_client_{int(time.time() * 1000)}"
88+
client_id = f"funart_client_{int(time.time() * 1000)}"
8989
log("INFO", f"New ComfyUI WebSocket connection with client_id: {client_id}")
9090

9191
# 添加连接到管理器
9292
ws_manager.add_connection(ws)
9393

94-
# 发送初始状态消息(模拟ComfyUI原生行为)
95-
try:
96-
ws.send(json.dumps({
97-
"type": "status",
98-
"data": {
99-
"sid": client_id,
100-
"status": {
101-
"exec_info": {
102-
"queue_remaining": get_task_queue()._get_pending_task_count()
103-
}
94+
# 通过消息队列发送初始状态消息,保证线程安全
95+
initial_status = {
96+
"type": "status",
97+
"data": {
98+
"sid": client_id,
99+
"status": {
100+
"exec_info": {
101+
"queue_remaining": get_task_queue().get_running_task_count()
104102
}
105103
}
106-
}))
107-
except Exception as e:
108-
log("ERROR", f"Failed to send initial status: {e}")
109-
return
104+
}
105+
}
106+
# 使用 _send_sync 发送初始状态
107+
ws_manager._send_sync(ws, initial_status)
110108

111109
# 设置客户端ID,用于后续关联任务
112110
setattr(ws, '_comfyui_client_id', client_id)
@@ -116,21 +114,19 @@ def comfyui_compatible_ws(ws):
116114

117115
# 如果是重连,重新订阅该客户端的所有进行中的任务
118116
ws_manager.resubscribe_client_tasks(ws, client_id)
119-
120-
# TODO 可能是多余的
117+
121118
while True:
122119
try:
123120
message = ws.receive()
124121
log("DEBUG", f"Received message from ComfyUI frontend: {message[:100]}...")
125-
122+
126123
except Exception as e:
127124
error_str = str(e)
128125
if "Connection closed" in error_str or "closed" in error_str.lower():
129126
log("INFO", f"Connection closed by client")
130127
break
131128
log("ERROR", f"Error receiving message: {e}\n{traceback.format_exc()}")
132129
break
133-
134130
except Exception as e:
135131
log("ERROR", f"Connection error: {e}\n{traceback.format_exc()}")
136132
finally:
@@ -202,7 +198,7 @@ def handle_serverless_run():
202198
203199
调用方式:
204200
- 默认: 异步调用(与 /api/prompt 处理一致)
205-
- Header X-Art-Invocation-Type: Sync 时: 同步调用,等待GPU返回结果
201+
- Header X-FC-Invocation-Type: Sync 时: 同步调用,等待GPU返回结果
206202
207203
异步模式:
208204
- 将请求转发到GPU函数(异步调用)
@@ -221,12 +217,12 @@ def handle_serverless_run():
221217
try:
222218
gateway_service = CpuGatewayService()
223219

224-
# 检查调用类型:Header X-Art-Invocation-Type: Sync 表示同步调用
225-
invocation_type = request.headers.get("X-Art-Invocation-Type", "").strip()
220+
# TODO: 支持X-FC-Invocation-Type透传到Runtime,当前默认使用异步
221+
invocation_type = request.headers.get(constants.HEADER_FC_INVOCATION_TYPE, "Async").strip()
226222
is_sync = invocation_type.lower() == "sync"
227223

228224
if is_sync:
229-
log("DEBUG", f"Processing /serverless/run in SYNC mode (X-Art-Invocation-Type: Sync)")
225+
log("DEBUG", f"Processing /serverless/run in SYNC mode (X-FC-Invocation-Type: Sync)")
230226
return gateway_service.handle_serverless_run_sync()
231227
else:
232228
log("DEBUG", f"Processing /serverless/run in ASYNC mode (default)")
@@ -267,10 +263,9 @@ def handle_history(subpath=""):
267263
}), 500
268264

269265
def _register_userdata_handler(self):
270-
"""在 prod 模式下,阻止保存 userdata 文件"""
271266
@self.bp.route("/userdata/<path:file>", methods=["POST"])
272267
def block_userdata_save(file):
273-
log("WARN", f"Attempt to save userdata blocked in prod mode: {file}")
268+
log("INFO", f"Disable Saving Userdata in prod mode: {file}")
274269
return jsonify({
275270
"error": {
276271
"type": "forbidden",

src/code/agent/routes/routes.py

Lines changed: 129 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
2+
import logging
23
import os
4+
import threading
35
import traceback
46

57
from flask import Flask, jsonify, request, Response
@@ -9,6 +11,7 @@
911
import constants
1012
from exceptions.exceptions import CustomError
1113
from services.management_service import ManagementService, Action, BackendStatus
14+
from utils.logger import log
1215
from .management_routes import ManagementRoutes
1316
from .serverless_api_routes import ServerlessApiRoutes
1417
from .cpu_routes import CpuRoutes
@@ -19,16 +22,17 @@ class Routes:
1922
def __init__(self):
2023
self.app = Flask(__name__)
2124
self._sock = Sock(self.app)
25+
# 重启锁,防止并发重启
26+
self._reboot_lock = threading.Lock()
2227
self.setup_routes()
23-
import logging
24-
log = logging.getLogger('werkzeug')
25-
log.setLevel(logging.ERROR)
28+
# 设置 Werkzeug 日志级别为 ERROR,只显示错误日志,不输出每个请求
29+
logging.getLogger('werkzeug').setLevel(logging.ERROR)
2630

2731

2832
def setup_routes(self):
2933
# 管控API
30-
management = ManagementRoutes()
31-
management.register(self.app)
34+
self.management = ManagementRoutes()
35+
self.management.register(self.app)
3236

3337
# ServerlessAPI
3438
if constants.BACKEND_TYPE == constants.TYPE_COMFYUI:
@@ -39,11 +43,118 @@ def setup_routes(self):
3943
cpu_router = CpuRoutes()
4044
cpu_router.register(self.app)
4145

46+
@self.app.route("/api/manager/reboot", methods=["GET", "POST"])
47+
def manager_reboot():
48+
"""
49+
拦截 ComfyUI-Manager 的 reboot 请求,使用管控接口实现重启
50+
支持从 CPU 函数触发:
51+
- X-FunArt-Snapshot-Name header: 指定要使用的 snapshot 名称
52+
- X-Forwarded-By header: 标识请求来源
53+
54+
重启是异步的,立即返回响应,通过/management/status接口查询重启状态
55+
"""
56+
# 检查是否已有重启在进行中
57+
if not self._reboot_lock.acquire(blocking=False):
58+
log("WARNING", "Reboot request rejected: reboot already in progress")
59+
return jsonify({
60+
"status": "failed",
61+
"message": "Reboot already in progress, please wait"
62+
}), 409
63+
64+
# 优先从 header 中获取 snapshot 名称(CPU 函数传递)
65+
snapshot_from_header = request.headers.get(constants.HEADER_SNAPSHOT_NAME)
66+
forwarded_by = request.headers.get(constants.HEADER_FORWARDED_BY, 'Direct')
67+
68+
service = ManagementService()
69+
current_snapshot = service.cur_snapshot_name or 'latest-dev'
70+
71+
log("INFO", f"Intercepted /api/manager/reboot request from {forwarded_by}, current snapshot: {current_snapshot}")
72+
73+
# 异步执行重启逻辑
74+
def do_reboot():
75+
try:
76+
# 如果是 CPU 函数触发且指定了 snapshot,直接使用,不再保存
77+
if snapshot_from_header:
78+
log("INFO", f"Using snapshot from CPU function: {snapshot_from_header}")
79+
snapshot_to_load = snapshot_from_header
80+
skip_save = True
81+
else:
82+
snapshot_to_load = current_snapshot
83+
skip_save = False
84+
85+
# 若最近一次管控操作为Start 且未指定 snapshot,则在重启前保存工作空间
86+
# TODO: 是否会影响pre-stop逻辑?
87+
if not skip_save and service.latest_action and service.latest_action == Action.START:
88+
try:
89+
from services.workspace.snapshot_manager import SnapshotManager
90+
log("INFO", "Saving workspace before reboot...")
91+
result_map = service.save(SnapshotManager.TYPE_DEV)
92+
log("INFO", f"Save result before reboot: {json.dumps(result_map, indent=2)}")
93+
94+
# 使用新保存的 snapshot 名称
95+
if 'snapshot' in result_map:
96+
snapshot_to_load = result_map['snapshot']
97+
log("INFO", f"Will restart with newly saved snapshot: {snapshot_to_load}")
98+
except Exception as e:
99+
log("WARNING", f"Failed to save workspace before reboot: {str(e)}")
100+
else:
101+
log("INFO", "Skip saving workspace (latest_action is not START)")
102+
103+
# 停止服务
104+
try:
105+
service.stop()
106+
except Exception as e:
107+
log("WARNING", f"Error during stop: {e}")
108+
109+
# 如果是 CPU 模式,异步触发 GPU 函数的重启
110+
if constants.COMFYUI_MODE == 'cpu' and constants.GPU_FUNCTION_URL:
111+
def trigger_gpu_reboot():
112+
try:
113+
gpu_reboot_url = f"{constants.GPU_FUNCTION_URL.rstrip('/')}/api/manager/reboot"
114+
log("INFO", f"Triggering GPU function reboot: {gpu_reboot_url}")
115+
116+
gpu_headers = {
117+
constants.HEADER_SNAPSHOT_NAME: snapshot_to_load,
118+
constants.HEADER_FORWARDED_BY: 'CPU-Reboot-Trigger',
119+
constants.HEADER_FC_INVOCATION_TYPE: 'Async'
120+
}
121+
122+
gpu_resp = requests.post(gpu_reboot_url, headers=gpu_headers, timeout=10)
123+
log("INFO", f"GPU function reboot triggered: status={gpu_resp.status_code}")
124+
except requests.exceptions.Timeout:
125+
log("WARNING", "GPU function reboot request timed out (expected for async call)")
126+
except Exception as e:
127+
log("WARNING", f"Failed to trigger GPU function reboot: {str(e)}")
128+
129+
# 触发 GPU 重启
130+
threading.Thread(target=trigger_gpu_reboot, daemon=True).start()
131+
132+
# 重新启动本地服务(不安装依赖),使用新保存的 snapshot
133+
log("INFO", f"Restarting local service with snapshot: {snapshot_to_load}")
134+
service.start(snapshot_to_load, nodes_map=service.SKIP_INSTALL_SENTINEL)
135+
log("INFO", "Reboot completed successfully")
136+
137+
except Exception as e:
138+
error_msg = f"Failed to restart ComfyUI: {str(e)}"
139+
log("ERROR", f"{error_msg}\nStacktrace:\n{traceback.format_exc()}")
140+
finally:
141+
# 释放重启锁
142+
self._reboot_lock.release()
143+
144+
# 在后台线程中执行重启
145+
threading.Thread(target=do_reboot, daemon=True).start()
146+
147+
# 立即返回响应
148+
return jsonify({
149+
"status": "success",
150+
"message": "Reboot request accepted, restarting in background"
151+
}), 202
152+
42153
@self.app.route("/initialize", methods=["POST"])
43154
def initialize():
44155
# See FC docs for all the HTTP headers: https://www.alibabacloud.com/help/doc-detail/132044.htm#common-headers
45156
request_id = request.headers.get("x-fc-request-id", "")
46-
print("FC Initialize Start RequestId: " + request_id)
157+
log("INFO", f"FC Initialize Start RequestId: {request_id}")
47158

48159
# Use the following code to get temporary credentials
49160
# access_key_id = request.headers['x-fc-access-key-id']
@@ -56,7 +167,7 @@ def initialize():
56167

57168
# 使用环境变量指定的snapshot,默认为latest-dev
58169
snapshot_name = os.environ.get('AUTO_LAUNCH_SNAPSHOT_NAME', 'latest-dev')
59-
print(f"Initializing function with ComfyUI mode: {constants.COMFYUI_MODE}, snapshot: {snapshot_name}")
170+
log("INFO", f"Initializing function with ComfyUI mode: {constants.COMFYUI_MODE}, snapshot: {snapshot_name}")
60171
service.start(snapshot_name, nodes_map={})
61172

62173
if (
@@ -65,22 +176,22 @@ def initialize():
65176
and constants.COMFYUI_MODE == "gpu"
66177
):
67178
try:
68-
print("prewarm models")
179+
log("INFO", "prewarm models")
69180
prompt = json.loads(constants.PREWARM_PROMPT)
70181
api = ServerlessApiService()
71182
api.run(prompt)
72183
api.api_clear_history()
73-
print("prewarm models done")
184+
log("INFO", "prewarm models done")
74185
except Exception as e:
75-
print(f"prewarm models got exception:\n{e}")
186+
log("ERROR", f"prewarm models got exception:\n{e}")
76187

77-
print("FC Initialize End RequestId: " + request_id)
188+
log("INFO", f"FC Initialize End RequestId: {request_id}")
78189
return "Function is initialized, request_id: " + request_id + "\n"
79190

80191
@self.app.route("/pre-stop", methods=["GET"])
81192
def pre_stop():
82193
request_id = request.headers.get("x-fc-request-id", "")
83-
print("FC PreStop Start RequestId: " + request_id)
194+
log("INFO", f"FC PreStop Start RequestId: {request_id}")
84195

85196
service = ManagementService() # singleton
86197
# 若最近一次管控操作为Start,且实例非预期销毁时,需要在pre-stop中保存工作空间从而兜底;
@@ -89,18 +200,18 @@ def pre_stop():
89200
try:
90201
from services.workspace.snapshot_manager import SnapshotManager
91202
result_map = service.save(SnapshotManager.TYPE_DEV)
92-
print(f"save resp when preStop: {json.dumps(result_map, indent=2)}")
203+
log("INFO", f"save resp when preStop: {json.dumps(result_map, indent=2)}")
93204
except Exception as e:
94-
print(f"error occur when preStop: {str(e)}")
205+
log("ERROR", f"error occur when preStop: {str(e)}")
95206
else:
96-
print("Do nothing in pre-stop")
97-
print("FC PreStop End RequestId: " + request_id)
207+
log("INFO", "Do nothing in pre-stop")
208+
log("INFO", f"FC PreStop End RequestId: {request_id}")
98209
return "OK"
99210

100211
@self.app.route("/<path:path>", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
101212
@self.app.route("/", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
102213
def proxy(path=""):
103-
backend_status = management.service.status
214+
backend_status = self.management.service.status
104215
if backend_status not in (BackendStatus.RUNNING, BackendStatus.SAVING):
105216
return jsonify({
106217
"status": "failed",
@@ -146,7 +257,7 @@ def handle_base_error(error):
146257

147258
def _handle_exception(e):
148259
err_msg = traceback.format_exc()
149-
print(f"{str(e)}\nStacktrace:\n{err_msg}")
260+
log("ERROR", f"{str(e)}\nStacktrace:\n{err_msg}")
150261

151262
if isinstance(e, CustomError):
152263
# 处理自定义异常

0 commit comments

Comments
 (0)