Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit f2d8ca5

Browse files
authored
feat(orchestrator): customize volume mounts via task (#458)
* allow to set custom volume mounts per task
1 parent 76c2ee3 commit f2d8ca5

5 files changed

Lines changed: 647 additions & 92 deletions

File tree

crates/orchestrator/src/plugins/node_groups/scheduler_impl.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,25 @@ impl SchedulerPlugin for NodeGroupsPlugin {
164164
})
165165
.collect::<Vec<String>>()
166166
});
167+
// Replace group variables in volume mounts if they exist
168+
if let Some(volume_mounts) = task_clone.volume_mounts {
169+
task_clone.volume_mounts = Some(
170+
volume_mounts
171+
.into_iter()
172+
.map(|mut volume_mount| {
173+
volume_mount.host_path =
174+
volume_mount.host_path.replace("${GROUP_ID}", &group.id);
175+
176+
volume_mount.container_path = volume_mount
177+
.container_path
178+
.replace("${GROUP_ID}", &group.id);
179+
180+
volume_mount
181+
})
182+
.collect(),
183+
);
184+
}
185+
167186
return Ok(vec![task_clone]);
168187
}
169188
}

crates/orchestrator/src/scheduler/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,20 @@ impl Scheduler {
5353
}
5454
}
5555

56+
// Replace variables in volume mounts
57+
if let Some(volume_mounts) = &mut task.volume_mounts {
58+
// Extract group_id from metadata labels if available
59+
60+
for volume_mount in volume_mounts.iter_mut() {
61+
// Use the replace_labels method with all variables
62+
let processed = volume_mount
63+
.replace_labels(&task.id.to_string(), Some(&node_address.to_string()));
64+
65+
// Replace the mount with the processed version
66+
*volume_mount = processed;
67+
}
68+
}
69+
5670
return Ok(Some(task));
5771
}
5872

crates/shared/src/models/task.rs

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,89 @@ pub struct SchedulingConfig {
5858
pub plugins: Option<HashMap<String, HashMap<String, Vec<String>>>>,
5959
}
6060

