Skip to content

Commit 52e6954

Browse files
committed
Unify message pathways for subagents and main agent
1 parent 27ba70c commit 52e6954

18 files changed

Lines changed: 66 additions & 35 deletions

src/agent.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Agent loop: context builder, session load/save/summarize, LLM + tool_calls loop, subagent runner.
22
33
use std::path::Path;
4+
use std::sync::atomic::Ordering;
45
use std::sync::Arc;
56

67
use tokio::sync::mpsc;
@@ -133,6 +134,7 @@ pub async fn run_agent_loop(
133134
.clone()
134135
.unwrap_or_else(|| "telegram".to_string()),
135136
});
137+
tool_ctx.delivered.store(true, Ordering::Relaxed);
136138
}
137139
}
138140
}
@@ -302,6 +304,7 @@ pub(crate) async fn run_subagent(
302304
chat_id: Some(chat_id),
303305
channel: Some(channel),
304306
outbound_tx: Some(outbound_tx),
307+
delivered: Default::default(),
305308
};
306309

307310
match run_agent_loop(

src/main.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
55
use std::path::PathBuf;
66
use std::sync::Arc;
7-
use std::sync::atomic::{AtomicI64, Ordering};
7+
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
88

99
use tokio::sync::mpsc;
1010

@@ -177,12 +177,14 @@ async fn main() {
177177
last_chat_id.store(msg.chat_id, Ordering::Relaxed);
178178
}
179179

180+
let delivered = Arc::new(AtomicBool::new(false));
180181
let tool_ctx = tools::ToolCtx {
181182
workspace: workspace.clone(),
182183
restrict_to_workspace: restrict,
183184
chat_id: Some(msg.chat_id),
184185
channel: Some(msg.channel.clone()),
185186
outbound_tx: Some(Arc::new(outbound_tx.clone())),
187+
delivered: Arc::clone(&delivered),
186188
};
187189
let chat_id_str = msg.chat_id.to_string();
188190

@@ -240,12 +242,16 @@ async fn main() {
240242
continue;
241243
}
242244

243-
let _ = outbound_tx
244-
.send(OutboundMsg {
245-
chat_id: msg.chat_id,
246-
text: reply,
247-
channel: msg.channel,
248-
})
249-
.await;
245+
// Skip if a tool (message tool or for_user result) already sent content to the user
246+
// during the agent loop, to avoid delivering the same response twice.
247+
if !delivered.load(Ordering::Relaxed) {
248+
let _ = outbound_tx
249+
.send(OutboundMsg {
250+
chat_id: msg.chat_id,
251+
text: reply,
252+
channel: msg.channel,
253+
})
254+
.await;
255+
}
250256
}
251257
}

src/tools/context.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Execution context for tools: workspace, chat, outbound channel.
22
33
use std::path::PathBuf;
4+
use std::sync::atomic::AtomicBool;
45
use std::sync::Arc;
56

67
use tokio::sync::mpsc;
@@ -20,4 +21,8 @@ pub struct ToolCtx {
2021
pub channel: Option<String>,
2122
/// Send outbound messages (e.g. to Telegram). Used by message tool.
2223
pub outbound_tx: Option<Arc<mpsc::Sender<OutboundMsg>>>,
24+
/// Set to true when any user-visible message has been sent during this request.
25+
/// Shared via Arc so clones (e.g. sub-ctx) observe the same flag.
26+
/// main.rs reads this after the agent loop to skip redundant delivery.
27+
pub delivered: Arc<AtomicBool>,
2328
}

src/tools/cron.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,7 @@ mod tool_tests {
10111011
chat_id,
10121012
channel: None,
10131013
outbound_tx: None,
1014+
delivered: Default::default(),
10141015
}
10151016
}
10161017

src/tools/file.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ mod tests {
347347
chat_id: None,
348348
channel: None,
349349
outbound_tx: None,
350+
delivered: Default::default(),
350351
};
351352
let rel = f.strip_prefix(&dir).unwrap().to_str().unwrap();
352353
let args = serde_json::json!({ "path": rel });

src/tools/git.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ mod tests {
178178
chat_id: None,
179179
channel: None,
180180
outbound_tx: None,
181+
delivered: Default::default(),
181182
}
182183
}
183184

src/tools/grep_dir.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ mod tests {
234234
chat_id: None,
235235
channel: None,
236236
outbound_tx: None,
237+
delivered: Default::default(),
237238
}
238239
}
239240

src/tools/message.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Send text to user via outbound queue (Telegram); used by main agent and subagent.
22
3+
use std::sync::atomic::Ordering;
4+
35
use serde_json::Value;
46

57
use crate::telegram::OutboundMsg;
@@ -61,7 +63,10 @@ impl Tool for MessageTool {
6163
channel,
6264
};
6365
match tx.try_send(msg) {
64-
Ok(()) => ToolResult::silent("sent"),
66+
Ok(()) => {
67+
ctx.delivered.store(true, Ordering::Relaxed);
68+
ToolResult::silent("sent")
69+
}
6570
Err(e) => ToolResult::error(e.to_string()),
6671
}
6772
})

src/tools/registry.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ mod tests {
175175
chat_id: None,
176176
channel: None,
177177
outbound_tx: None,
178+
delivered: Default::default(),
178179
};
179180
let args = serde_json::json!({ "path": "." });
180181
let res = reg.execute(&ctx, "read_file", &args).await;

src/tools/search.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ mod tests {
205205
chat_id: None,
206206
channel: None,
207207
outbound_tx: None,
208+
delivered: Default::default(),
208209
}
209210
}
210211

0 commit comments

Comments
 (0)