Skip to content

Commit c77b5ab

Browse files
committed
refactor: add job completion notifications to Runner
- Job Actor notifies Runner on completion (JobCompleted/JobFailed) - Runner handlers use fire-and-forget (tokio::spawn) to avoid blocking - executor::execute_job now returns bool for success/failure
1 parent 9168b79 commit c77b5ab

File tree

3 files changed

+99
-17
lines changed

3 files changed

+99
-17
lines changed

src/actor/job/executor.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub struct WebhookPayload {
2727
pub attempts: u32,
2828
}
2929

30-
pub async fn execute_job(job: &Job, sot_path: &PathBuf, runner: &RunnerConfig) {
30+
pub async fn execute_job(job: &Job, sot_path: &PathBuf, runner: &RunnerConfig) -> bool {
3131
let job_dir = git::get_job_dir(sot_path, &job.id);
3232
let work_dir = resolve_work_dir(sot_path, &job.id, &job.working_dir);
3333
let mut log_file = job
@@ -75,7 +75,7 @@ pub async fn execute_job(job: &Job, sot_path: &PathBuf, runner: &RunnerConfig) {
7575
let success = handle_result(job, &result, log_file.as_mut());
7676

7777
if success {
78-
return;
78+
return true;
7979
}
8080

8181
last_result = Some(result);
@@ -113,6 +113,8 @@ pub async fn execute_job(job: &Job, sot_path: &PathBuf, runner: &RunnerConfig) {
113113
send_webhook(&webhook.to_url(), &payload).await;
114114
}
115115
}
116+
117+
false
116118
}
117119

118120
fn resolve_work_dir(sot_path: &PathBuf, job_id: &str, working_dir: &Option<String>) -> PathBuf {

src/actor/job/mod.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
mod executor;
22
mod tick;
33

4+
use crate::actor::runner::{JobCompleted, JobFailed, RunnerActor};
45
use crate::config::{Concurrency, Job, RunnerConfig};
56
use crate::git;
67
use std::path::PathBuf;
78
use tokio::task::JoinHandle;
89
use tracing::{error, info};
910
use xtra::prelude::*;
11+
use xtra::refcount::Weak;
1012

1113
use executor::execute_job;
1214
use tick::is_job_due;
@@ -16,18 +18,25 @@ pub struct JobActor {
1618
job: Job,
1719
sot_path: PathBuf,
1820
runner: RunnerConfig,
21+
runner_addr: Option<Address<RunnerActor, Weak>>,
1922
pending_sync: bool,
2023
handles: Vec<JoinHandle<()>>,
2124
tick_handle: Option<JoinHandle<()>>,
2225
stopping: bool,
2326
}
2427

2528
impl JobActor {
26-
pub fn new(job: Job, sot_path: PathBuf, runner: RunnerConfig) -> Self {
29+
pub fn new(
30+
job: Job,
31+
sot_path: PathBuf,
32+
runner: RunnerConfig,
33+
runner_addr: Option<Address<RunnerActor, Weak>>,
34+
) -> Self {
2735
Self {
2836
job,
2937
sot_path,
3038
runner,
39+
runner_addr,
3140
pending_sync: true, // Initial sync needed
3241
handles: Vec::new(),
3342
tick_handle: None,
@@ -222,30 +231,61 @@ impl JobActor {
222231
}
223232

224233
fn spawn_job(&mut self) {
234+
let job_id = self.job.id.clone();
225235
let job = self.job.clone();
226236
let sot_path = self.sot_path.clone();
227237
let runner = self.runner.clone();
238+
let runner_addr = self.runner_addr.clone();
228239

229240
let handle = tokio::spawn(async move {
230-
execute_job(&job, &sot_path, &runner).await;
241+
let success = execute_job(&job, &sot_path, &runner).await;
242+
if let Some(addr) = runner_addr {
243+
let msg = if success {
244+
Either::Left(JobCompleted { job_id })
245+
} else {
246+
Either::Right(JobFailed { job_id })
247+
};
248+
match msg {
249+
Either::Left(m) => { let _ = addr.send(m).await; }
250+
Either::Right(m) => { let _ = addr.send(m).await; }
251+
}
252+
}
231253
});
232254

233255
self.handles.push(handle);
234256
}
235257

236258
fn spawn_waiting_job(&mut self) {
259+
let job_id = self.job.id.clone();
237260
let job = self.job.clone();
238261
let sot_path = self.sot_path.clone();
239262
let runner = self.runner.clone();
263+
let runner_addr = self.runner_addr.clone();
240264
let previous_handles = std::mem::take(&mut self.handles);
241265

242266
let handle = tokio::spawn(async move {
243267
for prev_handle in previous_handles {
244268
let _ = prev_handle.await;
245269
}
246-
execute_job(&job, &sot_path, &runner).await;
270+
let success = execute_job(&job, &sot_path, &runner).await;
271+
if let Some(addr) = runner_addr {
272+
let msg = if success {
273+
Either::Left(JobCompleted { job_id })
274+
} else {
275+
Either::Right(JobFailed { job_id })
276+
};
277+
match msg {
278+
Either::Left(m) => { let _ = addr.send(m).await; }
279+
Either::Right(m) => { let _ = addr.send(m).await; }
280+
}
281+
}
247282
});
248283

249284
self.handles.push(handle);
250285
}
251286
}
287+
288+
enum Either<L, R> {
289+
Left(L),
290+
Right(R),
291+
}

src/actor/runner/mod.rs

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::time::Duration;
1010
use tokio::task::JoinHandle;
1111
use tracing::{error, info, warn};
1212
use xtra::prelude::*;
13+
use xtra::refcount::Weak;
1314

1415
const CONFIG_FILE: &str = "rollcron.yaml";
1516

@@ -22,6 +23,7 @@ pub struct RunnerActor {
2223
job_actors: HashMap<String, Address<JobActor>>,
2324
poll_handle: Option<JoinHandle<()>>,
2425
supervisor_handle: Option<JoinHandle<()>>,
26+
self_addr: Option<Address<Self, Weak>>,
2527
}
2628

2729
impl RunnerActor {
@@ -39,12 +41,19 @@ impl RunnerActor {
3941
job_actors: HashMap::new(),
4042
poll_handle: None,
4143
supervisor_handle: None,
44+
self_addr: None,
4245
}
4346
}
4447

4548
fn spawn_job_actor(&mut self, job: Job) {
4649
let job_id = job.id.clone();
47-
let actor = JobActor::new(job, self.sot_path.clone(), self.runner_config.clone());
50+
let runner_addr = self.self_addr.clone();
51+
let actor = JobActor::new(
52+
job,
53+
self.sot_path.clone(),
54+
self.runner_config.clone(),
55+
runner_addr,
56+
);
4857
let addr = xtra::spawn_tokio(actor, Mailbox::unbounded());
4958
self.job_actors.insert(job_id, addr);
5059
}
@@ -55,6 +64,7 @@ impl Actor for RunnerActor {
5564

5665
async fn started(&mut self, mailbox: &Mailbox<Self>) -> Result<(), Self::Stop> {
5766
let addr = mailbox.address();
67+
self.self_addr = Some(addr.clone());
5868

5969
// Start git poll loop
6070
let source = self.source.clone();
@@ -82,9 +92,11 @@ impl Actor for RunnerActor {
8292
handle.abort();
8393
}
8494

85-
// Shutdown all job actors
95+
// Shutdown all job actors (fire-and-forget)
8696
for (_, addr) in self.job_actors.drain() {
87-
let _ = addr.send(Shutdown).await;
97+
tokio::spawn(async move {
98+
let _ = addr.send(Shutdown).await;
99+
});
88100
}
89101

90102
info!(target: "rollcron::runner", "Runner actor stopped");
@@ -142,24 +154,26 @@ impl Handler<ConfigUpdate> for RunnerActor {
142154
.cloned()
143155
.collect();
144156

145-
// Remove deleted jobs
157+
// Remove deleted jobs (fire-and-forget)
146158
for job_id in to_remove {
147159
if let Some(addr) = self.job_actors.remove(&job_id) {
148160
info!(target: "rollcron::runner", job_id = %job_id, "Removing job actor");
149-
let _ = addr.send(Shutdown).await;
161+
tokio::spawn(async move {
162+
let _ = addr.send(Shutdown).await;
163+
});
150164
}
151165
}
152166

153167
// Update or create jobs
154168
for (job_id, job) in new_job_ids {
155169
if let Some(addr) = self.job_actors.get(&job_id) {
156-
// Update existing job
157-
let _ = addr
158-
.send(SyncNeeded {
159-
sot_path: msg.sot_path.clone(),
160-
})
161-
.await;
162-
let _ = addr.send(Update(job)).await;
170+
// Update existing job (fire-and-forget)
171+
let addr = addr.clone();
172+
let sot_path = msg.sot_path.clone();
173+
tokio::spawn(async move {
174+
let _ = addr.send(SyncNeeded { sot_path }).await;
175+
let _ = addr.send(Update(job)).await;
176+
});
163177
} else {
164178
// Create new job
165179
let job_dir = git::get_job_dir(&self.sot_path, &job_id);
@@ -238,3 +252,29 @@ impl Handler<GracefulShutdown> for RunnerActor {
238252
ctx.stop_self();
239253
}
240254
}
255+
256+
/// Job execution completed successfully
257+
pub struct JobCompleted {
258+
pub job_id: String,
259+
}
260+
261+
impl Handler<JobCompleted> for RunnerActor {
262+
type Return = ();
263+
264+
async fn handle(&mut self, msg: JobCompleted, _ctx: &mut Context<Self>) {
265+
info!(target: "rollcron::runner", job_id = %msg.job_id, "Job completed");
266+
}
267+
}
268+
269+
/// Job execution failed after all retries
270+
pub struct JobFailed {
271+
pub job_id: String,
272+
}
273+
274+
impl Handler<JobFailed> for RunnerActor {
275+
type Return = ();
276+
277+
async fn handle(&mut self, msg: JobFailed, _ctx: &mut Context<Self>) {
278+
warn!(target: "rollcron::runner", job_id = %msg.job_id, "Job failed");
279+
}
280+
}

0 commit comments

Comments
 (0)