Skip to content

Commit 2e82ec9

Browse files
committed
add bfp filter to pushack sockets
1 parent 2881208 commit 2e82ec9

2 files changed

Lines changed: 48 additions & 9 deletions

File tree

forwarder/src/poll/pushack.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use crate::{
88
};
99
use parking_lot::RwLock;
1010
use smoltcp::wire::{IPV4_HEADER_LEN, TCP_HEADER_LEN};
11-
use std::{net::SocketAddr, sync::Arc};
11+
use std::{
12+
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
13+
sync::Arc,
14+
};
1215

1316
#[derive(Debug)]
1417
pub struct PushackPoll {
@@ -25,11 +28,21 @@ impl Poll for PushackPoll {
2528
peers: Arc<RwLock<PeerManager>>,
2629
on_peer_recv: Box<dyn Fn(&Peer, &mut [u8])>,
2730
) -> anyhow::Result<()> {
28-
let socket = PushackSocket::bind(&"0.0.0.0:0".parse().unwrap())?;
31+
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0);
32+
let socket = PushackSocket::inner_bind(addr.into())?;
2933
let buffer = create_socket_buffer!(MAX_PACKET_SIZE);
3034

35+
#[cfg(target_os = "linux")]
36+
{
37+
use crate::socket::pushack::{create_bfp_filter, TcpBpfFilter};
38+
let filter = create_bfp_filter(TcpBpfFilter::SrcPort(self.remote_addr.port()));
39+
if let Err(error) = socket.attach_filter(&filter) {
40+
log::warn!("couldn't attach bpf filter: {error:?}");
41+
}
42+
}
43+
3144
loop {
32-
let Ok(size) = socket.inner_socket().recv(cast_maybe_uninit(buffer)) else {
45+
let Ok(size) = socket.recv(cast_maybe_uninit(buffer)) else {
3346
continue;
3447
};
3548
let Some(tcp_packet) = parse_pushack_packet(&buffer[..size]) else {

forwarder/src/socket/pushack.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,15 @@ impl PushackSocket {
2323
pub fn bind(addr: &SocketAddr) -> io::Result<Self> {
2424
let _udp_socket = UdpSocket::bind(addr)?;
2525
let addr = _udp_socket.local_addr()?;
26-
2726
let socket = Self::inner_bind(addr)?;
27+
28+
#[cfg(target_os = "linux")]
29+
if let Err(error) =
30+
socket.attach_filter(&create_bfp_filter(TcpBpfFilter::DstPort(addr.port())))
31+
{
32+
log::warn!("couldn't attach bpf filter: {error:?}");
33+
}
34+
2835
Ok(Self {
2936
_udp_socket,
3037
addr,
@@ -33,11 +40,7 @@ impl PushackSocket {
3340
}
3441

3542
pub fn inner_bind(addr: SocketAddr) -> io::Result<socket2::Socket> {
36-
let socket = if addr.is_ipv4() {
37-
socket2::Socket::new(Domain::IPV4, Type::RAW, Some(Protocol::TCP))
38-
} else {
39-
socket2::Socket::new(Domain::IPV6, Type::RAW, Some(Protocol::TCP))
40-
}?;
43+
let socket = socket2::Socket::new(Domain::IPV4, Type::RAW, Some(Protocol::TCP))?;
4144
socket.bind(&addr.into())?;
4245
Ok(socket)
4346
}
@@ -166,3 +169,26 @@ pub fn parse_pushack_packet(packet: &[u8]) -> Option<TcpPacketPort> {
166169
};
167170
Some(ports)
168171
}
172+
173+
#[cfg(target_os = "linux")]
174+
#[derive(Debug)]
175+
pub enum TcpBpfFilter {
176+
SrcPort(u16),
177+
DstPort(u16),
178+
}
179+
180+
#[cfg(target_os = "linux")]
181+
pub fn create_bfp_filter(filter: TcpBpfFilter) -> [libc::sock_filter; 4] {
182+
let (filter_offset, value) = match filter {
183+
TcpBpfFilter::SrcPort(port) => (0, port),
184+
TcpBpfFilter::DstPort(port) => (2, port),
185+
};
186+
let total_offset = IPV4_HEADER_LEN + filter_offset;
187+
[
188+
(0x28, 0, 0, total_offset as u32), // ldh [offset]
189+
(0x15, 0, 1, value as u32), // jne val, drop
190+
(0x06, 0, 0, 0xffffffff), // ret #-1
191+
(0x06, 0, 0, 0000000000), // drop: ret #0
192+
]
193+
.map(|(code, jt, jf, k)| libc::sock_filter { code, jt, jf, k })
194+
}

0 commit comments

Comments
 (0)