Skip to content

Commit 519ff95

Browse files
committed
feat(bpf): move bpf worker from tokio to sync thread
Some performance tests that we have done lately show that the tokio worker threads are chugging because our workload is very CPU intensive. In this case the recommendation is to use regular threads to prevent wasting cycles on the tokio scheduler. This change is a first step to push tokio down to the absolute minimum requirement (pretty much just needed for the gRPC output at the moment) by switching the bpf worker to a dedicated thread that will read and parse events coming from the kernel.
1 parent 1a2fa30 commit 519ff95

5 files changed

Lines changed: 88 additions & 53 deletions

File tree

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env_logger = { version = "0.11.5", default-features = false, features = ["humant
1919
glob = "0.3.3"
2020
globset = "0.4.18"
2121
governor = "0.10.4"
22+
epoll = "4.4.0"
2223
http-body-util = "0.1.3"
2324
hyper = { version = "1.6.0", default-features = false }
2425
hyper-tls = "0.6.0"

fact/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ serde_json = { workspace = true }
3333
shlex = { workspace = true }
3434
uuid = { workspace = true }
3535
yaml-rust2 = { workspace = true }
36+
epoll = { workspace = true }
3637

3738
fact-api = { path = "../fact-api" }
3839
fact-ebpf = { path = "../fact-ebpf" }

fact/src/bpf/mod.rs

Lines changed: 63 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
use std::{io, path::PathBuf};
1+
use std::{
2+
io,
3+
os::fd::AsRawFd,
4+
path::PathBuf,
5+
thread::{self, JoinHandle},
6+
};
27

38
use anyhow::{Context, bail};
49
use aya::{
@@ -10,11 +15,7 @@ use checks::Checks;
1015
use globset::{Glob, GlobSet, GlobSetBuilder};
1116
use libc::c_char;
1217
use log::{error, info, warn};
13-
use tokio::{
14-
io::unix::AsyncFd,
15-
sync::{mpsc, watch},
16-
task::JoinHandle,
17-
};
18+
use tokio::sync::{mpsc, watch};
1819

1920
use crate::{config::BpfConfig, event::Event, host_info, metrics::EventCounter};
2021

@@ -211,36 +212,56 @@ impl Bpf {
211212
// Gather events from the ring buffer and print them out.
212213
pub fn start(
213214
mut self,
214-
mut running: watch::Receiver<bool>,
215+
running: watch::Receiver<bool>,
215216
event_counter: EventCounter,
216217
) -> JoinHandle<anyhow::Result<()>> {
217218
info!("Starting BPF worker...");
218219

219-
tokio::spawn(async move {
220-
let rb = self.take_ringbuffer()?;
221-
let mut fd = AsyncFd::new(rb)?;
220+
thread::spawn(move || {
221+
let mut rb = self.take_ringbuffer()?;
222+
223+
let rb_event = epoll::Event::new(epoll::Events::EPOLLIN, 0);
224+
let poller = match epoll::create(false) {
225+
Ok(p) => p,
226+
Err(e) => bail!("Failed to create epoll: {e:?}"),
227+
};
228+
if let Err(e) = epoll::ctl(
229+
poller,
230+
epoll::ControlOptions::EPOLL_CTL_ADD,
231+
rb.as_raw_fd(),
232+
rb_event,
233+
) {
234+
bail!("Failed to add ringbuffer to epoll: {e:?}");
235+
}
222236

223237
loop {
224-
tokio::select! {
225-
guard = fd.readable_mut() => {
226-
let mut guard = guard
227-
.context("ringbuffer guard held while runtime is stopping")?;
228-
let ringbuf = guard.get_inner_mut();
229-
while let Some(event) = ringbuf.next() {
238+
if running.has_changed()? && !*running.borrow() {
239+
info!("Stopping BPF worker...");
240+
break;
241+
}
242+
243+
if self.paths_config.has_changed()? {
244+
self.load_paths().context("Failed to load paths")?;
245+
}
246+
247+
match epoll::wait(poller, 100, &mut [rb_event]) {
248+
Ok(n) if n != 0 => {
249+
while let Some(event) = rb.next() {
230250
let event: &event_t = unsafe { &*(event.as_ptr() as *const _) };
231251
let event = match Event::try_from(event) {
232252
Ok(event) => {
233253
// If the event is monitored by parent, we need to check
234254
// its host path, but we don't have that context here,
235255
// so we let the event go into HostScanner and make the
236256
// decision there.
237-
if !event.is_monitored_by_parent() &&
238-
event.is_ignored(&self.paths_globset) {
257+
if !event.is_monitored_by_parent()
258+
&& event.is_ignored(&self.paths_globset)
259+
{
239260
event_counter.dropped();
240261
continue;
241262
}
242263
event
243-
},
264+
}
244265
Err(e) => {
245266
error!("Failed to parse event: '{e}'");
246267
event_counter.dropped();
@@ -249,25 +270,18 @@ impl Bpf {
249270
};
250271

251272
event_counter.added();
252-
if self.tx.send(event).await.is_err() {
273+
if self.tx.blocking_send(event).is_err() {
253274
info!("No BPF consumers left, stopping...");
254275
break;
255276
}
256277
}
257-
guard.clear_ready();
258-
},
259-
_ = self.paths_config.changed() => {
260-
self.load_paths().context("Failed to load paths")?;
261-
},
262-
_ = running.changed() => {
263-
if !*running.borrow() {
264-
info!("Stopping BPF worker...");
265-
break;
266-
}
267-
},
278+
}
279+
Ok(_) => {}
280+
Err(e) => bail!("Failed to wait for ringbuffer events: {e:?}"),
268281
}
269282
}
270283

284+
info!("Stopping BPF worker...");
271285
Ok(())
272286
})
273287
}
@@ -289,8 +303,8 @@ mod bpf_tests {
289303

290304
use super::*;
291305

292-
#[tokio::test]
293-
async fn test_basic() {
306+
#[test]
307+
fn test_basic() {
294308
if let Ok(value) = std::env::var("FACT_LOGLEVEL") {
295309
let value = value.to_lowercase();
296310
if value == "debug" || value == "trace" {
@@ -312,7 +326,7 @@ mod bpf_tests {
312326

313327
let handle = bpf.start(run_rx, exporter.metrics.bpf_worker.clone());
314328

315-
tokio::time::sleep(Duration::from_millis(500)).await;
329+
thread::sleep(Duration::from_millis(500));
316330

317331
// Create a file
318332
let file = NamedTempFile::new_in(&monitored_path).expect("Failed to create temporary file");
@@ -383,24 +397,26 @@ mod bpf_tests {
383397
// Close the file, removing it
384398
file.close().expect("Failed to close temp file");
385399

386-
let wait = timeout(Duration::from_secs(1), async move {
387-
for expected in expected_events {
388-
println!("expected: {expected:#?}");
389-
while let Some(event) = rx.recv().await {
390-
println!("{event:#?}");
391-
if event == expected {
392-
println!("Found!");
393-
break;
400+
tokio::runtime::Runtime::new().unwrap().block_on(async {
401+
let wait = timeout(Duration::from_secs(1), async {
402+
for expected in expected_events {
403+
println!("expected: {expected:#?}");
404+
while let Some(event) = rx.recv().await {
405+
println!("{event:#?}");
406+
if event == expected {
407+
println!("Found!");
408+
break;
409+
}
394410
}
395411
}
412+
});
413+
414+
tokio::select! {
415+
res = wait => res.unwrap(),
396416
}
397417
});
398418

399-
tokio::select! {
400-
res = wait => res.unwrap(),
401-
res = handle => res.unwrap().unwrap(),
402-
}
403-
404419
run_tx.send(false).unwrap();
420+
handle.join().unwrap().unwrap();
405421
}
406422
}

fact/src/lib.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,22 +109,28 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> {
109109
let mut host_scanner_handle = host_scanner.start();
110110
let mut rate_limiter_handle = rate_limiter.start();
111111
endpoints::Server::new(exporter.clone(), reloader.endpoint(), running.subscribe()).start();
112-
let mut bpf_handle = bpf.start(running.subscribe(), exporter.metrics.bpf_worker.clone());
112+
let bpf_handle = bpf.start(running.subscribe(), exporter.metrics.bpf_worker.clone());
113113
reloader.start(running.subscribe());
114114

115+
let (bpf_shutdown_tx, mut bpf_shutdown_rx) =
116+
tokio::sync::mpsc::channel::<anyhow::Result<()>>(1);
117+
tokio::task::spawn_blocking(move || {
118+
let res = bpf_handle.join().unwrap();
119+
bpf_shutdown_tx.blocking_send(res).unwrap();
120+
});
121+
115122
let mut sigterm = signal(SignalKind::terminate())?;
116123
let mut sighup = signal(SignalKind::hangup())?;
117124
loop {
118125
tokio::select! {
119126
_ = tokio::signal::ctrl_c() => break,
120127
_ = sigterm.recv() => break,
121128
_ = sighup.recv() => config_trigger.notify_one(),
122-
res = bpf_handle.borrow_mut() => {
129+
res = bpf_shutdown_rx.recv() => {
123130
match res {
124-
Ok(res) => if let Err(e) = res {
125-
warn!("BPF worker errored out: {e:?}");
126-
}
127-
Err(e) => warn!("BPF task errored out: {e:?}"),
131+
Some(Ok(())) => info!("BPF worker finished"),
132+
Some(Err(e)) => warn!("BPF worker errored out: {e:?}"),
133+
None => warn!("BPF worker channel closed"),
128134
}
129135
break;
130136
}

0 commit comments

Comments
 (0)