Skip to content

Commit a01eef4

Browse files
committed
feat(runtime): add cron tools
1 parent e66ec9a commit a01eef4

5 files changed

Lines changed: 343 additions & 12 deletions

File tree

crates/rexos-runtime/src/lib.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,17 @@ impl AgentRuntime {
198198
.context("parse schedule_delete args")?;
199199
self.schedule_delete(&args.id).context("schedule_delete")?
200200
}
201+
"cron_create" => {
202+
let args: CronCreateToolArgs =
203+
serde_json::from_str(&args_json).context("parse cron_create args")?;
204+
self.cron_create(args).context("cron_create")?
205+
}
206+
"cron_list" => self.cron_list().context("cron_list")?,
207+
"cron_cancel" => {
208+
let args: CronCancelToolArgs =
209+
serde_json::from_str(&args_json).context("parse cron_cancel args")?;
210+
self.cron_cancel(&args.job_id).context("cron_cancel")?
211+
}
201212
"knowledge_add_entity" => {
202213
let args: KnowledgeAddEntityToolArgs = serde_json::from_str(&args_json)
203214
.context("parse knowledge_add_entity args")?;
@@ -662,6 +673,69 @@ impl AgentRuntime {
662673
Ok("ok".to_string())
663674
}
664675

676+
fn cron_jobs_get(&self) -> anyhow::Result<Vec<CronJobRecord>> {
677+
let key = "rexos.cron.jobs";
678+
let raw = self
679+
.memory
680+
.kv_get(key)
681+
.context("kv_get rexos.cron.jobs")?
682+
.unwrap_or_else(|| "[]".to_string());
683+
let jobs: Vec<CronJobRecord> = serde_json::from_str(&raw).unwrap_or_default();
684+
Ok(jobs)
685+
}
686+
687+
fn cron_jobs_set(&self, jobs: &[CronJobRecord]) -> anyhow::Result<()> {
688+
let key = "rexos.cron.jobs";
689+
let raw = serde_json::to_string(jobs).context("serialize rexos.cron.jobs")?;
690+
self.memory
691+
.kv_set(key, &raw)
692+
.context("kv_set rexos.cron.jobs")?;
693+
Ok(())
694+
}
695+
696+
fn cron_create(&self, args: CronCreateToolArgs) -> anyhow::Result<String> {
697+
let job_id = args.job_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
698+
let mut jobs = self.cron_jobs_get()?;
699+
if let Some(existing) = jobs.iter().find(|j| j.job_id == job_id) {
700+
return Ok(serde_json::to_string(existing).unwrap_or_else(|_| "ok".to_string()));
701+
}
702+
703+
let record = CronJobRecord {
704+
job_id: job_id.clone(),
705+
name: args.name,
706+
schedule: args.schedule,
707+
action: args.action,
708+
delivery: args.delivery,
709+
one_shot: args.one_shot.unwrap_or(false),
710+
created_at: Self::now_epoch_seconds(),
711+
enabled: args.enabled.unwrap_or(true),
712+
};
713+
714+
jobs.push(record.clone());
715+
if jobs.len() > 200 {
716+
jobs.drain(0..(jobs.len() - 200));
717+
}
718+
self.cron_jobs_set(&jobs)?;
719+
720+
Ok(serde_json::to_string(&record).unwrap_or_else(|_| "ok".to_string()))
721+
}
722+
723+
fn cron_list(&self) -> anyhow::Result<String> {
724+
let jobs = self.cron_jobs_get()?;
725+
Ok(serde_json::to_string(&jobs).context("serialize cron_list")?)
726+
}
727+
728+
fn cron_cancel(&self, job_id: &str) -> anyhow::Result<String> {
729+
let mut jobs = self.cron_jobs_get()?;
730+
let before = jobs.len();
731+
jobs.retain(|j| j.job_id != job_id);
732+
if jobs.len() == before {
733+
return Ok(format!("error: cron job not found: {job_id}"));
734+
}
735+
self.cron_jobs_set(&jobs)?;
736+
Ok("ok".to_string())
737+
}
738+
665739
fn knowledge_entities_get(&self) -> anyhow::Result<Vec<KnowledgeEntityRecord>> {
666740
let key = "rexos.knowledge.entities";
667741
let raw = self
@@ -950,6 +1024,41 @@ struct ScheduleDeleteToolArgs {
9501024
id: String,
9511025
}
9521026

1027+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1028+
struct CronJobRecord {
1029+
job_id: String,
1030+
name: String,
1031+
schedule: serde_json::Value,
1032+
action: serde_json::Value,
1033+
#[serde(default)]
1034+
delivery: Option<serde_json::Value>,
1035+
one_shot: bool,
1036+
created_at: i64,
1037+
enabled: bool,
1038+
}
1039+
1040+
#[derive(Debug, serde::Deserialize)]
1041+
struct CronCreateToolArgs {
1042+
#[serde(default)]
1043+
#[serde(alias = "id")]
1044+
job_id: Option<String>,
1045+
name: String,
1046+
schedule: serde_json::Value,
1047+
action: serde_json::Value,
1048+
#[serde(default)]
1049+
delivery: Option<serde_json::Value>,
1050+
#[serde(default)]
1051+
one_shot: Option<bool>,
1052+
#[serde(default)]
1053+
enabled: Option<bool>,
1054+
}
1055+
1056+
#[derive(Debug, serde::Deserialize)]
1057+
struct CronCancelToolArgs {
1058+
#[serde(alias = "id")]
1059+
job_id: String,
1060+
}
1061+
9531062
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
9541063
struct KnowledgeEntityRecord {
9551064
id: String,

crates/rexos-tools/src/lib.rs

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,16 @@ impl Toolset {
187187
| "schedule_delete"
188188
| "knowledge_add_entity"
189189
| "knowledge_add_relation"
190-
| "knowledge_query" => {
190+
| "knowledge_query"
191+
| "cron_create"
192+
| "cron_list"
193+
| "cron_cancel" => {
191194
bail!("tool '{name}' is implemented in the runtime, not Toolset")
192195
}
193-
"cron_create" | "cron_list" | "cron_cancel" | "channel_send" | "hand_list"
194-
| "hand_activate" | "hand_status" | "hand_deactivate" | "a2a_discover" | "a2a_send"
195-
| "text_to_speech" | "speech_to_text" | "docker_exec" | "process_start"
196-
| "process_poll" | "process_write" | "process_kill" | "process_list"
197-
| "canvas_present" => {
196+
"channel_send" | "hand_list" | "hand_activate" | "hand_status" | "hand_deactivate"
197+
| "a2a_discover" | "a2a_send" | "text_to_speech" | "speech_to_text" | "docker_exec"
198+
| "process_start" | "process_poll" | "process_write" | "process_kill"
199+
| "process_list" | "canvas_present" => {
198200
bail!("tool not implemented yet: {name}")
199201
}
200202
_ => bail!("unknown tool: {name}"),
@@ -1983,11 +1985,57 @@ fn compat_tool_defs() -> Vec<ToolDefinition> {
19831985
},
19841986
});
19851987

1988+
defs.push(ToolDefinition {
1989+
kind: "function".to_string(),
1990+
function: ToolFunctionDefinition {
1991+
name: "cron_create".to_string(),
1992+
description: "Create a cron/scheduled job record (persisted).".to_string(),
1993+
parameters: json!({
1994+
"type": "object",
1995+
"properties": {
1996+
"job_id": { "type": "string", "description": "Optional stable job id. If omitted, RexOS generates one." },
1997+
"name": { "type": "string", "description": "Job name." },
1998+
"schedule": { "type": "object", "description": "Schedule payload (stored as-is)." },
1999+
"action": { "type": "object", "description": "Action payload (stored as-is)." },
2000+
"delivery": { "type": "object", "description": "Optional delivery payload (stored as-is)." },
2001+
"one_shot": { "type": "boolean", "description": "If true, job should be considered one-shot (stored)." },
2002+
"enabled": { "type": "boolean", "description": "Whether this job is enabled (default: true)." }
2003+
},
2004+
"required": ["name", "schedule", "action"],
2005+
"additionalProperties": false
2006+
}),
2007+
},
2008+
});
2009+
defs.push(ToolDefinition {
2010+
kind: "function".to_string(),
2011+
function: ToolFunctionDefinition {
2012+
name: "cron_list".to_string(),
2013+
description: "List cron/scheduled job records.".to_string(),
2014+
parameters: json!({
2015+
"type": "object",
2016+
"properties": {},
2017+
"additionalProperties": false
2018+
}),
2019+
},
2020+
});
2021+
defs.push(ToolDefinition {
2022+
kind: "function".to_string(),
2023+
function: ToolFunctionDefinition {
2024+
name: "cron_cancel".to_string(),
2025+
description: "Cancel a cron/scheduled job record by id.".to_string(),
2026+
parameters: json!({
2027+
"type": "object",
2028+
"properties": {
2029+
"job_id": { "type": "string", "description": "Job id." }
2030+
},
2031+
"required": ["job_id"],
2032+
"additionalProperties": false
2033+
}),
2034+
},
2035+
});
2036+
19862037
// Reserved tool names (stubs in RexOS for now).
19872038
for name in [
1988-
"cron_create",
1989-
"cron_list",
1990-
"cron_cancel",
19912039
"channel_send",
19922040
"hand_list",
19932041
"hand_activate",
@@ -2464,7 +2512,7 @@ mod tests {
24642512
async fn reserved_stub_tools_return_not_implemented_error() {
24652513
let tmp = tempfile::tempdir().unwrap();
24662514
let tools = Toolset::new(tmp.path().to_path_buf()).unwrap();
2467-
let err = tools.call("cron_create", r#"{}"#).await.unwrap_err();
2515+
let err = tools.call("channel_send", r#"{}"#).await.unwrap_err();
24682516
let msg = err.to_string();
24692517
assert!(msg.contains("not implemented"), "{msg}");
24702518
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
use std::collections::BTreeMap;
2+
use std::sync::{Arc, Mutex};
3+
4+
use axum::extract::State;
5+
use axum::routing::post;
6+
use axum::{Json, Router};
7+
use serde_json::json;
8+
9+
#[derive(Clone, Default)]
10+
struct TestState {
11+
calls: Arc<Mutex<u32>>,
12+
}
13+
14+
#[tokio::test]
15+
async fn reserved_cron_tools_create_list_and_cancel() {
16+
async fn handler(
17+
State(state): State<TestState>,
18+
Json(_payload): Json<serde_json::Value>,
19+
) -> Json<serde_json::Value> {
20+
let mut calls = state.calls.lock().unwrap();
21+
*calls += 1;
22+
23+
if *calls == 1 {
24+
return Json(json!({
25+
"choices": [{
26+
"index": 0,
27+
"message": {
28+
"role": "assistant",
29+
"content": null,
30+
"tool_calls": [
31+
{
32+
"id": "call_1",
33+
"type": "function",
34+
"function": {
35+
"name": "cron_create",
36+
"arguments": serde_json::to_string(&json!({
37+
"job_id": "job1",
38+
"name": "Job One",
39+
"schedule": { "kind": "every", "every_secs": 60 },
40+
"action": { "kind": "system_event", "text": "ping" },
41+
"one_shot": false
42+
})).unwrap()
43+
}
44+
},
45+
{
46+
"id": "call_2",
47+
"type": "function",
48+
"function": { "name": "cron_list", "arguments": "{}" }
49+
},
50+
{
51+
"id": "call_3",
52+
"type": "function",
53+
"function": {
54+
"name": "cron_cancel",
55+
"arguments": serde_json::to_string(&json!({ "job_id": "job1" })).unwrap()
56+
}
57+
},
58+
{
59+
"id": "call_4",
60+
"type": "function",
61+
"function": { "name": "cron_list", "arguments": "{}" }
62+
}
63+
]
64+
},
65+
"finish_reason": "tool_calls"
66+
}]
67+
}));
68+
}
69+
70+
Json(json!({
71+
"choices": [{
72+
"index": 0,
73+
"message": { "role": "assistant", "content": "done" },
74+
"finish_reason": "stop"
75+
}]
76+
}))
77+
}
78+
79+
let state = TestState::default();
80+
let app = Router::new()
81+
.route("/v1/chat/completions", post(handler))
82+
.with_state(state);
83+
84+
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
85+
let addr = listener.local_addr().unwrap();
86+
let server = tokio::spawn(async move {
87+
axum::serve(listener, app).await.unwrap();
88+
});
89+
90+
let tmp = tempfile::tempdir().unwrap();
91+
let workspace = tmp.path().join("workspace");
92+
std::fs::create_dir_all(&workspace).unwrap();
93+
94+
let home = tmp.path().join("home");
95+
let paths = rexos::paths::RexosPaths {
96+
base_dir: home.join(".rexos"),
97+
};
98+
paths.ensure_dirs().unwrap();
99+
100+
let memory = rexos::memory::MemoryStore::open_or_create(&paths).unwrap();
101+
102+
let mut providers = BTreeMap::new();
103+
providers.insert(
104+
"mock".to_string(),
105+
rexos::config::ProviderConfig {
106+
kind: rexos::config::ProviderKind::OpenAiCompatible,
107+
base_url: format!("http://{addr}/v1"),
108+
api_key_env: "".to_string(),
109+
default_model: "x".to_string(),
110+
},
111+
);
112+
113+
let cfg = rexos::config::RexosConfig {
114+
llm: rexos::config::LlmConfig::default(),
115+
providers,
116+
router: rexos::config::RouterConfig::default(),
117+
};
118+
let llms = rexos::llm::registry::LlmRegistry::from_config(&cfg).unwrap();
119+
let router = rexos::router::ModelRouter::new(rexos::config::RouterConfig {
120+
planning: rexos::config::RouteConfig {
121+
provider: "mock".to_string(),
122+
model: "x".to_string(),
123+
},
124+
coding: rexos::config::RouteConfig {
125+
provider: "mock".to_string(),
126+
model: "x".to_string(),
127+
},
128+
summary: rexos::config::RouteConfig {
129+
provider: "mock".to_string(),
130+
model: "x".to_string(),
131+
},
132+
});
133+
134+
let agent = rexos::agent::AgentRuntime::new(memory, llms, router);
135+
136+
let out = agent
137+
.run_session(
138+
workspace,
139+
"s1",
140+
None,
141+
"exercise reserved cron tools",
142+
rexos::router::TaskKind::Coding,
143+
)
144+
.await
145+
.unwrap();
146+
assert_eq!(out, "done");
147+
148+
let memory2 = rexos::memory::MemoryStore::open_or_create(&paths).unwrap();
149+
let msgs = memory2.list_chat_messages("s1").unwrap();
150+
let cron_lists: Vec<String> = msgs
151+
.iter()
152+
.filter(|m| {
153+
m.role == rexos::llm::openai_compat::Role::Tool && m.name.as_deref() == Some("cron_list")
154+
})
155+
.filter_map(|m| m.content.clone())
156+
.collect();
157+
assert_eq!(cron_lists.len(), 2, "cron_list outputs: {cron_lists:?}");
158+
159+
let v1: serde_json::Value =
160+
serde_json::from_str(&cron_lists[0]).expect("cron_list[0] should be json");
161+
assert_eq!(v1.as_array().map(|a| a.len()), Some(1), "{v1}");
162+
assert_eq!(
163+
v1[0].get("job_id").and_then(|v| v.as_str()),
164+
Some("job1"),
165+
"{v1}"
166+
);
167+
168+
let v2: serde_json::Value =
169+
serde_json::from_str(&cron_lists[1]).expect("cron_list[1] should be json");
170+
assert_eq!(v2.as_array().map(|a| a.len()), Some(0), "{v2}");
171+
172+
server.abort();
173+
}
174+

0 commit comments

Comments
 (0)