Skip to content

Commit dcf94a9

Browse files
committed
Use a slightly better data structure for JobCores
1 parent 655b48f commit dcf94a9

1 file changed

Lines changed: 45 additions & 35 deletions

File tree

crates/core/src/util/jobs.rs

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::cmp;
21
use std::sync::{Arc, Mutex, Weak};
32

43
use core_affinity::CoreId;
@@ -28,18 +27,19 @@ pub struct JobCores {
2827
struct JobCoresInner {
2928
/// A map to the repin_tx for each job thread
3029
job_threads: HashMap<JobThreadId, watch::Sender<CoreId>>,
31-
/// Kept sorted by `CoreInfo.jobs.len()`, in ascending order
3230
cores: IndexMap<CoreId, CoreInfo>,
31+
/// An index into `cores` of the next core to put a new job onto.
32+
///
33+
/// This acts as a partition point in `cores`; all cores in `..index` have
34+
/// one fewer job on them than the cores in `index..`.
35+
next_core: usize,
3336
next_id: JobThreadId,
3437
}
3538

3639
#[derive(Default)]
3740
struct CoreInfo {
3841
jobs: SmallVec<[JobThreadId; 4]>,
3942
}
40-
fn cores_cmp(_: &CoreId, v1: &CoreInfo, _: &CoreId, v2: &CoreInfo) -> cmp::Ordering {
41-
Ord::cmp(&v1.jobs.len(), &v2.jobs.len())
42-
}
4343

4444
#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
4545
struct JobThreadId(usize);
@@ -49,18 +49,7 @@ impl JobCores {
4949
pub fn take(&self) -> JobCore {
5050
let inner = if let Some(inner) = &self.inner {
5151
let cores = Arc::downgrade(inner);
52-
let mut inner = inner.lock().unwrap();
53-
54-
let id = inner.next_id;
55-
inner.next_id.0 += 1;
56-
57-
let (&core_id, core) = inner.cores.first_mut().unwrap();
58-
core.jobs.push(id);
59-
inner.cores.sort_by(cores_cmp);
60-
61-
let (repin_tx, repin_rx) = watch::channel(core_id);
62-
inner.job_threads.insert(id, repin_tx);
63-
52+
let (id, repin_rx) = inner.lock().unwrap().allocate();
6453
Some(JobCoreInner {
6554
repin_rx,
6655
_guard: JobCoreGuard { cores, id },
@@ -80,6 +69,7 @@ impl FromIterator<CoreId> for JobCores {
8069
Arc::new(Mutex::new(JobCoresInner {
8170
job_threads: HashMap::default(),
8271
cores,
72+
next_core: 0,
8373
next_id: JobThreadId(0),
8474
}))
8575
});
@@ -88,31 +78,51 @@ impl FromIterator<CoreId> for JobCores {
8878
}
8979

9080
impl JobCoresInner {
81+
fn allocate(&mut self) -> (JobThreadId, watch::Receiver<CoreId>) {
82+
let id = self.next_id;
83+
self.next_id.0 += 1;
84+
85+
let (&core_id, core) = self.cores.get_index_mut(self.next_core).unwrap();
86+
core.jobs.push(id);
87+
self.next_core = (self.next_core + 1) % self.cores.len();
88+
89+
let (repin_tx, repin_rx) = watch::channel(core_id);
90+
self.job_threads.insert(id, repin_tx);
91+
92+
(id, repin_rx)
93+
}
94+
9195
/// Run when a `JobThread` exits.
92-
fn on_thread_exit(&mut self, id: JobThreadId) {
96+
fn deallocate(&mut self, id: JobThreadId) {
9397
let core_id = *self.job_threads.remove(&id).unwrap().borrow();
9498

9599
let core_index = self.cores.get_index_of(&core_id).unwrap();
96-
let last_index = self.cores.len() - 1;
97-
98-
// `last_core` will be Some if `core_index` is not `last_index`
99-
// FIXME(noa): this will fail to level sometimes; we should keep a partition point and
100-
// manually move cores before it when they're low and above it when they're high.
101-
let (core, last_core) = match self.cores.get_disjoint_indices_mut([core_index, last_index]) {
102-
Ok([(_, core), (_, last)]) => (core, Some(last)),
103-
Err(_) => (&mut self.cores[core_index], None),
104-
};
105100

106-
let job_pos = core.jobs.iter().position(|x| *x == id).unwrap();
101+
// This core is now less busy than it should be - bump `next_core` back
102+
// by 1 and steal a thread from the core there.
103+
//
104+
// This wraps around in the 0 case, so the partition point is simply
105+
// moved to the end of the ring buffer.
106+
107+
let steal_from = self.next_core.checked_sub(1).unwrap_or(self.cores.len() - 1);
108+
109+
if let Ok([(_, core), (_, steal_from)]) = self.cores.get_disjoint_indices_mut([core_index, steal_from]) {
110+
// This unwrap will never fail, since cores below `next_core` always have
111+
// at least 1 thread on them. Edge case: if `next_core` is 0, `steal_from`
112+
// would wrap around to the end - but when `next_core` is 0, every core has
113+
// the same number of threads; so, if the last core is empty, all the cores
114+
// would be empty, but we know that's impossible because we're deallocating
115+
// a thread right now.
116+
let stolen = steal_from.jobs.pop().unwrap();
117+
118+
let pos = core.jobs.iter().position(|x| *x == id).unwrap();
119+
core.jobs[pos] = stolen;
107120

108-
if let Some(job) = last_core.and_then(|last| last.jobs.pop()) {
109-
core.jobs[job_pos] = job;
110-
let sender = self.job_threads.get_mut(&job).unwrap();
111-
sender.send_replace(core_id);
121+
self.job_threads[&stolen].send_replace(core_id);
112122
} else {
113-
core.jobs.remove(job_pos);
123+
// this core was already at `next_core - 1` - nothing needs to be done!
124+
self.next_core = steal_from;
114125
}
115-
self.cores.sort_by(cores_cmp);
116126
}
117127
}
118128

@@ -191,7 +201,7 @@ struct JobCoreGuard {
191201
impl Drop for JobCoreGuard {
192202
fn drop(&mut self) {
193203
if let Some(cores) = self.cores.upgrade() {
194-
cores.lock().unwrap().on_thread_exit(self.id);
204+
cores.lock().unwrap().deallocate(self.id);
195205
}
196206
}
197207
}

0 commit comments

Comments
 (0)