Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit b674d3f

Browse files
authored
replace default variables node_address, task_id on orchestrator (#457)
1 parent 05fa484 commit b674d3f

1 file changed

Lines changed: 71 additions & 2 deletions

File tree

  • crates/orchestrator/src/scheduler

crates/orchestrator/src/scheduler/mod.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ pub struct Scheduler {
1010
store_context: Arc<StoreContext>,
1111
plugins: Vec<Box<dyn SchedulerPlugin>>,
1212
}
13-
1413
impl Scheduler {
1514
pub fn new(store_context: Arc<StoreContext>, plugins: Vec<Box<dyn SchedulerPlugin>>) -> Self {
1615
let mut plugins = plugins;
@@ -33,7 +32,28 @@ impl Scheduler {
3332
}
3433

3534
if !all_tasks.is_empty() {
36-
return Ok(Some(all_tasks[0].clone()));
35+
let mut task = all_tasks[0].clone();
36+
37+
// Replace variables in env_vars
38+
if let Some(env_vars) = &mut task.env_vars {
39+
for (_, value) in env_vars.iter_mut() {
40+
let new_value = value
41+
.replace("${TASK_ID}", &task.id.to_string())
42+
.replace("${NODE_ADDRESS}", &node_address.to_string());
43+
*value = new_value;
44+
}
45+
}
46+
47+
// Replace variables in args
48+
if let Some(args) = &mut task.args {
49+
for arg in args.iter_mut() {
50+
*arg = arg
51+
.replace("${TASK_ID}", &task.id.to_string())
52+
.replace("${NODE_ADDRESS}", &node_address.to_string());
53+
}
54+
}
55+
56+
return Ok(Some(task));
3757
}
3858

3959
Ok(None)
@@ -43,6 +63,7 @@ impl Scheduler {
4363
#[cfg(test)]
4464
mod tests {
4565
use shared::models::task::TaskState;
66+
use std::collections::HashMap;
4667
use uuid::Uuid;
4768

4869
use crate::api::tests::helper::create_test_app_state;
@@ -68,4 +89,52 @@ mod tests {
6889
let task_for_node = scheduler.get_task_for_node(Address::ZERO).await.unwrap();
6990
assert_eq!(task_for_node, Some(task));
7091
}
92+
93+
#[tokio::test]
94+
async fn test_variable_replacement() {
95+
let state = create_test_app_state().await;
96+
let scheduler = Scheduler::new(state.store_context.clone(), vec![]);
97+
let node_address = Address::from([1u8; 20]);
98+
99+
let mut env_vars = HashMap::new();
100+
env_vars.insert("TASK_ID_VAR".to_string(), "task-${TASK_ID}".to_string());
101+
env_vars.insert("NODE_VAR".to_string(), "node-${NODE_ADDRESS}".to_string());
102+
103+
let task = Task {
104+
id: Uuid::new_v4(),
105+
image: "image".to_string(),
106+
name: "name".to_string(),
107+
state: TaskState::PENDING,
108+
created_at: 1,
109+
env_vars: Some(env_vars),
110+
args: Some(vec![
111+
"--task=${TASK_ID}".to_string(),
112+
"--node=${NODE_ADDRESS}".to_string(),
113+
]),
114+
..Default::default()
115+
};
116+
117+
let _ = state.store_context.task_store.add_task(task.clone()).await;
118+
119+
let result = scheduler.get_task_for_node(node_address).await.unwrap();
120+
assert!(result.is_some());
121+
122+
let returned_task = result.unwrap();
123+
124+
// Check env vars replacement
125+
let env_vars = returned_task.env_vars.unwrap();
126+
assert_eq!(
127+
env_vars.get("TASK_ID_VAR").unwrap(),
128+
&format!("task-{}", task.id)
129+
);
130+
assert_eq!(
131+
env_vars.get("NODE_VAR").unwrap(),
132+
&format!("node-{}", node_address)
133+
);
134+
135+
// Check args replacement
136+
let args = returned_task.args.unwrap();
137+
assert_eq!(args[0], format!("--task={}", task.id));
138+
assert_eq!(args[1], format!("--node={}", node_address));
139+
}
71140
}

0 commit comments

Comments
 (0)