Skip to content

Commit a6beab6

Browse files
committed
fix: handle id-less values messages and redact stream payload logs
1 parent 3ae43a2 commit a6beab6

2 files changed

Lines changed: 50 additions & 6 deletions

File tree

astrbot/core/agent/runners/deerflow/deerflow_agent_runner.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
import hashlib
3+
import json
24
import sys
35
import typing as T
46
from collections import deque
@@ -49,6 +51,8 @@ class _StreamState:
4951
task_failures: list[str] = field(default_factory=list)
5052
seen_message_ids: set[str] = field(default_factory=set)
5153
seen_message_order: deque[str] = field(default_factory=deque)
54+
# Fallback tracking for backends that omit message ids in values events.
55+
no_id_message_fingerprints: dict[int, str] = field(default_factory=dict)
5256
baseline_initialized: bool = False
5357
run_values_messages: list[dict[str, T.Any]] = field(default_factory=list)
5458
timed_out: bool = False
@@ -267,16 +271,38 @@ def _extract_new_messages_from_values(
267271
state: _StreamState,
268272
) -> list[dict[str, T.Any]]:
269273
new_messages: list[dict[str, T.Any]] = []
270-
for msg in values_messages:
274+
no_id_indexes_seen: set[int] = set()
275+
for idx, msg in enumerate(values_messages):
271276
if not isinstance(msg, dict):
272277
continue
273278
msg_id = get_message_id(msg)
274-
if not msg_id or msg_id in state.seen_message_ids:
279+
if msg_id:
280+
if msg_id in state.seen_message_ids:
281+
continue
282+
self._remember_seen_message_id(state, msg_id)
283+
new_messages.append(msg)
275284
continue
276-
self._remember_seen_message_id(state, msg_id)
285+
286+
no_id_indexes_seen.add(idx)
287+
msg_fingerprint = self._fingerprint_message(msg)
288+
if state.no_id_message_fingerprints.get(idx) == msg_fingerprint:
289+
continue
290+
state.no_id_message_fingerprints[idx] = msg_fingerprint
277291
new_messages.append(msg)
292+
293+
# Keep no-id index state aligned with latest values payload shape.
294+
for idx in list(state.no_id_message_fingerprints.keys()):
295+
if idx not in no_id_indexes_seen:
296+
state.no_id_message_fingerprints.pop(idx, None)
278297
return new_messages
279298

299+
def _fingerprint_message(self, message: dict[str, T.Any]) -> str:
300+
try:
301+
raw = json.dumps(message, sort_keys=True, ensure_ascii=False, default=str)
302+
except (TypeError, ValueError):
303+
raw = repr(message)
304+
return hashlib.sha1(raw.encode("utf-8", errors="ignore")).hexdigest()
305+
280306
def _remember_seen_message_id(self, state: _StreamState, msg_id: str) -> None:
281307
if not msg_id or msg_id in state.seen_message_ids:
282308
return
@@ -422,9 +448,14 @@ def _handle_values_event(
422448

423449
if not state.baseline_initialized:
424450
state.baseline_initialized = True
425-
for msg in values_messages:
451+
for idx, msg in enumerate(values_messages):
452+
if not isinstance(msg, dict):
453+
continue
426454
msg_id = get_message_id(msg)
427-
self._remember_seen_message_id(state, msg_id)
455+
if msg_id:
456+
self._remember_seen_message_id(state, msg_id)
457+
continue
458+
state.no_id_message_fingerprints[idx] = self._fingerprint_message(msg)
428459
return responses
429460

430461
new_messages = self._extract_new_messages_from_values(

astrbot/core/agent/runners/deerflow/deerflow_api_client.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,20 @@ async def stream_run(
149149
) -> AsyncGenerator[dict[str, Any], None]:
150150
session = self._get_session()
151151
url = f"{self.api_base}/api/langgraph/threads/{thread_id}/runs/stream"
152-
logger.debug(f"deerflow stream_run payload: {payload}")
152+
input_payload = payload.get("input")
153+
message_count = 0
154+
if isinstance(input_payload, dict) and isinstance(
155+
input_payload.get("messages"), list
156+
):
157+
message_count = len(input_payload["messages"])
158+
# Log only a minimal summary to avoid exposing sensitive user content.
159+
logger.debug(
160+
"deerflow stream_run payload summary: thread_id=%s, keys=%s, message_count=%d, stream_mode=%s",
161+
thread_id,
162+
list(payload.keys()),
163+
message_count,
164+
payload.get("stream_mode"),
165+
)
153166
# For long-running SSE streams, avoid aiohttp total timeout.
154167
# Use socket read timeout so active heartbeats/chunks can keep the stream alive.
155168
stream_timeout = ClientTimeout(

0 commit comments

Comments
 (0)