Skip to content

Commit c0f239c

Browse files
committed
feat(executor): implement graceful job timeout with SIGTERM/SIGKILL
- Change default timeout from 10s to 1h for long-running jobs - Send SIGTERM first on timeout, wait 10s grace period, then SIGKILL - Refactor stdout/stderr handling to use async tasks (prevents deadlock) - Add nix crate for Unix signal handling
1 parent 9aa4455 commit c0f239c

File tree

4 files changed

+107
-10
lines changed

4 files changed

+107
-10
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"
2121
tracing = "0.1"
2222
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
2323

24+
[target.'cfg(unix)'.dependencies]
25+
nix = { version = "0.29", features = ["signal", "process"] }
26+
2427
[dev-dependencies]
2528
tempfile = "3"
2629
serde_json = "1"

src/actor/job/executor.rs

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ use crate::webhook::{self, JobFailure};
1616
/// Default jitter ratio when not explicitly configured (25% of base delay)
1717
const AUTO_JITTER_RATIO: u32 = 25;
1818

19+
/// Grace period to wait after SIGTERM before sending SIGKILL
20+
const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
21+
1922
pub async fn execute_job(job: &Job, sot_path: &Path, runner: &RunnerConfig) -> bool {
2023
let job_dir = git::get_job_dir(sot_path, &job.id);
2124
let work_dir = resolve_work_dir(sot_path, &job.id, &job.working_dir);
@@ -155,27 +158,105 @@ async fn run_command(
155158
cmd.args(["-c", &job.command])
156159
.current_dir(work_dir)
157160
.stdout(std::process::Stdio::piped())
158-
.stderr(std::process::Stdio::piped())
159-
.kill_on_drop(true);
161+
.stderr(std::process::Stdio::piped());
160162

161163
for (key, value) in env_vars {
162164
cmd.env(key, value);
163165
}
164166

165-
let child = match cmd.spawn() {
167+
let mut child = match cmd.spawn() {
166168
Ok(c) => c,
167169
Err(e) => return CommandResult::ExecError(e.to_string()),
168170
};
169171

170-
let result = tokio::time::timeout(job.timeout, child.wait_with_output()).await;
171-
172-
match result {
173-
Ok(Ok(output)) => CommandResult::Completed(output),
172+
// Take stdout/stderr handles before waiting
173+
let stdout = child.stdout.take();
174+
let stderr = child.stderr.take();
175+
176+
// Spawn tasks to read output concurrently (prevents buffer deadlock)
177+
let stdout_task = tokio::spawn(async move {
178+
match stdout {
179+
Some(mut out) => {
180+
let mut buf = Vec::new();
181+
let _ = tokio::io::AsyncReadExt::read_to_end(&mut out, &mut buf).await;
182+
buf
183+
}
184+
None => Vec::new(),
185+
}
186+
});
187+
let stderr_task = tokio::spawn(async move {
188+
match stderr {
189+
Some(mut err) => {
190+
let mut buf = Vec::new();
191+
let _ = tokio::io::AsyncReadExt::read_to_end(&mut err, &mut buf).await;
192+
buf
193+
}
194+
None => Vec::new(),
195+
}
196+
});
197+
198+
// Wait for process with timeout
199+
let wait_result = tokio::time::timeout(job.timeout, child.wait()).await;
200+
201+
match wait_result {
202+
Ok(Ok(status)) => {
203+
let stdout = stdout_task.await.unwrap_or_default();
204+
let stderr = stderr_task.await.unwrap_or_default();
205+
CommandResult::Completed(std::process::Output {
206+
status,
207+
stdout,
208+
stderr,
209+
})
210+
}
174211
Ok(Err(e)) => CommandResult::ExecError(e.to_string()),
175-
Err(_) => CommandResult::Timeout,
212+
Err(_) => {
213+
// Timeout occurred - attempt graceful shutdown
214+
graceful_kill(&mut child, &job.id).await;
215+
CommandResult::Timeout
216+
}
176217
}
177218
}
178219

220+
/// Attempts graceful shutdown: SIGTERM first, then SIGKILL after grace period.
221+
#[cfg(unix)]
222+
async fn graceful_kill(child: &mut tokio::process::Child, job_id: &str) {
223+
use nix::sys::signal::{kill, Signal};
224+
use nix::unistd::Pid;
225+
226+
let Some(pid) = child.id() else {
227+
return; // Process already exited
228+
};
229+
let pid = Pid::from_raw(pid as i32);
230+
231+
// Send SIGTERM for graceful shutdown
232+
if kill(pid, Signal::SIGTERM).is_ok() {
233+
debug!(target: "rollcron::job", job_id = %job_id, "Sent SIGTERM, waiting for graceful exit");
234+
235+
// Wait for process to exit gracefully
236+
if tokio::time::timeout(GRACEFUL_SHUTDOWN_TIMEOUT, child.wait())
237+
.await
238+
.is_ok()
239+
{
240+
debug!(target: "rollcron::job", job_id = %job_id, "Process exited gracefully after SIGTERM");
241+
return;
242+
}
243+
244+
// Grace period expired - force kill
245+
warn!(target: "rollcron::job", job_id = %job_id, "Grace period expired, sending SIGKILL");
246+
}
247+
248+
// Send SIGKILL
249+
let _ = child.kill().await;
250+
let _ = child.wait().await;
251+
}
252+
253+
#[cfg(not(unix))]
254+
async fn graceful_kill(child: &mut tokio::process::Child, _job_id: &str) {
255+
// On non-Unix platforms, just kill immediately
256+
let _ = child.kill().await;
257+
let _ = child.wait().await;
258+
}
259+
179260
/// Load runner-level env vars for webhook URL expansion.
180261
/// Returns None on error (webhook will fall back to process env).
181262
fn load_runner_env_vars(

src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ pub struct ScheduleConfig {
151151
}
152152

153153
fn default_timeout() -> String {
154-
"10s".to_string()
154+
"1h".to_string()
155155
}
156156

157157
fn default_log_max_size() -> String {
@@ -349,7 +349,7 @@ jobs:
349349
assert_eq!(jobs[0].id, "hello");
350350
assert_eq!(jobs[0].name, "hello");
351351
assert_eq!(jobs[0].command, "echo hello");
352-
assert_eq!(jobs[0].timeout, Duration::from_secs(10));
352+
assert_eq!(jobs[0].timeout, Duration::from_secs(3600));
353353
}
354354

355355
#[test]

0 commit comments

Comments
 (0)