Skip to content

Commit d7dc4fe

Browse files
LorenzoTettamantisiddh34
authored andcommitted
#107]: fixed problem with veth_tracer execution and tc_classifier execution. Code refactoring. Logging system partial improvements
1 parent 2efb18a commit d7dc4fe

2 files changed

Lines changed: 113 additions & 94 deletions

File tree

core/src/components/identity/src/helpers.rs

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,31 @@
1+
use crate::enums::IpProtocols;
2+
use crate::structs::{PacketLog, VethLog};
13
use aya::{
2-
maps::{ perf::{ PerfEventArray, PerfEventArrayBuffer }, MapData },
3-
programs::{ SchedClassifier, TcAttachType },
4-
util::online_cpus,
54
Bpf,
5+
maps::{
6+
MapData,
7+
perf::{PerfEventArray, PerfEventArrayBuffer},
8+
},
9+
programs::{SchedClassifier, TcAttachType},
10+
util::online_cpus,
611
};
7-
use crate::structs::{ PacketLog, VethLog };
812
use bytes::BytesMut;
13+
use nix::net::if_::if_nameindex;
914
use std::{
1015
ascii,
1116
borrow::BorrowMut,
1217
net::Ipv4Addr,
1318
string,
14-
sync::{ atomic::{ AtomicBool, Ordering }, Arc },
19+
sync::{
20+
Arc,
21+
atomic::{AtomicBool, Ordering},
22+
},
1523
};
16-
use crate::enums::IpProtocols;
17-
use tracing::{ error, event, info, warn };
18-
use nix::net::if_::if_nameindex;
24+
use tracing::{error, event, info, warn};
1925

