Skip to content

Commit 2de2db5

Browse files
author
LorenzoTettamanti
committed
Merge new core updates
2 parents 2ed0765 + 1c781e1 commit 2de2db5

File tree

9 files changed

+338
-236
lines changed

9 files changed

+338
-236
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,4 +165,7 @@ docker-compose.yaml
165165
admission-webhook-with-cert.yaml
166166
proxy-injector-with-cert.yaml
167167
kafka.rs
168-
node-debugger.yaml
168+
node-debugger.yaml
169+
notes.txt
170+
dev-notes.txt
171+
skbuff.rs

core/src/components/conntracker/src/main.rs

Lines changed: 140 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,40 @@
66
* Functionalities:
77
* 1. Creates a PacketLog structure to track incoming packets
88
* 2. Tracking Parameters: SRC_IP.SRC_PORT,DST_IP,DST_PORT,PROTOCOL,HASH
9-
* 3. Store HASH_ID in a BPF HASHMAP
9+
* 3. Compute the EVENT_ID and CONNECTION_ID using a byte XOR
10+
* 4. Store CONNECTION_ID in a BPF LRU HASHMAP and pass EVENT_ID to the user space
1011
*/
1112

1213
// Imports
1314
#![no_std]
1415
#![no_main]
16+
#![allow(warnings)]
1517

