Skip to content

Commit ca4cf96

Browse files
author
root
committed
Make training scheduler launch subprocesses asynchronously
1 parent 831ce16 commit ca4cf96

2 files changed

Lines changed: 94 additions & 18 deletions

File tree

src/scheduler.rs

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,18 @@ pub fn cooldown_allows_training(
9595
Ok(elapsed_secs >= cooldown_secs)
9696
}
9797

98+
pub fn training_launch_allowed(profile: &ModerationProfile) -> Result<bool> {
99+
let Some(status) = profile.training_status() else {
100+
return Ok(true);
101+
};
102+
103+
let Some(state) = status.get("status").and_then(|value| value.as_str()) else {
104+
return Ok(true);
105+
};
106+
107+
Ok(state != "running")
108+
}
109+
98110
pub fn build_training_subprocess_command(
99111
root_dir: &str,
100112
profile_name: &str,
@@ -127,6 +139,10 @@ pub async fn plan_training_round(
127139
let mut planned = Vec::new();
128140

129141
for scanned in scan_profiles(root_dir)? {
142+
if !training_launch_allowed(&scanned.profile)? {
143+
continue;
144+
}
145+
130146
if !cooldown_allows_training(
131147
&scanned.profile,
132148
settings.training_scheduler_failure_cooldown_minutes,
@@ -163,23 +179,51 @@ pub async fn plan_training_round(
163179
Ok(planned)
164180
}
165181

182+
pub fn spawn_detached_command(
183+
program: &str,
184+
args: &[String],
185+
profile_name: &str,
186+
) -> Result<u32> {
187+
let mut child = std::process::Command::new(program)
188+
.args(args)
189+
.stdin(std::process::Stdio::null())
190+
.stdout(std::process::Stdio::inherit())
191+
.stderr(std::process::Stdio::inherit())
192+
.spawn()
193+
.with_context(|| format!("failed to spawn detached command for {}", profile_name))?;
194+
let pid = child.id();
195+
let profile_name = profile_name.to_string();
196+
std::thread::spawn(move || match child.wait() {
197+
Ok(status) => {
198+
if !status.success() {
199+
warn!(
200+
profile = %profile_name,
201+
status = ?status.code(),
202+
"training subprocess exited unsuccessfully"
203+
);
204+
}
205+
}
206+
Err(err) => {
207+
warn!(
208+
profile = %profile_name,
209+
error = %err,
210+
"failed to wait for detached training subprocess"
211+
);
212+
}
213+
});
214+
Ok(pid)
215+
}
216+
166217
pub async fn spawn_training_subprocess(
167218
settings: &Settings,
168219
profile_name: &str,
169-
) -> Result<std::process::ExitStatus> {
220+
) -> Result<u32> {
170221
let command = build_training_subprocess_command(
171222
&settings.root_dir.display().to_string(),
172223
profile_name,
173224
&settings.training_subprocess_allowed_cpus,
174225
)?;
175-
let status = std::process::Command::new(&command.program)
176-
.args(&command.args)
177-
.stdin(std::process::Stdio::null())
178-
.stdout(std::process::Stdio::inherit())
179-
.stderr(std::process::Stdio::inherit())
180-
.status()
181-
.with_context(|| format!("failed to run training subprocess for {}", profile_name))?;
182-
Ok(status)
226+
spawn_detached_command(&command.program, &command.args, profile_name)
183227
}
184228

185229
pub async fn run_scheduler_once(settings: &Settings) -> Result<Vec<PlannedTrainingAction>> {
@@ -197,14 +241,8 @@ pub async fn run_scheduler_once(settings: &Settings) -> Result<Vec<PlannedTraini
197241
"scheduler selected profile for training"
198242
);
199243

200-
let status = spawn_training_subprocess(settings, &action.profile_name).await?;
201-
if !status.success() {
202-
warn!(
203-
profile = %action.profile_name,
204-
status = ?status.code(),
205-
"training subprocess exited unsuccessfully"
206-
);
207-
}
244+
let pid = spawn_training_subprocess(settings, &action.profile_name).await?;
245+
info!(profile = %action.profile_name, pid, "scheduler started detached training subprocess");
208246
}
209247

210248
Ok(planned)

tests/scheduler_tests.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use profile::ModerationProfile;
1919
use serde_json::json;
2020
use std::path::PathBuf;
2121
use std::sync::{Mutex, OnceLock};
22-
use std::time::{Duration, SystemTime, UNIX_EPOCH};
22+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
2323
use sample_rpc::{
2424
serve_unix_requests_until_shutdown, SampleRpcRequest, SampleRpcResponse,
2525
};
@@ -187,6 +187,44 @@ fn scheduler_allows_profile_after_failure_cooldown_elapses() {
187187
assert!(decision);
188188
}
189189

190+
#[test]
191+
fn scheduler_skips_profile_when_training_is_already_running() {
192+
let profile = write_profile(
193+
&format!("scheduler-training-running-{}", std::process::id()),
194+
json!({"local_model_type": "hashlinear"}),
195+
);
196+
write_training_status(
197+
&profile,
198+
json!({
199+
"status": "running",
200+
"message": "training in progress",
201+
"timestamp": current_unix_secs(),
202+
"profile": profile.profile_name,
203+
"model_type": "hashlinear"
204+
}),
205+
);
206+
207+
let decision = scheduler::training_launch_allowed(&profile).expect("training launch decision");
208+
209+
assert!(!decision);
210+
}
211+
212+
#[test]
213+
fn scheduler_spawn_detached_command_returns_before_child_exits() {
214+
let started = Instant::now();
215+
let pid = scheduler::spawn_detached_command(
216+
"/bin/sh",
217+
&["-c".to_string(), "sleep 1".to_string()],
218+
"detached-test",
219+
)
220+
.expect("spawn detached command");
221+
222+
assert!(pid > 0);
223+
assert!(started.elapsed() < Duration::from_millis(500));
224+
225+
std::thread::sleep(Duration::from_millis(1200));
226+
}
227+
190228
#[test]
191229
fn scheduler_command_uses_single_core_systemd_scope() {
192230
let root_dir = repo_root();

0 commit comments

Comments
 (0)