61+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
62+
pub struct VolumeMount {
63+
/// Name/path of the volume on the host (supports label replacements)
64+
pub host_path: String,
65+
/// Path where the volume should be mounted in the container
66+
pub container_path: String,
67+
}
68+
69+
impl VolumeMount {
70+
/// Replace labels in the host_path with actual values
71+
/// Note: GROUP_ID replacement is handled by the node groups plugin
72+
/// (temporary until we have an expander trait)
73+
pub fn replace_labels(&self, task_id: &str, node_address: Option<&str>) -> Self {
74+
let mut host_path = self.host_path.clone();
75+
let mut container_path = self.container_path.clone();
76+
77+
// Replace ${TASK_ID} with actual task ID
78+
host_path = host_path.replace("${TASK_ID}", task_id);
79+
container_path = container_path.replace("${TASK_ID}", task_id);
80+
81+
// Replace ${NODE_ADDRESS} with actual node address if provided
82+
if let Some(addr) = node_address {
83+
host_path = host_path.replace("${NODE_ADDRESS}", addr);
84+
container_path = container_path.replace("${NODE_ADDRESS}", addr);
85+
}
86+
87+
// Get current timestamp for ${TIMESTAMP}
88+
let timestamp = chrono::Utc::now().timestamp().to_string();
89+
host_path = host_path.replace("${TIMESTAMP}", &timestamp);
90+
container_path = container_path.replace("${TIMESTAMP}", &timestamp);
91+
92+
Self {
93+
host_path,
94+
container_path,
95+
}
96+
}
97+
98+
/// Validate the volume mount configuration
99+
pub fn validate(&self) -> Result<(), String> {
100+
if self.host_path.is_empty() {
101+
return Err("Host path cannot be empty".to_string());
102+
}
103+
104+
if self.container_path.is_empty() {
105+
return Err("Container path cannot be empty".to_string());
106+
}
107+
108+
// Check for supported variables
109+
let supported_vars = [
110+
"${TASK_ID}",
111+
"${GROUP_ID}",
112+
"${TIMESTAMP}",
113+
"${NODE_ADDRESS}",
114+
];
115+
116+
let re = regex::Regex::new(r"\$\{[^}]+\}").unwrap();
117+
118+
// Check host_path
119+
for cap in re.find_iter(&self.host_path) {
120+
let var = cap.as_str();
121+
if !supported_vars.contains(&var) {
122+
return Err(format!(
123+
"Volume mount host_path contains unsupported variable: {}. Supported variables: {:?}",
124+
var, supported_vars
125+
));
126+
}
127+
}
128+
129+
// Check container_path
130+
for cap in re.find_iter(&self.container_path) {
131+
let var = cap.as_str();
132+
if !supported_vars.contains(&var) {
133+
return Err(format!(
134+
"Volume mount container_path contains unsupported variable: {}. Supported variables: {:?}",
135+
var, supported_vars
136+
));
137+
}
138+
}
139+
140+
Ok(())
141+
}
142+
}
143+
61144
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
62145
pub struct TaskRequest {
63146
pub image: String,
@@ -68,6 +151,7 @@ pub struct TaskRequest {
68151
pub scheduling_config: Option<SchedulingConfig>,
69152
pub storage_config: Option<StorageConfig>,
70153
pub metadata: Option<TaskMetadata>,
154+
pub volume_mounts: Option<Vec<VolumeMount>>,
71155
}
72156

73157
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -95,6 +179,8 @@ pub struct Task {
95179
pub storage_config: Option<StorageConfig>,
96180
#[serde(default)]
97181
pub metadata: Option<TaskMetadata>,
182+
#[serde(default)]
183+
pub volume_mounts: Option<Vec<VolumeMount>>,
98184
}
99185

100186
impl Default for Task {
@@ -112,6 +198,7 @@ impl Default for Task {
112198
scheduling_config: None,
113199
storage_config: None,
114200
metadata: None,
201+
volume_mounts: None,
115202
}
116203
}
117204
}
@@ -156,6 +243,12 @@ impl TryFrom<TaskRequest> for Task {
156243
storage_config.validate()?;
157244
}
158245