16-
use bytemuck::{Pod,Zeroable};
18+
//mod skbuff;
19+
20+
use bytemuck::{ Pod, Zeroable };
1721
use aya_ebpf::{
18-
bindings::TC_ACT_OK,
19-
macros::{ classifier, map },
22+
bindings::{ TC_ACT_OK, TC_ACT_SHOT },
23+
macros::{ classifier, map, kprobe, tracepoint },
2024
maps::PerfEventArray,
21-
programs::TcContext,
25+
maps::LruPerCpuHashMap,
26+
programs::{ TcContext, TracePointContext },
27+
helpers::{ bpf_probe_read_kernel, bpf_ktime_get_ns },
2228
};
29+
use aya_ebpf::EbpfContext;
30+
//use crate::skbuff::{ sock, sock_common };
2331
use aya_log_ebpf::info;
24-
use core::mem;
25-
26-
32+
use core::{ mem, ptr };
33+
//use crate::skbuff::proto;
34+
//use crate::skbuff::{ iphdr };
35+
//use crate::skbuff::sk_buff;
36+
use network_types::{
37+
eth::{ EthHdr, EtherType },
38+
ip::{ IpProto, Ipv4Hdr },
39+
tcp::TcpHdr,
40+
udp::UdpHdr,
41+
};
42+
use core::ptr::addr_of;
2743
/*
2844
* ETHERNET TYPE II FRAME:
2945
* Reference: https://it.wikipedia.org/wiki/Frame_Ethernet
@@ -41,18 +57,7 @@ use core::mem;
4157
* https://en.wikipedia.org/wiki/IPv4#Header
4258
*
4359
* Original reference:
44-
* https://datatracker.ietf.org/doc/html/rfc791
45-
*
46-
*
47-
* Ipv4 header datagram
4860
49-
0 1 2 3 TOT BYTES OFFSET (full length)
50-
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 32 bit
51-
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
52-
|Version| IHL |Type of Service| Total Length | 4 bytes 4
53-
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
54-
| Identification |Flags| Fragment Offset | 4 bytes 8
55-
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
5661
| Time to Live | Protocol | Header Checksum | 4 bytes 12
5762
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
5863
| Source Address | 4 bytes 16
@@ -86,22 +91,35 @@ use core::mem;
8691
pub struct PacketLog {
8792
pub proto: u8,
8893
pub src_ip: u32,
89-
pub src_port: u32,
94+
pub src_port: u16,
9095
pub dst_ip: u32,
91-
pub dst_port: u32,
92-
pub hash_id: u16,
96+
pub dst_port: u16,
97+
pub event_id: u16,
9398
}
9499

95100
#[repr(C)]
96-
#[derive(Clone, Copy, Pod, Zeroable)]
97-
pub struct ConnArray{
98-
pub hash_id: u16
101+
#[derive(Clone, Copy)]
102+
pub struct ConnArray {
103+
pub src_ip: u32,
104+
pub dst_ip: u32,
105+
pub src_port: u16,
106+
pub dst_port: u16,
107+
pub proto: u8,
99108
}
100109

101-
#[map]
110+
#[map(name = "EventsMap")]
102111
static mut EVENTS: PerfEventArray<PacketLog> = PerfEventArray::new(0);
103-
#[map(name = "ConnectionArray")]
104-
pub static mut CONNARRAY: PerfEventArray<ConnArray> = PerfEventArray::new(0);
112+
#[map(name = "ConnectionMap")]
113+
pub static mut ACTIVE_CONNECTIONS: LruPerCpuHashMap<
114+
u16,
115+
ConnArray
116+
> = LruPerCpuHashMap::with_max_entries(65536, 0);
117+
118+
#[map(name = "ConnectionTrackerMap")]
119+
pub static mut CONNTRACKER: LruPerCpuHashMap<ConnArray, u8> = LruPerCpuHashMap::with_max_entries(
120+
65536,
121+
0
122+
);
105123

106124
const IPV4_ETHERTYPE: u16 = 0x0800;
107125

@@ -116,22 +134,38 @@ const DST_MAC: usize = 6;
116134
const ETHERTYPE_BYTES: usize = 2;
117135

118136
//TCP UDP Stack
119-
const SRC_PORT: usize = 34;
120-
const DST_PORT: usize = 36;
137+
const SRC_PORT_OFFSET_FROM_IP_HEADER: usize = 0;
138+
const DST_PORT_OFFSET_FROM_IP_HEADER: usize = 2;
139+
140+
static ETH_STACK_BYTES: usize = SRC_MAC + DST_MAC + ETHERTYPE_BYTES;
141+
static DST_T0TAL_BYTES_OFFSET: usize = ETH_STACK_BYTES + DST_BYTE_OFFSET;
142+
static SRC_T0TAL_BYTES_OFFSET: usize = ETH_STACK_BYTES + SRC_BYTE_OFFSET;
143+
static PROTOCOL_T0TAL_BYTES_OFFSET: usize = ETH_STACK_BYTES + IPV4_PROTOCOL_OFFSET;
121144

122-
static ETH_STACK_BYTES :usize = SRC_MAC+DST_MAC+ETHERTYPE_BYTES;
123-
static DST_T0TAL_BYTES_OFFSET :usize = ETH_STACK_BYTES + DST_BYTE_OFFSET;
124-
static SRC_T0TAL_BYTES_OFFSET :usize = ETH_STACK_BYTES + SRC_BYTE_OFFSET;
125-
static PROTOCOL_T0TAL_BYTES_OFFSET :usize = ETH_STACK_BYTES + IPV4_PROTOCOL_OFFSET;
145+
const AF_INET: u16 = 2; //ipv4
146+
const AF_INET6: u16 = 10; //ipv6
126147

148+
const IPPROTO_UDP: u8 = 17;
149+
const IPPROTO_TCP: u8 = 6;
127150

151+
//TODO: add kprobe tracing for process ID
152+
//kprobe docs: https://docs.kernel.org/trace/kprobes.html
128153

154+
/* constants */
155+
const HOST_NETNS_INUM: u32 = 4026531993;
156+
const KUBE_POD_CIDR: u32 = 0x0af40000; // 10.244.0.0/16
157+
158+
/* Helper Functions */
159+
#[inline]
160+
unsafe fn is_kube_internal(ip: u32) -> bool {
161+
(ip & 0xffff0000) == KUBE_POD_CIDR
162+
}
129163

130164
#[classifier]
131165
pub fn identity_classifier(ctx: TcContext) -> i32 {
132166
match try_identity_classifier(ctx) {
133167
Ok(_) => TC_ACT_OK,
134-
Err(_) => TC_ACT_OK,
168+
Err(_) => TC_ACT_SHOT, //block packets that returns errors
135169
}
136170
}
137171