20-
use tokio::{ fs, signal };
21-
use std::path::Path;
2226
use anyhow::Context;
27+
use std::path::Path;
28+
use tokio::{fs, signal};
2329
/*
2430
* decleare bpf path env variable
2531
*/
@@ -47,19 +53,17 @@ impl TryFrom<u8> for IpProtocols {
4753
pub async fn display_events<T: BorrowMut<MapData>>(
4854
mut perf_buffers: Vec<PerfEventArrayBuffer<T>>,
4955
running: Arc<AtomicBool>,
50-
mut buffers: Vec<BytesMut>
56+
mut buffers: Vec<BytesMut>,
5157
) {
52-
info!("Triggering network events:");
5358
while running.load(Ordering::SeqCst) {
5459
for buf in perf_buffers.iter_mut() {
5560
match buf.read_events(&mut buffers) {
5661
Ok(events) => {
5762
for i in 0..events.read {
5863
let data = &buffers[i];
5964
if data.len() >= std::mem::size_of::<PacketLog>() {
60-
let pl: PacketLog = unsafe {
61-
std::ptr::read(data.as_ptr() as *const _)
62-
};
65+
let pl: PacketLog =
66+
unsafe { std::ptr::read(data.as_ptr() as *const _) };
6367
let src = Ipv4Addr::from(u32::from_be(pl.src_ip));
6468
let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip));
6569
let src_port = u16::from_be(pl.src_port as u16);
@@ -70,20 +74,12 @@ pub async fn display_events<T: BorrowMut<MapData>>(
7074
Ok(proto) => {
7175
info!(
7276
"Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
73-
event_id,
74-
proto,
75-
src,
76-
src_port,
77-
dst,
78-
dst_port
77+
event_id, proto, src, src_port, dst, dst_port
7978
);
8079
}
81-
Err(_) =>
82-
info!(
83-
"Event Id: {} Protocol: Unknown ({})",
84-
event_id,
85-
pl.proto
86-
),
80+
Err(_) => {
81+
info!("Event Id: {} Protocol: Unknown ({})", event_id, pl.proto)
82+
}
8783
};
8884
} else {
8985
warn!("Received packet data too small: {} bytes", data.len());
@@ -104,17 +100,15 @@ pub async fn display_veth_events<T: BorrowMut<MapData>>(
104100
running: Arc<AtomicBool>,
105101
mut buffers: Vec<BytesMut>
106102
) {
107-
info!("Triggering veth events:");
108103
while running.load(Ordering::SeqCst) {
109104
for buf in perf_buffers.iter_mut() {
110105
match buf.read_events(&mut buffers) {
111106
Ok(events) => {
112107
for i in 0..events.read {
113108
let data = &buffers[i];
114109
if data.len() >= std::mem::size_of::<VethLog>() {
115-
let vethlog: VethLog = unsafe {
116-
std::ptr::read(data.as_ptr() as *const _)
117-
};
110+
let vethlog: VethLog =
111+
unsafe { std::ptr::read(data.as_ptr() as *const _) };
118112

119113
let name_bytes = vethlog.name;
120114

@@ -167,11 +161,10 @@ pub fn get_veth_channels() -> Vec<String> {
167161
if let Ok(ifaces) = if_nameindex() {
168162
for iface in &ifaces {
169163
let iface_name = iface.name().to_str().unwrap().to_owned();
170-
if
171-
iface_name != "eth0" &&
172-
iface_name != "docker0" &&
173-
iface_name != "tunl0" &&
174-
iface_name != "lo"
164+
if iface_name != "eth0"
165+
&& iface_name != "docker0"
166+
&& iface_name != "tunl0"
167+
&& iface_name != "lo"
175168
{
176169
interfaces.push(iface_name);
177170
} else {

core/src/components/identity/src/main.rs

Lines changed: 84 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,39 +3,48 @@
33
* Features:
44
* 1. TCP, UDP , ICMP events tracker
55
* 2. Track Connections using a PerfEventArray named ConnArray
6+
* 3. Track veth creation and deletion events
67
*
78
*/
89
#![allow(warnings)]
910
#![allow(unused_mut)]
1011

12+
mod enums;
1113
mod helpers;
1214
mod structs;
13-
mod enums;
1415
use aya::{
15-
maps::{ perf::{ PerfEventArray, PerfEventArrayBuffer }, Map, MapData },
16-
programs::{ KProbe, SchedClassifier, TcAttachType },
16+
Bpf, Ebpf,
17+
maps::{
18+
Map, MapData,
19+
perf::{PerfEventArray, PerfEventArrayBuffer},
20+
},
21+
programs::{KProbe, SchedClassifier, TcAttachType},
1722
util::online_cpus,
18-
Bpf,
19-
Ebpf,
2023
};
2124

22-
use bytes::BytesMut;
23-
use std::{ convert::TryInto, sync::{ atomic::{ AtomicBool, Ordering }, Arc }, path::Path };
24-
use crate::helpers::{ display_events, display_veth_events, get_veth_channels };
2525
use crate::enums::IpProtocols;
26+
use crate::helpers::{display_events, display_veth_events, get_veth_channels};
27+
use bytes::BytesMut;
28+
use std::{
29+
convert::TryInto,
30+
path::Path,
31+
sync::{
32+
Arc,
33+
atomic::{AtomicBool, Ordering},
34+
},
35+
};
2636

27-
use tokio::{ fs, signal, sync::broadcast::error };
28-
use anyhow::{ Context, Ok };
29-
use tracing_subscriber::{ fmt::format::FmtSpan, EnvFilter };
30-
use tracing::{ info, error, warn };
37+
use anyhow::{Context, Ok};
38+
use tokio::{fs, signal, sync::broadcast::error};
39+
use tracing::{error, info, warn};
40+
use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan};
3141

3242
const BPF_PATH: &str = "BPF_PATH"; //BPF env path
3343

3444
#[tokio::main]
3545
async fn main() -> Result<(), anyhow::Error> {
3646
//init tracing subscriber
37-
tracing_subscriber
38-
::fmt()
47+
tracing_subscriber::fmt()
3948
.with_max_level(tracing::Level::INFO)
4049
.with_target(false)
4150
.with_level(true)
@@ -51,7 +60,9 @@ async fn main() -> Result<(), anyhow::Error> {
5160

5261
//init conntracker data path
5362
let bpf_path = std::env::var(BPF_PATH).context("BPF_PATH environment variable required")?;
54-
let data = fs::read(Path::new(&bpf_path)).await.context("failed to load file from path")?;
63+
let data = fs::read(Path::new(&bpf_path))
64+
.await
65+
.context("failed to load file from path")?;
5566

5667
//init bpf data
5768
let mut bpf = Bpf::load(&data)?;
@@ -66,35 +77,46 @@ async fn main() -> Result<(), anyhow::Error> {
6677
// everytime a new interface enters the InterfacesRegistry attach a bpf program with the attach_bpf_program function below
6778

6879
info!("Found interfaces: {:?}", interfaces);
69-
attach_bpf_program(&data, interfaces).await?;
80+
init_tc_classifier(&mut bpf, interfaces)
81+
.context("An error occured during the execution of attach_bpf_program function")?;
7082

71-
event_listener(bpf_maps).await?;
83+
event_listener(bpf_maps)
84+
.await
85+
.context("Error initializing event_listener")?;
7286

7387
Ok(())
7488
}
7589

76-
//attach a program to a vector of interfaces
77-
async fn attach_bpf_program(data: &[u8], ifaces: Vec<String>) -> Result<(), anyhow::Error> {
78-
// this function attach a bpf program to a vector of network interfaces
79-
90+
//attach the tc classifier program to a vector of interfaces
91+
pub fn init_tc_classifier(bpf: &mut Ebpf, ifaces: Vec<String>) -> Result<(), anyhow::Error> {
92+
//this funtion initialize the tc classifier program
8093
info!("Loading programs");
8194

82-
for interface in ifaces.iter() {
83-
let mut bpf = Bpf::load(&data)?;
84-
let program: &mut SchedClassifier = bpf
85-
.program_mut("identity_classifier")
86-
.ok_or_else(|| anyhow::anyhow!("program 'identity_classifier' not found"))?
87-
.try_into()?;
88-
program.load()?;
89-
90-
program.attach(&interface, TcAttachType::Ingress)?;
95+
let program: &mut SchedClassifier = bpf
96+
.program_mut("identity_classifier")
97+
.ok_or_else(|| anyhow::anyhow!("program 'identity_classifier' not found"))?
98+
.try_into()
99+
.context("Failed to init SchedClassifier program")?;
100+
101+
program
102+
.load()
103+
.context("Failed to load identity_classifier program")?;
104+
105+
for interface in ifaces {
106+
match program.attach(&interface, TcAttachType::Ingress) {
107+
std::result::Result::Ok(_) => info!(
108+
"Program 'identity_classifier' attached to interface {}",
109+
interface
110+
),
111+
Err(e) => error!(
112+
"Error attaching program to interface {}: {:?}",
113+
interface, e
114+
),
115+
}
91116
}
92117

93-
info!("Programs attached to interfaces successfully");
94-
95118
Ok(())
96119
}
97-
98120
fn init_veth_tracer(bpf: &mut Ebpf) -> Result<(), anyhow::Error> {
99121
//this functions init the veth_tracer used to make the InterfacesRegistry
100122

@@ -112,20 +134,24 @@ fn init_veth_tracer(bpf: &mut Ebpf) -> Result<(), anyhow::Error> {
112134
.program_mut("veth_deletion_trace")
113135
.ok_or_else(|| anyhow::anyhow!("program 'veth_deletion_trace' not found"))?
114136
.try_into()?;
115-
veth_deletion_tracer.load().context("Failed to load deletetion_tracer program")?;
116-
117137
veth_deletion_tracer
118-
.attach("unregister_netdevice_queue", 0)
119-
.context("Failed to attach to unregister_netdevice_queue")?;
138+
.load()
139+
.context("Failed to load deletetion_tracer program")?;
140+
141+
match veth_deletion_tracer.attach("unregister_netdevice_queue", 0) {
142+
std::result::Result::Ok(_) => info!("veth_deletion_trace program attached successfully"),
143+
Err(e) => error!("Error attaching veth_deletetion_trace program {:?}", e),
144+
}
145+
120146
Ok(())
121147
}
122148

123149
fn init_bpf_maps(bpf: &mut Ebpf) -> Result<(Map, Map), anyhow::Error> {
124150
// this function init the bpfs maps used in the main program
125-
/*
126-
index 0: events_map
127-
index 1: veth_map
128-
*/
151+
/*
152+
index 0: events_map
153+
index 1: veth_map
154+
*/
129155
let events_map = bpf
130156
.take_map("EventsMap")
131157
.ok_or_else(|| anyhow::anyhow!("EventsMap map not found"))?;
@@ -135,28 +161,28 @@ fn init_bpf_maps(bpf: &mut Ebpf) -> Result<(Map, Map), anyhow::Error> {
135161
.ok_or_else(|| anyhow::anyhow!("veth_identity_map map not found"))?;
136162

137163
/* EDIT: this part is paused right now
138-
info!("loading bpf connections map");
164+
info!("loading bpf connections map");
139165
140-
//init connection map
141-
let connections_map_raw = bpf
142-
.take_map("ConnectionMap")
143-
.context("failed to take connections map")?;
166+
//init connection map
167+
let connections_map_raw = bpf
168+
.take_map("ConnectionMap")
169+
.context("failed to take connections map")?;
144170
145-
let connection_tracker_map = bpf
146-
.take_map("ConnectionTrackerMap")
147-
.context("failed to take ConnectionTrackerMap map")?;
148-
*/
171+
let connection_tracker_map = bpf
172+
.take_map("ConnectionTrackerMap")
173+
.context("failed to take ConnectionTrackerMap map")?;
174+
*/
149175
Ok((events_map, veth_map))
150176
}
151177

152178
async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> {
153179
// this function init the event listener. Listens for veth events (creation/deletion) and network events (pod to pod communications)
154180
/* Doc:
155-
156-
perf_net_events_array: contains is associated with the network events stored in the events_map (EventsMap)
157-
perf_veth_array: contains is associated with the network events stored in the veth_map (veth_identity_map)
158-
159-
*/
181+
182+
perf_net_events_array: contains is associated with the network events stored in the events_map (EventsMap)
183+
perf_veth_array: contains is associated with the network events stored in the veth_map (veth_identity_map)
184+
185+
*/
160186

161187
info!("Preparing perf_buffers and perf_arrays");
162188
// init PerfEventArrays
@@ -183,7 +209,7 @@ async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> {
183209
let net_events_running = Arc::new(AtomicBool::new(true));
184210

185211
let mut veth_buffers = vec![BytesMut::with_capacity(1024); 10];
186-
let mut events_buffers = vec![BytesMut::with_capacity(1024); 10];
212+
let mut events_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()];
187213
// let mut connections_buffers = vec![BytesMut::with_capacity(1024); 10];
188214

189215
let veth_running_signal = veth_running.clone();
@@ -201,14 +227,14 @@ async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> {
201227
result = veth_events_displayer=>{
202228
match result{
203229
Err(e)=>error!("veth_event_displayer panicked {:?}",e),
204-
std::result::Result::Ok(_) => todo!(),
230+
std::result::Result::Ok(_) => info!("Found new veth_event"),
205231
}
206232
}
207233

208234
result = net_events_displayer=>{
209235
match result{
210236
Err(e)=>error!("net_event_displayer panicked {:?}",e),
211-
std::result::Result::Ok(_) => todo!(),
237+
std::result::Result::Ok(_) => info!("Found new net_event"),
212238
}
213239
}
214240
_= signal::ctrl_c()=>{

0 commit comments

Comments
 (0)