Skip to content

Commit 1c781e1

Browse files
author
LorenzoTettamanti
committed
[#93]: identity code refactoring. improved file organization
1 parent 32b6aae commit 1c781e1

File tree

6 files changed

+304
-236
lines changed

6 files changed

+304
-236
lines changed

core/src/components/conntracker/src/main.rs

Lines changed: 119 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,33 @@
1313
// Imports
1414
#![no_std]
1515
#![no_main]
16+
#![allow(warnings)]
1617

17-
use bytemuck::{Pod,Zeroable};
18+
//mod skbuff;
19+
20+
use bytemuck::{ Pod, Zeroable };
1821
use aya_ebpf::{
19-
bindings::{TC_ACT_OK,TC_ACT_SHOT},
20-
macros::{ classifier, map },
22+
bindings::{ TC_ACT_OK, TC_ACT_SHOT },
23+
macros::{ classifier, map, kprobe, tracepoint },
2124
maps::PerfEventArray,
2225
maps::LruPerCpuHashMap,
23-
programs::TcContext,
26+
programs::{ TcContext, TracePointContext },
27+
helpers::{ bpf_probe_read_kernel, bpf_ktime_get_ns },
2428
};
29+
use aya_ebpf::EbpfContext;
30+
//use crate::skbuff::{ sock, sock_common };
2531
use aya_log_ebpf::info;
26-
use core::mem;
27-
28-
32+
use core::{ mem, ptr };
33+
//use crate::skbuff::proto;
34+
//use crate::skbuff::{ iphdr };
35+
//use crate::skbuff::sk_buff;
36+
use network_types::{
37+
eth::{ EthHdr, EtherType },
38+
ip::{ IpProto, Ipv4Hdr },
39+
tcp::TcpHdr,
40+
udp::UdpHdr,
41+
};
42+
use core::ptr::addr_of;
2943
/*
3044
* ETHERNET TYPE II FRAME:
3145
* Reference: https://it.wikipedia.org/wiki/Frame_Ethernet
@@ -43,18 +57,7 @@ use core::mem;
4357
* https://en.wikipedia.org/wiki/IPv4#Header
4458
*
4559
* Original reference:
46-
* https://datatracker.ietf.org/doc/html/rfc791
47-
*
48-
*
49-
* Ipv4 header datagram
5060
51-
0 1 2 3 TOT BYTES OFFSET (full length)
52-
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 32 bit
53-
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
54-
|Version| IHL |Type of Service| Total Length | 4 bytes 4
55-
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
56-
| Identification |Flags| Fragment Offset | 4 bytes 8
57-
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
5861
| Time to Live | Protocol | Header Checksum | 4 bytes 12
5962
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
6063
| Source Address | 4 bytes 16
@@ -95,16 +98,28 @@ pub struct PacketLog {
9598
}
9699

97100
#[repr(C)]
98-
#[derive(Clone, Copy, Pod, Zeroable)]
99-
pub struct ConnArray{
100-
pub event_id: u16,
101-
pub connection_id: u16
101+
#[derive(Clone, Copy)]
102+
pub struct ConnArray {
103+
pub src_ip: u32,
104+
pub dst_ip: u32,
105+
pub src_port: u16,
106+
pub dst_port: u16,
107+
pub proto: u8,
102108
}
103109

104-
#[map(name="EventsMap")]
110+
#[map(name = "EventsMap")]
105111
static mut EVENTS: PerfEventArray<PacketLog> = PerfEventArray::new(0);
106112
#[map(name = "ConnectionMap")]
107-
pub static mut CONNMAP: LruPerCpuHashMap<u8,ConnArray> = LruPerCpuHashMap::with_max_entries(1024,0); //TODO: modify this to a LRU HASHMAP
113+
pub static mut ACTIVE_CONNECTIONS: LruPerCpuHashMap<
114+
u16,
115+
ConnArray
116+
> = LruPerCpuHashMap::with_max_entries(65536, 0);
117+
118+
#[map(name = "ConnectionTrackerMap")]
119+
pub static mut CONNTRACKER: LruPerCpuHashMap<ConnArray, u8> = LruPerCpuHashMap::with_max_entries(
120+
65536,
121+
0
122+
);
108123

109124
const IPV4_ETHERTYPE: u16 = 0x0800;
110125

@@ -122,20 +137,35 @@ const ETHERTYPE_BYTES: usize = 2;
122137
const SRC_PORT_OFFSET_FROM_IP_HEADER: usize = 0;
123138
const DST_PORT_OFFSET_FROM_IP_HEADER: usize = 2;
124139

140+
static ETH_STACK_BYTES: usize = SRC_MAC + DST_MAC + ETHERTYPE_BYTES;
141+
static DST_T0TAL_BYTES_OFFSET: usize = ETH_STACK_BYTES + DST_BYTE_OFFSET;
142+
static SRC_T0TAL_BYTES_OFFSET: usize = ETH_STACK_BYTES + SRC_BYTE_OFFSET;
143+
static PROTOCOL_T0TAL_BYTES_OFFSET: usize = ETH_STACK_BYTES + IPV4_PROTOCOL_OFFSET;
144+
145+
const AF_INET: u16 = 2; //ipv4
146+
const AF_INET6: u16 = 10; //ipv6
125147

126-
static ETH_STACK_BYTES :usize = SRC_MAC+DST_MAC+ETHERTYPE_BYTES;
127-
static DST_T0TAL_BYTES_OFFSET :usize = ETH_STACK_BYTES + DST_BYTE_OFFSET;
128-
static SRC_T0TAL_BYTES_OFFSET :usize = ETH_STACK_BYTES + SRC_BYTE_OFFSET;
129-
static PROTOCOL_T0TAL_BYTES_OFFSET :usize = ETH_STACK_BYTES + IPV4_PROTOCOL_OFFSET;
148+
const IPPROTO_UDP: u8 = 17;
149+
const IPPROTO_TCP: u8 = 6;
130150

151+
//TODO: add kprobe tracing for process ID
152+
//kprobe docs: https://docs.kernel.org/trace/kprobes.html
131153

154+
/* constants */
155+
const HOST_NETNS_INUM: u32 = 4026531993;
156+
const KUBE_POD_CIDR: u32 = 0x0af40000; // 10.244.0.0/16
132157

158+
/* Helper Functions */
159+
#[inline]
160+
unsafe fn is_kube_internal(ip: u32) -> bool {
161+
(ip & 0xffff0000) == KUBE_POD_CIDR
162+
}
133163

134164
#[classifier]
135165
pub fn identity_classifier(ctx: TcContext) -> i32 {
136166
match try_identity_classifier(ctx) {
137167
Ok(_) => TC_ACT_OK,
138-
Err(_) => TC_ACT_SHOT,//block packets that returns errors
168+
Err(_) => TC_ACT_SHOT, //block packets that returns errors
139169
}
140170
}
141171

@@ -145,59 +175,81 @@ fn try_identity_classifier(ctx: TcContext) -> Result<(), i64> {
145175
//only ipv4 protcol allowed
146176
if eth_proto != IPV4_ETHERTYPE {
147177
return Ok(());
148-
}
149-
178+
}
179+
150180
//read if the packets has Options
151181
let first_ipv4_byte = u8::from_be(ctx.load::<u8>(ETH_STACK_BYTES).map_err(|_| 1)?);
152-
let ihl = (first_ipv4_byte & 0x0F) as usize; /* 0x0F=00001111 &=AND bit a bit operator to extract the last 4 bit*/
153-
let ip_header_len= ihl*4; //returns the header lenght in bytes
182+
let ihl = (first_ipv4_byte &
183+
0x0f) as usize; /* 0x0F=00001111 &=AND bit a bit operator to extract the last 4 bit*/
184+
let ip_header_len = ihl * 4; //returns the header lenght in bytes
154185

155186
//get the source ip,destination ip and connection id
156187
let src_ip = u32::from_be(ctx.load::<u32>(SRC_T0TAL_BYTES_OFFSET).map_err(|_| 1)?); // ETH+SOURCE_ADDRESS
157-
let src_port = u16::from_be(ctx.load::<u16>(ETH_STACK_BYTES+ip_header_len+SRC_PORT_OFFSET_FROM_IP_HEADER).map_err(|_| 1)?); //14+IHL-Lenght+0
188+
let src_port = u16::from_be(
189+
ctx
190+
.load::<u16>(ETH_STACK_BYTES + ip_header_len + SRC_PORT_OFFSET_FROM_IP_HEADER)
191+
.map_err(|_| 1)?
192+
); //14+IHL-Lenght+0
158193
let dst_ip = u32::from_be(ctx.load::<u32>(DST_T0TAL_BYTES_OFFSET).map_err(|_| 1)?); // ETH+ DESTINATION_ADDRESS
159-
let dst_port = u16::from_be(ctx.load::<u16>(ETH_STACK_BYTES+ip_header_len+DST_PORT_OFFSET_FROM_IP_HEADER).map_err(|_| 1)?); //14+IHL-Lenght+0
194+
let dst_port = u16::from_be(
195+
ctx
196+
.load::<u16>(ETH_STACK_BYTES + ip_header_len + DST_PORT_OFFSET_FROM_IP_HEADER)
197+
.map_err(|_| 1)?
198+
); //14+IHL-Lenght+0
160199
let proto = u8::from_be(ctx.load::<u8>(PROTOCOL_T0TAL_BYTES_OFFSET).map_err(|_| 1)?);
161-
162200

163201
//not logging internal communication packets
164202
//TODO: do not log internal communications such as minikube dashboard packets or kubectl api packets
165-
let ip_to_block = u32::from_be_bytes([192,168,49,1]); //inverted requence
166-
let dst_ip_to_block = u32::from_be_bytes([192,168,49,2]);
167-
168-
169-
// XOR to generate the hash id for the given connection
170-
let event_id = (src_ip ^ dst_ip ^ (src_port as u32) ^ (dst_port as u32) ^ (proto as u32)) as u16; //generate one for every event using a 'byte XOR' operation
203+
let ip_to_block = u32::from_be_bytes([192, 168, 49, 1]); //inverted requence
204+
let dst_ip_to_block = u32::from_be_bytes([192, 168, 49, 2]);
171205

172-
let connection_id = (src_ip ^ dst_ip ^(proto as u32)) as u16; //added host_id to track the host to count every all the different connections
206+
let key = ConnArray {
207+
src_ip,
208+
dst_ip,
209+
src_port,
210+
dst_port,
211+
proto,
212+
};
173213

174-
if src_ip == ip_to_block && dst_ip == dst_ip_to_block {
175-
return Ok(());
176-
}
177-
else{
178-
//log all other packets
179-
let log = PacketLog {
180-
proto,
181-
src_ip,
182-
src_port,
183-
dst_ip,
184-
dst_port,
185-
event_id,
186-
};
187-
let connections = ConnArray{
188-
event_id,
189-
connection_id
190-
};
191-
unsafe {
192-
EVENTS.output(&ctx, &log, 0); //output to userspace
193-
//TODO: add more parameters to better identify the active connection (maybe timestamp?)
194-
CONNMAP.insert(&proto,&connections, 0) //save hash_id to kernel space lru per cpu hashmap
195-
};
214+
// XOR to generate the hash id for the given connection
215+
let event_id = (src_ip ^
216+
dst_ip ^
217+
(src_port as u32) ^
218+
(dst_port as u32) ^
219+
(proto as u32)) as u16; //generate one for every event using a 'byte XOR' operation
220+
221+
//let connection_id = (src_ip ^ dst_ip ^(proto as u32)) as u16; //added host_id to track the host to count every all the different connections
222+
223+
//if
224+
// (unsafe { is_kube_internal(src_ip) }) ||
225+
//(unsafe { is_kube_internal(dst_ip) }) ||
226+
// src_ip == ip_to_block ||
227+
// src_ip == dst_ip_to_block
228+
//{
229+
// return Ok(());
230+
//} else {
231+
//log all other packets
232+
let log = PacketLog {
233+
proto,
234+
src_ip,
235+
src_port,
236+
dst_ip,
237+
dst_port,
238+
event_id,
239+
};
240+
//let connections = ConnArray{
241+
// event_id,
242+
//connection_id
243+
//};
244+
unsafe {
245+
EVENTS.output(&ctx, &log, 0); //output to userspace
246+
//TODO: add more parameters to better identify the active connection (maybe timestamp?)
247+
ACTIVE_CONNECTIONS.insert(&event_id, &key, 0);
196248
}
197-
249+
//}
250+
198251
Ok(())
199252
}
200-
201253
#[panic_handler]
202254
fn panic(_info: &core::panic::PanicInfo) -> ! {
203255
loop {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* IpProtocols enum to reconstruct the packet protocol based on the
3+
* IPV4 Header Protocol code
4+
*/
5+
#[derive(Debug)]
6+
#[repr(u8)]
7+
pub enum IpProtocols {
8+
ICMP = 1,
9+
TCP = 6,
10+
UDP = 17,
11+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use aya::{ maps::{ perf::{ PerfEventArrayBuffer }, MapData } };
2+
use crate::structs::PacketLog;
3+
use bytes::BytesMut;
4+
use std::{ borrow::BorrowMut, net::Ipv4Addr, sync::{ atomic::{ AtomicBool, Ordering }, Arc } };
5+
use crate::enums::IpProtocols;
6+
use tracing::{ info, error, warn };
7+
8+
pub async fn display_events<T: BorrowMut<MapData>>(
9+
mut perf_buffers: Vec<PerfEventArrayBuffer<T>>,
10+
running: Arc<AtomicBool>,
11+
mut buffers: Vec<BytesMut>
12+
) {
13+
while running.load(Ordering::SeqCst) {
14+
for buf in perf_buffers.iter_mut() {
15+
match buf.read_events(&mut buffers) {
16+
Ok(events) => {
17+
for i in 0..events.read {
18+
let data = &buffers[i];
19+
if data.len() >= std::mem::size_of::<PacketLog>() {
20+
let pl: PacketLog = unsafe {
21+
std::ptr::read(data.as_ptr() as *const _)
22+
};
23+
let src = Ipv4Addr::from(u32::from_be(pl.src_ip));
24+
let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip));
25+
let src_port = u16::from_be(pl.src_port as u16);
26+
let dst_port = u16::from_be(pl.dst_port as u16);
27+
let event_id = pl.event_id;
28+
29+
match IpProtocols::try_from(pl.proto) {
30+
Ok(proto) => {
31+
info!(
32+
"Hash: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
33+
event_id,
34+
proto,
35+
src,
36+
src_port,
37+
dst,
38+
dst_port
39+
);
40+
}
41+
Err(_) =>
42+
info!("Hash: {} Protocol: Unknown ({})", event_id, pl.proto),
43+
};
44+
} else {
45+
warn!("Received packet data too small: {} bytes", data.len());
46+
}
47+
}
48+
}
49+
Err(e) => {
50+
error!("Error reading events: {:?}", e);
51+
}
52+
}
53+
}
54+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
55+
}
56+
}

0 commit comments

Comments
 (0)