diff --git a/astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py b/astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py index abb4c95999..7d4da0c50a 100644 --- a/astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py +++ b/astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py @@ -6,6 +6,7 @@ import io import time import uuid +from collections import deque from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING, Any, cast @@ -15,7 +16,7 @@ from astrbot import logger from astrbot.api.event import MessageChain -from astrbot.api.message_components import File, Image, Plain, Record, Video +from astrbot.api.message_components import File, Image, Plain, Record, Reply, Video from astrbot.api.platform import ( AstrBotMessage, MessageMember, @@ -60,6 +61,34 @@ class TypingSessionState: lock: asyncio.Lock = field(default_factory=asyncio.Lock) +@dataclass +class WeixinOCRecentMessage: + message_id: str + sender_id: str + sender_nickname: str + timestamp: int + timestamp_ms: int + components: list[Any] + message_str: str + message_kind: str + + +@dataclass +class WeixinOCRecentSessionCache: + messages: deque[WeixinOCRecentMessage] + updated_at: float + + +@dataclass +class WeixinOCReplyMeta: + is_reply: bool = False + ref_msg: dict[str, Any] | None = None + reply_kind: str | None = None + quoted_item_type: int | None = None + quoted_text: str | None = None + reply_to: dict[str, Any] = field(default_factory=lambda: {"matched": False}) + + @register_platform_adapter( "weixin_oc", "个人微信", @@ -73,6 +102,10 @@ class WeixinOCAdapter(Platform): IMAGE_UPLOAD_TYPE = 1 VIDEO_UPLOAD_TYPE = 2 FILE_UPLOAD_TYPE = 3 + RECENT_MESSAGE_CACHE_SIZE = 100 + REPLY_MATCH_WINDOW_MS = 60_000 + RECENT_SESSION_CACHE_TTL_S = 1_800 + MAX_RECENT_MESSAGE_SESSIONS = 500 def __init__( self, @@ -118,6 +151,22 @@ def __init__( self._context_tokens: dict[str, str] = {} self._typing_states: dict[str, TypingSessionState] = {} self._last_inbound_error = "" + self._recent_message_cache_size = self._get_int_config( + "weixin_oc_recent_message_cache_size", + self.RECENT_MESSAGE_CACHE_SIZE, + 1, + ) + self._recent_session_cache_ttl_s = self._get_int_config( + "weixin_oc_recent_session_cache_ttl_s", + self.RECENT_SESSION_CACHE_TTL_S, + 60, + ) + self._max_recent_message_sessions = self._get_int_config( + "weixin_oc_max_recent_message_sessions", + self.MAX_RECENT_MESSAGE_SESSIONS, + 1, + ) + self._recent_messages: dict[str, WeixinOCRecentSessionCache] = {} self._typing_keepalive_interval_s = max( 1, int(platform_config.get("weixin_oc_typing_keepalive_interval", 5)), @@ -152,6 +201,18 @@ def _sync_client_state(self) -> None: self.client.api_timeout_ms = self.api_timeout_ms self.client.token = self.token + def _get_int_config( + self, + key: str, + default: int, + minimum: int, + ) -> int: + try: + value = int(self.config.get(key, default)) + except (TypeError, ValueError): + value = default + return max(minimum, value) + def _get_typing_state(self, user_id: str) -> TypingSessionState: state = self._typing_states.get(user_id) if state is None: @@ -201,12 +262,12 @@ async def _ensure_typing_ticket( return state.ticket payload = await self.client.get_typing_config(user_id, context_token) - if int(payload.get("ret") or 0) != 0: + if not self._is_successful_api_payload(payload): logger.warning( "weixin_oc(%s): getconfig failed for %s: %s", self.meta().id, user_id, - payload.get("errmsg", ""), + self._format_api_error(payload), ) return None @@ -227,9 +288,9 @@ async def _send_typing_state( cancel: bool, ) -> None: payload = await self.client.send_typing_state(user_id, ticket, cancel=cancel) - if int(payload.get("ret") or 0) != 0: + if not self._is_successful_api_payload(payload): raise RuntimeError( - f"sendtyping failed for {user_id}: {payload.get('errmsg', '')}" + f"sendtyping failed for {user_id}: {self._format_api_error(payload)}" ) async def _run_typing_keepalive(self, user_id: str) -> None: @@ -769,6 +830,9 @@ async def _send_items_to_session( self, user_id: str, item_list: list[dict[str, Any]], + *, + cache_components: list[Any] | None = None, + cache_message_str: str | None = None, ) -> bool: if not self.token: logger.warning("weixin_oc(%s): missing token, skip send", self.meta().id) @@ -787,7 +851,7 @@ async def _send_items_to_session( user_id, ) return False - await self.client.request_json( + payload = await self.client.request_json( "POST", "ilink/bot/sendmessage", payload={ @@ -807,8 +871,65 @@ async def _send_items_to_session( token_required=True, headers={}, ) + if not self._is_successful_api_payload(payload): + logger.warning( + "weixin_oc(%s): sendmessage failed for %s: %s", + self.meta().id, + user_id, + self._format_api_error(payload), + ) + return False + resolved_cache_components = ( + list(cache_components) + if cache_components is not None + else self._build_cache_components_from_items(item_list) + ) + sender_id = str(self.account_id or self.meta().id) + sent_at_ms = time.time_ns() // 1_000_000 + self._cache_recent_message( + user_id, + message_id=uuid.uuid4().hex, + sender_id=sender_id, + sender_nickname=sender_id, + timestamp=sent_at_ms // 1000, + timestamp_ms=sent_at_ms, + components=resolved_cache_components, + message_str=cache_message_str + if cache_message_str is not None + else self._message_text_from_item_list( + item_list, + include_ref_text=False, + ), + ) return True + def _build_cache_components_from_items( + self, + item_list: list[dict[str, Any]], + ) -> list[Any]: + components: list[Any] = [] + for item in item_list: + item_type = int(item.get("type") or 0) + if item_type != 1: + continue + text = str(item.get("text_item", {}).get("text", "")).strip() + if text: + components.append(Plain(text)) + return components + + @staticmethod + def _is_successful_api_payload(payload: dict[str, Any]) -> bool: + ret = payload.get("ret", 0) + errcode = payload.get("errcode", 0) + return int(ret or 0) == 0 and int(errcode or 0) == 0 + + @staticmethod + def _format_api_error(payload: dict[str, Any]) -> str: + ret = int(payload.get("ret") or 0) + errcode = int(payload.get("errcode") or 0) + errmsg = str(payload.get("errmsg", "")) + return f"ret={ret}, errcode={errcode}, errmsg={errmsg}" + async def _send_media_segment( self, user_id: str, @@ -858,8 +979,18 @@ async def _send_media_segment( await self._send_items_to_session( user_id, [self._build_plain_text_item(text)], + cache_components=[Plain(text)], + cache_message_str=text, ) - return await self._send_items_to_session(user_id, [media_item]) + return await self._send_items_to_session( + user_id, + [media_item], + cache_components=[segment], + cache_message_str=self._message_text_from_item_list( + [media_item], + include_ref_text=False, + ), + ) async def _start_login_session(self) -> OpenClawLoginSession: endpoint = "ilink/bot/get_bot_qrcode" @@ -961,7 +1092,10 @@ async def _poll_qr_status(self, login_session: OpenClawLoginSession) -> None: await self._save_account_state() def _message_text_from_item_list( - self, item_list: list[dict[str, Any]] | None + self, + item_list: list[dict[str, Any]] | None, + *, + include_ref_text: bool = False, ) -> str: if not item_list: return "" @@ -985,15 +1119,298 @@ def _message_text_from_item_list( elif item_type == 5: text_parts.append("[视频]") else: - ref = item.get("ref_msg") - if isinstance(ref, dict): - ref_item = ref.get("message_item") - if isinstance(ref_item, dict): - ref_text = str(self._message_text_from_item_list([ref_item])) - if ref_text: - text_parts.append(f"[引用:{ref_text}]") + if include_ref_text: + ref = item.get("ref_msg") + if isinstance(ref, dict): + ref_item = ref.get("message_item") + if isinstance(ref_item, dict): + ref_text = str( + self._message_text_from_item_list( + [ref_item], + include_ref_text=True, + ) + ) + if ref_text: + text_parts.append(f"[引用:{ref_text}]") return "\n".join(text_parts).strip() + def _item_type_to_kind(self, item_type: int | None) -> str: + match int(item_type or 0): + case 1: + return "text" + case self.IMAGE_ITEM_TYPE: + return "image" + case self.VOICE_ITEM_TYPE: + return "voice" + case self.FILE_ITEM_TYPE: + return "file" + case self.VIDEO_ITEM_TYPE: + return "video" + case _: + return "unknown" + + def _get_recent_message_cache( + self, + session_id: str, + ) -> deque[WeixinOCRecentMessage]: + now = time.monotonic() + self._prune_recent_message_caches(now=now) + + cache_entry = self._recent_messages.get(session_id) + if cache_entry is None: + cache_entry = WeixinOCRecentSessionCache( + messages=deque(maxlen=self._recent_message_cache_size), + updated_at=now, + ) + self._recent_messages[session_id] = cache_entry + else: + cache_entry.updated_at = now + return cache_entry.messages + + def _prune_recent_message_caches(self, *, now: float | None = None) -> None: + if not self._recent_messages: + return + + current = now if now is not None else time.monotonic() + expired_session_ids = [ + session_id + for session_id, cache_entry in self._recent_messages.items() + if current - cache_entry.updated_at > self._recent_session_cache_ttl_s + ] + for session_id in expired_session_ids: + self._recent_messages.pop(session_id, None) + + overflow = len(self._recent_messages) - self._max_recent_message_sessions + if overflow <= 0: + return + + oldest_session_ids = sorted( + self._recent_messages, + key=lambda session_id: self._recent_messages[session_id].updated_at, + )[:overflow] + for session_id in oldest_session_ids: + self._recent_messages.pop(session_id, None) + + def _infer_message_kind_from_components(self, components: list[Any]) -> str: + if not components: + return "unknown" + for component in components: + if isinstance(component, Plain) and component.text.strip(): + return "text" + if isinstance(component, Image): + return "image" + if isinstance(component, Record): + return "voice" + if isinstance(component, File): + return "file" + if isinstance(component, Video): + return "video" + return "unknown" + + def _cache_recent_message( + self, + session_id: str, + *, + message_id: str, + sender_id: str, + sender_nickname: str, + timestamp: int, + timestamp_ms: int | None = None, + components: list[Any], + message_str: str, + message_kind: str | None = None, + ) -> None: + if not session_id or not message_id: + return + resolved_timestamp_ms = ( + timestamp_ms if timestamp_ms is not None else timestamp * 1000 + ) + cache = self._get_recent_message_cache(session_id) + cache.append( + WeixinOCRecentMessage( + message_id=message_id, + sender_id=sender_id, + sender_nickname=sender_nickname, + timestamp=timestamp, + timestamp_ms=resolved_timestamp_ms, + components=list(components), + message_str=message_str, + message_kind=message_kind + or self._infer_message_kind_from_components(components), + ) + ) + + def _match_recent_reply( + self, + session_id: str, + *, + ref_create_time_ms: int | None, + ) -> tuple[WeixinOCRecentMessage | None, dict[str, Any] | None]: + if not session_id or ref_create_time_ms is None: + return None, None + + best_match: WeixinOCRecentMessage | None = None + best_distance: int | None = None + self._prune_recent_message_caches() + cache_entry = self._recent_messages.get(session_id) + if cache_entry is None: + return None, None + + for candidate in cache_entry.messages: + distance = abs(candidate.timestamp_ms - ref_create_time_ms) + if distance > self.REPLY_MATCH_WINDOW_MS: + continue + if best_distance is None or distance < best_distance: + best_match = candidate + best_distance = distance + + if best_match is None or best_distance is None: + return None, None + + confidence = max( + 0.0, + min(1.0, 1.0 - (best_distance / self.REPLY_MATCH_WINDOW_MS)), + ) + return best_match, { + "matched": True, + "strategy": "nearest-message-by-timestamp", + "ref_create_time_ms": ref_create_time_ms, + "matched_message_id": best_match.message_id, + "matched_kind": best_match.message_kind, + "distance_ms": best_distance, + "confidence": round(confidence, 4), + } + + async def _build_reply_component_from_ref( + self, + *, + session_id: str, + ref_msg: dict[str, Any], + ) -> tuple[Reply | None, WeixinOCReplyMeta]: + metadata = WeixinOCReplyMeta(ref_msg=ref_msg) + message_item = ref_msg.get("message_item") + if not isinstance(message_item, dict): + return None, metadata + + quoted_item_type_raw = message_item.get("type") + try: + quoted_item_type = ( + int(quoted_item_type_raw) + if quoted_item_type_raw not in (None, "") + else None + ) + except (TypeError, ValueError): + quoted_item_type = None + metadata.quoted_item_type = quoted_item_type + metadata.reply_kind = self._item_type_to_kind(quoted_item_type) + + ref_create_time_ms_raw = message_item.get("create_time_ms") + try: + ref_create_time_ms = ( + int(ref_create_time_ms_raw) + if ref_create_time_ms_raw not in (None, "") + else None + ) + except (TypeError, ValueError): + ref_create_time_ms = None + + quoted_components: list[Any] = [] + quoted_text = "" + if quoted_item_type is not None: + quoted_components = await self._item_list_to_components([message_item]) + quoted_text = self._message_text_from_item_list( + [message_item], + include_ref_text=False, + ) + + if quoted_text: + metadata.quoted_text = quoted_text + metadata.reply_to = { + "matched": True, + "strategy": "direct-ref-msg", + "matched_kind": metadata.reply_kind, + "matched_text": quoted_text, + "confidence": 1.0, + } + + matched_message = None + matched_reply_to = None + if not quoted_text or not quoted_components: + matched_message, matched_reply_to = self._match_recent_reply( + session_id, + ref_create_time_ms=ref_create_time_ms, + ) + if matched_message is not None: + quoted_components = list(matched_message.components) + quoted_text = matched_message.message_str + metadata.quoted_text = quoted_text or None + metadata.reply_kind = matched_message.message_kind + metadata.reply_to = matched_reply_to or {"matched": True} + + if not quoted_text and not quoted_components: + return None, metadata + + metadata.is_reply = True + + reply_message_id = ( + matched_message.message_id + if matched_message is not None + else str( + message_item.get("message_id") + or message_item.get("msg_id") + or f"weixin_oc_ref_{ref_create_time_ms or uuid.uuid4().hex}" + ) + ) + quoted_sender_id_raw = str(message_item.get("from_user_id") or "unknown") + reply_sender_id_raw = ( + matched_message.sender_id + if matched_message is not None + else quoted_sender_id_raw + ) + normalized_reply_sender_id = self._normalize_reply_sender_id( + reply_sender_id_raw + ) + reply_sender_id = ( + normalized_reply_sender_id + if normalized_reply_sender_id + else reply_sender_id_raw + ) + reply_sender_nickname = ( + matched_message.sender_nickname + if matched_message is not None + else quoted_sender_id_raw + ) + reply_time = ( + matched_message.timestamp + if matched_message is not None + else ( + int(ref_create_time_ms / 1000) + if isinstance(ref_create_time_ms, int) + else int(time.time()) + ) + ) + + return ( + Reply( + id=reply_message_id, + chain=quoted_components, + sender_id=reply_sender_id, + sender_nickname=reply_sender_nickname, + time=reply_time, + message_str=quoted_text, + text=quoted_text, + ), + metadata, + ) + + def _normalize_reply_sender_id(self, sender_id: str) -> str: + normalized_sender_id = sender_id.strip() + if not normalized_sender_id: + return normalized_sender_id + if self.account_id and normalized_sender_id == str(self.account_id): + return self.meta().id + return normalized_sender_id + async def _item_list_to_components( self, item_list: list[dict[str, Any]] | None ) -> list[Any]: @@ -1031,16 +1448,36 @@ async def _handle_inbound_message(self, msg: dict[str, Any]) -> None: self._context_tokens[from_user_id] = context_token item_list = cast(list[dict[str, Any]], msg.get("item_list", [])) - components = await self._item_list_to_components(item_list) - text = self._message_text_from_item_list(item_list) + reply_component = None + reply_metadata = WeixinOCReplyMeta() + for item in item_list: + ref_msg = item.get("ref_msg") + if isinstance(ref_msg, dict): + ( + reply_component, + reply_metadata, + ) = await self._build_reply_component_from_ref( + session_id=from_user_id, + ref_msg=ref_msg, + ) + break + cached_components = await self._item_list_to_components(item_list) + components = list(cached_components) + if reply_component is not None: + components.insert(0, reply_component) + text = self._message_text_from_item_list(item_list, include_ref_text=False) message_id = str(msg.get("message_id") or msg.get("msg_id") or uuid.uuid4().hex) create_time = msg.get("create_time_ms") or msg.get("create_time") + create_time_ms: int | None = None if isinstance(create_time, (int, float)) and create_time > 1_000_000_000_000: + create_time_ms = int(create_time) ts = int(float(create_time) / 1000) elif isinstance(create_time, (int, float)): ts = int(create_time) + create_time_ms = ts * 1000 else: ts = int(time.time()) + create_time_ms = ts * 1000 abm = AstrBotMessage() abm.self_id = self.meta().id @@ -1052,6 +1489,23 @@ async def _handle_inbound_message(self, msg: dict[str, Any]) -> None: abm.message_str = text abm.timestamp = ts abm.raw_message = msg + abm.is_reply = reply_metadata.is_reply + abm.ref_msg = reply_metadata.ref_msg + abm.reply_kind = reply_metadata.reply_kind + abm.quoted_item_type = reply_metadata.quoted_item_type + abm.quoted_text = reply_metadata.quoted_text + abm.reply_to = reply_metadata.reply_to + + self._cache_recent_message( + from_user_id, + message_id=message_id, + sender_id=from_user_id, + sender_nickname=from_user_id, + timestamp=ts, + timestamp_ms=create_time_ms, + components=cached_components, + message_str=text, + ) self.commit_event( WeixinOCMessageEvent( @@ -1076,20 +1530,8 @@ async def _poll_inbound_updates(self) -> None: token_required=True, timeout_ms=self.long_poll_timeout_ms, ) - ret = int(data.get("ret") or 0) - errcode = data.get("errcode", 0) - if ret != 0 and ret is not None: - errmsg = str(data.get("errmsg", "")) - self._last_inbound_error = f"ret={ret}, errcode={errcode}, errmsg={errmsg}" - logger.warning( - "weixin_oc(%s): getupdates error: %s", - self.meta().id, - self._last_inbound_error, - ) - return - if errcode and int(errcode) != 0: - errmsg = str(data.get("errmsg", "")) - self._last_inbound_error = f"ret={ret}, errcode={errcode}, errmsg={errmsg}" + if not self._is_successful_api_payload(data): + self._last_inbound_error = self._format_api_error(data) logger.warning( "weixin_oc(%s): getupdates error: %s", self.meta().id, diff --git a/tests/test_weixin_oc_reply_parsing.py b/tests/test_weixin_oc_reply_parsing.py new file mode 100644 index 0000000000..8cbf7ec6c6 --- /dev/null +++ b/tests/test_weixin_oc_reply_parsing.py @@ -0,0 +1,581 @@ +import asyncio +import time +from typing import Any + +import pytest + +from astrbot.api.message_components import Image, Plain, Reply +from astrbot.core.platform.sources.weixin_oc.weixin_oc_adapter import WeixinOCAdapter + + +def _make_adapter() -> WeixinOCAdapter: + return WeixinOCAdapter( + platform_config={"id": "weixin_oc_test"}, + platform_settings={}, + event_queue=asyncio.Queue(), + ) + + +def test_weixin_oc_recent_message_cache_prunes_expired_sessions(): + adapter = _make_adapter() + adapter._recent_session_cache_ttl_s = 1 + adapter._max_recent_message_sessions = 10 + + adapter._get_recent_message_cache("expired_session") + adapter._recent_messages["expired_session"].updated_at = time.monotonic() - 5 + + active_cache = adapter._get_recent_message_cache("active_session") + assert active_cache.maxlen == adapter._recent_message_cache_size + + adapter._prune_recent_message_caches(now=time.monotonic()) + + assert "expired_session" not in adapter._recent_messages + assert "active_session" in adapter._recent_messages + + +def test_weixin_oc_recent_message_cache_prunes_oldest_sessions_on_overflow(): + adapter = _make_adapter() + adapter._recent_session_cache_ttl_s = 10**12 + adapter._max_recent_message_sessions = 2 + + adapter._get_recent_message_cache("session_1") + adapter._recent_messages["session_1"].updated_at = 1.0 + adapter._get_recent_message_cache("session_2") + adapter._recent_messages["session_2"].updated_at = 2.0 + adapter._get_recent_message_cache("session_3") + adapter._recent_messages["session_3"].updated_at = 3.0 + + adapter._prune_recent_message_caches(now=4.0) + + assert "session_1" not in adapter._recent_messages + assert "session_2" in adapter._recent_messages + assert "session_3" in adapter._recent_messages + + +@pytest.mark.asyncio +async def test_weixin_oc_builds_reply_component_from_direct_ref_text(): + adapter = _make_adapter() + captured_events: list[Any] = [] + adapter.commit_event = lambda event: captured_events.append(event) # type: ignore[method-assign] + + await adapter._handle_inbound_message( + { + "from_user_id": "user_1", + "message_id": "msg_1", + "create_time_ms": 1775408782000, + "item_list": [ + { + "type": 1, + "ref_msg": { + "message_item": { + "type": 1, + "from_user_id": "quoted_user", + "create_time_ms": 1775408781000, + "update_time_ms": 1775408781000, + "is_completed": True, + "text_item": {"text": "你好"}, + } + }, + "text_item": {"text": "引用了“你好”"}, + } + ], + } + ) + + assert len(captured_events) == 1 + event = captured_events[0] + reply = event.message_obj.message[0] + assert isinstance(reply, Reply) + assert reply.sender_id == "quoted_user" + assert reply.sender_nickname == "quoted_user" + assert reply.message_str == "你好" + assert reply.chain and isinstance(reply.chain[0], Plain) + assert reply.chain[0].text == "你好" + assert event.message_obj.message_str == "引用了“你好”" + assert event.message_obj.is_reply is True + assert event.message_obj.reply_kind == "text" + assert event.message_obj.quoted_item_type == 1 + assert event.message_obj.quoted_text == "你好" + assert event.message_obj.reply_to["strategy"] == "direct-ref-msg" + + +@pytest.mark.asyncio +async def test_weixin_oc_send_cache_uses_original_components_without_item_conversion(): + adapter = _make_adapter() + adapter.token = "token" + adapter.account_id = "bot_account" + adapter._context_tokens["user_3"] = "ctx" + + called = {"request": 0, "convert": 0} + + async def fake_request_json(*args, **kwargs): + called["request"] += 1 + return {} + + async def fail_if_convert(_item_list): + called["convert"] += 1 + raise AssertionError("send-path cache should not convert outbound item_list") + + adapter.client.request_json = fake_request_json # type: ignore[method-assign] + adapter._item_list_to_components = fail_if_convert # type: ignore[method-assign] + + media_component = Image(file="file:///tmp/fake-image.jpg") + + ok = await adapter._send_items_to_session( + "user_3", + [{"type": adapter.IMAGE_ITEM_TYPE, "image_item": {"media": {}}}], + cache_components=[media_component], + cache_message_str="[图片]", + ) + + assert ok is True + assert called["request"] == 1 + assert called["convert"] == 0 + cached = adapter._recent_messages["user_3"].messages[-1] + assert cached.components == [media_component] + assert cached.message_str == "[图片]" + + +@pytest.mark.asyncio +async def test_weixin_oc_text_send_cache_preserves_plain_components_by_default(): + adapter = _make_adapter() + adapter.token = "token" + adapter.account_id = "bot_account" + adapter._context_tokens["user_text"] = "ctx" + + async def fake_request_json(*args, **kwargs): + return {} + + adapter.client.request_json = fake_request_json # type: ignore[method-assign] + + ok = await adapter._send_items_to_session( + "user_text", + [adapter._build_plain_text_item("bot reply text")], + ) + + assert ok is True + cached = adapter._recent_messages["user_text"].messages[-1] + assert len(cached.components) == 1 + assert isinstance(cached.components[0], Plain) + assert cached.components[0].text == "bot reply text" + assert cached.message_kind == "text" + assert cached.message_str == "bot reply text" + + +@pytest.mark.asyncio +async def test_weixin_oc_send_cache_preserves_outbound_millisecond_precision( + monkeypatch: pytest.MonkeyPatch, +): + adapter = _make_adapter() + adapter.token = "token" + adapter.account_id = "bot_account" + adapter._context_tokens["user_precise"] = "ctx" + + async def fake_request_json(*args, **kwargs): + return {} + + adapter.client.request_json = fake_request_json # type: ignore[method-assign] + + time_points_ns = iter([1775408790123456789, 1775408790987654321]) + monkeypatch.setattr(time, "time_ns", lambda: next(time_points_ns)) + + ok_first = await adapter._send_items_to_session( + "user_precise", + [adapter._build_plain_text_item("first bot reply")], + ) + ok_second = await adapter._send_items_to_session( + "user_precise", + [adapter._build_plain_text_item("second bot reply")], + ) + + assert ok_first is True + assert ok_second is True + cached_messages = list(adapter._recent_messages["user_precise"].messages) + assert cached_messages[-2].timestamp_ms == 1775408790123 + assert cached_messages[-1].timestamp_ms == 1775408790987 + + +@pytest.mark.asyncio +async def test_weixin_oc_send_cache_skips_failed_business_response(): + adapter = _make_adapter() + adapter.token = "token" + adapter.account_id = "bot_account" + adapter._context_tokens["user_failed"] = "ctx" + + async def fake_request_json(*args, **kwargs): + return {"ret": 0, "errcode": 1001, "errmsg": "send failed"} + + adapter.client.request_json = fake_request_json # type: ignore[method-assign] + + ok = await adapter._send_items_to_session( + "user_failed", + [adapter._build_plain_text_item("bot reply text")], + ) + + assert ok is False + assert "user_failed" not in adapter._recent_messages + + +@pytest.mark.asyncio +async def test_weixin_oc_invalid_ref_msg_does_not_mark_message_as_reply(): + adapter = _make_adapter() + captured_events: list[Any] = [] + adapter.commit_event = lambda event: captured_events.append(event) # type: ignore[method-assign] + + await adapter._handle_inbound_message( + { + "from_user_id": "user_invalid", + "message_id": "msg_invalid", + "create_time_ms": 1775408782000, + "item_list": [ + { + "type": 1, + "ref_msg": {"message_item": "not-a-dict"}, + "text_item": {"text": "正文"}, + } + ], + } + ) + + assert len(captured_events) == 1 + event = captured_events[0] + assert not any(isinstance(comp, Reply) for comp in event.message_obj.message) + assert event.message_obj.is_reply is False + assert event.message_obj.ref_msg == {"message_item": "not-a-dict"} + assert event.message_obj.reply_to == {"matched": False} + + +@pytest.mark.asyncio +async def test_weixin_oc_non_numeric_quoted_type_does_not_crash(): + adapter = _make_adapter() + captured_events: list[Any] = [] + adapter.commit_event = lambda event: captured_events.append(event) # type: ignore[method-assign] + + await adapter._handle_inbound_message( + { + "from_user_id": "user_bad_type", + "message_id": "msg_bad_type", + "create_time_ms": 1775408782000, + "item_list": [ + { + "type": 1, + "ref_msg": { + "message_item": { + "type": "bad-type", + "from_user_id": "quoted_user", + "create_time_ms": 1775408781000, + } + }, + "text_item": {"text": "正文"}, + } + ], + } + ) + + assert len(captured_events) == 1 + event = captured_events[0] + assert event.message_obj.message_str == "正文" + assert event.message_obj.quoted_item_type is None + assert ( + event.message_obj.reply_kind is None + or event.message_obj.reply_kind == "unknown" + ) + + +@pytest.mark.asyncio +async def test_weixin_oc_cache_miss_does_not_emit_empty_reply_component(): + adapter = _make_adapter() + captured_events: list[Any] = [] + adapter.commit_event = lambda event: captured_events.append(event) # type: ignore[method-assign] + + await adapter._handle_inbound_message( + { + "from_user_id": "user_cache_miss", + "message_id": "msg_cache_miss", + "create_time_ms": 1775408782000, + "item_list": [ + { + "type": 1, + "ref_msg": { + "message_item": { + "create_time_ms": 1775408781000, + "is_completed": True, + } + }, + "text_item": {"text": "正文"}, + } + ], + } + ) + + assert len(captured_events) == 1 + event = captured_events[0] + assert not any(isinstance(comp, Reply) for comp in event.message_obj.message) + assert event.message_obj.is_reply is False + assert event.message_obj.quoted_text is None + assert event.message_obj.reply_to == {"matched": False} + assert event.message_obj.message_str == "正文" + + +@pytest.mark.asyncio +async def test_weixin_oc_matches_reply_from_recent_message_cache(): + adapter = _make_adapter() + captured_events: list[Any] = [] + adapter.commit_event = lambda event: captured_events.append(event) # type: ignore[method-assign] + adapter._cache_recent_message( + "user_2", + message_id="bot_msg_1", + sender_id="weixin_oc_test", + sender_nickname="weixin_oc_test", + timestamp=1775408790, + components=[Plain("上一条 Bot 消息")], + message_str="上一条 Bot 消息", + message_kind="text", + ) + + await adapter._handle_inbound_message( + { + "from_user_id": "user_2", + "message_id": "msg_2", + "create_time_ms": 1775408796000, + "item_list": [ + { + "type": 1, + "ref_msg": { + "message_item": { + "create_time_ms": 1775408793000, + "update_time_ms": 1775408793000, + "is_completed": True, + } + }, + "text_item": {"text": "这是对上一条的回复"}, + } + ], + } + ) + + assert len(captured_events) == 1 + event = captured_events[0] + reply = event.message_obj.message[0] + assert isinstance(reply, Reply) + assert reply.id == "bot_msg_1" + assert reply.sender_id == "weixin_oc_test" + assert reply.message_str == "上一条 Bot 消息" + assert event.message_obj.message_str == "这是对上一条的回复" + assert event.message_obj.reply_kind == "text" + assert event.message_obj.quoted_text == "上一条 Bot 消息" + assert event.message_obj.reply_to["strategy"] == "nearest-message-by-timestamp" + assert event.message_obj.reply_to["matched_message_id"] == "bot_msg_1" + + +@pytest.mark.asyncio +async def test_weixin_oc_normalizes_bot_reply_sender_id_from_cache(): + adapter = _make_adapter() + adapter.account_id = "wx_bot_real_id" + captured_events: list[Any] = [] + adapter.commit_event = lambda event: captured_events.append(event) # type: ignore[method-assign] + adapter._cache_recent_message( + "user_bot_reply", + message_id="bot_msg_real", + sender_id="wx_bot_real_id", + sender_nickname="wx_bot_real_id", + timestamp=1775408790, + timestamp_ms=1775408790123, + components=[Plain("上一条 Bot 消息")], + message_str="上一条 Bot 消息", + message_kind="text", + ) + + await adapter._handle_inbound_message( + { + "from_user_id": "user_bot_reply", + "message_id": "msg_bot_reply", + "create_time_ms": 1775408796000, + "item_list": [ + { + "type": 1, + "ref_msg": { + "message_item": { + "create_time_ms": 1775408790123, + "is_completed": True, + } + }, + "text_item": {"text": "回复 Bot"}, + } + ], + } + ) + + assert len(captured_events) == 1 + event = captured_events[0] + reply = event.message_obj.message[0] + assert isinstance(reply, Reply) + assert reply.sender_id == "weixin_oc_test" + + +@pytest.mark.asyncio +async def test_weixin_oc_inbound_cache_excludes_synthetic_reply_component(): + adapter = _make_adapter() + captured_events: list[Any] = [] + adapter.commit_event = lambda event: captured_events.append(event) # type: ignore[method-assign] + adapter._cache_recent_message( + "user_nested", + message_id="bot_msg_seed", + sender_id="weixin_oc_test", + sender_nickname="weixin_oc_test", + timestamp=1775408790, + timestamp_ms=1775408790000, + components=[Plain("最初的 Bot 消息")], + message_str="最初的 Bot 消息", + message_kind="text", + ) + + await adapter._handle_inbound_message( + { + "from_user_id": "user_nested", + "message_id": "msg_reply_1", + "create_time_ms": 1775408796000, + "item_list": [ + { + "type": 1, + "ref_msg": { + "message_item": { + "create_time_ms": 1775408790000, + "is_completed": True, + } + }, + "text_item": {"text": "第一层回复"}, + } + ], + } + ) + + cached = adapter._recent_messages["user_nested"].messages[-1] + assert len(cached.components) == 1 + assert isinstance(cached.components[0], Plain) + assert cached.components[0].text == "第一层回复" + + await adapter._handle_inbound_message( + { + "from_user_id": "user_nested", + "message_id": "msg_reply_2", + "create_time_ms": 1775408802000, + "item_list": [ + { + "type": 1, + "ref_msg": { + "message_item": { + "create_time_ms": 1775408796000, + "is_completed": True, + } + }, + "text_item": {"text": "第二层回复"}, + } + ], + } + ) + + assert len(captured_events) == 2 + second_event = captured_events[-1] + second_reply = second_event.message_obj.message[0] + assert isinstance(second_reply, Reply) + assert second_reply.message_str == "第一层回复" + assert len(second_reply.chain) == 1 + assert isinstance(second_reply.chain[0], Plain) + assert second_reply.chain[0].text == "第一层回复" + + +def test_weixin_oc_recent_message_cache_preserves_millisecond_precision(): + adapter = _make_adapter() + adapter._cache_recent_message( + "user_ms", + message_id="text_msg", + sender_id="bot", + sender_nickname="bot", + timestamp=1775408790, + timestamp_ms=1775408790001, + components=[Plain("caption")], + message_str="caption", + message_kind="text", + ) + adapter._cache_recent_message( + "user_ms", + message_id="image_msg", + sender_id="bot", + sender_nickname="bot", + timestamp=1775408790, + timestamp_ms=1775408790900, + components=[], + message_str="[图片]", + message_kind="image", + ) + + matched, metadata = adapter._match_recent_reply( + "user_ms", + ref_create_time_ms=1775408790910, + ) + + assert matched is not None + assert metadata is not None + assert matched.message_id == "image_msg" + assert metadata["distance_ms"] == 10 + + +def test_weixin_oc_recent_reply_match_accepts_small_future_clock_skew(): + adapter = _make_adapter() + adapter._cache_recent_message( + "user_future", + message_id="past_msg", + sender_id="bot", + sender_nickname="bot", + timestamp=1775408790, + timestamp_ms=1775408790000, + components=[Plain("过去的消息")], + message_str="过去的消息", + message_kind="text", + ) + adapter._cache_recent_message( + "user_future", + message_id="future_msg", + sender_id="bot", + sender_nickname="bot", + timestamp=1775408790, + timestamp_ms=1775408793050, + components=[Plain("稍晚的真实消息")], + message_str="稍晚的真实消息", + message_kind="text", + ) + + matched, metadata = adapter._match_recent_reply( + "user_future", + ref_create_time_ms=1775408793000, + ) + + assert matched is not None + assert metadata is not None + assert matched.message_id == "future_msg" + assert metadata["distance_ms"] == 50 + + +@pytest.mark.asyncio +async def test_weixin_oc_message_text_does_not_include_quoted_prefix(): + adapter = _make_adapter() + + text = adapter._message_text_from_item_list( + [ + { + "type": 1, + "ref_msg": { + "message_item": { + "type": 1, + "create_time_ms": 1775408781000, + "text_item": {"text": "你好"}, + } + }, + "text_item": {"text": "新的正文"}, + } + ], + include_ref_text=False, + ) + + assert text == "新的正文"