Skip to content

Commit deba6cb

Browse files
committed
Fix streaming output updates
1 parent 0653d37 commit deba6cb

6 files changed

Lines changed: 499 additions & 16 deletions

File tree

channels/feishu_channel.py

Lines changed: 214 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import base64
1616
import json
1717
import threading
18+
import time
1819
from datetime import datetime, timedelta, timezone
1920
from pathlib import Path
2021
from typing import TYPE_CHECKING, Any, Optional
@@ -34,6 +35,8 @@
3435
CreateMessageRequestBody,
3536
Emoji,
3637
GetMessageResourceRequest,
38+
PatchMessageRequest,
39+
PatchMessageRequestBody,
3740
ReplyMessageRequest,
3841
ReplyMessageRequestBody,
3942
)
@@ -67,6 +70,106 @@
6770

6871
FEISHU_CARD_MARKDOWN_CHUNK = 7000
6972
FEISHU_FALLBACK_MARKDOWN_LIMIT = 8000
73+
FEISHU_STREAMING_MAX_OUTPUT = 4000 # max chars of output to show in streaming card
74+
75+
76+
class _FeishuStreamWriter:
77+
"""Rate-limited, event-driven Feishu card updater for a single running task.
78+
79+
Registered as an output listener on TaskScheduler. Each time the agent emits
80+
an assistant event the writer appends the chunk and schedules a card patch.
81+
Patches are rate-limited to at most one every MIN_INTERVAL seconds so we don't
82+
hammer the Feishu API. Patch requests are serialized so older requests cannot
83+
race and overwrite newer content.
84+
85+
run_id is latched from the first event received, so the writer can be registered
86+
before the run row is created in the DB.
87+
"""
88+
89+
MIN_INTERVAL = 0.25 # seconds between patches
90+
91+
def __init__(self, task_id: int, msg_id: str, channel: "FeishuChannel", task_title: str):
92+
self.task_id = task_id
93+
self.msg_id = msg_id
94+
self._channel = channel
95+
self._task_title = task_title
96+
97+
self._run_id: Optional[int] = None # latched on first event
98+
self._parts: list[str] = []
99+
self._parts_lock = threading.Lock()
100+
self._last_patch = 0.0
101+
self._timer: Optional[threading.Timer] = None
102+
self._state_lock = threading.Lock()
103+
self._stopped = False
104+
self._patch_in_flight = False
105+
self._dirty = False
106+
107+
# called from the executor thread — must not block
108+
def on_event(self, task_id: int, run_id: int, event_type: str, content: str) -> None:
109+
if self._stopped or task_id != self.task_id:
110+
return
111+
if event_type != "assistant" or content == "":
112+
return
113+
with self._parts_lock:
114+
# Latch the run_id on the first event; reset parts if run_id changes (resume)
115+
if self._run_id is None:
116+
self._run_id = run_id
117+
elif self._run_id != run_id:
118+
self._run_id = run_id
119+
self._parts.clear()
120+
self._parts.append(content)
121+
self._schedule()
122+
123+
def _schedule(self) -> None:
124+
with self._state_lock:
125+
if self._stopped:
126+
return
127+
self._dirty = True
128+
self._schedule_dirty_locked()
129+
130+
def _schedule_dirty_locked(self) -> None:
131+
if self._stopped or not self._dirty or self._patch_in_flight or self._timer:
132+
return
133+
134+
delay = max(0.0, self.MIN_INTERVAL - (time.time() - self._last_patch))
135+
if delay <= 0:
136+
self._start_patch_locked()
137+
return
138+
139+
self._timer = threading.Timer(delay, self._timer_fired)
140+
self._timer.daemon = True
141+
self._timer.start()
142+
143+
def _start_patch_locked(self) -> None:
144+
self._patch_in_flight = True
145+
self._dirty = False
146+
threading.Thread(target=self._do_patch, daemon=True).start()
147+
148+
def _timer_fired(self) -> None:
149+
with self._state_lock:
150+
self._timer = None
151+
self._schedule_dirty_locked()
152+
153+
def _do_patch(self) -> None:
154+
with self._parts_lock:
155+
text = "".join(self._parts)
156+
if len(text) > FEISHU_STREAMING_MAX_OUTPUT:
157+
text = "…" + text[-FEISHU_STREAMING_MAX_OUTPUT:]
158+
card = self._channel._build_streaming_card(self.task_id, self._task_title, text)
159+
try:
160+
self._channel._patch_message(self.msg_id, card)
161+
finally:
162+
with self._state_lock:
163+
self._last_patch = time.time()
164+
self._patch_in_flight = False
165+
self._schedule_dirty_locked()
166+
167+
def stop(self) -> None:
168+
with self._state_lock:
169+
self._stopped = True
170+
if self._timer:
171+
self._timer.cancel()
172+
self._timer = None
70173

71174

72175
class FeishuChannel(Channel):
@@ -93,6 +196,14 @@ def __init__(self, bus: MessageBus, db: "TaskDB", scheduler: "TaskScheduler"):
93196
self._root_msg_map: dict[str, int] = {}
94197
self._root_msg_lock = threading.Lock()
95198

