Skip to content

Commit 0c6adfc

Browse files
committed
feat(bpf): move bpf worker from tokio to sync thread
Some performance tests that we have done lately show that the tokio worker threads are chugging because our workload is very CPU intensive. In this case the recommendation is to use regular threads to prevent wasting cycles on the tokio scheduler. This change is a first step to push tokio down to the absolute minimum requirement (pretty much just needed for the gRPC output at the moment) by switching the bpf worker to a dedicated thread that will read and parse events coming from the kernel.
1 parent c83080d commit 0c6adfc

5 files changed

Lines changed: 83 additions & 50 deletions

File tree

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ aya = { version = "0.13.1", default-features = false }
1616
anyhow = { version = "1", default-features = false, features = ["std", "backtrace"] }
1717
clap = { version = "4.5.41", features = ["derive", "env"] }
1818
env_logger = { version = "0.11.5", default-features = false, features = ["humantime"] }
19+
epoll = "4.4.0"
1920
http-body-util = "0.1.3"
2021
hyper = { version = "1.6.0", default-features = false }
2122
hyper-tls = "0.6.0"

fact/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ serde = { workspace = true }
2929
serde_json = { workspace = true }
3030
uuid = { workspace = true }
3131
yaml-rust2 = { workspace = true }
32+
epoll = { workspace = true }
3233

3334
fact-api = { path = "../fact-api" }
3435
fact-ebpf = { path = "../fact-ebpf" }

fact/src/bpf/mod.rs

Lines changed: 59 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
use std::{io, path::PathBuf};
1+
use std::{
2+
io,
3+
os::fd::AsRawFd,
4+
path::PathBuf,
5+
thread::{self, JoinHandle},
6+
};
27

