1414
1515import base64
1616import json
17+ import re
1718import threading
1819import time
1920from datetime import datetime , timedelta , timezone
7071
7172FEISHU_CARD_MARKDOWN_CHUNK = 7000
7273FEISHU_FALLBACK_MARKDOWN_LIMIT = 8000
73- FEISHU_STREAMING_MAX_OUTPUT = 4000 # max chars of output to show in streaming card
74+ FEISHU_CARD_MAX_ELEMENTS = 200
75+ FEISHU_PANEL_MAX_LINE_ELEMENTS = FEISHU_CARD_MAX_ELEMENTS - 20
76+ FEISHU_PANEL_PLAIN_TEXT_CHUNK = 1800
77+ FEISHU_THINKING_PREFIX = "[thinking] "
7478
7579
7680class _FeishuStreamWriter :
@@ -110,16 +114,22 @@ def on_event(self, task_id: int, run_id: int, event_type: str, content: str) ->
110114 return
111115 if event_type != "assistant" or content == "" :
112116 return
117+ display_content = self ._display_content (content )
113118 with self ._parts_lock :
114119 # Latch the run_id on the first event; reset parts if run_id changes (resume)
115120 if self ._run_id is None :
116121 self ._run_id = run_id
117122 elif self ._run_id != run_id :
118123 self ._run_id = run_id
119124 self ._parts .clear ()
120- self ._parts .append (content )
125+ self ._parts .append (display_content )
121126 self ._schedule ()
122127
128+ def _display_content (self , content : str ) -> str :
129+ if content .startswith (FEISHU_THINKING_PREFIX ):
130+ return content [len (FEISHU_THINKING_PREFIX ) :]
131+ return content
132+
123133 def _schedule (self ) -> None :
124134 with self ._state_lock :
125135 if self ._stopped :
@@ -153,8 +163,6 @@ def _timer_fired(self) -> None:
153163 def _do_patch (self ) -> None :
154164 with self ._parts_lock :
155165 text = "" .join (self ._parts )
156- if len (text ) > FEISHU_STREAMING_MAX_OUTPUT :
157- text = "…" + text [- FEISHU_STREAMING_MAX_OUTPUT :]
158166 card = self ._channel ._build_streaming_card (self .task_id , self ._task_title , text )
159167 try :
160168 self ._channel ._patch_message (self .msg_id , card )
@@ -164,6 +172,10 @@ def _do_patch(self) -> None:
164172 self ._patch_in_flight = False
165173 self ._schedule_dirty_locked ()
166174
175+ def snapshot_text (self ) -> str :
176+ with self ._parts_lock :
177+ return "" .join (self ._parts )
178+
167179 def stop (self ) -> None :
168180 with self ._state_lock :
169181 self ._stopped = True
@@ -322,22 +334,23 @@ def send(self, msg: OutboundMessage) -> None:
322334 ]
323335 content = error_text
324336
325- card = self ._build_notification_card (
326- task_id = task_id ,
327- task = task ,
328- is_completed = is_completed ,
329- body_text = content ,
330- )
331-
332337 # Try to reply in thread if we have an origin message
333338 with self ._origin_lock :
334339 origin = self ._task_origin .get (task_id )
335340
336- # Stop streaming thread and get the running card message_id if any
341+ # Stop streaming thread and get the running card message_id/history if any
337342 streaming_msg_id = None
338343 with self ._streaming_lock :
339344 streaming_msg_id = self ._streaming_msg .pop (task_id , None )
340- self ._stop_streaming (task_id )
345+ streaming_history = self ._stop_streaming (task_id )
346+
347+ card = self ._build_notification_card (
348+ task_id = task_id ,
349+ task = task ,
350+ is_completed = is_completed ,
351+ body_text = content ,
352+ streaming_history = streaming_history ,
353+ )
341354
342355 sent_id = None
343356 if origin :
@@ -539,24 +552,101 @@ def _build_streaming_card(
539552 self , task_id : int , task_title : str , output_text : str , done : bool = False
540553 ) -> dict [str , Any ]:
541554 """Build a card showing live streaming output."""
555+ elements : list [dict [str , Any ]]
542556 if done :
543557 display_text = output_text .strip () or "完成"
558+ elements = [
559+ {
560+ "tag" : "markdown" ,
561+ "content" : self ._preserve_feishu_markdown_linebreaks (display_text ),
562+ }
563+ ]
564+ elif not output_text .strip ():
565+ elements = [{"tag" : "markdown" , "content" : "Thinking ▌" }]
544566 else :
545- body = output_text .strip () or "Thinking"
546- display_text = body + " ▌"
567+ elements = [self ._build_streaming_history_panel (output_text , expanded = True )]
547568 return {
548569 "schema" : "2.0" ,
549570 "config" : {"wide_screen_mode" : True , "width_mode" : "fill" },
550571 "body" : {
551- "elements" : [
552- {
553- "tag" : "markdown" ,
554- "content" : display_text ,
555- }
556- ]
572+ "elements" : elements ,
557573 },
558574 }
559575
576+ def _build_streaming_history_panel (
577+ self , output_text : str , expanded : bool = False
578+ ) -> dict [str , Any ]:
579+ elements = self ._build_streaming_history_elements (output_text )
580+ return {
581+ "tag" : "collapsible_panel" ,
582+ "expanded" : expanded ,
583+ "header" : {
584+ "title" : {
585+ "tag" : "plain_text" ,
586+ "content" : "思考过程" ,
587+ },
588+ "vertical_align" : "center" ,
589+ "icon" : {
590+ "tag" : "standard_icon" ,
591+ "token" : "down-small-ccm_outlined" ,
592+ "color" : "" ,
593+ "size" : "16px 16px" ,
594+ },
595+ "icon_position" : "right" ,
596+ "icon_expanded_angle" : - 180 ,
597+ },
598+ "border" : {
599+ "color" : "grey" ,
600+ "corner_radius" : "5px" ,
601+ },
602+ "vertical_spacing" : "8px" ,
603+ "padding" : "8px 8px 8px 8px" ,
604+ "elements" : elements ,
605+ }
606+
607+ def _build_streaming_history_elements (self , output_text : str ) -> list [dict [str , Any ]]:
608+ normalized = output_text .replace ("\r \n " , "\n " ).replace ("\r " , "\n " ).rstrip ("\n " )
609+ if not normalized :
610+ return []
611+
612+ line_elements = self ._build_streaming_history_line_elements (normalized )
613+ if len (line_elements ) <= FEISHU_PANEL_MAX_LINE_ELEMENTS :
614+ return line_elements
615+
616+ return [
617+ {
618+ "tag" : "markdown" ,
619+ "content" : self ._wrap_feishu_code_block (normalized ),
620+ }
621+ ]
622+
623+ def _build_streaming_history_line_elements (self , text : str ) -> list [dict [str , Any ]]:
624+ elements : list [dict [str , Any ]] = []
625+ for line in text .split ("\n " ):
626+ chunks = self ._chunk_text (line , FEISHU_PANEL_PLAIN_TEXT_CHUNK ) if line else [" " ]
627+ for chunk in chunks :
628+ elements .append (self ._build_streaming_history_line (chunk ))
629+ return elements
630+
631+ def _build_streaming_history_line (self , content : str ) -> dict [str , Any ]:
632+ return {
633+ "tag" : "div" ,
634+ "text" : {
635+ "tag" : "plain_text" ,
636+ "text_color" : "grey" ,
637+ "text_size" : "notation" ,
638+ "content" : content ,
639+ },
640+ }
641+
642+ def _wrap_feishu_code_block (self , text : str ) -> str :
643+ longest_backtick_run = max ((len (run ) for run in re .findall (r"`+" , text )), default = 0 )
644+ fence = "`" * max (3 , longest_backtick_run + 1 )
645+ return f"{ fence } \n { text } \n { fence } "
646+
647+ def _preserve_feishu_markdown_linebreaks (self , text : str ) -> str :
648+ return text .replace ("\r \n " , "\n " ).replace ("\r " , "\n " )
649+
560650 def _start_streaming (self , task_id : int , running_msg_id : str , task_title : str ) -> None :
561651 """Register an event-driven writer that patches the running card on each assistant event."""
562652 # Stop any previous writer for this task
@@ -571,25 +661,37 @@ def _start_streaming(self, task_id: int, running_msg_id: str, task_title: str) -
571661 self .scheduler .add_output_listener (writer .on_event )
572662 print (f"[Feishu] Streaming writer registered for task { task_id } , msg_id={ running_msg_id } " )
573663
574- def _stop_streaming (self , task_id : int ) -> None :
664+ def _stop_streaming (self , task_id : int ) -> Optional [ str ] :
575665 """Unregister the streaming writer for task_id."""
576666 with self ._writers_lock :
577667 writer = self ._writers .pop (task_id , None )
578668 if writer :
579669 self .scheduler .remove_output_listener (writer .on_event )
670+ history = writer .snapshot_text ()
580671 writer .stop ()
581672 print (f"[Feishu] Streaming writer stopped for task { task_id } " )
673+ return history
674+ return None
582675
583676 def _build_notification_card (
584677 self ,
585678 task_id : int ,
586679 task : dict [str , Any ],
587680 is_completed : bool ,
588681 body_text : str ,
682+ streaming_history : Optional [str ] = None ,
589683 ) -> dict [str , Any ]:
590684 clean_body = (body_text or "" ).strip () or ("Done." if is_completed else "Unknown error" )
591685 summary = self ._truncate_text (clean_body .splitlines ()[0 ], 120 ) if clean_body else ""
592686 elements = self ._build_result_elements (body_text = clean_body )
687+ if streaming_history and streaming_history .strip ():
688+ panel_text = (
689+ self ._strip_final_result_from_history (streaming_history , clean_body )
690+ if is_completed
691+ else streaming_history
692+ )
693+ if panel_text .strip ():
694+ elements = [self ._build_streaming_history_panel (panel_text )] + elements
593695
594696 if not is_completed :
595697 elements .append (
@@ -612,6 +714,16 @@ def _build_notification_card(
612714 },
613715 }
614716
717+ def _strip_final_result_from_history (self , history : str , final_text : str ) -> str :
718+ final_body = (final_text or "" ).strip ()
719+ if not final_body :
720+ return history
721+
722+ trimmed_history = history .rstrip ()
723+ if trimmed_history .endswith (final_body ):
724+ return trimmed_history [: - len (final_body )].rstrip ()
725+ return history
726+
615727 def _build_result_elements (self , body_text : str ) -> list [dict [str , Any ]]:
616728 clean_body = (body_text or "" ).strip () or "Done."
617729 return [
0 commit comments