Skip to content

Commit b848ddf

Browse files
authored
fix: channel reply stream cold start (#366)
## Summary - Warm channel conversations before subscribing to the agent stream so cold-start external replies can relay back to Telegram, Lark, DingTalk, and Weixin. - Add regression coverage for the runtime channel cold-task path. - Keep Telegram credential checks from starting long-running polling during plugin tests. ## Verification - `just push -u origin aio-9-channel-reply-stream` - `cargo nextest run --workspace` via `just push`: 5706 passed, 18 skipped Co-authored-by: zynx <>
1 parent b24570f commit b848ddf

6 files changed

Lines changed: 316 additions & 23 deletions

File tree

crates/aionui-channel/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,6 @@ rustls = { workspace = true, optional = true }
3636
rustls-native-certs = { workspace = true, optional = true }
3737
base64 = { workspace = true, optional = true }
3838
uuid = { workspace = true, optional = true }
39+
40+
[dev-dependencies]
41+
aionui-ai-agent = { workspace = true, features = ["test-support"] }

crates/aionui-channel/src/message_service.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ impl ChannelMessageService {
4646
/// Sends a text message from a channel user to the AI agent.
4747
///
4848
/// 1. Ensures the session has a backing conversation (creates one if needed)
49-
/// 2. Sends the message via ConversationService
50-
/// 3. Returns the conversation_id for stream subscription
49+
/// 2. Warms up the backing agent task so stream subscription is available
50+
/// 3. Sends the message via ConversationService
51+
/// 4. Returns the conversation_id and stream receiver for relay
5152
///
5253
/// The caller is responsible for subscribing to stream events and
5354
/// relaying them to the IM platform.
@@ -75,30 +76,40 @@ impl ChannelMessageService {
7576
};
7677

7778
let user_id = &self.owner_user_id;
79+
// Channel relays need a stream subscription before the agent starts
80+
// emitting. `ConversationService::send_message` returns immediately
81+
// and builds cold agents in the background, so warm the conversation
82+
// explicitly for channel traffic.
7883
self.conversation_svc
79-
.send_message(user_id, &conversation_id, req, &self.task_manager)
84+
.warmup(user_id, &conversation_id, &self.task_manager)
8085
.await
8186
.map_err(|e| ChannelError::MessageSendFailed(e.to_string()))?;
8287

83-
// Subscribe to the agent's broadcast channel for the ChannelStreamRelay.
84-
// The agent task exists because send_message just called get_or_build_task.
85-
// ConversationService spawns agent.send_message in a background task,
86-
// so the first events have not been emitted yet — no race condition.
8788
let stream_rx = self
8889
.task_manager
8990
.get_task(&conversation_id)
90-
.map(|handle| handle.subscribe());
91+
.map(|handle| handle.subscribe())
92+
.ok_or_else(|| {
93+
ChannelError::MessageSendFailed(format!(
94+
"Agent task missing after warmup for conversation {conversation_id}"
95+
))
96+
})?;
97+
98+
self.conversation_svc
99+
.send_message(user_id, &conversation_id, req, &self.task_manager)
100+
.await
101+
.map_err(|e| ChannelError::MessageSendFailed(e.to_string()))?;
91102

92103
info!(
93104
conversation_id = %conversation_id,
94105
session_id = %session.id,
95-
has_stream = stream_rx.is_some(),
106+
has_stream = true,
96107
"message sent to agent"
97108
);
98109

99110
Ok(SendResult {
100111
conversation_id,
101-
stream_rx,
112+
stream_rx: Some(stream_rx),
102113
})
103114
}
104115

crates/aionui-channel/src/plugin.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::types::{BotInfo, PluginConfig, PluginStatus, PluginType, UnifiedIncom
99
///
1010
/// This addresses M-63 — the API Spec `BasePlugin.onMessage/onConfirm`
1111
/// callbacks are mapped to channel-based injection.
12+
#[derive(Clone)]
1213
pub struct PluginCallbacks {
1314
/// Sender for incoming messages from the platform.
1415
pub message_tx: tokio::sync::mpsc::Sender<UnifiedIncomingMessage>,

crates/aionui-channel/src/plugins/telegram/plugin.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub struct TelegramPlugin {
3232
bot_info: Option<BotInfo>,
3333
last_error: Option<String>,
3434
api: Option<Arc<TelegramApi>>,
35+
callbacks: Option<PluginCallbacks>,
3536
poll_handle: Option<JoinHandle<()>>,
3637
shutdown_tx: Option<watch::Sender<bool>>,
3738
}
@@ -43,6 +44,7 @@ impl Default for TelegramPlugin {
4344
bot_info: None,
4445
last_error: None,
4546
api: None,
47+
callbacks: None,
4648
poll_handle: None,
4749
shutdown_tx: None,
4850
}
@@ -102,27 +104,38 @@ impl ChannelPlugin for TelegramPlugin {
102104
);
103105

104106
self.api = Some(api);
105-
// Store callbacks in a shared container for the polling task
107+
self.callbacks = Some(callbacks);
108+
self.status = PluginStatus::Ready;
109+
Ok(())
110+
}
111+
112+
async fn start(&mut self) -> Result<(), ChannelError> {
113+
self.status = PluginStatus::Starting;
114+
115+
if self.poll_handle.is_some() {
116+
self.status = PluginStatus::Running;
117+
return Ok(());
118+
}
119+
120+
let api = self
121+
.api
122+
.as_ref()
123+
.cloned()
124+
.ok_or_else(|| ChannelError::PlatformApi("Telegram plugin not initialized".into()))?;
125+
let callbacks = self
126+
.callbacks
127+
.clone()
128+
.ok_or_else(|| ChannelError::PlatformApi("Telegram callbacks not initialized".into()))?;
129+
106130
let (shutdown_tx, shutdown_rx) = watch::channel(false);
107131
self.shutdown_tx = Some(shutdown_tx);
108-
109-
// Spawn the long-polling task
110-
let api_clone = Arc::clone(self.api.as_ref().expect("api just set"));
111132
self.poll_handle = Some(tokio::spawn(poll_loop(
112-
api_clone,
133+
api,
113134
callbacks.message_tx,
114135
callbacks.confirm_tx,
115136
shutdown_rx,
116137
)));
117138

118-
self.status = PluginStatus::Ready;
119-
Ok(())
120-
}
121-
122-
async fn start(&mut self) -> Result<(), ChannelError> {
123-
self.status = PluginStatus::Starting;
124-
// The polling task was already spawned in initialize;
125-
// `start` just transitions the status.
126139
self.status = PluginStatus::Running;
127140
info!("Telegram plugin started");
128141
Ok(())
@@ -143,6 +156,7 @@ impl ChannelPlugin for TelegramPlugin {
143156
}
144157

145158
self.api = None;
159+
self.callbacks = None;
146160
self.status = PluginStatus::Stopped;
147161
info!("Telegram plugin stopped");
148162
Ok(())

crates/aionui-channel/tests/manager_integration.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
//! TP-1..TP-5, CS-1..CS-2, WS-2.
66
77
use std::collections::HashMap;
8+
use std::sync::atomic::{AtomicUsize, Ordering};
89
use std::sync::{Arc, Mutex};
910

1011
use aionui_api_types::WebSocketMessage;
@@ -51,6 +52,7 @@ struct MockPlugin {
5152
bot_info: Option<BotInfo>,
5253
last_error: Option<String>,
5354
should_fail_init: bool,
55+
start_calls: Arc<AtomicUsize>,
5456
}
5557

5658
impl MockPlugin {
@@ -61,6 +63,7 @@ impl MockPlugin {
6163
bot_info: None,
6264
last_error: None,
6365
should_fail_init: false,
66+
start_calls: Arc::new(AtomicUsize::new(0)),
6467
}
6568
}
6669

@@ -91,6 +94,7 @@ impl ChannelPlugin for MockPlugin {
9194
}
9295

9396
async fn start(&mut self) -> Result<(), ChannelError> {
97+
self.start_calls.fetch_add(1, Ordering::SeqCst);
9498
self.status = PluginStatus::Starting;
9599
self.status = PluginStatus::Running;
96100
Ok(())
@@ -164,6 +168,17 @@ fn make_no_impl_factory() -> PluginFactory {
164168
Box::new(|_pt| None)
165169
}
166170

171+
fn make_counting_factory() -> (PluginFactory, Arc<AtomicUsize>) {
172+
let start_calls = Arc::new(AtomicUsize::new(0));
173+
let captured = Arc::clone(&start_calls);
174+
let factory = Box::new(move |pt| {
175+
let mut plugin = MockPlugin::new(pt);
176+
plugin.start_calls = Arc::clone(&captured);
177+
Some(Box::new(plugin) as Box<dyn ChannelPlugin>)
178+
});
179+
(factory, start_calls)
180+
}
181+
167182
fn make_telegram_config() -> serde_json::Value {
168183
serde_json::json!({
169184
"credentials": { "token": "bot:valid123" },
@@ -372,6 +387,24 @@ async fn tp1_test_valid_credentials() {
372387
assert_eq!(result.as_deref(), Some("mock_bot_user"));
373388
}
374389

390+
#[tokio::test]
391+
async fn test_plugin_initializes_without_starting_runtime() {
392+
let (mgr, _repo, _bc) = setup().await;
393+
let (factory, start_calls) = make_counting_factory();
394+
395+
let username = mgr
396+
.test_plugin("telegram", make_plugin_config(), &factory)
397+
.await
398+
.unwrap();
399+
400+
assert_eq!(username.as_deref(), Some("mock_bot_user"));
401+
assert_eq!(
402+
start_calls.load(Ordering::SeqCst),
403+
0,
404+
"credential tests must not start long-running plugin runtime"
405+
);
406+
}
407+
375408
// ── TP-2: Test invalid credentials propagates error ───────────────
376409

377410
#[tokio::test]

0 commit comments

Comments
 (0)