Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 728da09

Browse files
authored
introduce task container, cleanup task volume handling (#465)
* introduce task container, cleanup task volume handling * introduce task hash comparison on model level
1 parent 2e0a139 commit 728da09

7 files changed

Lines changed: 255 additions & 177 deletions

File tree

crates/shared/src/models/task.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use std::collections::HashMap;
33
use chrono::Utc;
44
use redis::{ErrorKind, FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value};
55
use serde::{Deserialize, Serialize};
6+
use std::collections::hash_map::DefaultHasher;
7+
use std::hash::{Hash, Hasher};
68
use uuid::Uuid;
79

810
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
@@ -183,6 +185,44 @@ pub struct Task {
183185
pub volume_mounts: Option<Vec<VolumeMount>>,
184186
}
185187

188+
impl Task {
189+
/// Generate a hash of the task configuration for comparison purposes
190+
pub fn generate_config_hash(&self) -> u64 {
191+
let mut hasher = DefaultHasher::new();
192+
193+
// Hash core configuration
194+
self.image.hash(&mut hasher);
195+
self.cmd.hash(&mut hasher);
196+
self.entrypoint.hash(&mut hasher);
197+
198+
// Hash environment variables in sorted order for consistency
199+
if let Some(env_vars) = &self.env_vars {
200+
let mut sorted_env: Vec<_> = env_vars.iter().collect();
201+
sorted_env.sort_by_key(|(k, _)| *k);
202+
for (key, value) in sorted_env {
203+
key.hash(&mut hasher);
204+
value.hash(&mut hasher);
205+
}
206+
}
207+
208+
// Hash volume mounts in sorted order for consistency
209+
if let Some(volume_mounts) = &self.volume_mounts {
210+
let mut sorted_volumes: Vec<_> = volume_mounts.iter().collect();
211+
sorted_volumes.sort_by(|a, b| {
212+
a.host_path
213+
.cmp(&b.host_path)
214+
.then_with(|| a.container_path.cmp(&b.container_path))
215+
});
216+
for volume_mount in sorted_volumes {
217+
volume_mount.host_path.hash(&mut hasher);
218+
volume_mount.container_path.hash(&mut hasher);
219+
}
220+
}
221+
222+
hasher.finish()
223+
}
224+
}
225+
186226
impl Default for Task {
187227
fn default() -> Self {
188228
Self {

crates/worker/src/docker/docker_manager.rs

Lines changed: 126 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::docker::task_container::TaskContainer;
12
use bollard::container::{
23
Config, CreateContainerOptions, ListContainersOptions, LogsOptions, StartContainerOptions,
34
};
@@ -14,6 +15,7 @@ use log::{debug, error, info};
1415
use shared::models::node::GpuSpecs;
1516
use std::collections::HashMap;
1617
use std::path::{Path, PathBuf};
18+
use std::str::FromStr;
1719
use std::time::Duration;
1820
use strip_ansi_escapes::strip;
1921

@@ -215,9 +217,17 @@ impl DockerManager {
215217

216218
let mut final_volumes = Vec::new();
217219
if self.storage_path.is_some() {
218-
// Create task-specific data volume
219220
let volume_name = format!("{}_data", name);
220-
let task_data_path = self.safe_storage_path(&[name.trim_start_matches('/'), "data"])?;
221+
222+
let data_dir_name = match TaskContainer::from_str(name) {
223+
Ok(task_container) => task_container.data_dir_name(),
224+
Err(_) => {
225+
// Fallback to using full container name if extraction fails
226+
name.trim_start_matches('/').to_string()
227+
}
228+
};
229+
230+
let task_data_path = self.safe_storage_path(&[&data_dir_name, "data"])?;
221231
Self::create_secure_directory(&task_data_path)?;
222232

223233
self.docker
@@ -300,16 +310,24 @@ impl DockerManager {
300310
let processed_volumes: Vec<(String, String, bool)> = if let Some(_storage_path) =
301311
&self.storage_path
302312
{
303-
// Create volume mount directories within storage path structure
304313
vols.into_iter()
305314
.map(|(host_path, container_path, read_only, task_volume)| {
306315
if task_volume {
307316
// Create volume mount directory within the task's storage area
308317
// Remove leading slash and sanitize the path
309318
let sanitized_host_path =
310319
host_path.trim_start_matches('/').replace('/', "_");
320+
321+
let mount_dir_name = match TaskContainer::from_str(name) {
322+
Ok(task_container) => task_container.data_dir_name(),
323+
Err(_) => {
324+
// Fallback to using full container name if extraction fails
325+
name.trim_start_matches('/').to_string()
326+
}
327+
};
328+
311329
match self.safe_storage_path(&[
312-
name.trim_start_matches('/'),
330+
&mount_dir_name,
313331
"mounts",
314332
&sanitized_host_path,
315333
]) {
@@ -559,72 +577,120 @@ impl DockerManager {
559577
}
560578
}
561579

562-
// --- Step 4: Remove directory with retries ---
580+
// --- Step 4: Check if other containers with same task ID exist before removing directory ---
563581
if self.storage_path.is_some() {
564-
match self.safe_storage_path(&[trimmed_name]) {
565-
Ok(dir_path) => {
566-
// Check if directory exists before attempting to remove it
567-
if dir_path.exists() {
568-
let mut success = false;
569-
570-
for attempt in 0..max_retries {
571-
match std::fs::remove_dir_all(&dir_path) {
572-
Ok(_) => {
573-
info!(
574-
"Directory {} removed successfully",
575-
dir_path.display()
576-
);
577-
success = true;
578-
break;
579-
}
580-
Err(e) => {
581-
debug!(
582-
"Attempt {}/{} failed to remove dir {}: {}",
583-
attempt + 1,
584-
max_retries,
585-
dir_path.display(),
586-
e
587-
);
588-
tokio::time::sleep(Duration::from_secs(1)).await;
582+
let should_remove_directory = if let Ok(task_container) =
583+
TaskContainer::from_str(trimmed_name)
584+
{
585+
// Check if there are other containers with the same task ID
586+
match self.list_containers(true).await {
587+
Ok(containers) => {
588+
let other_containers_with_same_task = containers.iter().any(|c| {
589+
c.names.iter().any(|name| {
590+
let clean_name = name.trim_start_matches('/');
591+
if let Ok(other_task_container) = TaskContainer::from_str(name)
592+
{
593+
// Same task ID but different container (not the one being removed)
594+
other_task_container.task_id == task_container.task_id
595+
&& clean_name != trimmed_name
596+
} else {
597+
false
589598
}
590-
}
599+
})
600+
});
601+
602+
if other_containers_with_same_task {
603+
info!("Other containers with task ID {} exist, keeping shared directory", task_container.task_id);
604+
false
605+
} else {
606+
info!("No other containers with task ID {} found, safe to remove directory", task_container.task_id);
607+
true
591608
}
609+
}
610+
Err(e) => {
611+
error!("Failed to list containers for cleanup check: {}", e);
612+
// Err on the side of caution - don't remove directory if we can't check
613+
false
614+
}
615+
}
616+
} else {
617+
// If we can't extract task ID, use original behavior
618+
true
619+
};
592620

593-
if !success {
594-
error!(
595-
"Failed to remove directory {} after {} attempts — trying fallback",
596-
dir_path.display(), max_retries
597-
);
621+
if should_remove_directory {
622+
let dir_name = if let Ok(task_container) = TaskContainer::from_str(trimmed_name)
623+
{
624+
task_container.data_dir_name()
625+
} else {
626+
trimmed_name.to_string()
627+
};
598628

599-
// Try `rm -rf` as fallback
600-
match std::process::Command::new("rm")
601-
.arg("-rf")
602-
.arg(&dir_path)
603-
.status()
604-
{
605-
Ok(status) if status.success() => {
606-
info!(
607-
"Fallback removal of {} succeeded",
608-
dir_path.display()
609-
);
610-
}
611-
Ok(status) => {
612-
error!("Fallback rm -rf failed with status {}", status);
629+
match self.safe_storage_path(&[&dir_name]) {
630+
Ok(dir_path) => {
631+
// Check if directory exists before attempting to remove it
632+
if dir_path.exists() {
633+
let mut success = false;
634+
635+
for attempt in 0..max_retries {
636+
match std::fs::remove_dir_all(&dir_path) {
637+
Ok(_) => {
638+
info!(
639+
"Directory {} removed successfully",
640+
dir_path.display()
641+
);
642+
success = true;
643+
break;
644+
}
645+
Err(e) => {
646+
debug!(
647+
"Attempt {}/{} failed to remove dir {}: {}",
648+
attempt + 1,
649+
max_retries,
650+
dir_path.display(),
651+
e
652+
);
653+
tokio::time::sleep(Duration::from_secs(1)).await;
654+
}
613655
}
614-
Err(e) => {
615-
error!("Failed to execute fallback rm -rf: {}", e);
656+
}
657+
658+
if !success {
659+
error!(
660+
"Failed to remove directory {} after {} attempts — trying fallback",
661+
dir_path.display(), max_retries
662+
);
663+
664+
// Try `rm -rf` as fallback
665+
match std::process::Command::new("rm")
666+
.arg("-rf")
667+
.arg(&dir_path)
668+
.status()
669+
{
670+
Ok(status) if status.success() => {
671+
info!(
672+
"Fallback removal of {} succeeded",
673+
dir_path.display()
674+
);
675+
}
676+
Ok(status) => {
677+
error!("Fallback rm -rf failed with status {}", status);
678+
}
679+
Err(e) => {
680+
error!("Failed to execute fallback rm -rf: {}", e);
681+
}
616682
}
617683
}
684+
} else {
685+
debug!(
686+
"Directory {} does not exist, skipping removal",
687+
dir_path.display()
688+
);
618689
}
619-
} else {
620-
debug!(
621-
"Directory {} does not exist, skipping removal",
622-
dir_path.display()
623-
);
624690
}
625-
}
626-
Err(e) => {
627-
error!("Failed to create secure path for directory removal: {}", e);
691+
Err(e) => {
692+
error!("Failed to create secure path for directory removal: {}", e);
693+
}
628694
}
629695
}
630696
}

crates/worker/src/docker/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod docker_manager;
22
pub mod service;
33
pub mod state;
4+
pub mod task_container;
45
pub mod taskbridge;
56

67
pub use docker_manager::DockerManager;

0 commit comments

Comments
 (0)