Skip to content

Commit 0f0e4c5

Browse files
ziyanli33ziyan.li
andauthored
feat(feishu-channel): add streaming response support for feishu channel (#561)
* feat(feishu-channel): add streaming response support for feishu channel 1. add streaming config parameter to extension init and env fallback 2. add fake stream test utilities and new streaming test case 3. refactor message handling logic to support both sync and stream modes 4. update docs to include streaming configuration option * style: run pre-commit ruff-format --------- Co-authored-by: ziyan.li <ziyan.li@bytedance.com>
1 parent fa1ddc9 commit 0f0e4c5

3 files changed

Lines changed: 160 additions & 15 deletions

File tree

docs/docs/tools/feishu-channel.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ tool:
3838
app_id: cli_xxx
3939
app_secret: xxx
4040
transport: ws
41+
streaming: true
4142
```
4243
4344
## 最小示例

tests/test_feishu_channel_extension.py

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
from veadk.extensions.feishu_channel import FeishuChannelExtension
2020

2121

22+
@pytest.fixture
23+
def anyio_backend():
24+
return "asyncio"
25+
26+
2227
class FakeChannel:
2328
def __init__(self):
2429
self.handlers = {}
@@ -31,6 +36,25 @@ async def send(self, chat_id, body, options=None):
3136
self.sent_messages.append((chat_id, body, options))
3237

3338

39+
class FakeStreamController:
40+
def __init__(self):
41+
self.chunks = []
42+
43+
async def append(self, chunk):
44+
self.chunks.append(chunk)
45+
46+
47+
class FakeStreamChannel(FakeChannel):
48+
def __init__(self):
49+
super().__init__()
50+
self.stream_calls = []
51+
52+
async def stream(self, chat_id, spec, options=None):
53+
controller = FakeStreamController()
54+
await spec["markdown"](controller)
55+
self.stream_calls.append((chat_id, controller.chunks, options))
56+
57+
3458
class FakeRunner:
3559
def __init__(self):
3660
self.calls = []
@@ -46,6 +70,46 @@ async def run(self, messages, user_id="", session_id="", **kwargs):
4670
return f"echo:{messages}"
4771

4872

73+
class FakeStreamingMemory:
74+
def __init__(self):
75+
self.sessions = []
76+
self.session_service = object()
77+
78+
async def create_session(self, app_name, user_id, session_id):
79+
self.sessions.append(
80+
{"app_name": app_name, "user_id": user_id, "session_id": session_id}
81+
)
82+
return True
83+
84+
85+
class FakeStreamingRunner:
86+
def __init__(self):
87+
self.app_name = "stream_app"
88+
self.short_term_memory = FakeStreamingMemory()
89+
self.run_async_calls = []
90+
91+
async def run_async(self, user_id, session_id, new_message, run_config=None):
92+
self.run_async_calls.append(
93+
{
94+
"user_id": user_id,
95+
"session_id": session_id,
96+
"new_message": new_message,
97+
"run_config": run_config,
98+
}
99+
)
100+
yield SimpleNamespace(
101+
content=SimpleNamespace(
102+
parts=[
103+
SimpleNamespace(text="hel", thought=False),
104+
SimpleNamespace(text="thinking", thought=True),
105+
]
106+
)
107+
)
108+
yield SimpleNamespace(
109+
content=SimpleNamespace(parts=[SimpleNamespace(text="lo", thought=False)])
110+
)
111+
112+
49113
def build_message(**overrides):
50114
message = SimpleNamespace(
51115
id="om_001",
@@ -73,7 +137,7 @@ def build_message(**overrides):
73137
return message
74138

75139

76-
@pytest.mark.asyncio
140+
@pytest.mark.anyio
77141
async def test_extension_uses_union_id_and_thread_id():
78142
runner = FakeRunner()
79143
channel = FakeChannel()
@@ -102,7 +166,7 @@ async def test_extension_uses_union_id_and_thread_id():
102166
]
103167

104168

105-
@pytest.mark.asyncio
169+
@pytest.mark.anyio
106170
async def test_extension_falls_back_to_chat_id_when_thread_missing():
107171
runner = FakeRunner()
108172
channel = FakeChannel()
@@ -118,7 +182,7 @@ async def test_extension_falls_back_to_chat_id_when_thread_missing():
118182
assert runner.calls[0]["session_id"] == "oc_chat"
119183

120184

121-
@pytest.mark.asyncio
185+
@pytest.mark.anyio
122186
async def test_extension_ignores_empty_message_by_default():
123187
runner = FakeRunner()
124188
channel = FakeChannel()
@@ -130,3 +194,26 @@ async def test_extension_ignores_empty_message_by_default():
130194

131195
assert runner.calls == []
132196
assert channel.sent_messages == []
197+
198+
199+
@pytest.mark.anyio
200+
async def test_extension_streaming_uses_markdown_producer_controller():
201+
runner = FakeStreamingRunner()
202+
channel = FakeStreamChannel()
203+
extension = FeishuChannelExtension(
204+
runner=runner,
205+
channel=channel,
206+
streaming=True,
207+
)
208+
209+
await extension._on_message(build_message())
210+
211+
assert runner.short_term_memory.sessions == [
212+
{
213+
"app_name": "stream_app",
214+
"user_id": "on_union",
215+
"session_id": "oc_chat",
216+
}
217+
]
218+
assert len(runner.run_async_calls) == 1
219+
assert channel.stream_calls == [("oc_chat", ["hel", "lo"], {"reply_to": "om_001"})]

veadk/extensions/feishu_channel.py

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def __init__(
8484
reply_in_thread: bool = True,
8585
ignore_empty_messages: bool = True,
8686
channel_kwargs: dict[str, Any] | None = None,
87+
streaming: bool = False,
8788
) -> None:
8889
self.runner = runner
8990
self.session_id_factory = session_id_factory or self.default_session_id_factory
@@ -92,6 +93,10 @@ def __init__(
9293
self.response_formatter = response_formatter or self.default_response_formatter
9394
self.reply_in_thread = reply_in_thread
9495
self.ignore_empty_messages = ignore_empty_messages
96+
self.streaming = (
97+
streaming
98+
or str(os.getenv("TOOL_FEISHU_CHANNEL_STREAMING", "")).lower() == "true"
99+
)
95100

96101
if channel is not None:
97102
self.channel = channel
@@ -167,29 +172,81 @@ async def _on_message(self, message: Any) -> None:
167172

168173
context = self.build_message_context(message=message, text=text)
169174

175+
send_options = {}
176+
if self.reply_in_thread and context.message_id:
177+
send_options["reply_to"] = context.message_id
178+
170179
if self.message_handler is not None:
171180
response_text = await self._maybe_await(self.message_handler(context))
181+
if not response_text:
182+
return
183+
184+
await self._maybe_await(
185+
self.channel.send(
186+
context.chat_id,
187+
self.response_formatter(str(response_text)),
188+
send_options,
189+
)
190+
)
191+
elif getattr(self, "streaming", False) and hasattr(self.channel, "stream"):
192+
from google.adk.agents import RunConfig
193+
from google.adk.agents.run_config import StreamingMode
194+
from veadk.config import getenv
195+
from veadk.runner import _convert_messages
196+
197+
if self.runner.short_term_memory:
198+
await self.runner.short_term_memory.create_session(
199+
app_name=self.runner.app_name,
200+
user_id=context.user_id,
201+
session_id=context.session_id,
202+
)
203+
204+
converted_messages = _convert_messages(
205+
context.text, self.runner.app_name, context.user_id, context.session_id
206+
)
207+
208+
run_config = RunConfig(
209+
streaming_mode=StreamingMode.SSE,
210+
max_llm_calls=int(getenv("MODEL_AGENT_MAX_LLM_CALLS", 100)),
211+
)
212+
213+
async def stream_to_feishu(stream):
214+
for converted_message in converted_messages:
215+
async for event in self.runner.run_async(
216+
user_id=context.user_id,
217+
session_id=context.session_id,
218+
new_message=converted_message,
219+
run_config=run_config,
220+
):
221+
if event.content and event.content.parts:
222+
for part in event.content.parts:
223+
if not getattr(part, "thought", False) and part.text:
224+
await stream.append(part.text)
225+
226+
await self._maybe_await(
227+
self.channel.stream(
228+
context.chat_id,
229+
{"markdown": stream_to_feishu},
230+
send_options,
231+
)
232+
)
172233
else:
173234
response_text = await self.runner.run(
174235
messages=context.text,
175236
user_id=context.user_id,
176237
session_id=context.session_id,
177238
)
178239

179-
if not response_text:
180-
return
181-
182-
send_options = {}
183-
if self.reply_in_thread and context.message_id:
184-
send_options["reply_to"] = context.message_id
240+
if not response_text:
241+
return
185242

186-
await self._maybe_await(
187-
self.channel.send(
188-
context.chat_id,
189-
self.response_formatter(str(response_text)),
190-
send_options,
243+
await self._maybe_await(
244+
self.channel.send(
245+
context.chat_id,
246+
self.response_formatter(str(response_text)),
247+
send_options,
248+
)
191249
)
192-
)
193250

194251
def build_message_context(
195252
self, message: Any, text: str | None = None

0 commit comments

Comments
 (0)