Skip to content

Commit 9a115b9

Browse files
[#105]: added netns and pid parameter in identity service (user space). Better concurrency management. added bpf maps preliminary checks in main.rs
1 parent 47cd33f commit 9a115b9

File tree

3 files changed

+90
-45
lines changed

3 files changed

+90
-45
lines changed

core/src/components/identity/src/helpers.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ use tokio::{fs, signal};
3333
* decleare bpf path env variable
3434
*/
3535
const BPF_PATH: &str = "BPF_PATH";
36-
const IFACE: &str = "IFACE";
37-
use std::result::Result::Ok as Okk;
3836

3937
/*
4038
* TryFrom Trait implementation for IpProtocols enum
@@ -62,7 +60,7 @@ pub async fn display_events<T: BorrowMut<MapData>>(
6260
while running.load(Ordering::SeqCst) {
6361
for buf in perf_buffers.iter_mut() {
6462
match buf.read_events(&mut buffers) {
65-
Ok(events) => {
63+
std::result::Result::Ok(events) => {
6664
for i in 0..events.read {
6765
let data = &buffers[i];
6866
if data.len() >= std::mem::size_of::<PacketLog>() {
@@ -72,10 +70,10 @@ pub async fn display_events<T: BorrowMut<MapData>>(
7270
let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip));
7371
let src_port = u16::from_be(pl.src_port as u16);
7472
let dst_port = u16::from_be(pl.dst_port as u16);
75-
let event_id = pl.event_id;
73+
let event_id = pl.pid;
7674

7775
match IpProtocols::try_from(pl.proto) {
78-
Ok(proto) => {
76+
std::result::Result::Ok(proto) => {
7977
info!(
8078
"Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
8179
event_id, proto, src, src_port, dst, dst_port
@@ -109,7 +107,7 @@ pub async fn display_veth_events<T: BorrowMut<MapData>>(
109107
while running.load(Ordering::SeqCst) {
110108
for buf in perf_buffers.iter_mut() {
111109
match buf.read_events(&mut buffers) {
112-
Ok(events) => {
110+
std::result::Result::Ok(events) => {
113111
for i in 0..events.read {
114112
let data = &buffers[i];
115113
if data.len() >= std::mem::size_of::<VethLog>() {
@@ -123,6 +121,7 @@ pub async fn display_veth_events<T: BorrowMut<MapData>>(
123121
let state = vethlog.state;
124122

125123
let dev_addr = dev_addr_bytes;
124+
let netns = vethlog.netns;
126125
let mut event_type = String::new();
127126
match vethlog.event_type {
128127
1 => {
@@ -134,15 +133,22 @@ pub async fn display_veth_events<T: BorrowMut<MapData>>(
134133
_ => warn!("unknown event_type"),
135134
}
136135
match name {
137-
Ok(veth_name) => {
136+
std::result::Result::Ok(veth_name) => {
138137
info!(
139-
"Triggered action: register_netdevice event_type:{:?} Manipulated veth: {:?} state:{:?} dev_addr:{:?}",
138+
"[{}] Triggered action: register_netdevice event_type:{:?} Manipulated veth: {:?} state:{:?} dev_addr:{:?}",
139+
netns,
140140
event_type,
141141
veth_name.trim_end_matches("\0").to_string(),
142142
state,
143143
dev_addr
144144
);
145-
attach_detach_veth(bpf.clone(), vethlog.event_type, veth_name, link_ids.clone()).await;
145+
attach_detach_veth(
146+
bpf.clone(),
147+
vethlog.event_type,
148+
veth_name,
149+
link_ids.clone(),
150+
)
151+
.await;
146152
}
147153
Err(_) => info!("Unknown name or corrupted field"),
148154
}
@@ -173,8 +179,7 @@ pub fn get_veth_channels() -> Vec<String> {
173179
if let Ok(ifaces) = if_nameindex() {
174180
for iface in &ifaces {
175181
let iface_name = iface.name().to_str().unwrap().to_owned();
176-
if !ignore_iface(&iface_name)
177-
{
182+
if !ignore_iface(&iface_name) {
178183
interfaces.push(iface_name);
179184
} else {
180185
info!("skipping interface {:?}", iface_name);
@@ -185,8 +190,16 @@ pub fn get_veth_channels() -> Vec<String> {
185190
interfaces
186191
}
187192

188-
async fn attach_detach_veth(bpf: Arc<Mutex<Bpf>>, event_type: u8, iface: &str, link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>) -> Result<(), anyhow::Error> {
189-
info!("attach_detach_veth called: event_type={}, iface={}", event_type, iface);
193+
async fn attach_detach_veth(
194+
bpf: Arc<Mutex<Bpf>>,
195+
event_type: u8,
196+
iface: &str,
197+
link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>,
198+
) -> Result<(), anyhow::Error> {
199+
info!(
200+
"attach_detach_veth called: event_type={}, iface={}",
201+
event_type, iface
202+
);
190203
match event_type {
191204
1 => {
192205
let mut bpf = bpf.lock().unwrap();
@@ -204,11 +217,13 @@ async fn attach_detach_veth(bpf: Arc<Mutex<Bpf>>, event_type: u8, iface: &str, l
204217

205218
let mut link_ids = link_ids.lock().unwrap();
206219
match program.attach(iface, TcAttachType::Ingress) {
207-
Ok(link_id) => {
208-
info!("Program 'identity_classifier' attached to interface {}", iface);
220+
std::result::Result::Ok(link_id) => {
221+
info!(
222+
"Program 'identity_classifier' attached to interface {}",
223+
iface
224+
);
209225
link_ids.insert(iface.to_string(), link_id);
210-
211-
},
226+
}
212227
Err(e) => error!("Error attaching program to interface {}: {:?}", iface, e),
213228
}
214229
}
@@ -230,4 +245,4 @@ async fn attach_detach_veth(bpf: Arc<Mutex<Bpf>>, event_type: u8, iface: &str, l
230245
}
231246
}
232247
Ok(())
233-
}
248+
}

core/src/components/identity/src/main.rs

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,40 @@
77
* 3. Track veth creation and deletion events
88
*
99
*/
10-
#![allow(warnings)]
1110
#![allow(unused_mut)]
11+
#![allow(warnings)]
1212

1313
mod enums;
1414
mod helpers;
1515
mod structs;
1616
use aya::{
17+
Bpf,
1718
maps::{
18-
perf::{PerfEventArray, PerfEventArrayBuffer}, Map, MapData
19-
}, programs::{tc::SchedClassifierLinkId, KProbe, SchedClassifier, TcAttachType}, util::online_cpus, Bpf, Ebpf
19+
Map, MapData,
20+
perf::{PerfEventArray, PerfEventArrayBuffer},
21+
},
22+
programs::{KProbe, SchedClassifier, TcAttachType, tc::SchedClassifierLinkId},
23+
util::online_cpus,
2024
};
2125

