|
| 1 | +import asyncio |
1 | 2 | import base64 |
2 | 3 | import json |
3 | 4 | import os |
4 | 5 | import uuid |
5 | 6 | from io import BytesIO |
6 | 7 |
|
7 | 8 | import lark_oapi as lark |
| 9 | +from lark_oapi.api.cardkit.v1 import ( |
| 10 | + ContentCardElementRequest, |
| 11 | + ContentCardElementRequestBody, |
| 12 | + CreateCardRequest, |
| 13 | + CreateCardRequestBody, |
| 14 | + SettingsCardRequest, |
| 15 | + SettingsCardRequestBody, |
| 16 | +) |
8 | 17 | from lark_oapi.api.im.v1 import ( |
9 | 18 | CreateFileRequest, |
10 | 19 | CreateFileRequestBody, |
|
28 | 37 | convert_video_format, |
29 | 38 | get_media_duration, |
30 | 39 | ) |
| 40 | +from astrbot.core.utils.metrics import Metric |
31 | 41 |
|
32 | 42 |
|
33 | 43 | class LarkMessageEvent(AstrMessageEvent): |
@@ -555,15 +565,257 @@ async def react(self, emoji: str) -> None: |
555 | 565 | logger.error(f"发送飞书表情回应失败({response.code}): {response.msg}") |
556 | 566 | return |
557 | 567 |
|
558 | | - async def send_streaming(self, generator, use_fallback: bool = False): |
| 568 | + async def _create_streaming_card(self) -> str | None: |
| 569 | + """创建一个开启流式更新模式的卡片实体,返回 card_id。""" |
| 570 | + if self.bot.cardkit is None: |
| 571 | + logger.error("[Lark] API Client cardkit 模块未初始化") |
| 572 | + return None |
| 573 | + |
| 574 | + card_json = { |
| 575 | + "schema": "2.0", |
| 576 | + "header": { |
| 577 | + "title": {"content": "", "tag": "plain_text"}, |
| 578 | + }, |
| 579 | + "config": { |
| 580 | + "streaming_mode": True, |
| 581 | + "summary": {"content": ""}, |
| 582 | + "streaming_config": { |
| 583 | + "print_frequency_ms": {"default": 50}, |
| 584 | + "print_step": {"default": 2}, |
| 585 | + "print_strategy": "fast", |
| 586 | + }, |
| 587 | + }, |
| 588 | + "body": { |
| 589 | + "elements": [ |
| 590 | + { |
| 591 | + "tag": "markdown", |
| 592 | + "content": "", |
| 593 | + "element_id": "markdown_1", |
| 594 | + } |
| 595 | + ] |
| 596 | + }, |
| 597 | + } |
| 598 | + |
| 599 | + request = ( |
| 600 | + CreateCardRequest.builder() |
| 601 | + .request_body( |
| 602 | + CreateCardRequestBody.builder() |
| 603 | + .type("card_json") |
| 604 | + .data(json.dumps(card_json, ensure_ascii=False)) |
| 605 | + .build() |
| 606 | + ) |
| 607 | + .build() |
| 608 | + ) |
| 609 | + |
| 610 | + try: |
| 611 | + response = await self.bot.cardkit.v1.card.acreate(request) |
| 612 | + except Exception as e: |
| 613 | + logger.error(f"[Lark] 创建流式卡片实体失败: {e}") |
| 614 | + return None |
| 615 | + |
| 616 | + if not response.success(): |
| 617 | + logger.error( |
| 618 | + f"[Lark] 创建流式卡片实体失败({response.code}): {response.msg}" |
| 619 | + ) |
| 620 | + return None |
| 621 | + |
| 622 | + if response.data is None or not response.data.card_id: |
| 623 | + logger.error("[Lark] 创建流式卡片实体成功但未返回 card_id") |
| 624 | + return None |
| 625 | + |
| 626 | + card_id = response.data.card_id |
| 627 | + logger.debug(f"[Lark] 创建流式卡片实体成功: {card_id}") |
| 628 | + return card_id |
| 629 | + |
| 630 | + async def _send_card_message( |
| 631 | + self, |
| 632 | + card_id: str, |
| 633 | + reply_message_id: str | None = None, |
| 634 | + receive_id: str | None = None, |
| 635 | + receive_id_type: str | None = None, |
| 636 | + ) -> bool: |
| 637 | + """将卡片实体作为 interactive 消息发送。""" |
| 638 | + content = json.dumps( |
| 639 | + {"type": "card", "data": {"card_id": card_id}}, |
| 640 | + ensure_ascii=False, |
| 641 | + ) |
| 642 | + return await self._send_im_message( |
| 643 | + self.bot, |
| 644 | + content=content, |
| 645 | + msg_type="interactive", |
| 646 | + reply_message_id=reply_message_id, |
| 647 | + receive_id=receive_id, |
| 648 | + receive_id_type=receive_id_type, |
| 649 | + ) |
| 650 | + |
| 651 | + async def _update_streaming_text( |
| 652 | + self, |
| 653 | + card_id: str, |
| 654 | + content: str, |
| 655 | + sequence: int, |
| 656 | + ) -> bool: |
| 657 | + """调用 CardKit 流式更新文本接口,向 markdown_1 组件推送全量文本。""" |
| 658 | + if self.bot.cardkit is None: |
| 659 | + logger.error("[Lark] API Client cardkit 模块未初始化") |
| 660 | + return False |
| 661 | + |
| 662 | + request = ( |
| 663 | + ContentCardElementRequest.builder() |
| 664 | + .card_id(card_id) |
| 665 | + .element_id("markdown_1") |
| 666 | + .request_body( |
| 667 | + ContentCardElementRequestBody.builder() |
| 668 | + .content(content) |
| 669 | + .sequence(sequence) |
| 670 | + .uuid(str(uuid.uuid4())) |
| 671 | + .build() |
| 672 | + ) |
| 673 | + .build() |
| 674 | + ) |
| 675 | + |
| 676 | + try: |
| 677 | + response = await self.bot.cardkit.v1.card_element.acontent(request) |
| 678 | + except Exception as e: |
| 679 | + logger.debug(f"[Lark] 流式更新文本失败 (ignored): {e}") |
| 680 | + return False |
| 681 | + |
| 682 | + if not response.success(): |
| 683 | + logger.debug(f"[Lark] 流式更新文本失败({response.code}): {response.msg}") |
| 684 | + return False |
| 685 | + |
| 686 | + return True |
| 687 | + |
| 688 | + async def _close_streaming_mode( |
| 689 | + self, |
| 690 | + card_id: str, |
| 691 | + sequence: int, |
| 692 | + ) -> None: |
| 693 | + """关闭卡片的流式更新模式,使其可正常转发、摘要恢复。""" |
| 694 | + if self.bot.cardkit is None: |
| 695 | + logger.error("[Lark] API Client cardkit 模块未初始化") |
| 696 | + return |
| 697 | + |
| 698 | + settings_json = json.dumps( |
| 699 | + {"config": {"streaming_mode": False}}, |
| 700 | + ensure_ascii=False, |
| 701 | + ) |
| 702 | + |
| 703 | + request = ( |
| 704 | + SettingsCardRequest.builder() |
| 705 | + .card_id(card_id) |
| 706 | + .request_body( |
| 707 | + SettingsCardRequestBody.builder() |
| 708 | + .settings(settings_json) |
| 709 | + .sequence(sequence) |
| 710 | + .uuid(str(uuid.uuid4())) |
| 711 | + .build() |
| 712 | + ) |
| 713 | + .build() |
| 714 | + ) |
| 715 | + |
| 716 | + try: |
| 717 | + response = await self.bot.cardkit.v1.card.asettings(request) |
| 718 | + except Exception as e: |
| 719 | + logger.error(f"[Lark] 关闭流式模式失败: {e}") |
| 720 | + return |
| 721 | + |
| 722 | + if not response.success(): |
| 723 | + logger.error(f"[Lark] 关闭流式模式失败({response.code}): {response.msg}") |
| 724 | + else: |
| 725 | + logger.debug(f"[Lark] 流式模式已关闭: {card_id}") |
| 726 | + |
| 727 | + async def _fallback_send_streaming(self, generator, use_fallback: bool = False): |
| 728 | + """回退到非流式发送:缓冲全部文本后一次性发送,并保留父类副作用。""" |
559 | 729 | buffer = None |
560 | 730 | async for chain in generator: |
561 | 731 | if not buffer: |
562 | 732 | buffer = chain |
563 | 733 | else: |
564 | 734 | buffer.chain.extend(chain.chain) |
565 | | - if not buffer: |
566 | | - return None |
567 | | - buffer.squash_plain() |
568 | | - await self.send(buffer) |
569 | | - return await super().send_streaming(generator, use_fallback) |
| 735 | + |
| 736 | + if buffer: |
| 737 | + buffer.squash_plain() |
| 738 | + await self.send(buffer) |
| 739 | + |
| 740 | + await Metric.upload(msg_event_tick=1, adapter_name=self.platform_meta.name) |
| 741 | + self._has_send_oper = True |
| 742 | + |
| 743 | + async def send_streaming(self, generator, use_fallback: bool = False): |
| 744 | + """使用 CardKit 流式卡片实现打字机效果。 |
| 745 | +
|
| 746 | + 流程:创建卡片实体 → 发送消息 → 流式更新文本 → 关闭流式模式。 |
| 747 | + 使用解耦发送循环,LLM token 到达时只更新 buffer 并唤醒发送协程, |
| 748 | + 发送频率由网络 RTT 自然限流。 |
| 749 | + """ |
| 750 | + # Step 1: 创建流式卡片实体 |
| 751 | + card_id = await self._create_streaming_card() |
| 752 | + if not card_id: |
| 753 | + logger.warning("[Lark] 无法创建流式卡片,回退到非流式发送") |
| 754 | + await self._fallback_send_streaming(generator, use_fallback) |
| 755 | + return |
| 756 | + |
| 757 | + # Step 2: 发送卡片消息 |
| 758 | + sent = await self._send_card_message( |
| 759 | + card_id, |
| 760 | + reply_message_id=self.message_obj.message_id, |
| 761 | + ) |
| 762 | + if not sent: |
| 763 | + logger.error("[Lark] 发送流式卡片消息失败,回退到非流式发送") |
| 764 | + await self._fallback_send_streaming(generator, use_fallback) |
| 765 | + return |
| 766 | + |
| 767 | + logger.info("[Lark] 流式输出: 使用 CardKit 流式卡片") |
| 768 | + |
| 769 | + # Step 3: 解耦发送循环 (Event-driven, 参考 Telegram Draft 路径) |
| 770 | + sequence = 0 |
| 771 | + delta = "" |
| 772 | + last_sent = "" |
| 773 | + done = False |
| 774 | + text_changed = asyncio.Event() |
| 775 | + |
| 776 | + async def _sender_loop() -> None: |
| 777 | + """信号驱动的文本发送循环,有新内容就发,RTT 自然限流。""" |
| 778 | + nonlocal sequence, last_sent |
| 779 | + while not done: |
| 780 | + await text_changed.wait() |
| 781 | + text_changed.clear() |
| 782 | + snapshot = delta |
| 783 | + if snapshot and snapshot != last_sent: |
| 784 | + sequence += 1 |
| 785 | + ok = await self._update_streaming_text(card_id, snapshot, sequence) |
| 786 | + if ok: |
| 787 | + last_sent = snapshot |
| 788 | + if delta != snapshot: |
| 789 | + text_changed.set() |
| 790 | + |
| 791 | + sender_task = asyncio.create_task(_sender_loop()) |
| 792 | + |
| 793 | + try: |
| 794 | + async for chain in generator: |
| 795 | + if not isinstance(chain, MessageChain): |
| 796 | + continue |
| 797 | + |
| 798 | + if chain.type == "break": |
| 799 | + # 飞书卡片不支持分段,忽略 break |
| 800 | + continue |
| 801 | + |
| 802 | + for comp in chain.chain: |
| 803 | + if isinstance(comp, Plain): |
| 804 | + delta += comp.text |
| 805 | + text_changed.set() |
| 806 | + finally: |
| 807 | + done = True |
| 808 | + text_changed.set() |
| 809 | + await sender_task |
| 810 | + |
| 811 | + # Step 4: 必要时补发最终文本 + 关闭流式模式 |
| 812 | + if delta and delta != last_sent: |
| 813 | + sequence += 1 |
| 814 | + await self._update_streaming_text(card_id, delta, sequence) |
| 815 | + |
| 816 | + sequence += 1 |
| 817 | + await self._close_streaming_mode(card_id, sequence) |
| 818 | + |
| 819 | + # Step 5: 内联父类 send_streaming 的副作用 |
| 820 | + await Metric.upload(msg_event_tick=1, adapter_name=self.platform_meta.name) |
| 821 | + self._has_send_oper = True |
0 commit comments