Skip to content

Commit 326d2d6

Browse files
authored
feat(agent): add configurable socket syscall hook list (#11662) (#11702) (#11705)
* feat(agent): add configurable socket syscall hook list (#11662) * feat(agent): add configurable socket syscall hook list Add inputs.ebpf.socket.tunning.hooked_socket_syscalls so users can select which supported socket syscalls install eBPF hooks, while preserving the existing backend selection logic for mixed, pure-kprobe, and kfunc modes. * Fix hooked syscall config handling * Modify the format of HOOKED_SOCKET_SYSCALLS Conflicts: server/agent_config/README-CH.md server/agent_config/README.md * Add tests for syscall hook configuration
1 parent 223bbd9 commit 326d2d6

10 files changed

Lines changed: 561 additions & 54 deletions

File tree

agent/src/config/config.rs

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,35 @@ use enterprise_utils::l7::custom_policy::config::{CustomFieldPolicy, CustomProto
5757
pub const K8S_CA_CRT_PATH: &str = "/run/secrets/kubernetes.io/serviceaccount/ca.crt";
5858
const MINUTE: Duration = Duration::from_secs(60);
5959
const DEFAULT_STANDALONE_CONFIG: &str = "/etc/deepflow-agent-standalone.yaml";
60+
#[rustfmt::skip]
61+
const HOOKED_SOCKET_SYSCALLS: [&str; 10] = [
62+
"read",
63+
"readv",
64+
"recvfrom",
65+
"recvmsg",
66+
"recvmmsg",
67+
"sendmsg",
68+
"sendmmsg",
69+
"sendto",
70+
"write",
71+
"writev",
72+
];
73+
74+
fn normalize_hooked_socket_syscalls<T: AsRef<str>>(syscalls: &[T]) -> Vec<String> {
75+
HOOKED_SOCKET_SYSCALLS
76+
.iter()
77+
.filter(|supported| {
78+
syscalls
79+
.iter()
80+
.any(|syscall| syscall.as_ref() == **supported)
81+
})
82+
.map(|syscall| syscall.to_string())
83+
.collect()
84+
}
85+
86+
fn default_hooked_socket_syscalls() -> Vec<String> {
87+
normalize_hooked_socket_syscalls(&HOOKED_SOCKET_SYSCALLS)
88+
}
6089

6190
#[derive(Debug, Error)]
6291
pub enum ConfigError {
@@ -980,13 +1009,30 @@ pub struct EbpfSocketKprobe {
9801009
pub whitelist: EbpfSocketKprobePorts,
9811010
}
9821011

983-
#[derive(Clone, Default, Debug, Deserialize, PartialEq, Eq)]
1012+
#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
9841013
#[serde(default)]
9851014
pub struct EbpfSocketTunning {
9861015
pub max_capture_rate: u64,
9871016
pub syscall_trace_id_disabled: bool,
9881017
pub map_prealloc_disabled: bool,
9891018
pub fentry_enabled: bool,
1019+
#[serde(
1020+
default = "default_hooked_socket_syscalls",
1021+
deserialize_with = "deser_hooked_socket_syscalls"
1022+
)]
1023+
pub hooked_socket_syscalls: Vec<String>,
1024+
}
1025+
1026+
impl Default for EbpfSocketTunning {
1027+
fn default() -> Self {
1028+
Self {
1029+
max_capture_rate: 0,
1030+
syscall_trace_id_disabled: false,
1031+
map_prealloc_disabled: false,
1032+
fentry_enabled: false,
1033+
hooked_socket_syscalls: default_hooked_socket_syscalls(),
1034+
}
1035+
}
9901036
}
9911037

9921038
#[derive(Clone, Default, Debug, Deserialize, PartialEq, Eq)]
@@ -3612,6 +3658,22 @@ where
36123658
Ok(v)
36133659
}
36143660

3661+
fn deser_hooked_socket_syscalls<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
3662+
where
3663+
D: Deserializer<'de>,
3664+
{
3665+
let raw = Vec::<String>::deserialize(deserializer)?;
3666+
for syscall in &raw {
3667+
if !HOOKED_SOCKET_SYSCALLS.contains(&syscall.as_str()) {
3668+
return Err(de::Error::invalid_value(
3669+
Unexpected::Str(syscall),
3670+
&"one of read|readv|recvfrom|recvmsg|recvmmsg|sendmsg|sendmmsg|sendto|write|writev",
3671+
));
3672+
}
3673+
}
3674+
Ok(normalize_hooked_socket_syscalls(&raw))
3675+
}
3676+
36153677
#[cfg(test)]
36163678
mod tests {
36173679
use super::*;
@@ -3730,4 +3792,39 @@ processors:
37303792
assert_eq!(apps[1].protocol, L7Protocol::Grpc);
37313793
assert_eq!(apps[1].timeout, Duration::from_secs(130));
37323794
}
3795+
3796+
#[test]
3797+
fn parse_hooked_socket_syscalls_normalizes_order_and_duplicates() {
3798+
let yaml = r#"
3799+
inputs:
3800+
ebpf:
3801+
socket:
3802+
tunning:
3803+
hooked_socket_syscalls: [write, read, write, sendto]
3804+
"#;
3805+
let cfg: UserConfig = serde_yaml::from_str(yaml).unwrap();
3806+
3807+
assert_eq!(
3808+
cfg.inputs.ebpf.socket.tunning.hooked_socket_syscalls,
3809+
vec!["read", "sendto", "write"]
3810+
);
3811+
}
3812+
3813+
#[test]
3814+
fn parse_hooked_socket_syscalls_rejects_invalid_values() {
3815+
let yaml = r#"
3816+
inputs:
3817+
ebpf:
3818+
socket:
3819+
tunning:
3820+
hooked_socket_syscalls: [read, invalid_syscall]
3821+
"#;
3822+
let result: Result<UserConfig, _> = serde_yaml::from_str(yaml);
3823+
3824+
assert!(result.is_err());
3825+
let error = result.unwrap_err().to_string();
3826+
assert!(error.contains("invalid value: string \"invalid_syscall\""));
3827+
assert!(error
3828+
.contains("read|readv|recvfrom|recvmsg|recvmmsg|sendmsg|sendmmsg|sendto|write|writev"));
3829+
}
37333830
}