22-
use crate::enums::IpProtocols;
2326
use crate::helpers::{display_events, display_veth_events, get_veth_channels};
2427
use bytes::BytesMut;
2528
use std::{
2629
convert::TryInto,
2730
path::Path,
2831
sync::{
29-
atomic::{AtomicBool, Ordering}, Arc, Mutex
32+
Arc, Mutex,
33+
atomic::{AtomicBool, Ordering},
3034
},
3135
};
3236

3337
use anyhow::{Context, Ok};
34-
use tokio::{fs, signal, sync::broadcast::error};
35-
use tracing::{error, info, warn};
38+
use tokio::{fs, signal};
39+
use tracing::{error, info};
3640
use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan};
3741

3842
const BPF_PATH: &str = "BPF_PATH"; //BPF env path
3943
use std::collections::HashMap;
40-
use std::result::Result::Ok as Okk;
4144

4245
#[tokio::main]
4346
async fn main() -> Result<(), anyhow::Error> {
@@ -68,25 +71,35 @@ async fn main() -> Result<(), anyhow::Error> {
6871
//init bpf data
6972
let bpf = Arc::new(Mutex::new(Bpf::load(&data)?));
7073

71-
//load veth_trace program ref veth_trace.rs
72-
init_veth_tracer(bpf.clone());
73-
let bpf_maps = init_bpf_maps(bpf.clone()).unwrap();
74+
match init_bpf_maps(bpf.clone()) {
75+
std::result::Result::Ok(bpf_maps) => {
76+
info!("Successfully loaded bpf maps");
7477

75-
let interfaces = get_veth_channels();
78+
//load veth_trace program ref veth_trace.rs
79+
init_veth_tracer(bpf.clone()).await?;
7680

77-
info!("Found interfaces: {:?}", interfaces);
78-
init_tc_classifier(bpf.clone(), interfaces, link_ids.clone())
79-
.context("An error occured during the execution of attach_bpf_program function")?;
81+
let interfaces = get_veth_channels();
8082

81-
event_listener(bpf_maps, link_ids.clone(), bpf.clone())
82-
.await
83-
.context("Error initializing event_listener")?;
83+
info!("Found interfaces: {:?}", interfaces);
84+
init_tc_classifier(bpf.clone(), interfaces, link_ids.clone())
85+
.await
86+
.context("An error occured during the execution of attach_bpf_program function")?;
87+
88+
event_listener(bpf_maps, link_ids.clone(), bpf.clone())
89+
.await
90+
.context("Error initializing event_listener")?;
91+
}
92+
Err(e) => {
93+
error!("Error while loading bpf maps {}", e);
94+
signal::ctrl_c();
95+
}
96+
}
8497

8598
Ok(())
8699
}
87100

88101
//attach the tc classifier program to a vector of interfaces
89-
pub fn init_tc_classifier(
102+
async fn init_tc_classifier(
90103
bpf: Arc<Mutex<Bpf>>,
91104
ifaces: Vec<String>,
92105
link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>,
@@ -108,7 +121,7 @@ pub fn init_tc_classifier(
108121

109122
for interface in ifaces {
110123
match program.attach(&interface, TcAttachType::Ingress) {
111-
Okk(link_id) => {
124+
std::result::Result::Ok(link_id) => {
112125
info!(
113126
"Program 'identity_classifier' attached to interface {}",
114127
interface
@@ -126,7 +139,7 @@ pub fn init_tc_classifier(
126139
Ok(())
127140
}
128141

129-
fn init_veth_tracer(bpf: Arc<Mutex<Bpf>>) -> Result<(), anyhow::Error> {
142+
async fn init_veth_tracer(bpf: Arc<Mutex<Bpf>>) -> Result<(), anyhow::Error> {
130143
//this functions init the veth_tracer used to make the InterfacesRegistry
131144

132145
let mut bpf_new = bpf.lock().unwrap();
@@ -138,7 +151,10 @@ fn init_veth_tracer(bpf: Arc<Mutex<Bpf>>) -> Result<(), anyhow::Error> {
138151
.try_into()?;
139152
veth_creation_tracer.load()?;
140153

141-
veth_creation_tracer.attach("register_netdevice", 0)?;
154+
match veth_creation_tracer.attach("register_netdevice", 0) {
155+
std::result::Result::Ok(_) => info!("veth_creation_tracer program attached successfully"),
156+
Err(e) => error!("Error attaching veth_creation_tracer program {:?}", e),
157+
}
142158

143159
//deletion tracer
144160
let veth_deletion_tracer: &mut KProbe = bpf_new
@@ -188,7 +204,11 @@ fn init_bpf_maps(bpf: Arc<Mutex<Bpf>>) -> Result<(Map, Map), anyhow::Error> {
188204
Ok((events_map, veth_map))
189205
}
190206

191-
async fn event_listener(bpf_maps: (Map, Map), link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>, bpf: Arc<Mutex<Bpf>>) -> Result<(), anyhow::Error> {
207+
async fn event_listener(
208+
bpf_maps: (Map, Map),
209+
link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>,
210+
bpf: Arc<Mutex<Bpf>>,
211+
) -> Result<(), anyhow::Error> {
192212
// this function init the event listener. Listens for veth events (creation/deletion) and network events (pod to pod communications)
193213
/* Doc:
194214
@@ -230,7 +250,14 @@ async fn event_listener(bpf_maps: (Map, Map), link_ids: Arc<Mutex<HashMap<String
230250

231251
//display_events(perf_buffers, running, buffers).await;
232252
let veth_events_displayer = tokio::spawn(async move {
233-
display_veth_events(bpf.clone(), perf_veth_buffer, veth_running, veth_buffers, veth_link_ids, ).await;
253+
display_veth_events(
254+
bpf.clone(),
255+
perf_veth_buffer,
256+
veth_running,
257+
veth_buffers,
258+
veth_link_ids,
259+
)
260+
.await;
234261
});
235262
let net_events_displayer = tokio::spawn(async move {
236263
display_events(perf_net_events_buffer, net_events_running, events_buffers).await;

core/src/components/identity/src/structs.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
#[derive(Clone, Copy)]
77
pub struct PacketLog {
88
pub proto: u8,
9-
pub src_ip: u32,
10-
pub src_port: u16,
9+
pub src_ip: u32,
10+
pub src_port: u16,
1111
pub dst_ip: u32,
1212
pub dst_port: u16,
1313
pub event_id: u16,
14+
pub pid : u32
1415
}
1516
/*
1617
* Connection Array that contains the hash_id associated with an active connection
@@ -32,6 +33,8 @@ unsafe impl aya::Pod for ConnArray {}
3233
pub struct VethLog {
3334
pub name: [u8; 16],
3435
pub state: u64,
35-
pub dev_addr: [u32;8],
36+
pub dev_addr: [u32; 8],
3637
pub event_type: u8,
38+
pub netns: u32,
39+
pub pid: u32,
3740
}

0 commit comments

Comments
 (0)