Skip to content

Commit 975f4f8

Browse files
[#158]: added program handlers function in the common crate. Remove duplicated code in metrics module
1 parent 2e451fd commit 975f4f8

File tree

10 files changed

+165
-191
lines changed

10 files changed

+165
-191
lines changed

core/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ aya = "0.13.1"
1919

2020
[features]
2121
map-handlers = []
22+
program-handlers = []

core/common/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
pub mod constants;
22
pub mod formatters;
33
pub mod logger;
4+
#[cfg(feature = "map-handlers")]
45
pub mod map_handlers;
6+
#[cfg(feature = "program-handlers")]
7+
pub mod program_handlers;

core/common/src/map_handlers.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@ use tracing::{error, info};
1717
//
1818
// this function init the bpfs maps used in the main program
1919
//
20-
// index 0: events_map
21-
// index 1: veth_map
22-
// index 2: blocklist map
23-
// index 3: tcp_registry map
2420
//
2521

2622
#[cfg(feature = "map-handlers")]
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use aya::{Ebpf, programs::KProbe};
2+
use std::convert::TryInto;
3+
use std::sync::{Arc, Mutex};
4+
use tracing::{error, info};
5+
6+
#[cfg(feature = "program-handlers")]
7+
pub fn load_program(
8+
bpf: Arc<Mutex<Ebpf>>,
9+
program_name: &str,
10+
actual_program: &str,
11+
) -> Result<(), anyhow::Error> {
12+
let mut bpf_new = bpf.lock().expect("Cannot get value from lock");
13+
14+
// Load and attach the eBPF programs
15+
let program: &mut KProbe = bpf_new
16+
.program_mut(program_name)
17+
.ok_or_else(|| anyhow::anyhow!("Program {} not found", program_name))?
18+
.try_into()
19+
.map_err(|e| anyhow::anyhow!("Failed to convert program: {:?}", e))?;
20+
21+
program
22+
.load()
23+
.map_err(|e| anyhow::anyhow!("Cannot load program: {}. Error: {}", &program_name, e))?;
24+
25+
match program.attach(actual_program, 0) {
26+
Ok(_) => info!("{} program attached successfully", actual_program),
27+
Err(e) => {
28+
error!("Error attaching {} program {:?}", actual_program, e);
29+
return Err(anyhow::anyhow!(
30+
"Failed to attach {}: {:?}",
31+
actual_program,
32+
e
33+
));
34+
}
35+
};
36+
37+
info!(
38+
"eBPF program {} loaded and attached successfully",
39+
program_name
40+
);
41+
Ok(())
42+
}

core/src/components/metrics/Cargo.toml

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,21 @@ edition = "2024"
77
aya = "0.13.1"
88
aya-log = "0.2.1"
99
bytes = "1.4"
10-
tokio = { version = "1.48.0", features = ["rt","macros","time","fs","signal","rt-multi-thread"] }
10+
tokio = { version = "1.48.0", features = [
11+
"rt",
12+
"macros",
13+
"time",
14+
"fs",
15+
"signal",
16+
"rt-multi-thread",
17+
] }
1118
anyhow = "1.0"
1219
tracing = "0.1.41"
1320
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
1421
libc = "0.2.172"
1522
bytemuck = "1.23.0"
16-
cortexbrain-common = { path = "../../../common" }
17-
nix ={version="0.30.1",features=["net"]}
23+
cortexbrain-common = { path = "../../../common", features = [
24+
"map-handlers",
25+
"program-handlers",
26+
] }
27+
nix = { version = "0.30.1", features = ["net"] }

core/src/components/metrics/src/helpers.rs

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
1-
use aya::{maps::{
2-
perf::PerfEventArrayBuffer, Map, MapData, PerfEventArray
3-
}, util::online_cpus};
1+
use aya::{
2+
maps::{Map, MapData, PerfEventArray, perf::PerfEventArrayBuffer},
3+
util::online_cpus,
4+
};
45

56
use bytes::BytesMut;
6-
use tokio::signal;
7-
use std::{
8-
sync::{
9-
Arc,
10-
atomic::{AtomicBool, Ordering},
11-
},
7+
use std::sync::{
8+
Arc,
9+
atomic::{AtomicBool, Ordering},
1210
};
11+
use tokio::signal;
1312

14-
use tracing::{error, info};
13+
use tracing::{debug, error, info};
1514

1615
use crate::structs::NetworkMetrics;
1716
use crate::structs::TimeStampMetrics;
1817

1918
pub async fn display_metrics_map(
2019
mut perf_buffers: Vec<PerfEventArrayBuffer<MapData>>,
21-
running: Arc<AtomicBool>, // Changed to Arc<AtomicBool>
20+
running: Arc<AtomicBool>, // Changed to Arc<AtomicBool>
2221
mut buffers: Vec<BytesMut>,
2322
) {
2423
info!("Starting metrics event listener...");
@@ -46,10 +45,23 @@ pub async fn display_metrics_map(
4645
let sk_receive_buffer_size = net_metrics.sk_receive_buffer_size;
4746
info!(
4847
"tgid: {}, comm: {}, ts_us: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_write_memory_queued: {}, sk_ack_backlog: {}, sk_receive_buffer_size: {}",
49-
tgid, comm, ts_us, sk_drop_count, sk_err, sk_err_soft, sk_backlog_len, sk_write_memory_queued, sk_ack_backlog, sk_receive_buffer_size
48+
tgid,
49+
comm,
50+
ts_us,
51+
sk_drop_count,
52+
sk_err,
53+
sk_err_soft,
54+
sk_backlog_len,
55+
sk_write_memory_queued,
56+
sk_ack_backlog,
57+
sk_receive_buffer_size
5058
);
5159
} else {
52-
info!("Received data too small: {} bytes, expected: {}", data.len(), std::mem::size_of::<NetworkMetrics>());
60+
info!(
61+
"Received data too small: {} bytes, expected: {}",
62+
data.len(),
63+
std::mem::size_of::<NetworkMetrics>()
64+
);
5365
}
5466
}
5567
}
@@ -65,7 +77,7 @@ pub async fn display_metrics_map(
6577

6678
pub async fn display_time_stamp_events_map(
6779
mut perf_buffers: Vec<PerfEventArrayBuffer<MapData>>,
68-
running: Arc<AtomicBool>, // Changed to Arc<AtomicBool>
80+
running: Arc<AtomicBool>, // Changed to Arc<AtomicBool>
6981
mut buffers: Vec<BytesMut>,
7082
) {
7183
info!("Starting timestamp event listener...");
@@ -107,48 +119,67 @@ pub async fn display_time_stamp_events_map(
107119
info!("Timestamp event listener stopped");
108120
}
109121

110-
pub async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> {
122+
pub async fn event_listener(bpf_maps: Vec<Map>) -> Result<(), anyhow::Error> {
111123
info!("Getting CPU count...");
112-
let cpu_count = online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))?.len();
113-
info!("CPU count: {}", cpu_count);
114-
124+
125+
let mut perf_event_arrays = Vec::new(); // contains a vector of PerfEventArrays
126+
let mut event_buffers = Vec::new(); // contains a vector of buffers
127+
115128
info!("Creating perf buffers...");
116-
let mut net_perf_buffer: Vec<PerfEventArrayBuffer<MapData>> = Vec::new();
117-
let mut net_perf_array: PerfEventArray<MapData> = PerfEventArray::try_from(bpf_maps.0)?;
118-
let mut time_stamp_events_perf_buffer: Vec<PerfEventArrayBuffer<MapData>> = Vec::new();
119-
let mut time_stamp_events_perf_array: PerfEventArray<MapData> =
120-
PerfEventArray::try_from(bpf_maps.1)?;
121-
122-
info!("Opening perf buffers for {} CPUs...", cpu_count);
123-
for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? {
124-
let buf: PerfEventArrayBuffer<MapData> = net_perf_array.open(cpu_id, None)?;
125-
net_perf_buffer.push(buf);
129+
for map in bpf_maps {
130+
debug!("Debugging map type:{:?}", map);
131+
let perf_event_array = PerfEventArray::try_from(map).map_err(|e| {
132+
error!("Cannot create perf_event_array for map.Reason: {}", e);
133+
anyhow::anyhow!("Cannot create perf_event_array for map.Reason: {}", e)
134+
})?;
135+
perf_event_arrays.push(perf_event_array); // this is step 1
136+
let perf_event_array_buffer = Vec::new();
137+
event_buffers.push(perf_event_array_buffer); //this is step 2
126138
}
127-
for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? {
128-
let buf: PerfEventArrayBuffer<MapData> = time_stamp_events_perf_array.open(cpu_id, None)?;
129-
time_stamp_events_perf_buffer.push(buf);
139+
140+
let cpu_count = online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))?;
141+
142+
//info!("CPU count: {}", cpu_count);
143+
for (perf_evt_array, perf_evt_array_buffer) in
144+
perf_event_arrays.iter_mut().zip(event_buffers.iter_mut())
145+
{
146+
for cpu_id in &cpu_count {
147+
let single_buffer = perf_evt_array.open(*cpu_id, None)?;
148+
perf_evt_array_buffer.push(single_buffer);
149+
}
130150
}
151+
152+
//info!("Opening perf buffers for {} CPUs...", cpu_count);
131153
info!("Perf buffers created successfully");
154+
let mut event_buffers = event_buffers.into_iter();
155+
156+
let time_stamp_events_perf_buffer = event_buffers.next().expect("");
157+
let net_perf_buffer = event_buffers.next().expect("");
132158

133159
// Create shared running flags
134160
let net_metrics_running = Arc::new(AtomicBool::new(true));
135161
let time_stamp_events_running = Arc::new(AtomicBool::new(true));
136-
162+
137163
// Create proper sized buffers
138-
let net_metrics_buffers = vec![BytesMut::with_capacity(1024); cpu_count];
139-
let time_stamp_events_buffers = vec![BytesMut::with_capacity(1024); cpu_count];
140-
164+
let net_metrics_buffers = vec![BytesMut::with_capacity(1024); cpu_count.len()];
165+
let time_stamp_events_buffers = vec![BytesMut::with_capacity(1024); cpu_count.len()];
166+
141167
// Clone for the signal handler
142168
let net_metrics_running_signal = net_metrics_running.clone();
143169
let time_stamp_events_running_signal = time_stamp_events_running.clone();
144-
170+
145171
info!("Starting event listener tasks...");
146172
let metrics_map_displayer = tokio::spawn(async move {
147173
display_metrics_map(net_perf_buffer, net_metrics_running, net_metrics_buffers).await;
148174
});
149175

150176
let time_stamp_events_displayer = tokio::spawn(async move {
151-
display_time_stamp_events_map(time_stamp_events_perf_buffer, time_stamp_events_running, time_stamp_events_buffers).await
177+
display_time_stamp_events_map(
178+
time_stamp_events_perf_buffer,
179+
time_stamp_events_running,
180+
time_stamp_events_buffers,
181+
)
182+
.await
152183
});
153184

154185
info!("Event listeners started, entering main loop...");
@@ -176,4 +207,4 @@ pub async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> {
176207

177208
// return success
178209
Ok(())
179-
}
210+
}
Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,18 @@
1-
use aya::{
2-
Ebpf
3-
};
4-
1+
use anyhow::{Context, Ok};
2+
use aya::Ebpf;
3+
use cortexbrain_common::{constants, logger};
54
use std::{
65
env, fs,
76
path::Path,
8-
sync::{
9-
Arc, Mutex,
10-
},
7+
sync::{Arc, Mutex},
118
};
12-
13-
use anyhow::{Context, Ok};
149
use tracing::{error, info};
15-
use cortexbrain_common::{constants, logger};
1610

1711
mod helpers;
18-
use crate::{helpers::event_listener, maps_handlers::map_pinner, program_handlers::load_and_attach_tcp_programs};
19-
20-
mod maps_handlers;
21-
use crate::maps_handlers::init_ebpf_maps;
12+
use crate::helpers::event_listener;
2213

23-
mod program_handlers;
24-
use crate::program_handlers::load_program;
14+
use cortexbrain_common::map_handlers::{init_bpf_maps, map_pinner};
15+
use cortexbrain_common::program_handlers::load_program;
2516

2617
mod structs;
2718

@@ -33,41 +24,50 @@ async fn main() -> Result<(), anyhow::Error> {
3324
info!("Starting metrics service...");
3425
info!("fetching data");
3526

36-
let bpf_path = env::var(constants::BPF_PATH).context("BPF_PATH environment variable required")?;
27+
let bpf_path =
28+
env::var(constants::BPF_PATH).context("BPF_PATH environment variable required")?;
3729
let data = fs::read(Path::new(&bpf_path)).context("Failed to load file from path")?;
3830
let bpf = Arc::new(Mutex::new(Ebpf::load(&data)?));
3931
let tcp_bpf = bpf.clone();
4032
let tcp_rev_bpf = bpf.clone();
33+
let tcp_v6_bpf = bpf.clone();
4134

4235
info!("Running Ebpf logger");
4336
info!("loading programs");
44-
let bpf_map_save_path =
45-
std::env::var(constants::PIN_MAP_PATH).context("PIN_MAP_PATH environment variable required")?;
37+
let bpf_map_save_path = std::env::var(constants::PIN_MAP_PATH)
38+
.context("PIN_MAP_PATH environment variable required")?;
4639

47-
match init_ebpf_maps(bpf.clone()) {
48-
std::result::Result::Ok(maps) => {
40+
let map_data = vec!["time_stamp_events".to_string(), "net_metrics".to_string()];
41+
42+
match init_bpf_maps(bpf.clone(), map_data) {
43+
std::result::Result::Ok(bpf_maps) => {
4944
info!("BPF maps loaded successfully");
5045
let pin_path = std::path::PathBuf::from(&bpf_map_save_path);
5146
info!("About to call map_pinner with path: {:?}", pin_path);
52-
match map_pinner(&maps, &pin_path).await {
53-
std::result::Result::Ok(_) => {
47+
match map_pinner(bpf_maps, &pin_path) {
48+
std::result::Result::Ok(maps) => {
5449
info!("BPF maps pinned successfully to {}", bpf_map_save_path);
5550

5651
{
5752
load_program(bpf.clone(), "metrics_tracer", "tcp_identify_packet_loss")
58-
.context("An error occured during the execution of load_program function")?;
59-
}
60-
61-
{
62-
load_and_attach_tcp_programs(tcp_bpf.clone())
63-
.context("An error occured during the execution of load_and_attach_tcp_programs function")?;
53+
.context(
54+
"An error occured during the execution of load_program function",
55+
)?;
56+
57+
load_program(tcp_bpf,"tcp_connect","tcp_v4_connect")
58+
.context("An error occured during the execution of load_and_attach_tcp_programs function")?;
59+
load_program(tcp_v6_bpf,"tcp_connect","tcp_v6_connect")
60+
.context("An error occured during the execution of load_and_attach_tcp_programs function")?;
61+
62+
load_program(
63+
tcp_rev_bpf,
64+
"tcp_rcv_state_process",
65+
"tcp_rcv_state_process",
66+
)
67+
.context(
68+
"An error occured during the execution of load_program function",
69+
)?;
6470
}
65-
66-
{
67-
load_program(tcp_rev_bpf.clone(), "tcp_rcv_state_process", "tcp_rcv_state_process")
68-
.context("An error occured during the execution of load_program function")?;
69-
}
70-
7171
event_listener(maps).await?;
7272
}
7373
Err(e) => {
@@ -83,4 +83,4 @@ async fn main() -> Result<(), anyhow::Error> {
8383
}
8484

8585
Ok(())
86-
}
86+
}

0 commit comments

Comments
 (0)