Skip to content

Commit 56f1f60

Browse files
authored
core: Allow adjusting CPU reservation pools (#2907)
1 parent 022ca68 commit 56f1f60

1 file changed

Lines changed: 122 additions & 31 deletions

File tree

crates/core/src/startup.rs

Lines changed: 122 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,79 @@ fn reload_config<S>(conf_file: &ConfigToml, reload_handle: &reload::Handle<EnvFi
169169
// processes running - this should probably be some sort of flag.
170170
#[must_use]
171171
pub fn pin_threads() -> Cores {
172-
Cores::get().unwrap_or_default()
172+
pin_threads_with_reservations(CoreReservations::default())
173+
}
174+
175+
/// Like [`pin_threads`], but with a custom [`CoreReservations`].
176+
#[must_use]
177+
pub fn pin_threads_with_reservations(reservations: CoreReservations) -> Cores {
178+
Cores::get(reservations).unwrap_or_default()
179+
}
180+
181+
/// The desired distribution of available cores to purposes.
182+
///
183+
/// Note that, in addition to `reserved`, [`Cores`] reserves two additional
184+
/// cores for the operating system. That is, the denominator for fractions
185+
/// given below is `num_cpus - reserved - 2`.
186+
pub struct CoreReservations {
187+
/// Cores to run database instances on.
188+
///
189+
/// Default: 1/8
190+
pub databases: f64,
191+
/// Cores to run tokio worker threads on.
192+
///
193+
/// Default: 4/8
194+
pub tokio_workers: f64,
195+
/// Cores to run rayon threads on.
196+
///
197+
/// Default: 1/8
198+
pub rayon: f64,
199+
/// Extra reserved cores.
200+
///
201+
/// If greater than zero, this many cores will be reserved _before_
202+
/// any of the other reservations are made (but after reserving the OS cores).
203+
///
204+
/// Default: 0
205+
pub reserved: usize,
206+
}
207+
208+
impl Default for CoreReservations {
209+
fn default() -> Self {
210+
Self {
211+
databases: 1.0 / 8.0,
212+
tokio_workers: 4.0 / 8.0,
213+
rayon: 1.0 / 8.0,
214+
reserved: 0,
215+
}
216+
}
217+
}
218+
219+
impl CoreReservations {
220+
/// Apply this reservation to an arbitrary list of core ids.
221+
///
222+
/// Returns the allocated cores in the order:
223+
///
224+
/// - reserved
225+
/// - databases
226+
/// - tokio_workers
227+
/// - rayon
228+
///
229+
/// Left public for testing and debugging purposes.
230+
pub fn apply(&self, cores: &mut Vec<CoreId>) -> [Vec<CoreId>; 4] {
231+
let reserved = cores.drain(..self.reserved).collect_vec();
232+
233+
let total = cores.len() as f64;
234+
let frac = |frac: f64| (total * frac).ceil() as usize;
235+
fn claim(cores: &mut Vec<CoreId>, n: usize) -> impl Iterator<Item = CoreId> + '_ {
236+
cores.drain(..n.min(cores.len()))
237+
}
238+
239+
let databases = claim(cores, frac(self.databases)).collect_vec();
240+
let tokio_workers = claim(cores, frac(self.tokio_workers)).collect_vec();
241+
let rayon = claim(cores, frac(self.rayon)).collect_vec();
242+
243+
[reserved, databases, tokio_workers, rayon]
244+
}
173245
}
174246

175247
/// A type holding cores divvied up into different sets.
@@ -178,48 +250,43 @@ pub fn pin_threads() -> Cores {
178250
#[derive(Default)]
179251
pub struct Cores {
180252
/// The cores to run database instances on.
181-
///
182-
/// Currently, this is 1/8 of num_cpus.
183253
pub databases: JobCores,
184-
/// The cores to run tokio worker and blocking threads on.
185-
///
186-
/// Currently, tokio worker threads are 4/8 of num_cpus, and tokio blocking
187-
/// threads are pinned non-exclusively to 2/8 of num_cpus.
254+
/// The cores to run tokio worker threads on.
188255
pub tokio: TokioCores,
189256
/// The cores to run rayon threads on.
190-
///
191-
/// Currently, this is 1/8 of num_cpus.
192257
pub rayon: RayonCores,
258+
/// Extra cores if a [`CoreReservations`] with `reserved > 0` was used.
259+
///
260+
/// If `Some`, the boxed array is non-empty.
261+
pub reserved: Option<Box<[CoreId]>>,
262+
/// Cores shared between tokio runtimes to schedule blocking tasks on.
263+
///
264+
/// All remaining cores after [`CoreReservations`] have been made become
265+
/// blocking cores.
266+
///
267+
/// See `Tokio.blocking` for more context.
268+
#[cfg(target_os = "linux")]
269+
pub blocking: Option<nix::sched::CpuSet>,
193270
}
194271

195272
impl Cores {
196-
fn get() -> Option<Self> {
197-
let cores = &mut core_affinity::get_core_ids()
198-
.filter(|cores| cores.len() >= 10)?
199-
.into_iter()
200-
// We reserve the first two cores for the OS.
201-
// This allows us to pin interrupt handlers (IRQs) to these cores,
202-
// particularly those for incoming network traffic,
203-
// preventing them from preempting the main reducer threads.
204-
.filter(|core_id| core_id.id > 1)
205-
.collect_vec()
206-
.into_iter();
207-
208-
let total = cores.len() as f64;
209-
let frac = |frac: f64| (total * frac).ceil() as usize;
273+
fn get(reservations: CoreReservations) -> Option<Self> {
274+
let mut cores = Self::get_core_ids()?;
210275

211-
let databases = cores.take(frac(1.0 / 8.0)).collect();
276+
let [reserved, databases, tokio_workers, rayon] = reservations.apply(&mut cores);
212277

213-
let tokio_workers = cores.take(frac(4.0 / 8.0)).collect();
214-
215-
let rayon = RayonCores(Some(cores.take(frac(1.0 / 8.0)).collect()));
278+
let reserved = (!reserved.is_empty()).then(|| reserved.into());
279+
let databases = databases.into_iter().collect::<JobCores>();
280+
let rayon = RayonCores((!rayon.is_empty()).then_some(rayon));
216281

217282
// see comment on `TokioCores.blocking`
218283
#[cfg(target_os = "linux")]
219-
let remaining = cores.try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
220-
cpuset.set(core.id).ok()?;
221-
Some(cpuset)
222-
});
284+
let remaining = cores
285+
.into_iter()
286+
.try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
287+
cpuset.set(core.id).ok()?;
288+
Some(cpuset)
289+
});
223290

224291
let tokio = TokioCores {
225292
workers: Some(tokio_workers),
@@ -231,8 +298,32 @@ impl Cores {
231298
databases,
232299
tokio,
233300
rayon,
301+
reserved,
302+
#[cfg(target_os = "linux")]
303+
blocking: remaining,
234304
})
235305
}
306+
307+
/// Get the cores of the local host, as reported by the operating system.
308+
///
309+
/// Cores 0 and 1 are not included in the returned vec, as we reserve them
310+
/// for the operating system.
311+
///
312+
/// Returns `None` if `num_cpus - 2` is less than 8.
313+
/// If `Some` is returned, the `Vec` is non-empty.
314+
pub fn get_core_ids() -> Option<Vec<CoreId>> {
315+
let cores = core_affinity::get_core_ids()
316+
.filter(|cores| cores.len() >= 10)?
317+
.into_iter()
318+
// We reserve the first two cores for the OS.
319+
// This allows us to pin interrupt handlers (IRQs) to these cores,
320+
// particularly those for incoming network traffic,
321+
// preventing them from preempting the main reducer threads.
322+
.filter(|core_id| core_id.id > 1)
323+
.collect_vec();
324+
325+
(!cores.is_empty()).then_some(cores)
326+
}
236327
}
237328

238329
#[derive(Default)]

0 commit comments

Comments
 (0)