22import base64
33import json
44import time
5- import hashlib
65import traceback
76import uuid
87import xml .etree .ElementTree as ET
1918from langbot .pkg .platform .logger import EventLogger
2019
2120
22- def _summarize_stream_text (content : str , tail_length : int = 32 ) -> dict [str , str | int ]:
23- text = content or ""
24- encoded = text .encode ('utf-8' )
25- return {
26- "chars" : len (text ),
27- "bytes" : len (encoded ),
28- "tail_repr" : repr (text [- tail_length :]),
29- "md5" : hashlib .md5 (encoded ).hexdigest ()[:12 ] if encoded else "0" * 12 ,
30- }
31-
32-
3321@dataclass
3422class StreamChunk :
3523 """描述单次推送给企业微信的流式片段。"""
@@ -75,9 +63,6 @@ class StreamSession:
7563 # 缓存最近一次片段,处理重试或超时兜底
7664 last_chunk : Optional [StreamChunk ] = None
7765
78- # 发布到企业微信 stream 的快照序号,便于串联日志
79- publish_seq : int = 0
80-
8166
8267class StreamSessionManager :
8368 """管理 stream 会话的生命周期,并负责队列的生产消费。"""
@@ -148,16 +133,12 @@ async def publish(self, stream_id: str, chunk: StreamChunk) -> bool:
148133 return False
149134
150135 session .last_access = time .time ()
151- session .publish_seq += 1
152- chunk .meta .setdefault ('seq' , session .publish_seq )
153136 session .last_chunk = chunk
154137
155138 # 企业微信消费的是当前完整快照,保留最新片段即可,避免旧片段堆积导致显示延迟。
156- cleared_count = 0
157139 while not session .queue .empty ():
158140 try :
159141 session .queue .get_nowait ()
160- cleared_count += 1
161142 except asyncio .QueueEmpty :
162143 break
163144
@@ -170,22 +151,6 @@ async def publish(self, stream_id: str, chunk: StreamChunk) -> bool:
170151 if chunk .is_final :
171152 session .finished = True
172153
173- summary = _summarize_stream_text (chunk .content )
174- await self .logger .debug (
175- '[wecom-stream] '
176- f'action=publish '
177- f'stream_id={ stream_id or "-" } '
178- f'msg_id={ session .msg_id or "-" } '
179- f'seq={ chunk .meta .get ("seq" , - 1 )} '
180- f'finish={ str (chunk .is_final ).lower ()} '
181- f'cleared={ cleared_count } '
182- f'queue_size={ session .queue .qsize ()} '
183- f'content_chars={ summary ["chars" ]} '
184- f'content_bytes={ summary ["bytes" ]} '
185- f'content_tail={ summary ["tail_repr" ]} '
186- f'content_md5={ summary ["md5" ]} '
187- )
188-
189154 return True
190155
191156 async def consume (self , stream_id : str , timeout : float = 0.5 ) -> Optional [StreamChunk ]:
@@ -213,46 +178,11 @@ async def consume(self, stream_id: str, timeout: float = 0.5) -> Optional[Stream
213178 if chunk .is_final :
214179 session .finished = True
215180
216- summary = _summarize_stream_text (chunk .content )
217- await self .logger .debug (
218- '[wecom-stream] '
219- f'action=consume '
220- f'stream_id={ stream_id or "-" } '
221- f'msg_id={ session .msg_id or "-" } '
222- f'seq={ chunk .meta .get ("seq" , - 1 )} '
223- f'finish={ str (chunk .is_final ).lower ()} '
224- f'queue_size={ session .queue .qsize ()} '
225- f'content_chars={ summary ["chars" ]} '
226- f'content_bytes={ summary ["bytes" ]} '
227- f'content_tail={ summary ["tail_repr" ]} '
228- f'content_md5={ summary ["md5" ]} '
229- )
230181 return chunk
231182 except asyncio .TimeoutError :
232183 if session .finished and session .last_chunk :
233- summary = _summarize_stream_text (session .last_chunk .content )
234- await self .logger .debug (
235- '[wecom-stream] '
236- f'action=consume_timeout_last_chunk '
237- f'stream_id={ stream_id or "-" } '
238- f'msg_id={ session .msg_id or "-" } '
239- f'seq={ session .last_chunk .meta .get ("seq" , - 1 )} '
240- f'finish={ str (session .last_chunk .is_final ).lower ()} '
241- f'queue_size={ session .queue .qsize ()} '
242- f'content_chars={ summary ["chars" ]} '
243- f'content_bytes={ summary ["bytes" ]} '
244- f'content_tail={ summary ["tail_repr" ]} '
245- f'content_md5={ summary ["md5" ]} '
246- )
247184 return session .last_chunk
248185
249- await self .logger .debug (
250- '[wecom-stream] '
251- f'action=consume_timeout_empty '
252- f'stream_id={ stream_id or "-" } '
253- f'msg_id={ session .msg_id or "-" } '
254- f'queue_size={ session .queue .qsize ()} '
255- )
256186 return None
257187
258188 def mark_finished (self , stream_id : str ) -> None :
@@ -331,47 +261,6 @@ def __init__(
331261 self .stream_timeout_final_text = '抱歉,处理超时,请稍后重试。'
332262 self .stream_error_final_text = '抱歉,处理失败,请稍后重试。'
333263
334- async def _log_stream_debug (
335- self ,
336- action : str ,
337- stream_id : str ,
338- session : Optional [StreamSession ] = None ,
339- source : str = '' ,
340- chunk : Optional [StreamChunk ] = None ,
341- ) -> None :
342- """记录流式会话关键路径日志,便于在消息记录中排查轮询与收口问题。"""
343- age_ms = - 1
344- msg_id = ''
345- if session :
346- msg_id = session .msg_id
347- age_ms = int ((time .time () - session .created_at ) * 1000 )
348-
349- finish = False
350- seq = - 1
351- summary = _summarize_stream_text ('' )
352- if chunk :
353- finish = chunk .is_final
354- seq = int (chunk .meta .get ('seq' , - 1 ))
355- summary = _summarize_stream_text (chunk .content )
356-
357- queue_size = session .queue .qsize () if session else - 1
358-
359- await self .logger .debug (
360- '[wecom-stream] '
361- f'action={ action } '
362- f'stream_id={ stream_id or "-" } '
363- f'msg_id={ msg_id or "-" } '
364- f'source={ source or "-" } '
365- f'seq={ seq } '
366- f'finish={ str (finish ).lower ()} '
367- f'age_ms={ age_ms } '
368- f'queue_size={ queue_size } '
369- f'content_chars={ summary ["chars" ]} '
370- f'content_bytes={ summary ["bytes" ]} '
371- f'content_tail={ summary ["tail_repr" ]} '
372- f'content_md5={ summary ["md5" ]} '
373- )
374-
375264 def _is_stream_lifetime_exceeded (self , session : StreamSession ) -> bool :
376265 """判断当前 stream 是否已经超过最大生命周期。"""
377266 if session .finished :
@@ -398,13 +287,6 @@ async def _force_finish_stream(
398287 session .finished = True
399288 session .last_access = time .time ()
400289
401- await self ._log_stream_debug (
402- action = 'force_finish' ,
403- stream_id = stream_id ,
404- session = session ,
405- source = reason if published else f'{ reason } _no_queue' ,
406- chunk = chunk ,
407- )
408290 return chunk
409291
410292 def _resolve_followup_chunk (
@@ -535,12 +417,6 @@ async def _handle_post_initial_response(self, msg_json: dict[str, Any], nonce: s
535417 if is_new :
536418 asyncio .create_task (self ._dispatch_event (event ))
537419
538- await self ._log_stream_debug (
539- action = 'initial_response' ,
540- stream_id = session .stream_id ,
541- session = session ,
542- source = 'new' if is_new else 'reuse' ,
543- )
544420
545421 payload = self ._build_stream_payload (session .stream_id , '' , False )
546422 return await self ._encrypt_and_reply (payload , nonce )
@@ -567,12 +443,6 @@ async def _handle_post_followup_response(self, msg_json: dict[str, Any], nonce:
567443 session = self .stream_sessions .get_session (stream_id )
568444 if not session :
569445 chunk = StreamChunk (content = self .stream_error_final_text , is_final = True , meta = {'reason' : 'missing_session' })
570- await self ._log_stream_debug (
571- action = 'followup_response' ,
572- stream_id = stream_id ,
573- source = 'missing_session' ,
574- chunk = chunk ,
575- )
576446 payload = self ._build_stream_payload (stream_id , chunk .content , chunk .is_final )
577447 return await self ._encrypt_and_reply (payload , nonce )
578448
@@ -601,22 +471,9 @@ async def _handle_post_followup_response(self, msg_json: dict[str, Any], nonce:
601471 if not chunk :
602472 chunk = self ._resolve_followup_chunk (session , cached_content )
603473 if not chunk :
604- await self ._log_stream_debug (
605- action = 'followup_response' ,
606- stream_id = stream_id ,
607- session = session ,
608- source = 'empty_response' ,
609- )
610474 payload = self ._build_stream_payload (stream_id , '' , False )
611475 return await self ._encrypt_and_reply (payload , nonce )
612476
613- await self ._log_stream_debug (
614- action = 'followup_response' ,
615- stream_id = stream_id ,
616- session = session ,
617- source = chunk .meta .get ('reason' , 'queue' ),
618- chunk = chunk ,
619- )
620477 payload = self ._build_stream_payload (stream_id , chunk .content , chunk .is_final )
621478 if chunk .is_final :
622479 self .stream_sessions .mark_finished (stream_id )
@@ -944,19 +801,6 @@ async def push_stream_chunk(self, msg_id: str, content: str, is_final: bool = Fa
944801 # 根据 msg_id 找到对应 stream 会话,如果不存在说明当前消息非流式
945802 stream_id = self .stream_sessions .get_stream_id_by_msg (msg_id )
946803 if not stream_id :
947- summary = _summarize_stream_text (content )
948- await self .logger .debug (
949- '[wecom-stream] '
950- f'action=push_stream_chunk_missing_session '
951- f'stream_id=- '
952- f'msg_id={ msg_id or "-" } '
953- f'seq=-1 '
954- f'finish={ str (is_final ).lower ()} '
955- f'content_chars={ summary ["chars" ]} '
956- f'content_bytes={ summary ["bytes" ]} '
957- f'content_tail={ summary ["tail_repr" ]} '
958- f'content_md5={ summary ["md5" ]} '
959- )
960804 return False
961805
962806 chunk = StreamChunk (content = content , is_final = is_final )
0 commit comments