Skip to content

Commit c3e175b

Browse files
committed
fix(wecombot): preserve websocket compatibility with pull enhancements
1 parent 8f97d93 commit c3e175b

3 files changed

Lines changed: 117 additions & 96 deletions

File tree

src/langbot/pkg/platform/sources/wecombot.py

Lines changed: 88 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from ..logger import EventLogger
1212
from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent
1313
from langbot.libs.wecom_ai_bot_api.api import WecomBotClient
14+
from langbot.libs.wecom_ai_bot_api.ws_client import WecomBotWsClient
1415

1516

1617
DEFAULT_PULL_PENDING_PLACEHOLDER = 'AI 正在思考中,请稍候'
@@ -194,12 +195,13 @@ async def target2yiri(event: WecomBotEvent):
194195

195196

196197
class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
197-
bot: WecomBotClient
198+
bot: typing.Union[WecomBotClient, WecomBotWsClient]
198199
bot_account_id: str
199200
message_converter: WecomBotMessageConverter = WecomBotMessageConverter()
200201
event_converter: WecomBotEventConverter = WecomBotEventConverter()
201202
config: dict
202203
bot_uuid: str = None
204+
_ws_mode: bool = False
203205

204206
@staticmethod
205207
def _get_int_config(config: dict, key: str, default: int, min_value: int, max_value: int) -> int:
@@ -211,52 +213,60 @@ def _get_int_config(config: dict, key: str, default: int, min_value: int, max_va
211213
return max(min_value, min(max_value, value))
212214

213215
def __init__(self, config: dict, logger: EventLogger):
214-
enable_webhook = config.get('enable-webhook', True)
215-
if not enable_webhook:
216-
raise Exception('WecomBot websocket mode is not supported in this branch yet. Please enable webhook mode.')
217-
218-
required_keys = ['Token', 'EncodingAESKey', 'Corpid', 'BotId']
219-
missing_keys = [key for key in required_keys if not config.get(key)]
220-
if missing_keys:
221-
raise Exception(f'WecomBot webhook mode missing config: {missing_keys}')
216+
enable_webhook = config.get('enable-webhook', False)
217+
normalized_config = dict(config)
222218

223-
pull_poll_timeout_ms = self._get_int_config(config, 'PullPollTimeoutMs', 500, 50, 2000)
224-
pull_stream_max_lifetime_ms = self._get_int_config(config, 'PullStreamMaxLifetimeMs', 300000, 1000, 600000)
225-
pending_placeholder_enabled = config.get('PullPendingPlaceholderEnabled', False)
226-
pending_placeholder_delay_ms = self._get_int_config(config, 'PullPendingPlaceholderDelayMs', 3000, 0, 10000)
227-
pending_placeholder = config.get('PullPendingPlaceholder', DEFAULT_PULL_PENDING_PLACEHOLDER)
219+
if not enable_webhook:
220+
bot = WecomBotWsClient(
221+
bot_id=config['BotId'],
222+
secret=config['Secret'],
223+
logger=logger,
224+
encoding_aes_key=config.get('EncodingAESKey', ''),
225+
)
226+
ws_mode = True
227+
else:
228+
required_keys = ['Token', 'EncodingAESKey', 'Corpid']
229+
missing_keys = [key for key in required_keys if key not in config or not config[key]]
230+
if missing_keys:
231+
raise Exception(f'WecomBot webhook mode missing config: {missing_keys}')
232+
233+
pull_poll_timeout_ms = self._get_int_config(config, 'PullPollTimeoutMs', 500, 50, 2000)
234+
pull_stream_max_lifetime_ms = self._get_int_config(config, 'PullStreamMaxLifetimeMs', 300000, 1000, 600000)
235+
pending_placeholder_enabled = config.get('PullPendingPlaceholderEnabled', False)
236+
pending_placeholder_delay_ms = self._get_int_config(config, 'PullPendingPlaceholderDelayMs', 3000, 0, 10000)
237+
pending_placeholder = config.get('PullPendingPlaceholder', DEFAULT_PULL_PENDING_PLACEHOLDER)
238+
239+
normalized_config['PullPollTimeoutMs'] = pull_poll_timeout_ms
240+
normalized_config['PullStreamMaxLifetimeMs'] = pull_stream_max_lifetime_ms
241+
normalized_config['PullPendingPlaceholderEnabled'] = pending_placeholder_enabled
242+
normalized_config['PullPendingPlaceholderDelayMs'] = pending_placeholder_delay_ms
243+
normalized_config['PullPendingPlaceholder'] = pending_placeholder
244+
245+
effective_placeholder_delay = pending_placeholder_delay_ms / 1000 if pending_placeholder_enabled else 0
246+
effective_placeholder = pending_placeholder if pending_placeholder_enabled else ''
247+
248+
bot = WecomBotClient(
249+
Token=config['Token'],
250+
EnCodingAESKey=config['EncodingAESKey'],
251+
Corpid=config['Corpid'],
252+
logger=logger,
253+
unified_mode=True,
254+
stream_poll_timeout=pull_poll_timeout_ms / 1000,
255+
stream_max_lifetime=pull_stream_max_lifetime_ms / 1000,
256+
pending_placeholder=effective_placeholder,
257+
pending_placeholder_delay=effective_placeholder_delay,
258+
)
259+
ws_mode = False
228260

229-
normalized_config = dict(config)
230-
normalized_config['enable-webhook'] = True
231-
normalized_config['PullPollTimeoutMs'] = pull_poll_timeout_ms
232-
normalized_config['PullStreamMaxLifetimeMs'] = pull_stream_max_lifetime_ms
233-
normalized_config['PullPendingPlaceholderEnabled'] = pending_placeholder_enabled
234-
normalized_config['PullPendingPlaceholderDelayMs'] = pending_placeholder_delay_ms
235-
normalized_config['PullPendingPlaceholder'] = pending_placeholder
236-
237-
# 如果未开启首字等待占位,则将延迟设为0且占位文案设为空
238-
effective_placeholder_delay = pending_placeholder_delay_ms / 1000 if pending_placeholder_enabled else 0
239-
effective_placeholder = pending_placeholder if pending_placeholder_enabled else ''
240-
241-
bot = WecomBotClient(
242-
Token=config['Token'],
243-
EnCodingAESKey=config['EncodingAESKey'],
244-
Corpid=config['Corpid'],
245-
logger=logger,
246-
unified_mode=True,
247-
stream_poll_timeout=pull_poll_timeout_ms / 1000,
248-
stream_max_lifetime=pull_stream_max_lifetime_ms / 1000,
249-
pending_placeholder=effective_placeholder,
250-
pending_placeholder_delay=effective_placeholder_delay,
251-
)
252-
bot_account_id = config['BotId']
261+
bot_account_id = config.get('BotId', '')
253262

254263
super().__init__(
255264
config=normalized_config,
256265
logger=logger,
257266
bot=bot,
258267
bot_account_id=bot_account_id,
259268
)
269+
self._ws_mode = ws_mode
260270

261271
async def reply_message(
262272
self,
@@ -265,7 +275,15 @@ async def reply_message(
265275
quote_origin: bool = False,
266276
):
267277
content = await self.message_converter.yiri2target(message)
268-
await self.bot.set_message(message_source.source_platform_object.message_id, content)
278+
if self._ws_mode:
279+
event = message_source.source_platform_object
280+
req_id = event.get('req_id', '')
281+
if req_id:
282+
await self.bot.reply_text(req_id, content)
283+
else:
284+
await self.bot.set_message(event.message_id, content)
285+
else:
286+
await self.bot.set_message(message_source.source_platform_object.message_id, content)
269287

270288
async def reply_message_chunk(
271289
self,
@@ -275,30 +293,22 @@ async def reply_message_chunk(
275293
quote_origin: bool = False,
276294
is_final: bool = False,
277295
):
278-
"""将流水线增量输出写入企业微信 stream 会话。
279-
280-
Args:
281-
message_source: 流水线提供的原始消息事件。
282-
bot_message: 当前片段对应的模型元信息(未使用)。
283-
message: 需要回复的消息链。
284-
quote_origin: 是否引用原消息(企业微信暂不支持)。
285-
is_final: 标记当前片段是否为最终回复。
286-
287-
Returns:
288-
dict: 包含 `stream` 键,标识写入是否成功。
289-
290-
Example:
291-
在流水线 `reply_message_chunk` 调用中自动触发,无需手动调用。
292-
"""
293-
# 转换为纯文本(智能机器人当前协议仅支持文本流)
294296
content = await self.message_converter.yiri2target(message)
295297
msg_id = message_source.source_platform_object.message_id
296-
# 将片段推送到 WecomBotClient 中的队列,返回值用于判断是否走降级逻辑
297-
success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final)
298-
if not success and is_final:
299-
# 未命中流式队列时使用旧有 set_message 兜底
300-
await self.bot.set_message(msg_id, content)
301-
return {'stream': success}
298+
299+
if self._ws_mode:
300+
success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final)
301+
if not success and is_final:
302+
event = message_source.source_platform_object
303+
req_id = event.get('req_id', '')
304+
if req_id:
305+
await self.bot.reply_text(req_id, content)
306+
return {'stream': success}
307+
else:
308+
success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final)
309+
if not success and is_final:
310+
await self.bot.set_message(msg_id, content)
311+
return {'stream': success}
302312

