Skip to content

Commit dd967fc

Browse files
author
yinsu.zs
committed
优化状态轮询性能和可观测性
Change-Id: Ia021faa5fdde460c3df814ca9f7a6c7ce4e29066
1 parent 28553c8 commit dd967fc

8 files changed

Lines changed: 91 additions & 382 deletions

File tree

src/code/agent/services/gateway/__init__.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
from .gateways import CpuGatewayService, HistoryGatewayService
2-
from .status import (
3-
StatusStorageService,
4-
StatusPoller,
5-
get_status_storage_service
6-
)
2+
from .status import StatusPoller
73
from .queue import (
84
TaskStatus,
95
TaskRequest,
@@ -14,9 +10,7 @@
1410
__all__ = [
1511
'CpuGatewayService',
1612
'HistoryGatewayService',
17-
'StatusStorageService',
1813
'StatusPoller',
19-
'get_status_storage_service',
2014
'TaskStatus',
2115
'TaskQueue',
2216
'get_task_queue'

src/code/agent/services/gateway/gateways/history_gateway.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ def _convert_status_to_history_item(self, status_data, task_id, file_mtime=None)
286286
data = final_result.get("data", {})
287287
prompt_id = data.get("prompt_id", task_id)
288288
results = data.get("results", [])
289+
execution_time = data.get("execution_time") # 获取执行时间
289290

290291
# 构造 ComfyUI 兼容的输出格式
291292
outputs = {}
@@ -325,15 +326,22 @@ def _convert_status_to_history_item(self, status_data, task_id, file_mtime=None)
325326
[] # outputs_to_execute - 要执行的输出节点
326327
]
327328

329+
# 构造 status 对象,包含执行时间(如果有)
330+
status_obj = {
331+
"status_str": "success",
332+
"completed": True,
333+
"messages": []
334+
}
335+
336+
# 添加执行时间到 status 中(与原生 ComfyUI 兼容)
337+
if execution_time is not None:
338+
status_obj["execution_time"] = execution_time
339+
328340
return {
329341
prompt_id: {
330342
"prompt": prompt_structure,
331343
"outputs": outputs,
332-
"status": {
333-
"status_str": "success",
334-
"completed": True,
335-
"messages": []
336-
}
344+
"status": status_obj
337345
}
338346
}
339347

src/code/agent/services/gateway/queue/task_queue.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,18 +217,15 @@ def _start_status_polling(self, task_id: str):
217217
为任务启动状态轮询
218218
"""
219219
try:
220-
from services.gateway.status import StatusPoller, get_status_storage_service
220+
from services.gateway.status import StatusPoller
221221

222222
with self._poller_lock:
223223
if task_id in self._status_pollers:
224224
log("DEBUG", f"Task {task_id} already has status poller")
225225
return
226226

227-
storage_service = get_status_storage_service()
228-
229227
poller = StatusPoller(
230228
task_id=task_id,
231-
storage_service=storage_service,
232229
poll_interval=0.5
233230
)
234231

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
from .storage import StatusStorageService, get_status_storage_service
21
from .poller import StatusPoller
32

43
__all__ = [
5-
'StatusStorageService',
64
'StatusPoller',
7-
'get_status_storage_service'
85
]
96

src/code/agent/services/gateway/status/poller.py

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import threading
88
from typing import Optional, Callable, Any
99

10-
from .storage import StatusStorageService
1110
from utils.logger import log
1211

1312

@@ -17,21 +16,20 @@ class StatusPoller:
1716
用于定期从存储中轮询工作流执行状态
1817
"""
1918

20-
def __init__(self, task_id: str, storage_service: StatusStorageService, poll_interval: float = 1.0):
19+
def __init__(self, task_id: str, poll_interval: float = 1.0):
2120
"""
2221
初始化状态轮询器
2322
2423
Args:
2524
task_id: 要轮询的任务ID
26-
storage_service: 状态存储服务实例
2725
poll_interval: 轮询间隔,单位秒,默认1.0秒
2826
"""
2927
self.task_id = task_id
30-
self.storage_service = storage_service
3128
self.poll_interval = poll_interval
3229
self.is_running = False
3330
self.thread: Optional[threading.Thread] = None
3431
self.on_status_update: Optional[Callable[[str, Any], None]] = None
32+
self.last_status_count = 0 # 记录已处理的状态数量
3533

3634
def set_status_callback(self, callback: Callable[[str, Any], None]):
3735
"""
@@ -64,7 +62,7 @@ def stop(self):
6462
log("INFO", f"Stopped polling for task {self.task_id}")
6563

6664
def _poll_loop(self):
67-
"""轮询循环逻辑(使用增量文件读取)"""
65+
"""轮询循环逻辑(通过serverless_api接口查询状态)"""
6866
# 是否已找到任务完成状态
6967
task_completed = False
7068

@@ -73,13 +71,16 @@ def _poll_loop(self):
7371
# 时间监控:轮询周期开始
7472
poll_start = time.time()
7573

76-
# 使用增量文件读取,仅获取新增状态
74+
# 通过serverless_api接口读取状态
7775
read_start = time.time()
78-
new_statuses = self.storage_service.get_status_incremental(self.task_id)
76+
all_statuses = self._get_status_from_api()
7977
read_cost = (time.time() - read_start) * 1000
8078

79+
# 只处理新增的状态
80+
new_statuses = all_statuses[self.last_status_count:]
81+
8182
if new_statuses:
82-
log("INFO", f"[Perf][{self.task_id}] File read cost: {read_cost:.1f}ms, got {len(new_statuses)} statuses")
83+
log("INFO", f"[Perf][{self.task_id}] File read cost: {read_cost:.1f}ms, got {len(new_statuses)} new statuses (total: {len(all_statuses)})")
8384

8485
# 调用回调函数处理新状态
8586
callback_start = time.time()
@@ -90,6 +91,9 @@ def _poll_loop(self):
9091
# 默认输出到控制台
9192
log("DEBUG", f"Task {self.task_id} status update: {json.dumps(status, ensure_ascii=False)}")
9293

94+
# 更新已处理计数
95+
self.last_status_count += 1
96+
9397
# 检查是否为任务完成状态
9498
if self._is_status_completed(status):
9599
task_completed = True
@@ -110,11 +114,23 @@ def _poll_loop(self):
110114

111115
self.is_running = False
112116

113-
# 清理状态读取缓存
117+
# 不再需要清理缓存(因为不使用增量读取缓存)
118+
log("DEBUG", f"Polling stopped for task {self.task_id}, processed {self.last_status_count} statuses")
119+
120+
def _get_status_from_api(self):
121+
"""
122+
通过调用serverless_api接口获取状态
123+
124+
Returns:
125+
list: 状态列表
126+
"""
114127
try:
115-
self.storage_service.cleanup_cache(self.task_id)
128+
from services.serverlessapi.serverless_api_service import ServerlessApiService
129+
service = ServerlessApiService()
130+
return service.get_status_from_store(self.task_id)
116131
except Exception as e:
117-
log("WARNING", f"Error cleaning up cache for task {self.task_id}: {e}")
132+
log("ERROR", f"Failed to get status from serverless_api for task {self.task_id}: {e}")
133+
return []
118134

119135
def _is_status_completed(self, status: dict) -> bool:
120136
"""

0 commit comments

Comments
 (0)