@@ -450,16 +450,23 @@ async def send(self, message_chain: MessageChain):
450450 return await super ().send (message_chain )
451451
452452 async def send_streaming (self , generator , use_fallback : bool = False ):
453- """Matrix 流式发送 - 直接消费上游流式输出,累积后整块发送
453+ """Matrix 流式发送 - 使用消息编辑实现实时流式更新
454454
455- 注意:Matrix 平台支持真正的流式发送,因此忽略 use_fallback 参数,
456- 始终使用累积后一次性发送的方式,避免消息被不必要的分割 。
455+ 通过先发送初始消息,然后不断编辑该消息来实现流式输出效果。
456+ 类似于 Telegram/Discord 机器人的实时打字效果 。
457457 """
458- logger .info (f"Matrix send_streaming 开始,use_fallback={ use_fallback } " )
458+ import time
459+
460+ logger .info (f"Matrix send_streaming 开始 (编辑模式),use_fallback={ use_fallback } " )
459461 room_id = self .session_id
460462 accumulated_text = "" # 累积的文本内容
461463 non_text_components = [] # 非文本组件列表
462- typing_timeout = 30000 # 输入指示超时时间 (毫秒)
464+
465+ # 流式编辑控制参数
466+ edit_interval = 0.5 # 编辑间隔(秒),避免过于频繁的编辑导致 rate limit
467+ last_edit_time = 0.0
468+ message_event_id = None # 已发送消息的 event_id,用于后续编辑
469+ initial_message_sent = False
463470
464471 # 嘟文串相关变量
465472 reply_to = None
@@ -470,17 +477,60 @@ async def send_streaming(self, generator, use_fallback: bool = False):
470477 # 检查第一个消息链是否包含回复信息
471478 first_chain_processed = False
472479
473- # 开启输入指示
474- try :
475- await self .client .set_typing (room_id , typing = True , timeout = typing_timeout )
476- except Exception as e :
477- logger .debug (f"发送输入指示失败:{ e } " )
480+ async def build_content (text : str , is_streaming : bool = True ) -> dict [str , Any ]:
481+ """构建消息内容"""
482+ # 生成 formatted_body
483+ try :
484+ display_text = text + ("..." if is_streaming else "" )
485+ formatted_body = markdown_to_html (display_text )
486+ except Exception as e :
487+ logger .warning (f"Failed to render markdown: { e } " )
488+ display_text = text + ("..." if is_streaming else "" )
489+ formatted_body = display_text .replace ("\n " , "<br>" )
490+
491+ content : dict [str , Any ] = {
492+ "msgtype" : "m.text" ,
493+ "body" : display_text ,
494+ "format" : "org.matrix.custom.html" ,
495+ "formatted_body" : formatted_body ,
496+ }
497+
498+ # 如果有回复引用信息,添加 fallback(仅初始消息需要)
499+ if not initial_message_sent and original_message_info and reply_to :
500+ orig_sender = original_message_info .get ("sender" , "" )
501+ orig_body = original_message_info .get ("body" , "" )
502+ if len (orig_body ) > TEXT_TRUNCATE_LENGTH_50 :
503+ orig_body = orig_body [:TEXT_TRUNCATE_LENGTH_50 ] + "..."
504+ fallback_text = f"> <{ orig_sender } > { orig_body } \n \n "
505+ content ["body" ] = fallback_text + content ["body" ]
506+
507+ from .utils .utils import MatrixUtils
508+
509+ fallback_html = MatrixUtils .create_reply_fallback (
510+ original_body = original_message_info .get ("body" , "" ),
511+ original_sender = original_message_info .get ("sender" , "" ),
512+ original_event_id = reply_to ,
513+ room_id = room_id ,
514+ )
515+ content ["formatted_body" ] = fallback_html + content ["formatted_body" ]
516+
517+ # 添加嘟文串支持(仅初始消息需要)
518+ if not initial_message_sent :
519+ if use_thread and thread_root :
520+ content ["m.relates_to" ] = {
521+ "rel_type" : "m.thread" ,
522+ "event_id" : thread_root ,
523+ "m.in_reply_to" : {"event_id" : reply_to } if reply_to else None ,
524+ }
525+ elif reply_to :
526+ content ["m.relates_to" ] = {"m.in_reply_to" : {"event_id" : reply_to }}
527+
528+ return content
478529
479530 chain_count = 0
480531 try :
481532 async for chain in generator :
482533 chain_count += 1
483- logger .info (f"处理第 { chain_count } 个消息链" )
484534 if isinstance (chain , MessageChain ):
485535 # 只在第一个消息链中检查回复信息
486536 if not first_chain_processed :
@@ -494,8 +544,7 @@ async def send_streaming(self, generator, use_fallback: bool = False):
494544 except Exception :
495545 pass
496546
497- # 如果没有找到回复对象,但消息链中包含 Reply 组件(表示开启了回复模式)
498- # 则尝试获取自己最近发送的消息作为回复对象
547+ # 如果没有找到回复对象,但消息链中包含 Reply 组件
499548 if not reply_to :
500549 try :
501550 from astrbot .api .message_components import (
@@ -507,52 +556,37 @@ async def send_streaming(self, generator, use_fallback: bool = False):
507556 )
508557
509558 if has_reply_component :
510- # 获取房间当前状态以找到自己的用户 ID
511559 try :
512- # 尝试通过客户端获取自己的用户 ID
513560 whoami = await self .client .whoami ()
514561 my_user_id = whoami .get ("user_id" )
515562
516563 if my_user_id :
517- # 获取房间最近的消息
518564 messages_resp = await self .client .room_messages (
519565 room_id = room_id ,
520- direction = "b" , # 向后获取(最新的消息)
521- limit = 50 , # 获取最近 50 条消息
566+ direction = "b" ,
567+ limit = 50 ,
522568 )
523569
524- # 查找自己最近发送的消息
525570 chunk = messages_resp .get ("chunk" , [])
526571 for event in chunk :
527572 if (
528- event .get ("type" )
529- == "m.room.message"
530- and event .get ("sender" )
531- == my_user_id
532- and event .get ("content" , {}).get (
533- "msgtype"
534- )
535- == "m.text"
573+ event .get ("type" ) == "m.room.message"
574+ and event .get ("sender" ) == my_user_id
575+ and event .get ("content" , {}).get ("msgtype" ) == "m.text"
536576 ):
537577 reply_to = event .get ("event_id" )
538- logger .debug (
539- f"找到自己最近的消息作为回复对象:{ reply_to } "
540- )
578+ logger .debug (f"找到自己最近的消息作为回复对象:{ reply_to } " )
541579 break
542580 except Exception as e :
543581 logger .debug (f"获取自己最近消息失败:{ e } " )
544582 except Exception as e :
545583 logger .debug (f"处理回复模式时出错:{ e } " )
546584
547- # 如果 message chain 中没有 Reply,则使用原始消息 ID 作为回复目标
548- if (
549- not reply_to
550- and self .message_obj
551- and self .message_obj .message_id
552- ):
585+ # 使用原始消息 ID 作为回复目标
586+ if not reply_to and self .message_obj and self .message_obj .message_id :
553587 reply_to = str (self .message_obj .message_id )
554588
555- # 如果有回复, 检查是否需要使用嘟文串模式
589+ # 检查是否需要使用嘟文串模式
556590 if reply_to :
557591 try :
558592 resp = await self .client .get_event (room_id , reply_to )
@@ -562,9 +596,7 @@ async def send_streaming(self, generator, use_fallback: bool = False):
562596 "body" : resp .get ("content" , {}).get ("body" , "" ),
563597 }
564598 if resp and "content" in resp :
565- relates_to = resp ["content" ].get (
566- "m.relates_to" , {}
567- )
599+ relates_to = resp ["content" ].get ("m.relates_to" , {})
568600 if relates_to .get ("rel_type" ) == "m.thread" :
569601 thread_root = relates_to .get ("event_id" )
570602 use_thread = True
@@ -575,9 +607,7 @@ async def send_streaming(self, generator, use_fallback: bool = False):
575607 use_thread = False
576608 thread_root = None
577609 except Exception as e :
578- logger .warning (
579- f"Failed to get event for threading: { e } "
580- )
610+ logger .warning (f"Failed to get event for threading: { e } " )
581611
582612 first_chain_processed = True
583613
@@ -586,78 +616,90 @@ async def send_streaming(self, generator, use_fallback: bool = False):
586616 if isinstance (component , Plain ):
587617 accumulated_text += component .text
588618 elif not isinstance (component , Reply ):
589- # 非文本、非 Reply 组件收集起来
590619 non_text_components .append (component )
591620
621+ # 流式编辑逻辑
622+ current_time = time .time ()
623+ if accumulated_text :
624+ if not initial_message_sent :
625+ # 发送初始消息
626+ try :
627+ content = await build_content (accumulated_text , is_streaming = True )
628+ result = await self .client .send_message (
629+ room_id = room_id ,
630+ msg_type = "m.room.message" ,
631+ content = content ,
632+ )
633+ message_event_id = result .get ("event_id" )
634+ initial_message_sent = True
635+ last_edit_time = current_time
636+ logger .debug (f"流式消息初始发送成功:{ message_event_id } " )
637+ except Exception as e :
638+ logger .error (f"发送初始流式消息失败:{ e } " )
639+ elif message_event_id and (current_time - last_edit_time ) >= edit_interval :
640+ # 编辑已发送的消息
641+ try :
642+ new_content = {
643+ "body" : accumulated_text + "..." ,
644+ "format" : "org.matrix.custom.html" ,
645+ "formatted_body" : markdown_to_html (accumulated_text + "..." ),
646+ }
647+ await self .client .edit_message (
648+ room_id = room_id ,
649+ original_event_id = message_event_id ,
650+ new_content = new_content ,
651+ )
652+ last_edit_time = current_time
653+ logger .debug (f"流式消息编辑成功,当前长度:{ len (accumulated_text )} " )
654+ except Exception as e :
655+ logger .debug (f"编辑流式消息失败(将继续累积):{ e } " )
656+
657+ except Exception as e :
658+ logger .error (f"流式处理过程中出错:{ e } " )
659+
592660 finally :
593- # 关闭输入指示
594- try :
595- await self .client .set_typing (room_id , typing = False )
596- except Exception as e :
597- logger .debug (f"停止输入指示失败:{ e } " )
598661 logger .info (
599662 f"流式处理完成,共处理 { chain_count } 个消息链,累积文本长度:{ len (accumulated_text )} "
600663 )
601664
602- # 发送累积的文本内容
665+ # 发送或编辑最终的完整文本内容
603666 if accumulated_text :
604667 try :
605- # 生成 formatted_body
668+ # 生成最终的 formatted_body(不带省略号)
606669 try :
607670 formatted_body = markdown_to_html (accumulated_text )
608671 except Exception as e :
609672 logger .warning (f"Failed to render markdown: { e } " )
610673 formatted_body = accumulated_text .replace ("\n " , "<br>" )
611674
612- content : dict [str , Any ] = {
613- "msgtype" : "m.text" ,
675+ final_content = {
614676 "body" : accumulated_text ,
615677 "format" : "org.matrix.custom.html" ,
616678 "formatted_body" : formatted_body ,
617679 }
618680
619- # 如果有回复引用信息,添加 fallback
620- if original_message_info and reply_to :
621- orig_sender = original_message_info . get ( "sender" , "" )
622- orig_body = original_message_info . get ( "body" , "" )
623- if len ( orig_body ) > TEXT_TRUNCATE_LENGTH_50 :
624- orig_body = orig_body [: TEXT_TRUNCATE_LENGTH_50 ] + "..."
625- fallback_text = f"> < { orig_sender } > { orig_body } \n \n "
626- content [ "body" ] = fallback_text + content [ "body" ]
627-
628- from . utils . utils import MatrixUtils
629-
630- fallback_html = MatrixUtils . create_reply_fallback (
631- original_body = original_message_info . get ( "body" , "" ),
632- original_sender = original_message_info . get ( "sender" , "" ),
633- original_event_id = reply_to ,
681+ if initial_message_sent and message_event_id :
682+ # 最终编辑,去掉省略号
683+ try :
684+ await self . client . edit_message (
685+ room_id = room_id ,
686+ original_event_id = message_event_id ,
687+ new_content = final_content ,
688+ )
689+ logger . info ( "流式消息最终编辑完成" )
690+ except Exception as e :
691+ logger . error ( f"最终编辑失败: { e } " )
692+ else :
693+ # 如果从未发送过消息(可能累积太快),直接发送完整内容
694+ content = await build_content ( accumulated_text , is_streaming = False )
695+ await self . client . send_message (
634696 room_id = room_id ,
697+ msg_type = "m.room.message" ,
698+ content = content ,
635699 )
636- content ["formatted_body" ] = (
637- fallback_html + content ["formatted_body" ]
638- )
639-
640- # 添加嘟文串支持
641- if use_thread and thread_root :
642- content ["m.relates_to" ] = {
643- "rel_type" : "m.thread" ,
644- "event_id" : thread_root ,
645- "m.in_reply_to" : {"event_id" : reply_to } if reply_to else None ,
646- }
647- elif reply_to :
648- content ["m.relates_to" ] = {"m.in_reply_to" : {"event_id" : reply_to }}
649-
650- logger .info (
651- f"发送流式消息,长度:{ len (accumulated_text )} ,回复对象:{ reply_to } "
652- )
653- await self .client .send_message (
654- room_id = room_id ,
655- msg_type = "m.room.message" ,
656- content = content ,
657- )
658- logger .info ("流式消息发送成功" )
700+ logger .info ("流式消息一次性发送成功" )
659701 except Exception as e :
660- logger .error (f"发送消息失败 (streaming): { e } " )
702+ logger .error (f"发送最终消息失败 (streaming): { e } " )
661703
662704 # 发送非文本组件(图片、文件等)
663705 for component in non_text_components :
0 commit comments