38
use anyhow::{bail, Context};
49
use aya::{
@@ -9,11 +14,7 @@ use aya::{
914
use checks::Checks;
1015
use libc::c_char;
1116
use log::{error, info};
12-
use tokio::{
13-
io::unix::AsyncFd,
14-
sync::{mpsc, watch},
15-
task::JoinHandle,
16-
};
17+
use tokio::sync::{mpsc, watch};
1718

1819
use crate::{event::Event, host_info, metrics::EventCounter};
1920

@@ -172,25 +173,44 @@ impl Bpf {
172173
// Gather events from the ring buffer and print them out.
173174
pub fn start(
174175
mut self,
175-
mut running: watch::Receiver<bool>,
176+
running: watch::Receiver<bool>,
176177
event_counter: EventCounter,
177178
) -> JoinHandle<anyhow::Result<()>> {
178179
info!("Starting BPF worker...");
179180

180-
tokio::spawn(async move {
181+
thread::spawn(move || {
181182
self.attach_progs()
182183
.context("Failed to attach ebpf programs")?;
183184

184-
let rb = self.take_ringbuffer()?;
185-
let mut fd = AsyncFd::new(rb)?;
185+
let mut rb = self.take_ringbuffer()?;
186+
187+
let rb_event = epoll::Event::new(epoll::Events::EPOLLIN, 0);
188+
let poller = match epoll::create(false) {
189+
Ok(p) => p,
190+
Err(e) => bail!("Failed to create epoll: {e:?}"),
191+
};
192+
if let Err(e) = epoll::ctl(
193+
poller,
194+
epoll::ControlOptions::EPOLL_CTL_ADD,
195+
rb.as_raw_fd(),
196+
rb_event,
197+
) {
198+
bail!("Failed to add ringbuffer to epoll: {e:?}");
199+
}
186200

187201
loop {
188-
tokio::select! {
189-
guard = fd.readable_mut() => {
190-
let mut guard = guard
191-
.context("ringbuffer guard held while runtime is stopping")?;
192-
let ringbuf = guard.get_inner_mut();
193-
while let Some(event) = ringbuf.next() {
202+
if running.has_changed()? && !*running.borrow() {
203+
info!("Stopping BPF worker...");
204+
break;
205+
}
206+
207+
if self.paths_config.has_changed()? {
208+
self.load_paths().context("Failed to load paths")?;
209+
}
210+
211+
match epoll::wait(poller, 100, &mut [rb_event]) {
212+
Ok(n) if n != 0 => {
213+
while let Some(event) = rb.next() {
194214
let event: &event_t = unsafe { &*(event.as_ptr() as *const _) };
195215
let event = match Event::try_from(event) {
196216
Ok(event) => event,
@@ -202,25 +222,18 @@ impl Bpf {
202222
};
203223

204224
event_counter.added();
205-
if self.tx.send(event).await.is_err() {
225+
if self.tx.blocking_send(event).is_err() {
206226
info!("No BPF consumers left, stopping...");
207227
break;
208228
}
209229
}
210-
guard.clear_ready();
211-
},
212-
_ = self.paths_config.changed() => {
213-
self.load_paths().context("Failed to load paths")?;
214-
},
215-
_ = running.changed() => {
216-
if !*running.borrow() {
217-
info!("Stopping BPF worker...");
218-
break;
219-
}
220-
},
230+
}
231+
Ok(_) => {}
232+
Err(e) => bail!("Failed to wait for ringbuffer events: {e:?}"),
221233
}
222234
}
223235

236+
info!("Stopping BPF worker...");
224237
Ok(())
225238
})
226239
}
@@ -242,8 +255,8 @@ mod bpf_tests {
242255

243256
use super::*;
244257

245-
#[tokio::test]
246-
async fn test_basic() {
258+
#[test]
259+
fn test_basic() {
247260
if let Ok(value) = std::env::var("FACT_LOGLEVEL") {
248261
let value = value.to_lowercase();
249262
if value == "debug" || value == "trace" {
@@ -266,7 +279,7 @@ mod bpf_tests {
266279

267280
let handle = bpf.start(run_rx, exporter.metrics.bpf_worker.clone());
268281

269-
tokio::time::sleep(Duration::from_millis(500)).await;
282+
thread::sleep(Duration::from_millis(500));
270283

271284
// Create a file
272285
let file = NamedTempFile::new_in(monitored_path).expect("Failed to create temporary file");
@@ -316,24 +329,26 @@ mod bpf_tests {
316329
// Close the file, removing it
317330
file.close().expect("Failed to close temp file");
318331

319-
let wait = timeout(Duration::from_secs(1), async move {
320-
for expected in expected_events {
321-
println!("expected: {expected:#?}");
322-
while let Some(event) = rx.recv().await {
323-
println!("{event:#?}");
324-
if event == expected {
325-
println!("Found!");
326-
break;
332+
tokio::runtime::Runtime::new().unwrap().block_on(async {
333+
let wait = timeout(Duration::from_secs(1), async {
334+
for expected in expected_events {
335+
println!("expected: {expected:#?}");
336+
while let Some(event) = rx.recv().await {
337+
println!("{event:#?}");
338+
if event == expected {
339+
println!("Found!");
340+
break;
341+
}
327342
}
328343
}
344+
});
345+
346+
tokio::select! {
347+
res = wait => res.unwrap(),
329348
}
330349
});
331350

332-
tokio::select! {
333-
res = wait => res.unwrap(),
334-
res = handle => res.unwrap().unwrap(),
335-
}
336-
337351
run_tx.send(false).unwrap();
352+
handle.join().unwrap().unwrap();
338353
}
339354
}

fact/src/lib.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,27 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> {
9494
)?;
9595
let mut host_scanner_handle = host_scanner.start();
9696
endpoints::Server::new(exporter.clone(), reloader.endpoint(), running.subscribe()).start();
97-
let mut bpf_handle = bpf.start(running.subscribe(), exporter.metrics.bpf_worker.clone());
97+
let bpf_handle = bpf.start(running.subscribe(), exporter.metrics.bpf_worker.clone());
9898
reloader.start(running.subscribe());
9999

100+
let (bpf_shutdown_tx, mut bpf_shutdown_rx) = mpsc::channel::<anyhow::Result<()>>(1);
101+
tokio::task::spawn_blocking(move || {
102+
let res = bpf_handle.join().unwrap();
103+
bpf_shutdown_tx.blocking_send(res).unwrap();
104+
});
105+
100106
let mut sigterm = signal(SignalKind::terminate())?;
101107
let mut sighup = signal(SignalKind::hangup())?;
102108
loop {
103109
tokio::select! {
104110
_ = tokio::signal::ctrl_c() => break,
105111
_ = sigterm.recv() => break,
106112
_ = sighup.recv() => config_trigger.notify_one(),
107-
res = bpf_handle.borrow_mut() => {
113+
res = bpf_shutdown_rx.recv() => {
108114
match res {
109-
Ok(res) => if let Err(e) = res {
110-
warn!("BPF worker errored out: {e:?}");
111-
}
112-
Err(e) => warn!("BPF task errored out: {e:?}"),
115+
Some(Ok(())) => info!("BPF worker finished"),
116+
Some(Err(e)) => warn!("BPF worker errored out: {e:?}"),
117+
None => warn!("BPF worker channel closed"),
113118
}
114119
break;
115120
}

0 commit comments

Comments
 (0)