Skip to content

Commit b7e532e

Browse files
[#105]: added bpf maps pinnner function
1 parent 0bac72d commit b7e532e

File tree

2 files changed

+66
-15
lines changed

2 files changed

+66
-15
lines changed

core/src/components/identity/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ COPY conntracker /usr/src/cortexbrain-identity-service/conntracker
3030

3131
# Set environment variable
3232
ENV BPF_PATH="/usr/src/cortexbrain-identity-service/conntracker"
33+
ENV PIN_MAP_PATH="/sys/fs/bpf/cortexbrain-identity-service/"
3334

3435
# Default command
3536
CMD ["cortexflow-identity-service"]

core/src/components/identity/src/main.rs

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,27 @@ use aya::{
2222
programs::{KProbe, SchedClassifier, TcAttachType, tc::SchedClassifierLinkId},
2323
util::online_cpus,
2424
};
25+
use libc::signal;
2526

2627
use crate::helpers::{display_events, display_veth_events, get_veth_channels};
2728
use bytes::BytesMut;
2829
use std::{
2930
convert::TryInto,
30-
path::Path,
31+
path::{Path, PathBuf},
3132
sync::{
3233
Arc, Mutex,
3334
atomic::{AtomicBool, Ordering},
3435
},
3536
};
3637

37-
use anyhow::{Context, Ok};
38+
use anyhow::{Context, Error, Ok};
3839
use tokio::{fs, signal};
3940
use tracing::{error, info};
4041
use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan};
4142

4243
const BPF_PATH: &str = "BPF_PATH"; //BPF env path
44+
const PIN_MAP_PATH: &str = "PIN_MAP_PATH";
45+
4346
use std::collections::HashMap;
4447

4548
#[tokio::main]
@@ -70,24 +73,38 @@ async fn main() -> Result<(), anyhow::Error> {
7073

7174
//init bpf data
7275
let bpf = Arc::new(Mutex::new(Bpf::load(&data)?));
76+
let bpf_map_save_path =
77+
std::env::var(PIN_MAP_PATH).context("BPF_PATH environment variable required")?;
7378

7479
match init_bpf_maps(bpf.clone()) {
7580
std::result::Result::Ok(bpf_maps) => {
7681
info!("Successfully loaded bpf maps");
7782

78-
//load veth_trace program ref veth_trace.rs
79-
init_veth_tracer(bpf.clone()).await?;
80-
81-
let interfaces = get_veth_channels();
82-
83-
info!("Found interfaces: {:?}", interfaces);
84-
init_tc_classifier(bpf.clone(), interfaces, link_ids.clone())
85-
.await
86-
.context("An error occured during the execution of attach_bpf_program function")?;
87-
88-
event_listener(bpf_maps, link_ids.clone(), bpf.clone())
89-
.await
90-
.context("Error initializing event_listener")?;
83+
//TODO: save the bpf maps in a Vec instead of using a tuple
84+
match map_pinner(&bpf_maps, &bpf_map_save_path.into()).await {
85+
std::result::Result::Ok(_) => {
86+
info!("maps pinned successfully");
87+
//load veth_trace program ref veth_trace.rs
88+
init_veth_tracer(bpf.clone()).await?;
89+
90+
let interfaces = get_veth_channels();
91+
92+
info!("Found interfaces: {:?}", interfaces);
93+
init_tc_classifier(bpf.clone(), interfaces, link_ids.clone())
94+
.await
95+
.context(
96+
"An error occured during the execution of attach_bpf_program function",
97+
)?;
98+
99+
event_listener(bpf_maps, link_ids.clone(), bpf.clone())
100+
.await
101+
.context("Error initializing event_listener")?;
102+
}
103+
Err(e) => {
104+
error!("Error while pinning bpf_maps: {}", e);
105+
signal::ctrl_c();
106+
}
107+
}
91108
}
92109
Err(e) => {
93110
error!("Error while loading bpf maps {}", e);
@@ -218,6 +235,14 @@ async fn event_listener(
218235
*/
219236

220237
info!("Preparing perf_buffers and perf_arrays");
238+
239+
//TODO: try to change from PerfEventArray to a RingBuffer data structure
240+
//let m0=bpf_maps[0];
241+
//let m1 = bpf_maps[1];
242+
//let mut ring1=RingBuf::try_from(m0)?;
243+
//let mut ring2=RingBuf::try_from(m1)?;
244+
245+
//TODO:create an helper function that initialize the data structures and the running
221246
// init PerfEventArrays
222247
let mut perf_veth_array: PerfEventArray<MapData> = PerfEventArray::try_from(bpf_maps.1)?;
223248
let mut perf_net_events_array: PerfEventArray<MapData> = PerfEventArray::try_from(bpf_maps.0)?;
@@ -287,3 +312,28 @@ async fn event_listener(
287312

288313
Ok(())
289314
}
315+
316+
//TODO: save bpf maps path in the cli metadata
317+
//takes an array of bpf maps and pin them to persiste session data
318+
//TODO: change maps type with a Vec<Map> instead of (Map,Map). This method is only for fast development and it's not optimized
319+
320+
//chmod 700 <path> to setup the permissions to pin maps TODO:add this permission in the CLI
321+
async fn map_pinner(maps: &(Map, Map), path: &PathBuf) -> Result<(), Error> {
322+
323+
//FIXME: add exception for already pinned maps
324+
if !path.exists() {
325+
error!("Pin path {:?} does not exist. Creating it...", path);
326+
let _ = fs::create_dir_all(path)
327+
.await
328+
.map_err(|e| error!("Failed to create directory: {}", e));
329+
}
330+
331+
// Costruisci i path completi per le due mappe
332+
let map1_path = path.join("events_map");
333+
let map2_path = path.join("veth_map");
334+
335+
maps.0.pin(&map1_path)?;
336+
maps.1.pin(&map2_path)?;
337+
338+
Ok(())
339+
}

0 commit comments

Comments
 (0)