303313
async def is_stream_output_supported(self) -> bool:
304314
"""智能机器人侧默认开启流式能力。
@@ -315,7 +325,11 @@ async def create_message_card(self, message_id: str, event) -> bool:
315325
return False
316326

317327
async def send_message(self, target_type, target_id, message):
318-
pass
328+
if self._ws_mode:
329+
content = await self.message_converter.yiri2target(message)
330+
await self.bot.send_message(target_id, content)
331+
else:
332+
pass
319333

320334
def register_listener(
321335
self,
@@ -344,29 +358,25 @@ def set_bot_uuid(self, bot_uuid: str):
344358
self.bot_uuid = bot_uuid
345359

346360
async def handle_unified_webhook(self, bot_uuid: str, path: str, request):
347-
"""处理统一 webhook 请求。
348-
349-
Args:
350-
bot_uuid: Bot 的 UUID
351-
path: 子路径(如果有的话)
352-
request: Quart Request 对象
353-
354-
Returns:
355-
响应数据
356-
"""
361+
if self._ws_mode:
362+
return None
357363
return await self.bot.handle_unified_webhook(request)
358364

359365
async def run_async(self):
360-
# 统一 webhook 模式下,不启动独立的 Quart 应用
361-
# 保持运行但不启动独立端口
366+
if self._ws_mode:
367+
await self.bot.connect()
368+
else:
362369

363-
async def keep_alive():
364-
while True:
365-
await asyncio.sleep(1)
370+
async def keep_alive():
371+
while True:
372+
await asyncio.sleep(1)
366373

367-
await keep_alive()
374+
await keep_alive()
368375

369376
async def kill(self) -> bool:
377+
if self._ws_mode:
378+
await self.bot.disconnect()
379+
return True
370380
return False
371381

372382
async def unregister_listener(

src/langbot/pkg/platform/sources/wecombot.yaml

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@ spec:
2121
- name: enable-webhook
2222
label:
2323
en_US: Enable Webhook Mode
24-
zh_Hans: 启用 Webhook 模式
24+
zh_Hans: 启用Webhook模式
2525
description:
26-
en_US: Switch between Webhook mode and WebSocket mode config. This branch currently supports Webhook mode only.
27-
zh_Hans: Webhook / WebSocket 配置之间切换展示。当前分支仅支持 Webhook 模式。
26+
en_US: If enabled, the bot will use webhook mode to receive messages. Otherwise, it will use WS long connection mode
27+
zh_Hans: 如果启用,机器人将使用 Webhook 模式接收消息。否则,将使用 WS 长连接模式
2828
type: boolean
2929
required: true
30-
default: true
30+
default: false
3131
- name: Secret
3232
label:
3333
en_US: Secret
3434
zh_Hans: 机器人密钥 (Secret)
3535
description:
36-
en_US: Reserved for WebSocket mode.
37-
zh_Hans: WebSocket 模式预留配置项。
36+
en_US: Required for WebSocket long connection mode
37+
zh_Hans: 使用 WS 长连接模式时必填
3838
type: string
3939
required: false
4040
default: ""
@@ -43,6 +43,9 @@ spec:
4343
label:
4444
en_US: Corpid
4545
zh_Hans: 企业ID
46+
description:
47+
en_US: Required for Webhook mode
48+
zh_Hans: 使用 Webhook 模式时必填
4649
type: string
4750
required: false
4851
default: ""
@@ -51,6 +54,9 @@ spec:
5154
label:
5255
en_US: Token
5356
zh_Hans: 令牌 (Token)
57+
description:
58+
en_US: Required for Webhook mode
59+
zh_Hans: 使用 Webhook 模式时必填
5460
type: string
5561
required: false
5662
default: ""
@@ -59,6 +65,9 @@ spec:
5965
label:
6066
en_US: EncodingAESKey
6167
zh_Hans: 消息加解密密钥 (EncodingAESKey)
68+
description:
69+
en_US: Required for Webhook mode. Optional for WebSocket mode (used for file decryption)
70+
zh_Hans: 使用 Webhook 模式时必填。WebSocket 模式下可选(用于文件解密)
6271
type: string
6372
required: false
6473
default: ""

tests/unit_tests/pipeline/test_wecombot_dify_minfix.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -409,8 +409,6 @@ async def test_wecom_dispatch_exception_forces_finish():
409409
assert chunk.content == client.stream_error_final_text
410410

411411

412-
413-
414412
def test_wecom_client_defaults_match_master_polling_behavior():
415413
WecomBotClient = get_wecom_client()
416414
client = WecomBotClient(
@@ -441,18 +439,22 @@ def get_wecom_adapter():
441439
return import_module('langbot.pkg.platform.sources.wecombot').WecomBotAdapter
442440

443441

444-
def test_wecombot_adapter_rejects_websocket_mode_in_current_branch():
442+
def test_wecombot_adapter_supports_websocket_mode_from_master():
445443
WecomBotAdapter = get_wecom_adapter()
444+
ws_module = import_module('langbot.libs.wecom_ai_bot_api.ws_client')
446445

447-
with pytest.raises(Exception, match='websocket mode is not supported'):
448-
WecomBotAdapter(
449-
{
450-
'BotId': 'bot-id',
451-
'enable-webhook': False,
452-
'Secret': 'secret',
453-
},
454-
make_async_logger(),
455-
)
446+
adapter = WecomBotAdapter(
447+
{
448+
'BotId': 'bot-id',
449+
'enable-webhook': False,
450+
'Secret': 'secret',
451+
},
452+
make_valid_event_logger(),
453+
)
454+
455+
assert adapter._ws_mode is True
456+
assert isinstance(adapter.bot, ws_module.WecomBotWsClient)
457+
assert adapter.config['enable-webhook'] is False
456458

457459

458460
def test_wecombot_adapter_webhook_mode_normalizes_pull_config():

0 commit comments

Comments
 (0)