@@ -141,38 +175,81 @@ fn try_identity_classifier(ctx: TcContext) -> Result<(), i64> {
141175
//only ipv4 protcol allowed
142176
if eth_proto != IPV4_ETHERTYPE {
143177
return Ok(());
144-
} else {
145-
146-
//get the source ip,destination ip and connection id
147-
let src_ip = u32::from_be(ctx.load::<u32>(SRC_T0TAL_BYTES_OFFSET).map_err(|_| 1)?); // ETH+SOURCE_ADDRESS
148-
let src_port = u32::from_be(ctx.load::<u32>(SRC_PORT).map_err(|_| 1)?);
149-
let dst_ip = u32::from_be(ctx.load::<u32>(DST_T0TAL_BYTES_OFFSET).map_err(|_| 1)?); // ETH+ DESTINATION_ADDRESS
150-
let dst_port = u32::from_be(ctx.load::<u32>(DST_PORT).map_err(|_| 1)?);
151-
let proto = u8::from_be(ctx.load::<u8>(PROTOCOL_T0TAL_BYTES_OFFSET).map_err(|_| 1)?);
152-
153-
// XOR to generate the hash id for the given connection
154-
let hash_id = (src_ip ^ dst_ip ^ (src_port as u32) ^ (dst_port as u32) ^ (proto as u32)) as u16;
155-
156-
let log = PacketLog {
157-
proto,
158-
src_ip,
159-
src_port,
160-
dst_ip,
161-
dst_port,
162-
hash_id,
163-
};
164-
let connections = ConnArray{
165-
hash_id
166-
};
167-
unsafe {
168-
EVENTS.output(&ctx, &log, 0); //output to userspace
169-
CONNARRAY.output(&ctx,&connections, 0) //save hash_id to kernel space array
170-
};
171178
}
172179

180+
//read if the packets has Options
181+
let first_ipv4_byte = u8::from_be(ctx.load::<u8>(ETH_STACK_BYTES).map_err(|_| 1)?);
182+
let ihl = (first_ipv4_byte &
183+
0x0f) as usize; /* 0x0F=00001111 &=AND bit a bit operator to extract the last 4 bit*/
184+
let ip_header_len = ihl * 4; //returns the header lenght in bytes
185+
186+
//get the source ip,destination ip and connection id
187+
let src_ip = u32::from_be(ctx.load::<u32>(SRC_T0TAL_BYTES_OFFSET).map_err(|_| 1)?); // ETH+SOURCE_ADDRESS
188+
let src_port = u16::from_be(
189+
ctx
190+
.load::<u16>(ETH_STACK_BYTES + ip_header_len + SRC_PORT_OFFSET_FROM_IP_HEADER)
191+
.map_err(|_| 1)?
192+
); //14+IHL-Lenght+0
193+
let dst_ip = u32::from_be(ctx.load::<u32>(DST_T0TAL_BYTES_OFFSET).map_err(|_| 1)?); // ETH+ DESTINATION_ADDRESS
194+
let dst_port = u16::from_be(
195+
ctx
196+
.load::<u16>(ETH_STACK_BYTES + ip_header_len + DST_PORT_OFFSET_FROM_IP_HEADER)
197+
.map_err(|_| 1)?
198+
); //14+IHL-Lenght+0
199+
let proto = u8::from_be(ctx.load::<u8>(PROTOCOL_T0TAL_BYTES_OFFSET).map_err(|_| 1)?);
200+
201+
//not logging internal communication packets
202+
//TODO: do not log internal communications such as minikube dashboard packets or kubectl api packets
203+
let ip_to_block = u32::from_be_bytes([192, 168, 49, 1]); //inverted requence
204+
let dst_ip_to_block = u32::from_be_bytes([192, 168, 49, 2]);
205+
206+
let key = ConnArray {
207+
src_ip,
208+
dst_ip,
209+
src_port,
210+
dst_port,
211+
proto,
212+
};
213+
214+
// XOR to generate the hash id for the given connection
215+
let event_id = (src_ip ^
216+
dst_ip ^
217+
(src_port as u32) ^
218+
(dst_port as u32) ^
219+
(proto as u32)) as u16; //generate one for every event using a 'byte XOR' operation
220+
221+
//let connection_id = (src_ip ^ dst_ip ^(proto as u32)) as u16; //added host_id to track the host to count every all the different connections
222+
223+
//if
224+
// (unsafe { is_kube_internal(src_ip) }) ||
225+
//(unsafe { is_kube_internal(dst_ip) }) ||
226+
// src_ip == ip_to_block ||
227+
// src_ip == dst_ip_to_block
228+
//{
229+
// return Ok(());
230+
//} else {
231+
//log all other packets
232+
let log = PacketLog {
233+
proto,
234+
src_ip,
235+
src_port,
236+
dst_ip,
237+
dst_port,
238+
event_id,
239+
};
240+
//let connections = ConnArray{
241+
// event_id,
242+
//connection_id
243+
//};
244+
unsafe {
245+
EVENTS.output(&ctx, &log, 0); //output to userspace
246+
//TODO: add more parameters to better identify the active connection (maybe timestamp?)
247+
ACTIVE_CONNECTIONS.insert(&event_id, &key, 0);
248+
}
249+
//}
250+
173251
Ok(())
174252
}
175-
176253
#[panic_handler]
177254
fn panic(_info: &core::panic::PanicInfo) -> ! {
178255
loop {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* IpProtocols enum to reconstruct the packet protocol based on the
3+
* IPV4 Header Protocol code
4+
*/
5+
#[derive(Debug)]
6+
#[repr(u8)]
7+
pub enum IpProtocols {
8+
ICMP = 1,
9+
TCP = 6,
10+
UDP = 17,
11+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use aya::{ maps::{ perf::{ PerfEventArrayBuffer }, MapData } };
2+
use crate::structs::PacketLog;
3+
use bytes::BytesMut;
4+
use std::{ borrow::BorrowMut, net::Ipv4Addr, sync::{ atomic::{ AtomicBool, Ordering }, Arc } };
5+
use crate::enums::IpProtocols;
6+
use tracing::{ info, error, warn };
7+
8+
pub async fn display_events<T: BorrowMut<MapData>>(
9+
mut perf_buffers: Vec<PerfEventArrayBuffer<T>>,
10+
running: Arc<AtomicBool>,
11+
mut buffers: Vec<BytesMut>
12+
) {
13+
while running.load(Ordering::SeqCst) {
14+
for buf in perf_buffers.iter_mut() {
15+
match buf.read_events(&mut buffers) {
16+
Ok(events) => {
17+
for i in 0..events.read {
18+
let data = &buffers[i];
19+
if data.len() >= std::mem::size_of::<PacketLog>() {
20+
let pl: PacketLog = unsafe {
21+
std::ptr::read(data.as_ptr() as *const _)
22+
};
23+
let src = Ipv4Addr::from(u32::from_be(pl.src_ip));
24+
let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip));
25+
let src_port = u16::from_be(pl.src_port as u16);
26+
let dst_port = u16::from_be(pl.dst_port as u16);
27+
let event_id = pl.event_id;
28+
29+
match IpProtocols::try_from(pl.proto) {
30+
Ok(proto) => {
31+
info!(
32+
"Hash: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
33+
event_id,
34+
proto,
35+
src,
36+
src_port,
37+
dst,
38+
dst_port
39+
);
40+
}
41+
Err(_) =>
42+
info!("Hash: {} Protocol: Unknown ({})", event_id, pl.proto),
43+
};
44+
} else {
45+
warn!("Received packet data too small: {} bytes", data.len());
46+
}
47+
}
48+
}
49+
Err(e) => {
50+
error!("Error reading events: {:?}", e);
51+
}
52+
}
53+
}
54+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
55+
}
56+
}

0 commit comments

Comments
 (0)