199+
# task_id -> _FeishuStreamWriter for live card updates
200+
self._writers: dict[int, "_FeishuStreamWriter"] = {}
201+
self._writers_lock = threading.Lock()
202+
203+
# task_id -> running card message_id (used by send() to patch instead of reply)
204+
self._streaming_msg: dict[int, str] = {}
205+
self._streaming_lock = threading.Lock()
206+
96207
# Subscribe to outbound bus messages for task notifications
97208
bus.subscribe_outbound(self._on_outbound)
98209

@@ -222,13 +333,26 @@ def send(self, msg: OutboundMessage) -> None:
222333
with self._origin_lock:
223334
origin = self._task_origin.get(task_id)
224335

336+
# Stop streaming thread and get the running card message_id if any
337+
streaming_msg_id = None
338+
with self._streaming_lock:
339+
streaming_msg_id = self._streaming_msg.pop(task_id, None)
340+
self._stop_streaming(task_id)
341+
225342
sent_id = None
226343
if origin:
227344
reply_to_chat, root_msg_id, reaction_msg_id = origin
228345
# Add emoji reaction to the message that triggered the task (or resume)
229346
emoji = "DONE" if is_completed else "Cry"
230347
self._add_reaction(reaction_msg_id, emoji)
231-
sent_id = self._reply_message(root_msg_id, content, card=card)
348+
349+
# If we have a streaming card, patch it with the final result
350+
if streaming_msg_id:
351+
patched = self._patch_message(streaming_msg_id, card)
352+
if patched:
353+
sent_id = streaming_msg_id
354+
if not sent_id:
355+
sent_id = self._reply_message(root_msg_id, content, card=card)
232356

233357
# Fallback: send to default chat if no origin or reply failed
234358
if not sent_id:
@@ -387,6 +511,75 @@ def _create_reply(self, parent_message_id: str, card: dict[str, Any]) -> Optiona
387511
print(f"[Feishu] Reply failed: {response.code} {response.msg}")
388512
return None
389513