246+
if let Some(volume_mounts) = &request.volume_mounts {
247+
for volume_mount in volume_mounts {
248+
volume_mount.validate()?;
249+
}
250+
}
251+
159252
Ok(Task {
160253
id: Uuid::new_v4(),
161254
image: request.image,
@@ -169,6 +262,7 @@ impl TryFrom<TaskRequest> for Task {
169262
scheduling_config: request.scheduling_config,
170263
storage_config: request.storage_config,
171264
metadata: request.metadata,
265+
volume_mounts: request.volume_mounts,
172266
})
173267
}
174268
}
@@ -204,3 +298,133 @@ impl ToRedisArgs for Task {
204298
out.write_arg(task_json.as_bytes());
205299
}
206300
}
301+
302+
#[cfg(test)]
303+
mod tests {
304+
use super::*;
305+
306+
#[test]
307+
fn test_volume_mount_label_replacement() {
308+
let volume_mount = VolumeMount {
309+
host_path: "/host/data/${TASK_ID}".to_string(),
310+
container_path: "/container/data/${TASK_ID}".to_string(),
311+
};
312+
313+
let processed = volume_mount.replace_labels("task-123", Some("node-addr"));
314+
315+
assert_eq!(processed.host_path, "/host/data/task-123");
316+
assert_eq!(processed.container_path, "/container/data/task-123");
317+
}
318+
319+
#[test]
320+
fn test_volume_mount_label_replacement_without_group() {
321+
let volume_mount = VolumeMount {
322+
host_path: "/host/data/${TASK_ID}".to_string(),
323+
container_path: "/container/data/${TASK_ID}".to_string(),
324+
};
325+
326+
let processed = volume_mount.replace_labels("task-789", None);
327+
328+
assert_eq!(processed.host_path, "/host/data/task-789");
329+
assert_eq!(processed.container_path, "/container/data/task-789");
330+
}
331+
332+
#[test]
333+
fn test_volume_mount_with_timestamp() {
334+
let volume_mount = VolumeMount {
335+
host_path: "/host/logs/${TASK_ID}-${TIMESTAMP}".to_string(),
336+
container_path: "/container/logs".to_string(),
337+
};
338+
339+
let processed = volume_mount.replace_labels("task-123", None);
340+
341+
assert!(processed.host_path.starts_with("/host/logs/task-123-"));
342+
assert!(processed.host_path.len() > "/host/logs/task-123-".len());
343+
assert_eq!(processed.container_path, "/container/logs");
344+
}
345+
346+
#[test]
347+
fn test_volume_mount_validation_success() {
348+
let volume_mount = VolumeMount {
349+
host_path: "/host/data/${TASK_ID}".to_string(),
350+
container_path: "/container/data".to_string(),
351+
};
352+
353+
assert!(volume_mount.validate().is_ok());
354+
}
355+
356+
#[test]
357+
fn test_volume_mount_validation_with_node_address() {
358+
let volume_mount = VolumeMount {
359+
host_path: "/host/data/${NODE_ADDRESS}".to_string(),
360+
container_path: "/container/data/${TASK_ID}".to_string(),
361+
};
362+
363+
assert!(volume_mount.validate().is_ok());
364+
}
365+
366+
#[test]
367+
fn test_volume_mount_validation_empty_host_path() {
368+
let volume_mount = VolumeMount {
369+
host_path: "".to_string(),
370+
container_path: "/container/data".to_string(),
371+
};
372+
373+
assert!(volume_mount.validate().is_err());
374+
assert_eq!(
375+
volume_mount.validate().unwrap_err(),
376+
"Host path cannot be empty"
377+
);
378+
}
379+
380+
#[test]
381+
fn test_volume_mount_validation_empty_container_path() {
382+
let volume_mount = VolumeMount {
383+
host_path: "/host/data".to_string(),
384+
container_path: "".to_string(),
385+
};
386+
387+
assert!(volume_mount.validate().is_err());
388+
assert_eq!(
389+
volume_mount.validate().unwrap_err(),
390+
"Container path cannot be empty"
391+
);
392+
}
393+
394+
#[test]
395+
fn test_volume_mount_validation_unsupported_variable() {
396+
let volume_mount = VolumeMount {
397+
host_path: "/host/data/${UNSUPPORTED_VAR}".to_string(),
398+
container_path: "/container/data".to_string(),
399+
};
400+
401+
assert!(volume_mount.validate().is_err());
402+
assert!(volume_mount
403+
.validate()
404+
.unwrap_err()
405+
.contains("unsupported variable: ${UNSUPPORTED_VAR}"));
406+
}
407+
408+
#[test]
409+
fn test_task_with_volume_mounts() {
410+
let task_request = TaskRequest {
411+
image: "ubuntu:latest".to_string(),
412+
name: "test-task".to_string(),
413+
volume_mounts: Some(vec![
414+
VolumeMount {
415+
host_path: "/host/data/${TASK_ID}".to_string(),
416+
container_path: "/data".to_string(),
417+
},
418+
VolumeMount {
419+
host_path: "/host/logs/${TASK_ID}".to_string(),
420+
container_path: "/logs".to_string(),
421+
},
422+
]),
423+
..Default::default()
424+
};
425+
426+
let task = Task::try_from(task_request).unwrap();
427+
assert!(task.volume_mounts.is_some());
428+
assert_eq!(task.volume_mounts.as_ref().unwrap().len(), 2);
429+
}
430+
}

0 commit comments

Comments
 (0)