Skip to content

Commit 72e4e56

Browse files
committed
Use sched_setaffinity on tokio blocking threads
1 parent 511a6a1 commit 72e4e56

5 files changed

Lines changed: 63 additions & 24 deletions

File tree

Cargo.lock

Lines changed: 14 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ log = "0.4.17"
200200
memchr = "2"
201201
mimalloc = "0.1.39"
202202
nohash-hasher = "0.2"
203+
nix = "0.30"
203204
once_cell = "1.16"
204205
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] }
205206
parse-size = "1.1.0"

crates/core/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ core_affinity = "0.8"
120120
tikv-jemallocator = {workspace = true}
121121
tikv-jemalloc-ctl = {workspace = true}
122122

123+
[target.'cfg(target_os = "linux")'.dependencies]
124+
nix = { workspace = true, features = ["sched"] }
125+
123126
[features]
124127
# Print a warning when doing an unindexed `iter_by_col_range` on a large table.
125128
unindexed_iter_by_col_range_warn = []

crates/core/src/startup.rs

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ fn reload_config<S>(conf_file: &ConfigToml, reload_handle: &reload::Handle<EnvFi
155155
#[derive(Default)]
156156
pub struct Cores {
157157
pub databases: JobCores,
158-
pub tokio_workers: ThreadPoolCores,
159-
pub rayon: ThreadPoolCores,
158+
pub tokio: TokioCores,
159+
pub rayon: RayonCores,
160160
}
161161

162162
impl Cores {
@@ -170,13 +170,25 @@ impl Cores {
170170

171171
let databases = cores.take(frac(1.0 / 8.0)).collect();
172172

173-
let tokio_workers = ThreadPoolCores(Some(cores.take(frac(4.0 / 8.0)).collect()));
173+
let tokio_workers = cores.take(frac(4.0 / 8.0)).collect();
174174

175-
let rayon = ThreadPoolCores(Some(cores.take(frac(1.0 / 8.0)).collect()));
175+
let rayon = RayonCores(Some(cores.take(frac(1.0 / 8.0)).collect()));
176+
177+
#[cfg(target_os = "linux")]
178+
let remaining = cores.try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| {
179+
cpuset.set(core.id).ok()?;
180+
Some(cpuset)
181+
});
182+
183+
let tokio = TokioCores {
184+
workers: Some(tokio_workers),
185+
#[cfg(target_os = "linux")]
186+
blocking: remaining,
187+
};
176188

177189
Some(Self {
178190
databases,
179-
tokio_workers,
191+
tokio,
180192
rayon,
181193
})
182194
}
@@ -192,30 +204,40 @@ fn vec_to_queue(cores: Vec<CoreId>) -> CoreQueue {
192204
}
193205

194206
#[derive(Default)]
195-
pub struct ThreadPoolCores(Option<Vec<CoreId>>);
207+
pub struct TokioCores {
208+
workers: Option<Vec<CoreId>>,
209+
#[cfg(target_os = "linux")]
210+
blocking: Option<nix::sched::CpuSet>,
211+
}
196212

197-
impl ThreadPoolCores {
198-
fn into_setup_fn(self) -> Option<(usize, impl Fn())> {
199-
self.0.map(|cores| {
213+
impl TokioCores {
214+
pub fn configure(self, builder: &mut tokio::runtime::Builder) {
215+
if let Some(cores) = self.workers {
216+
let num = cores.len();
200217
let cores = vec_to_queue(cores);
201-
(cores.len(), move || {
202-
if let Some(core) = cores.pop() {
203-
core_affinity::set_for_current(core);
204-
}
205-
})
206-
})
207-
}
208-
209-
pub fn configure_tokio(self, builder: &mut tokio::runtime::Builder) {
210-
if let Some((num, setup)) = self.into_setup_fn() {
211218
// `on_thread_start` gets called for both async worker threads and blocking threads,
212219
// but the first `worker_threads` threads that tokio spawns are worker threads,
213220
// so this ends up working fine
214-
builder.worker_threads(num).on_thread_start(setup);
221+
builder.worker_threads(num).on_thread_start(move || {
222+
if let Some(core) = cores.pop() {
223+
core_affinity::set_for_current(core);
224+
} else {
225+
#[cfg(target_os = "linux")]
226+
if let Some(cpuset) = &self.blocking {
227+
let this = nix::unistd::Pid::from_raw(0);
228+
let _ = nix::sched::sched_setaffinity(this, cpuset);
229+
}
230+
}
231+
});
215232
}
216233
}
234+
}
235+
236+
#[derive(Default)]
237+
pub struct RayonCores(Option<Vec<CoreId>>);
217238

218-
pub fn configure_rayon(self, tokio_handle: &tokio::runtime::Handle) {
239+
impl RayonCores {
240+
pub fn configure(self, tokio_handle: &tokio::runtime::Handle) {
219241
rayon_core::ThreadPoolBuilder::new()
220242
.thread_name(|_idx| "rayon-worker".to_string())
221243
.spawn_handler(thread_spawn_handler(tokio_handle))

crates/standalone/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ fn main() -> anyhow::Result<()> {
7373
// Create a multi-threaded run loop
7474
let mut builder = Builder::new_multi_thread();
7575
builder.enable_all();
76-
cores.tokio_workers.configure_tokio(&mut builder);
76+
cores.tokio.configure(&mut builder);
7777
let rt = builder.build().unwrap();
78-
cores.rayon.configure_rayon(rt.handle());
78+
cores.rayon.configure(rt.handle());
7979
rt.block_on(async_main(cores.databases))
8080
}

0 commit comments

Comments
 (0)