Skip to content

Commit c8cbb54

Browse files
committed
[#107]: dynamic tc veth ingress attachment
1 parent b6c5a7a commit c8cbb54

4 files changed

Lines changed: 113 additions & 36 deletions

File tree

core/src/components/conntracker/build.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ use which::which;
1212
///
1313
/// [bindeps]: https://doc.rust-lang.org/nightly/cargo/reference/unstable.html?highlight=feature#artifact-dependencies
1414
fn main() {
15+
// TODO: Try generating bindings using bindgen
16+
// bindgen::Builder::default()
17+
// .header("header.h")
18+
// .generate()
19+
// .expect("Unable to generate bindings")
20+
// .write_to_file("src/bindings.rs")
21+
// .expect("Couldn't write bindings!");
22+
1523
let bpf_linker = which("bpf-linker").unwrap();
1624
println!("cargo:rerun-if-changed={}", bpf_linker.to_str().unwrap());
1725
}

core/src/components/identity/src/helpers.rs

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::enums::IpProtocols;
22
use crate::structs::{PacketLog, VethLog};
3+
use aya::programs::tc::SchedClassifierLinkId;
34
use aya::{
45
Bpf,
56
maps::{
@@ -11,6 +12,8 @@ use aya::{
1112
};
1213
use bytes::BytesMut;
1314
use nix::net::if_::if_nameindex;
15+
use std::collections::HashMap;
16+
use std::sync::Mutex;
1417
use std::{
1518
ascii,
1619
borrow::BorrowMut,
@@ -31,6 +34,7 @@ use tokio::{fs, signal};
3134
*/
3235
const BPF_PATH: &str = "BPF_PATH";
3336
const IFACE: &str = "IFACE";
37+
use std::result::Result::Ok as Okk;
3438

3539
/*
3640
* TryFrom Trait implementation for IpProtocols enum
@@ -96,9 +100,11 @@ pub async fn display_events<T: BorrowMut<MapData>>(
96100
}
97101

98102
pub async fn display_veth_events<T: BorrowMut<MapData>>(
103+
bpf: Arc<Mutex<Bpf>>,
99104
mut perf_buffers: Vec<PerfEventArrayBuffer<T>>,
100105
running: Arc<AtomicBool>,
101-
mut buffers: Vec<BytesMut>
106+
mut buffers: Vec<BytesMut>,
107+
mut link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>,
102108
) {
103109
while running.load(Ordering::SeqCst) {
104110
for buf in perf_buffers.iter_mut() {
@@ -136,6 +142,7 @@ pub async fn display_veth_events<T: BorrowMut<MapData>>(
136142
state,
137143
dev_addr
138144
);
145+
attach_detach_veth(bpf.clone(), vethlog.event_type, veth_name, link_ids.clone()).await;
139146
}
140147
Err(_) => info!("Unknown name or corrupted field"),
141148
}
@@ -153,6 +160,11 @@ pub async fn display_veth_events<T: BorrowMut<MapData>>(
153160
}
154161
}
155162

163+
pub fn ignore_iface(iface: &str) -> bool {
164+
let ignored_interfaces = ["eth0", "docker0", "tunl0", "lo"];
165+
ignored_interfaces.contains(&iface)
166+
}
167+
156168
//filter the interfaces,exclude docker0,eth0,lo interfaces
157169
pub fn get_veth_channels() -> Vec<String> {
158170
//filter interfaces and save the output in the
@@ -161,10 +173,7 @@ pub fn get_veth_channels() -> Vec<String> {
161173
if let Ok(ifaces) = if_nameindex() {
162174
for iface in &ifaces {
163175
let iface_name = iface.name().to_str().unwrap().to_owned();
164-
if iface_name != "eth0"
165-
&& iface_name != "docker0"
166-
&& iface_name != "tunl0"
167-
&& iface_name != "lo"
176+
if !ignore_iface(&iface_name)
168177
{
169178
interfaces.push(iface_name);
170179
} else {
@@ -175,3 +184,50 @@ pub fn get_veth_channels() -> Vec<String> {
175184

176185
interfaces
177186
}
187+
188+
async fn attach_detach_veth(bpf: Arc<Mutex<Bpf>>, event_type: u8, iface: &str, link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>) -> Result<(), anyhow::Error> {
189+
info!("attach_detach_veth called: event_type={}, iface={}", event_type, iface);
190+
match event_type {
191+
1 => {
192+
let mut bpf = bpf.lock().unwrap();
193+
let program: &mut SchedClassifier = bpf
194+
.program_mut("identity_classifier")
195+
.ok_or_else(|| anyhow::anyhow!("program 'identity_classifier' not found"))?
196+
.try_into()?;
197+
198+
let iface = iface.trim_end_matches('\0');
199+
200+
if ignore_iface(iface) {
201+
info!("Skipping ignored interface: {}", iface);
202+
return Ok(());
203+
}
204+
205+
let mut link_ids = link_ids.lock().unwrap();
206+
match program.attach(iface, TcAttachType::Ingress) {
207+
Ok(link_id) => {
208+
info!("Program 'identity_classifier' attached to interface {}", iface);
209+
link_ids.insert(iface.to_string(), link_id);
210+
211+
},
212+
Err(e) => error!("Error attaching program to interface {}: {:?}", iface, e),
213+
}
214+
}
215+
2 => {
216+
// INFO: Detaching occurs automatically when veth is deleted by kernel itsel
217+
let mut link_ids = link_ids.lock().unwrap();
218+
match link_ids.remove(iface) {
219+
Some(_) => {
220+
info!("Successfully detached program from interface {}", iface);
221+
}
222+
None => {
223+
error!("Interface {} not found in link_ids", iface);
224+
return Err(anyhow::anyhow!("Interface {} not found in link_ids", iface));
225+
}
226+
}
227+
}
228+
_ => {
229+
error!("Unknown event type: {}", event_type);
230+
}
231+
}
232+
Ok(())
233+
}

core/src/components/identity/src/main.rs

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,9 @@ mod enums;
1414
mod helpers;
1515
mod structs;
1616
use aya::{
17-
Bpf, Ebpf,
1817
maps::{
19-
Map, MapData,
20-
perf::{PerfEventArray, PerfEventArrayBuffer},
21-
},
22-
programs::{KProbe, SchedClassifier, TcAttachType},
23-
util::online_cpus,
18+
perf::{PerfEventArray, PerfEventArrayBuffer}, Map, MapData
19+
}, programs::{tc::SchedClassifierLinkId, KProbe, SchedClassifier, TcAttachType}, util::online_cpus, Bpf, Ebpf
2420
};
2521

2622
use crate::enums::IpProtocols;
@@ -30,8 +26,7 @@ use std::{
3026
convert::TryInto,
3127
path::Path,
3228
sync::{
33-
Arc,
34-
atomic::{AtomicBool, Ordering},
29+
atomic::{AtomicBool, Ordering}, Arc, Mutex
3530
},
3631
};
3732

@@ -41,6 +36,8 @@ use tracing::{error, info, warn};
4136
use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan};
4237

4338
const BPF_PATH: &str = "BPF_PATH"; //BPF env path
39+
use std::collections::HashMap;
40+
use std::result::Result::Ok as Okk;
4441

4542
#[tokio::main]
4643
async fn main() -> Result<(), anyhow::Error> {
@@ -59,41 +56,47 @@ async fn main() -> Result<(), anyhow::Error> {
5956
info!("Starting identity service...");
6057
info!("fetching data");
6158

59+
// To Store link_ids they can be used to detach tc
60+
let link_ids = Arc::new(Mutex::new(HashMap::<String, SchedClassifierLinkId>::new()));
61+
6262
//init conntracker data path
6363
let bpf_path = std::env::var(BPF_PATH).context("BPF_PATH environment variable required")?;
6464
let data = fs::read(Path::new(&bpf_path))
6565
.await
6666
.context("failed to load file from path")?;
6767

6868
//init bpf data
69-
let mut bpf = Bpf::load(&data)?;
69+
let bpf = Arc::new(Mutex::new(Bpf::load(&data)?));
7070

7171
//load veth_trace program ref veth_trace.rs
72-
init_veth_tracer(&mut bpf);
73-
let bpf_maps = init_bpf_maps(&mut bpf).unwrap();
72+
init_veth_tracer(bpf.clone());
73+
let bpf_maps = init_bpf_maps(bpf.clone()).unwrap();
7474

7575
let interfaces = get_veth_channels();
7676

77-
//TODO: store the results from the veth_tracer in a hashmap (InterfacesRegistry) to make sure that the creation and deletion of veth are up to date
78-
// everytime a new interface enters the InterfacesRegistry attach a bpf program with the attach_bpf_program function below
79-
8077
info!("Found interfaces: {:?}", interfaces);
81-
init_tc_classifier(&mut bpf, interfaces)
78+
init_tc_classifier(bpf.clone(), interfaces, link_ids.clone())
8279
.context("An error occured during the execution of attach_bpf_program function")?;
8380

84-
event_listener(bpf_maps)
81+
event_listener(bpf_maps, link_ids.clone(), bpf.clone())
8582
.await
8683
.context("Error initializing event_listener")?;
8784

8885
Ok(())
8986
}
9087

9188
//attach the tc classifier program to a vector of interfaces
92-
pub fn init_tc_classifier(bpf: &mut Ebpf, ifaces: Vec<String>) -> Result<(), anyhow::Error> {
89+
pub fn init_tc_classifier(
90+
bpf: Arc<Mutex<Bpf>>,
91+
ifaces: Vec<String>,
92+
link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>,
93+
) -> Result<(), anyhow::Error> {
9394
//this funtion initialize the tc classifier program
9495
info!("Loading programs");
9596

96-
let program: &mut SchedClassifier = bpf
97+
let mut bpf_new = bpf.lock().unwrap();
98+
99+
let program: &mut SchedClassifier = bpf_new
97100
.program_mut("identity_classifier")
98101
.ok_or_else(|| anyhow::anyhow!("program 'identity_classifier' not found"))?
99102
.try_into()
@@ -105,10 +108,14 @@ pub fn init_tc_classifier(bpf: &mut Ebpf, ifaces: Vec<String>) -> Result<(), any
105108

106109
for interface in ifaces {
107110
match program.attach(&interface, TcAttachType::Ingress) {
108-
std::result::Result::Ok(_) => info!(
109-
"Program 'identity_classifier' attached to interface {}",
110-
interface
111-
),
111+
Okk(link_id) => {
112+
info!(
113+
"Program 'identity_classifier' attached to interface {}",
114+
interface
115+
);
116+
let mut map = link_ids.lock().unwrap();
117+
map.insert(interface.clone(), link_id);
118+
}
112119
Err(e) => error!(
113120
"Error attaching program to interface {}: {:?}",
114121
interface, e
@@ -118,11 +125,14 @@ pub fn init_tc_classifier(bpf: &mut Ebpf, ifaces: Vec<String>) -> Result<(), any
118125

119126
Ok(())
120127
}
121-
fn init_veth_tracer(bpf: &mut Ebpf) -> Result<(), anyhow::Error> {
128+
129+
fn init_veth_tracer(bpf: Arc<Mutex<Bpf>>) -> Result<(), anyhow::Error> {
122130
//this functions init the veth_tracer used to make the InterfacesRegistry
123131

132+
let mut bpf_new = bpf.lock().unwrap();
133+
124134
//creation tracer
125-
let veth_creation_tracer: &mut KProbe = bpf
135+
let veth_creation_tracer: &mut KProbe = bpf_new
126136
.program_mut("veth_creation_trace")
127137
.ok_or_else(|| anyhow::anyhow!("program 'veth_creation_trace' not found"))?
128138
.try_into()?;
@@ -131,7 +141,7 @@ fn init_veth_tracer(bpf: &mut Ebpf) -> Result<(), anyhow::Error> {
131141
veth_creation_tracer.attach("register_netdevice", 0)?;
132142

133143
//deletion tracer
134-
let veth_deletion_tracer: &mut KProbe = bpf
144+
let veth_deletion_tracer: &mut KProbe = bpf_new
135145
.program_mut("veth_deletion_trace")
136146
.ok_or_else(|| anyhow::anyhow!("program 'veth_deletion_trace' not found"))?
137147
.try_into()?;
@@ -147,17 +157,19 @@ fn init_veth_tracer(bpf: &mut Ebpf) -> Result<(), anyhow::Error> {
147157
Ok(())
148158
}
149159

150-
fn init_bpf_maps(bpf: &mut Ebpf) -> Result<(Map, Map), anyhow::Error> {
160+
fn init_bpf_maps(bpf: Arc<Mutex<Bpf>>) -> Result<(Map, Map), anyhow::Error> {
151161
// this function init the bpfs maps used in the main program
152162
/*
153163
index 0: events_map
154164
index 1: veth_map
155165
*/
156-
let events_map = bpf
166+
let mut bpf_new = bpf.lock().unwrap();
167+
168+
let events_map = bpf_new
157169
.take_map("EventsMap")
158170
.ok_or_else(|| anyhow::anyhow!("EventsMap map not found"))?;
159171

160-
let veth_map = bpf
172+
let veth_map = bpf_new
161173
.take_map("veth_identity_map")
162174
.ok_or_else(|| anyhow::anyhow!("veth_identity_map map not found"))?;
163175

@@ -176,7 +188,7 @@ fn init_bpf_maps(bpf: &mut Ebpf) -> Result<(Map, Map), anyhow::Error> {
176188
Ok((events_map, veth_map))
177189
}
178190

179-
async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> {
191+
async fn event_listener(bpf_maps: (Map, Map), link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>, bpf: Arc<Mutex<Bpf>>) -> Result<(), anyhow::Error> {
180192
// this function init the event listener. Listens for veth events (creation/deletion) and network events (pod to pod communications)
181193
/* Doc:
182194
@@ -214,10 +226,11 @@ async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> {
214226

215227
let veth_running_signal = veth_running.clone();
216228
let net_events_running_signal = net_events_running.clone();
229+
let veth_link_ids = link_ids.clone();
217230

218231
//display_events(perf_buffers, running, buffers).await;
219232
let veth_events_displayer = tokio::spawn(async move {
220-
display_veth_events(perf_veth_buffer, veth_running, veth_buffers).await;
233+
display_veth_events(bpf.clone(), perf_veth_buffer, veth_running, veth_buffers, veth_link_ids, ).await;
221234
});
222235
let net_events_displayer = tokio::spawn(async move {
223236
display_events(perf_net_events_buffer, net_events_running, events_buffers).await;

core/src/components/identity/src/structs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,4 @@ pub struct VethLog {
3434
pub state: u64,
3535
pub dev_addr: [u32;8],
3636
pub event_type: u8,
37-
}
37+
}

0 commit comments

Comments
 (0)