Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 122 additions & 31 deletions crates/core/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,79 @@ fn reload_config<S>(conf_file: &ConfigToml, reload_handle: &reload::Handle<EnvFi
// processes running - this should probably be some sort of flag.
#[must_use]
pub fn pin_threads() -> Cores {
Cores::get().unwrap_or_default()
pin_threads_with_reservations(CoreReservations::default())
}

/// Like [`pin_threads`], but with a custom [`CoreReservations`].
#[must_use]
pub fn pin_threads_with_reservations(reservations: CoreReservations) -> Cores {
Cores::get(reservations).unwrap_or_default()
}

/// The desired distribution of available cores to purposes.
///
/// Note that, in addition to `reserved`, [`Cores`] reserves two additional
/// cores for the operating system. That is, the denominator for fractions
/// given below is `num_cpus - reserved - 2`.
pub struct CoreReservations {
/// Cores to run database instances on.
///
/// Default: 1/8
pub databases: f64,
/// Cores to run tokio worker threads on.
///
/// Default: 4/8
pub tokio_workers: f64,
/// Cores to run rayon threads on.
///
/// Default: 1/8
pub rayon: f64,
/// Extra reserved cores.
///
/// If greater than zero, this many cores will be reserved _before_
/// any of the other reservations are made (but after reserving the OS cores).
///
/// Default: 0
pub reserved: usize,
}

impl Default for CoreReservations {
fn default() -> Self {
Self {
databases: 1.0 / 8.0,
tokio_workers: 4.0 / 8.0,
rayon: 1.0 / 8.0,
reserved: 0,
}
}
}

impl CoreReservations {
/// Apply this reservation to an arbitrary list of core ids.
///
/// Returns the allocated cores in the order:
///
/// - reserved
/// - databases
/// - tokio_workers
/// - rayon
///
/// Left public for testing and debugging purposes.
pub fn apply(&self, cores: &mut Vec<CoreId>) -> [Vec<CoreId>; 4] {
let reserved = cores.drain(..self.reserved).collect_vec();

let total = cores.len() as f64;
let frac = |frac: f64| (total * frac).ceil() as usize;
fn claim(cores: &mut Vec<CoreId>, n: usize) -> impl Iterator<Item = CoreId> + '_ {
cores.drain(..n.min(cores.len()))
}

let databases = claim(cores, frac(self.databases)).collect_vec();
let tokio_workers = claim(cores, frac(self.tokio_workers)).collect_vec();
let rayon = claim(cores, frac(self.rayon)).collect_vec();

[reserved, databases, tokio_workers, rayon]
}
}

/// A type holding cores divvied up into different sets.
Expand All @@ -178,48 +250,43 @@ pub fn pin_threads() -> Cores {
#[derive(Default)]
pub struct Cores {
/// The cores to run database instances on.
///
/// Currently, this is 1/8 of num_cpus.
pub databases: JobCores,
/// The cores to run tokio worker and blocking threads on.
///
/// Currently, tokio worker threads are 4/8 of num_cpus, and tokio blocking
/// threads are pinned non-exclusively to 2/8 of num_cpus.
/// The cores to run tokio worker threads on.
pub tokio: TokioCores,
/// The cores to run rayon threads on.
///
/// Currently, this is 1/8 of num_cpus.
pub rayon: RayonCores,
/// Extra cores if a [`CoreReservations`] with `reserved > 0` was used.
///
/// If `Some`, the boxed array is non-empty.
pub reserved: Option<Box<[CoreId]>>,
/// Cores shared between tokio runtimes to schedule blocking tasks on.
///
/// All remaining cores after [`CoreReservations`] have been made become
/// blocking cores.
///
/// See `Tokio.blocking` for more context.
#[cfg(target_os = "linux")]
pub blocking: Option<nix::sched::CpuSet>,
}

impl Cores {
fn get() -> Option<Self> {
let cores = &mut core_affinity::get_core_ids()
.filter(|cores| cores.len() >= 10)?
.into_iter()
// We reserve the first two cores for the OS.
// This allows us to pin interrupt handlers (IRQs) to these cores,
// particularly those for incoming network traffic,
// preventing them from preempting the main reducer threads.
.filter(|core_id| core_id.id > 1)
.collect_vec()
.into_iter();

let total = cores.len() as f64;
let frac = |frac: f64| (total * frac).ceil() as usize;
fn get(reservations: CoreReservations) -> Option<Self> {
let mut cores = Self::get_core_ids()?;

let databases = cores.take(frac(1.0 / 8.0)).collect();
let [reserved, databases, tokio_workers, rayon] = reservations.apply(&mut cores);

let tokio_workers = cores.take(frac(4.0 / 8.0)).collect();

let rayon = RayonCores(Some(cores.take(frac(1.0 / 8.0)).collect()));
let reserved = (!reserved.is_empty()).then(|| reserved.into());
let databases = databases.into_iter().collect::<JobCores>();
let rayon = RayonCores((!rayon.is_empty()).then_some(rayon));

// see comment on `TokioCores.blocking`
#[cfg(target_os = "linux")]
let remaining = cores.try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
cpuset.set(core.id).ok()?;
Some(cpuset)
});
let remaining = cores
.into_iter()
.try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
cpuset.set(core.id).ok()?;
Some(cpuset)
});

let tokio = TokioCores {
workers: Some(tokio_workers),
Expand All @@ -231,8 +298,32 @@ impl Cores {
databases,
tokio,
rayon,
reserved,
#[cfg(target_os = "linux")]
blocking: remaining,
})
}

/// Get the cores of the local host, as reported by the operating system.
///
/// Cores 0 and 1 are not included in the returned vec, as we reserve them
/// for the operating system.
///
/// Returns `None` if `num_cpus - 2` is less than 8.
/// If `Some` is returned, the `Vec` is non-empty.
pub fn get_core_ids() -> Option<Vec<CoreId>> {
let cores = core_affinity::get_core_ids()
.filter(|cores| cores.len() >= 10)?
.into_iter()
// We reserve the first two cores for the OS.
// This allows us to pin interrupt handlers (IRQs) to these cores,
// particularly those for incoming network traffic,
// preventing them from preempting the main reducer threads.
.filter(|core_id| core_id.id > 1)
.collect_vec();

(!cores.is_empty()).then_some(cores)
}
}

#[derive(Default)]
Expand Down
Loading