Skip to content

Commit 2ae78d9

Browse files
committed
Simplify timeout checks and remove a race condition.
This commit simplifies how we calculate when `poll` should return. Whilst doing this I realised that the HTTP server check is in the wrong place and is subject to a race condition: we could empty the queue, and then get stuck not processing anything for an arbitrary period of time.
1 parent 80fd5d0 commit 2ae78d9

1 file changed

Lines changed: 45 additions & 37 deletions

File tree

src/jobrunner.rs

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -88,24 +88,35 @@ impl JobRunner {
8888
let mut check_queue = false;
8989
// A scratch buffer used to read from files.
9090
let mut buf = Box::new([0; READBUF]);
91-
// The earliest finish_by time of any running process (i.e. the process that will timeout
92-
// the soonest).
93-
let mut next_finish_by: Option<Instant> = None;
9491
loop {
9592
assert!(!tmp_failure || check_queue);
96-
// If there are jobs on the queue we haven't been able to run for temporary reasons,
97-
// then wait a short amount of time and try again.
98-
let mut timeout = if tmp_failure { WAIT_TIMEOUT * 1000 } else { -1 };
99-
// If any processes will exceed their timeout then, if that's shorter than the above
100-
// timeout, only wait for enough time to pass before we need to send them SIGTERM.
101-
if let Some(fby) = next_finish_by {
102-
let fby_timeout = fby.saturating_duration_since(Instant::now());
103-
if timeout == -1
104-
|| fby_timeout < Duration::from_millis(timeout.try_into().unwrap_or(0))
105-
{
106-
timeout = fby_timeout.as_millis().try_into().unwrap_or(c_int::MAX);
93+
let timeout = if tmp_failure {
94+
// If there are jobs on the queue we haven't been able to run for temporary
95+
// reasons, then wait a short amount of time and try again.
96+
WAIT_TIMEOUT * 1000
97+
} else {
98+
// The earliest finish_by time of any running process (i.e. the process that will timeout
99+
// the soonest).
100+
let mut next_finish_by = None;
101+
for i in 0..self.running.len() {
102+
if let Some(Job { finish_by, .. }) = self.running[i] {
103+
if next_finish_by.is_none() || Some(finish_by) < next_finish_by {
104+
next_finish_by = Some(finish_by);
105+
}
106+
}
107107
}
108-
}
108+
109+
if let Some(x) = next_finish_by {
110+
x.saturating_duration_since(Instant::now())
111+
.as_millis()
112+
.try_into()
113+
.unwrap_or(c_int::MAX)
114+
} else {
115+
// No running jobs. Wait indefinitely until someone tells us to do something.
116+
-1
117+
}
118+
};
119+
109120
poll(&mut self.pollfds, timeout).ok();
110121

111122
self.check_for_sighup();
@@ -174,11 +185,27 @@ impl JobRunner {
174185
}
175186
}
176187

188+
// Has the HTTP server told us that we should check for new jobs and/or SIGCHLD/SIGHUP
189+
// has been received?
190+
match self.pollfds[self.maxjobs * 2].revents() {
191+
Some(flags) if flags == PollFlags::POLLIN => {
192+
check_queue = true;
193+
// It's fine for us to drain the event pipe completely: we'll process all the
194+
// events it contains below.
195+
loop {
196+
match nix::unistd::read(self.snare.event_read_fd, &mut *buf) {
197+
Ok(0) | Err(_) => break,
198+
Ok(_) => (),
199+
}
200+
}
201+
}
202+
_ => (),
203+
}
204+
177205
// Iterate over the running jobs and:
178206
// * If any jobs have exceeded their timeout, send them SIGTERM.
179207
// * If there are jobs whose stderr/stdout have closed, keep waiting on them until
180208
// they exit.
181-
next_finish_by = None;
182209
for i in 0..self.running.len() {
183210
if let Some(Job {
184211
finish_by,
@@ -188,8 +215,6 @@ impl JobRunner {
188215
{
189216
if finish_by <= Instant::now() {
190217
kill(Pid::from_raw(child.id() as i32), Signal::SIGTERM).ok();
191-
} else if next_finish_by.is_none() || Some(finish_by) < next_finish_by {
192-
next_finish_by = Some(finish_by);
193218
}
194219
}
195220

@@ -254,35 +279,18 @@ impl JobRunner {
254279
}
255280
}
256281

257-
// Has the HTTP server told us that we should check for new jobs and/or SIGCHLD/SIGHUP
258-
// has been received?
259-
match self.pollfds[self.maxjobs * 2].revents() {
260-
Some(flags) if flags == PollFlags::POLLIN => {
261-
check_queue = true;
262-
// It's fine for us to drain the event pipe completely: we'll process all the
263-
// events it contains.
264-
loop {
265-
match nix::unistd::read(self.snare.event_read_fd, &mut *buf) {
266-
Ok(0) | Err(_) => break,
267-
Ok(_) => (),
268-
}
269-
}
270-
}
271-
_ => (),
272-
}
273-
274282
// Should we check the queue? This could be because we were previously unable to empty
275283
// it fully, or because the HTTP server has told us that there might be new jobs.
276284
// However, it's only worth us checking the queue (which requires a lock) if there's
277285
// space for us to run further jobs.
278286
if check_queue && self.num_running < self.maxjobs {
279287
match self.try_pop_queue() {
280288
(false, false) => {
281-
check_queue = false;
289+
check_queue = true;
282290
tmp_failure = false;
283291
}
284292
(true, false) => {
285-
check_queue = true;
293+
check_queue = false;
286294
tmp_failure = false;
287295
}
288296
(true, true) => {

0 commit comments

Comments
 (0)