Skip to content

Commit e93e6df

Browse files
committed
feat(runtime): add channel_send outbox
1 parent a01eef4 commit e93e6df

14 files changed

Lines changed: 832 additions & 9 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/rexos-cli/src/main.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ enum Command {
2020
#[command(subcommand)]
2121
command: AgentCommand,
2222
},
23+
/// Outbound channels (outbox + dispatcher)
24+
Channel {
25+
#[command(subcommand)]
26+
command: ChannelCommand,
27+
},
2328
/// Long-running harness helpers (initializer + sessions)
2429
Harness {
2530
#[command(subcommand)]
@@ -81,6 +86,25 @@ enum AgentCommand {
8186
},
8287
}
8388

89+
#[derive(Debug, clap::Subcommand)]
90+
enum ChannelCommand {
91+
/// Drain queued outbox messages once
92+
Drain {
93+
/// Max messages to attempt in one run
94+
#[arg(long, default_value_t = 50)]
95+
limit: usize,
96+
},
97+
/// Run a long-lived worker that periodically drains the outbox
98+
Worker {
99+
/// Seconds between drain attempts
100+
#[arg(long, default_value_t = 5)]
101+
interval_secs: u64,
102+
/// Max messages to attempt per drain cycle
103+
#[arg(long, default_value_t = 50)]
104+
limit: usize,
105+
},
106+
}
107+
84108
#[derive(Debug, Clone, Copy, clap::ValueEnum)]
85109
enum AgentKind {
86110
Planning,
@@ -152,8 +176,43 @@ async fn main() -> anyhow::Result<()> {
152176
eprintln!("[rexos] session_id={session_id}");
153177
}
154178
},
179+
Command::Channel { command } => match command {
180+
ChannelCommand::Drain { limit } => {
181+
let paths = RexosPaths::discover()?;
182+
paths.ensure_dirs()?;
183+
RexosConfig::ensure_default(&paths)?;
184+
MemoryStore::open_or_create(&paths)?;
185+
186+
let dispatcher =
187+
rexos::agent::OutboxDispatcher::new(MemoryStore::open_or_create(&paths)?)?;
188+
let summary = dispatcher.drain_once(limit).await?;
189+
println!("drain: sent={} failed={}", summary.sent, summary.failed);
190+
}
191+
ChannelCommand::Worker {
192+
interval_secs,
193+
limit,
194+
} => {
195+
let paths = RexosPaths::discover()?;
196+
paths.ensure_dirs()?;
197+
RexosConfig::ensure_default(&paths)?;
198+
MemoryStore::open_or_create(&paths)?;
199+
200+
let dispatcher =
201+
rexos::agent::OutboxDispatcher::new(MemoryStore::open_or_create(&paths)?)?;
202+
203+
loop {
204+
let summary = dispatcher.drain_once(limit).await?;
205+
println!("drain: sent={} failed={}", summary.sent, summary.failed);
206+
tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
207+
}
208+
}
209+
},
155210
Command::Harness { command } => match command {
156-
HarnessCommand::Init { dir, prompt, session } => {
211+
HarnessCommand::Init {
212+
dir,
213+
prompt,
214+
session,
215+
} => {
157216
if prompt.is_none() {
158217
rexos::harness::init_workspace(&dir)?;
159218
println!("Harness initialized in {}", dir.display());

crates/rexos-runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ serde_json.workspace = true
1212
tokio.workspace = true
1313
toml.workspace = true
1414
uuid.workspace = true
15+
reqwest.workspace = true
1516
rexos-kernel = { path = "../rexos-kernel" }
1617
rexos-llm = { path = "../rexos-llm" }
1718
rexos-memory = { path = "../rexos-memory" }

crates/rexos-runtime/src/lib.rs

Lines changed: 232 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,130 @@ pub struct AgentRuntime {
2323
router: ModelRouter,
2424
}
2525

26+
#[derive(Debug, Clone, Default, PartialEq, Eq)]
27+
pub struct OutboxDrainSummary {
28+
pub sent: u32,
29+
pub failed: u32,
30+
}
31+
32+
#[derive(Debug)]
33+
pub struct OutboxDispatcher {
34+
memory: MemoryStore,
35+
http: reqwest::Client,
36+
}
37+
38+
impl OutboxDispatcher {
39+
pub fn new(memory: MemoryStore) -> anyhow::Result<Self> {
40+
let http = reqwest::Client::builder()
41+
.timeout(std::time::Duration::from_secs(15))
42+
.build()
43+
.context("build http client")?;
44+
Ok(Self { memory, http })
45+
}
46+
47+
pub async fn drain_once(&self, limit: usize) -> anyhow::Result<OutboxDrainSummary> {
48+
let mut msgs = self.outbox_messages_get()?;
49+
let mut summary = OutboxDrainSummary::default();
50+
51+
let mut processed = 0usize;
52+
for msg in msgs.iter_mut() {
53+
if processed >= limit.max(1) {
54+
break;
55+
}
56+
if msg.status != OutboxStatus::Queued {
57+
continue;
58+
}
59+
processed += 1;
60+
61+
let now = AgentRuntime::now_epoch_seconds();
62+
msg.attempts = msg.attempts.saturating_add(1);
63+
msg.updated_at = now;
64+
msg.last_error = None;
65+
66+
let result = match msg.channel.as_str() {
67+
"console" => {
68+
self.deliver_console(msg);
69+
Ok(())
70+
}
71+
"webhook" => self.deliver_webhook(msg).await,
72+
other => Err(anyhow::anyhow!("unknown channel: {other}")),
73+
};
74+
75+
match result {
76+
Ok(()) => {
77+
msg.status = OutboxStatus::Sent;
78+
msg.sent_at = Some(now);
79+
summary.sent = summary.sent.saturating_add(1);
80+
}
81+
Err(e) => {
82+
msg.status = OutboxStatus::Failed;
83+
msg.last_error = Some(e.to_string());
84+
summary.failed = summary.failed.saturating_add(1);
85+
}
86+
}
87+
}
88+
89+
if processed > 0 {
90+
self.outbox_messages_set(&msgs)?;
91+
}
92+
93+
Ok(summary)
94+
}
95+
96+
fn outbox_messages_get(&self) -> anyhow::Result<Vec<OutboxMessageRecord>> {
97+
let raw = self
98+
.memory
99+
.kv_get("rexos.outbox.messages")
100+
.context("kv_get rexos.outbox.messages")?
101+
.unwrap_or_else(|| "[]".to_string());
102+
Ok(serde_json::from_str(&raw).unwrap_or_default())
103+
}
104+
105+
fn outbox_messages_set(&self, msgs: &[OutboxMessageRecord]) -> anyhow::Result<()> {
106+
let raw = serde_json::to_string(msgs).context("serialize rexos.outbox.messages")?;
107+
self.memory
108+
.kv_set("rexos.outbox.messages", &raw)
109+
.context("kv_set rexos.outbox.messages")?;
110+
Ok(())
111+
}
112+
113+
fn deliver_console(&self, msg: &OutboxMessageRecord) {
114+
let subject = msg.subject.as_deref().unwrap_or("");
115+
println!(
116+
"[rexos][channel_send][console] to={} subject={} message={}",
117+
msg.recipient, subject, msg.message
118+
);
119+
}
120+
121+
async fn deliver_webhook(&self, msg: &OutboxMessageRecord) -> anyhow::Result<()> {
122+
let url = std::env::var("REXOS_WEBHOOK_URL")
123+
.ok()
124+
.filter(|v| !v.trim().is_empty())
125+
.ok_or_else(|| anyhow::anyhow!("REXOS_WEBHOOK_URL is not set"))?;
126+
127+
let payload = serde_json::json!({
128+
"message_id": msg.message_id,
129+
"recipient": msg.recipient,
130+
"subject": msg.subject,
131+
"message": msg.message,
132+
"created_at": msg.created_at,
133+
});
134+
135+
let resp = self
136+
.http
137+
.post(url)
138+
.json(&payload)
139+
.send()
140+
.await
141+
.context("send webhook request")?;
142+
143+
if !resp.status().is_success() {
144+
bail!("webhook returned http {}", resp.status());
145+
}
146+
Ok(())
147+
}
148+
}
149+
26150
impl AgentRuntime {
27151
pub fn new(memory: MemoryStore, llms: LlmRegistry, router: ModelRouter) -> Self {
28152
Self {
@@ -209,6 +333,11 @@ impl AgentRuntime {
209333
serde_json::from_str(&args_json).context("parse cron_cancel args")?;
210334
self.cron_cancel(&args.job_id).context("cron_cancel")?
211335
}
336+
"channel_send" => {
337+
let args: ChannelSendToolArgs =
338+
serde_json::from_str(&args_json).context("parse channel_send args")?;
339+
self.channel_send(args).context("channel_send")?
340+
}
212341
"knowledge_add_entity" => {
213342
let args: KnowledgeAddEntityToolArgs = serde_json::from_str(&args_json)
214343
.context("parse knowledge_add_entity args")?;
@@ -694,7 +823,9 @@ impl AgentRuntime {
694823
}
695824

696825
fn cron_create(&self, args: CronCreateToolArgs) -> anyhow::Result<String> {
697-
let job_id = args.job_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
826+
let job_id = args
827+
.job_id
828+
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
698829
let mut jobs = self.cron_jobs_get()?;
699830
if let Some(existing) = jobs.iter().find(|j| j.job_id == job_id) {
700831
return Ok(serde_json::to_string(existing).unwrap_or_else(|_| "ok".to_string()));
@@ -736,6 +867,71 @@ impl AgentRuntime {
736867
Ok("ok".to_string())
737868
}
738869

870+
fn outbox_messages_get(&self) -> anyhow::Result<Vec<OutboxMessageRecord>> {
871+
let key = "rexos.outbox.messages";
872+
let raw = self
873+
.memory
874+
.kv_get(key)
875+
.context("kv_get rexos.outbox.messages")?
876+
.unwrap_or_else(|| "[]".to_string());
877+
let msgs: Vec<OutboxMessageRecord> = serde_json::from_str(&raw).unwrap_or_default();
878+
Ok(msgs)
879+
}
880+
881+
fn outbox_messages_set(&self, msgs: &[OutboxMessageRecord]) -> anyhow::Result<()> {
882+
let key = "rexos.outbox.messages";
883+
let raw = serde_json::to_string(msgs).context("serialize rexos.outbox.messages")?;
884+
self.memory
885+
.kv_set(key, &raw)
886+
.context("kv_set rexos.outbox.messages")?;
887+
Ok(())
888+
}
889+
890+
fn channel_send(&self, args: ChannelSendToolArgs) -> anyhow::Result<String> {
891+
if args.channel.trim().is_empty() {
892+
return Ok("error: channel is empty".to_string());
893+
}
894+
if args.recipient.trim().is_empty() {
895+
return Ok("error: recipient is empty".to_string());
896+
}
897+
if args.message.trim().is_empty() {
898+
return Ok("error: message is empty".to_string());
899+
}
900+
901+
match args.channel.as_str() {
902+
"console" | "webhook" => {}
903+
other => return Ok(format!("error: unknown channel: {other}")),
904+
}
905+
906+
let now = Self::now_epoch_seconds();
907+
let record = OutboxMessageRecord {
908+
message_id: uuid::Uuid::new_v4().to_string(),
909+
channel: args.channel,
910+
recipient: args.recipient,
911+
subject: args.subject.filter(|s| !s.trim().is_empty()),
912+
message: args.message,
913+
status: OutboxStatus::Queued,
914+
attempts: 0,
915+
last_error: None,
916+
created_at: now,
917+
updated_at: now,
918+
sent_at: None,
919+
};
920+
921+
let mut msgs = self.outbox_messages_get()?;
922+
msgs.push(record.clone());
923+
if msgs.len() > 500 {
924+
msgs.drain(0..(msgs.len() - 500));
925+
}
926+
self.outbox_messages_set(&msgs)?;
927+
928+
Ok(serde_json::json!({
929+
"status": "queued",
930+
"message_id": record.message_id,
931+
})
932+
.to_string())
933+
}
934+
739935
fn knowledge_entities_get(&self) -> anyhow::Result<Vec<KnowledgeEntityRecord>> {
740936
let key = "rexos.knowledge.entities";
741937
let raw = self
@@ -1059,6 +1255,41 @@ struct CronCancelToolArgs {
10591255
job_id: String,
10601256
}
10611257

1258+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1259+
#[serde(rename_all = "snake_case")]
1260+
enum OutboxStatus {
1261+
Queued,
1262+
Sent,
1263+
Failed,
1264+
}
1265+
1266+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1267+
struct OutboxMessageRecord {
1268+
message_id: String,
1269+
channel: String,
1270+
recipient: String,
1271+
#[serde(default)]
1272+
subject: Option<String>,
1273+
message: String,
1274+
status: OutboxStatus,
1275+
attempts: u32,
1276+
#[serde(default)]
1277+
last_error: Option<String>,
1278+
created_at: i64,
1279+
updated_at: i64,
1280+
#[serde(default)]
1281+
sent_at: Option<i64>,
1282+
}
1283+
1284+
#[derive(Debug, serde::Deserialize)]
1285+
struct ChannelSendToolArgs {
1286+
channel: String,
1287+
recipient: String,
1288+
#[serde(default)]
1289+
subject: Option<String>,
1290+
message: String,
1291+
}
1292+
10621293
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
10631294
struct KnowledgeEntityRecord {
10641295
id: String,

0 commit comments

Comments
 (0)