agent/src/config/handler.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3610,6 +3610,11 @@ impl ConfigHandler {
36103610
new_tunning.fentry_enabled,
36113611
"inputs.ebpf.socket.tunning.fentry_enabled"
36123612
),
3613+
(
3614+
tunning.hooked_socket_syscalls,
3615+
new_tunning.hooked_socket_syscalls,
3616+
"inputs.ebpf.socket.tunning.hooked_socket_syscalls"
3617+
),
36133618
(
36143619
tunning.map_prealloc_disabled,
36153620
new_tunning.map_prealloc_disabled,

agent/src/ebpf/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,7 @@ extern "C" {
806806
pub fn enable_unix_socket_feature();
807807
pub fn disable_fentry();
808808
pub fn enable_fentry();
809+
pub fn set_hooked_socket_syscalls(bitmap: c_ulonglong);
809810
pub fn set_virtual_file_collect(enabled: bool) -> c_int;
810811
cfg_if::cfg_if! {
811812
if #[cfg(feature = "extended_observability")] {

agent/src/ebpf/samples/rust/socket-tracer/src/main.rs

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use chrono::prelude::DateTime;
1818
use chrono::FixedOffset;
1919
use chrono::Utc;
20+
use log::info;
2021
use socket_tracer::ebpf::*;
2122
use std::convert::TryInto;
2223
use std::env;
@@ -26,7 +27,6 @@ use std::net::IpAddr;
2627
use std::sync::Mutex;
2728
use std::thread;
2829
use std::time::{Duration, UNIX_EPOCH};
29-
use log::info;
3030

3131
extern "C" {
3232
fn print_uprobe_http2_info(data: *mut c_char, len: c_uint);
@@ -42,6 +42,88 @@ lazy_static::lazy_static! {
4242
static ref COUNTER: Mutex<u32> = Mutex::new(0);
4343
}
4444

45+
#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
46+
struct HookedSocketSyscallBitmap(c_ulonglong);
47+
48+
impl HookedSocketSyscallBitmap {
49+
fn set_enabled(&mut self, bit: c_ulonglong) {
50+
self.0 |= bit;
51+
}
52+
}
53+
54+
impl<T: AsRef<str>> From<&[T]> for HookedSocketSyscallBitmap {
55+
fn from(vs: &[T]) -> Self {
56+
let mut bitmap = HookedSocketSyscallBitmap(0);
57+
for v in vs.iter() {
58+
match v.as_ref() {
59+
"read" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_READ),
60+
"readv" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_READV),
61+
"recvfrom" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_RECVFROM),
62+
"recvmsg" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_RECVMSG),
63+
"recvmmsg" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_RECVMMSG),
64+
"sendmsg" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_SENDMSG),
65+
"sendmmsg" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_SENDMMSG),
66+
"sendto" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_SENDTO),
67+
"write" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_WRITE),
68+
"writev" => bitmap.set_enabled(HOOKED_SOCKET_SYSCALL_WRITEV),
69+
_ => {}
70+
}
71+
}
72+
bitmap
73+
}
74+
}
75+
76+
const HOOKED_SOCKET_SYSCALL_READ: c_ulonglong = 1 << 0;
77+
const HOOKED_SOCKET_SYSCALL_READV: c_ulonglong = 1 << 1;
78+
const HOOKED_SOCKET_SYSCALL_RECVFROM: c_ulonglong = 1 << 2;
79+
const HOOKED_SOCKET_SYSCALL_RECVMSG: c_ulonglong = 1 << 3;
80+
const HOOKED_SOCKET_SYSCALL_RECVMMSG: c_ulonglong = 1 << 4;
81+
const HOOKED_SOCKET_SYSCALL_SENDMSG: c_ulonglong = 1 << 5;
82+
const HOOKED_SOCKET_SYSCALL_SENDMMSG: c_ulonglong = 1 << 6;
83+
const HOOKED_SOCKET_SYSCALL_SENDTO: c_ulonglong = 1 << 7;
84+
const HOOKED_SOCKET_SYSCALL_WRITE: c_ulonglong = 1 << 8;
85+
const HOOKED_SOCKET_SYSCALL_WRITEV: c_ulonglong = 1 << 9;
86+
87+
#[cfg(test)]
88+
mod tests {
89+
use super::*;
90+
91+
#[test]
92+
fn hooked_socket_syscall_bitmap_sets_expected_bits() {
93+
let syscalls = ["read", "recvfrom", "sendto", "write"];
94+
let bitmap = HookedSocketSyscallBitmap::from(syscalls.as_slice());
95+
96+
assert_eq!(
97+
bitmap,
98+
HookedSocketSyscallBitmap(
99+
HOOKED_SOCKET_SYSCALL_READ
100+
| HOOKED_SOCKET_SYSCALL_RECVFROM
101+
| HOOKED_SOCKET_SYSCALL_SENDTO
102+
| HOOKED_SOCKET_SYSCALL_WRITE,
103+
),
104+
);
105+
}
106+
107+
#[test]
108+
fn set_hooked_socket_syscalls_accepts_bitmap_from_syscall_names() {
109+
let syscalls = ["write", "read", "write", "sendto"];
110+
let bitmap = HookedSocketSyscallBitmap::from(syscalls.as_slice());
111+
112+
assert_eq!(
113+
bitmap,
114+
HookedSocketSyscallBitmap(
115+
HOOKED_SOCKET_SYSCALL_READ
116+
| HOOKED_SOCKET_SYSCALL_SENDTO
117+
| HOOKED_SOCKET_SYSCALL_WRITE,
118+
),
119+
);
120+
121+
unsafe {
122+
set_hooked_socket_syscalls(bitmap.0);
123+
}
124+
}
125+
}
126+
45127
#[allow(dead_code)]
46128
fn increment_counter(num: u32, counter_type: u32) {
47129
if counter_type == 0 {
@@ -201,7 +283,11 @@ extern "C" fn debug_callback(_data: *mut c_char, len: c_int) {
201283
}
202284
}
203285

204-
extern "C" fn socket_trace_callback(_: *mut c_void, queue_id: c_int, sd: *mut SK_BPF_DATA) -> c_int {
286+
extern "C" fn socket_trace_callback(
287+
_: *mut c_void,
288+
queue_id: c_int,
289+
sd: *mut SK_BPF_DATA,
290+
) -> c_int {
205291
unsafe {
206292
let mut proto_tag = String::from("");
207293
if sk_proto_safe(sd) == SOCK_DATA_OTHER {
@@ -411,7 +497,10 @@ fn main() {
411497
if e.kind() == std::io::ErrorKind::NotFound {
412498
println!("numad process not found, skipping CPU affinity protection (normal)");
413499
} else {
414-
println!("Failed to protect CPU affinity due to unexpected error: {}", e);
500+
println!(
501+
"Failed to protect CPU affinity due to unexpected error: {}",
502+
e
503+
);
415504
}
416505
}
417506
}
@@ -640,10 +729,20 @@ fn main() {
640729
.as_c_str()
641730
.as_ptr(),
642731
);
643-
// dpdk enable
644-
// set_dpdk_trace_enabled(true);
645-
// disable_kprobe_feature();
646-
// set_virtual_file_collect(true);
732+
//let hooked_socket_syscalls = [
733+
// "read", "readv", "recvfrom", "recvmsg", "recvmmsg", "sendmsg", "sendmmsg", "sendto",
734+
// "write", "writev",
735+
//];
736+
let hooked_socket_syscalls = [
737+
"readv", "recvfrom", "recvmsg", "recvmmsg", "sendmsg", "sendmmsg", "sendto", "writev",
738+
];
739+
set_hooked_socket_syscalls(
740+
HookedSocketSyscallBitmap::from(hooked_socket_syscalls.as_slice()).0,
741+
);
742+
// dpdk enable
743+
// set_dpdk_trace_enabled(true);
744+
// disable_kprobe_feature();
745+
// set_virtual_file_collect(true);
647746
if running_socket_tracer(
648747
socket_trace_callback, /* Callback interface rust -> C */
649748
1, /* Number of worker threads, indicating how many user-space threads participate in data processing */

0 commit comments

Comments
 (0)