514+
def _patch_message(self, message_id: str, card: dict[str, Any]) -> bool:
515+
"""Patch an existing interactive card message with new content."""
516+
if not self._client or not FEISHU_AVAILABLE:
517+
return False
518+
try:
519+
request = (
520+
PatchMessageRequest.builder()
521+
.message_id(message_id)
522+
.request_body(
523+
PatchMessageRequestBody.builder()
524+
.content(json.dumps(card, ensure_ascii=False))
525+
.build()
526+
)
527+
.build()
528+
)
529+
response = self._client.im.v1.message.patch(request)
530+
if response.success():
531+
return True
532+
print(f"[Feishu] Patch failed: {response.code} {response.msg}")
533+
return False
534+
except Exception as e:
535+
print(f"[Feishu] Error patching message {message_id}: {e}")
536+
return False
537+
538+
def _build_streaming_card(
539+
self, task_id: int, task_title: str, output_text: str, done: bool = False
540+
) -> dict[str, Any]:
541+
"""Build a card showing live streaming output."""
542+
if done:
543+
display_text = output_text.strip() or "完成"
544+
else:
545+
body = output_text.strip() or "Thinking"
546+
display_text = body + " ▌"
547+
return {
548+
"schema": "2.0",
549+
"config": {"wide_screen_mode": True, "width_mode": "fill"},
550+
"body": {
551+
"elements": [
552+
{
553+
"tag": "markdown",
554+
"content": display_text,
555+
}
556+
]
557+
},
558+
}
559+
560+
def _start_streaming(self, task_id: int, running_msg_id: str, task_title: str) -> None:
561+
"""Register an event-driven writer that patches the running card on each assistant event."""
562+
# Stop any previous writer for this task
563+
self._stop_streaming(task_id)
564+
565+
writer = _FeishuStreamWriter(task_id, running_msg_id, self, task_title)
566+
with self._writers_lock:
567+
self._writers[task_id] = writer
568+
with self._streaming_lock:
569+
self._streaming_msg[task_id] = running_msg_id
570+
571+
self.scheduler.add_output_listener(writer.on_event)
572+
print(f"[Feishu] Streaming writer registered for task {task_id}, msg_id={running_msg_id}")
573+
574+
def _stop_streaming(self, task_id: int) -> None:
575+
"""Unregister the streaming writer for task_id."""
576+
with self._writers_lock:
577+
writer = self._writers.pop(task_id, None)
578+
if writer:
579+
self.scheduler.remove_output_listener(writer.on_event)
580+
writer.stop()
581+
print(f"[Feishu] Streaming writer stopped for task {task_id}")
582+
390583
def _build_notification_card(
391584
self,
392585
task_id: int,
@@ -941,10 +1134,14 @@ def _handle_inbound(self, data) -> None:
9411134
)
9421135
with self._origin_lock:
9431136
self._task_origin[tid] = (reply_to, message.message_id, message.message_id)
944-
self._reply_message(
945-
message.message_id,
946-
f"▶️ 收到!正在唤醒 Task #{tid},请稍候~",
1137+
task_obj = self.db.get_task(tid)
1138+
resume_title = (task_obj or {}).get("title", f"Task #{tid}")
1139+
running_card = self._build_streaming_card(tid, resume_title, "")
1140+
running_msg_id = self._create_reply(
1141+
parent_message_id=message.message_id, card=running_card
9471142
)
1143+
if running_msg_id:
1144+
self._start_streaming(tid, running_msg_id, resume_title)
9481145
print(f"[Feishu] Task {tid} resumed")
9491146
else:
9501147
self._send_message(
@@ -1050,10 +1247,14 @@ def _handle_inbound(self, data) -> None:
10501247
)
10511248
with self._origin_lock:
10521249
self._task_origin[task_id] = (reply_to, thread_root, message.message_id)
1053-
self._reply_message(
1054-
thread_root,
1055-
f"▶️ 收到!正在唤醒 Task #{task_id},请稍候~",
1250+
task_obj = self.db.get_task(task_id)
1251+
resume_title = (task_obj or {}).get("title", f"Task #{task_id}")
1252+
running_card = self._build_streaming_card(task_id, resume_title, "")
1253+
running_msg_id = self._create_reply(
1254+
parent_message_id=thread_root, card=running_card
10561255
)
1256+
if running_msg_id:
1257+
self._start_streaming(task_id, running_msg_id, resume_title)
10571258
print(f"[Feishu] Auto-resuming task {task_id} from thread reply")
10581259
return
10591260
else:
@@ -1133,12 +1334,16 @@ def _handle_inbound(self, data) -> None:
11331334
else ""
11341335
)
11351336

1136-
# Reply with a brief running hint and track origin for completion notification
1137-
self._reply_message(message.message_id, f"Task #{task_id} is running…")
1337+
# Send running card and start streaming
1338+
task_title = f"[Feishu] {title}"
1339+
running_card = self._build_streaming_card(task_id, task_title, "")
1340+
running_msg_id = self._create_reply(parent_message_id=message.message_id, card=running_card)
11381341
with self._origin_lock:
11391342
self._task_origin[task_id] = (reply_to, message.message_id, message.message_id)
11401343
with self._root_msg_lock:
11411344
self._root_msg_map[message.message_id] = task_id
1345+
if running_msg_id:
1346+
self._start_streaming(task_id, running_msg_id, task_title)
11421347
print(
11431348
f"[Feishu] Task {task_id} origin tracked: reply_to={reply_to}, root_msg={message.message_id}"
11441349
)

docs/todo.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,20 @@
22

33
## In Progress
44

5+
- [x] **修复流式输出链路问题** — 去掉飞书侧二次 token 拆分,串行化 Feishu PatchMessage 更新,并让任务完成后的 `/output` 接口回落到最新 run 的 `raw_output`
6+
- ✅ 已修复:飞书 streaming writer 现在按 agent 原始 assistant 事件追加内容,不再拆词做本地打字机效果
7+
- ✅ 已修复:Feishu PatchMessage 更新增加 in-flight/dirty 状态,避免并发 patch 乱序覆盖
8+
- ✅ 已修复:`/api/tasks/{id}/output` 在任务结束后回落读取最新 run 的 `raw_output`
9+
- ✅ 验证:`make check` 通过,`82 passed`
10+
11+
- [x] **系统 review 流式输出链路** — 梳理后端 CLI 读取、事件持久化、前端轮询展示和 Feishu 卡片更新,确认 Feishu 本地打字机拆分是主要问题,并记录并发 patch 与输出保留风险
12+
- ✅ 验证:`uv run pytest -q tests/test_feishu_message_rendering.py tests/test_codex_streaming_events.py` 通过,`14 passed`
13+
14+
- [x] **飞书 channel 流式返回消息** — 任务运行时每 5 秒向飞书推送一次当前输出,用 PatchMessage 更新同一条卡片;任务完成/失败后将该卡片更新为最终结果
15+
- ✅ 已实现:新建任务和 resume 任务时发送初始运行卡片,启动后台轮询线程每 5 秒 patch 一次
16+
- ✅ 已实现:任务结束时 patch 同一条卡片为最终结果,减少回复消息数
17+
- ✅ 验证:`uv run pytest -q` 通过,73 passed
18+
519
- [x] **修复 Feishu 卡片展开后仍显示 truncated 内容** — 排查折叠面板完整结果的组装逻辑,确保展开后显示完整正文而不是再次裁剪后的剩余片段
620
- ✅ 已修复:长结果卡片改为”折叠态仅显示 summary,展开态显示完整正文”,不再在面板外额外渲染截断预览
721
- ✅ 已修复:展开区按 chunk 承载完整正文,避免 `truncated` 出现在展开内容里,也避免预览与正文重复阅读

0 commit comments

Comments
 (0)