Skip to content

Commit 1b52af5

Browse files
feat: support agent governance (#11446)
* feat: add AiAgentConfig for AI agent governance * feat: add BIZ_TYPE_AI_AGENT constant * feat: add ai_agent stub in enterprise-utils * feat: add AI agent URL detection hook in the HTTP parser * feat: add biz_type to ProcessData and ProcessInfo * feat: exempt AI agent traffic from flow reassembly limits * feat: add access_permission and AI agent hook points for eBPF * feat: integrate AiAgentRegistry into the agent lifecycle * fix: add missing L7ProtocolInfoInterface import and remove the unused variable * fix: add a null statement after skip_latency_filter * feat: complete PRD 2.2 updates for AI agent governance * fix: resolve blockers in the AI agent governance event pipeline * feat: add AI agent reassembly and process biz_type handling * feat: add ai_agent config and bump the database version * feat: extend endpoints for AI agent identification * fix: align AI agent tracepoint hooks * feat: add chmod, chown, and unlink tracepoints for AI agents * fix: clean up AI agents with full process scanning * fix: correct AI agent pid_tgid usage in socket_trace * fix: reduce AI agent stack usage in data submission * fix: sync biz_type for gprocess in multi-controller mode * fix: add missing is_ai_agent handling in socket_trace * fix: avoid BPF stack usage for the ai_agent flag * fix: address AI agent governance review feedback * fix: resolve the proc scan hook warning and HTTP endpoint borrow issue * feat: auto-sync AI agent gprocess_info * feat: mark AI agents with biz_type in gprocess * feat: support gprocess.biz_type tag queries * feat: log AI agent PIDs during gprocess sync * feat: inherit child process lifecycle for AI agents * feat: expose process event start time for AI agents * fix: correct gprocess fallback and captured bytes for proc lifecycle * fix: guard AI reassembly bytes on invalid socket info * fix: refresh proc.gprocess_info on process change * fix: enable reassembly after protocol inference * fix: enable reassembly for existing sockets after protocol inference * fix: inherit gprocess_id for AI agent descendants * fix: preserve gprocess inheritance on proc exec and add tests * fix: include AI agent PIDs in socket list sync * fix: propagate reasm_bytes during eBPF merge * fix: remove the record_endpoint_hit stub from AiAgentRegistry * feat: add ai_agent_root_pid for child gprocess_id resolution * fix: normalize file_op output to match IoEvent * fix: bypass collect_mode filtering for child AI agent file I/O events * chore: add deepflow-ctl support for ai-agent process show * fix: keep AI endpoint extraction independent of http_endpoint_disabled * feat: split AI agent event tables * fix: register metrics for AI agent event tables * fix: add localized tag descriptions for AI agent event tables * fix: compact management events and disable 1-second aggregation * fix: reduce AI agent governance sync log noise * fix: scope event reducers by agent and gpid * fix: guard AI agent proc scan hook for non-Linux builds * fix: refine file agg event windowing * fix: clarify AI agent event semantics * fix: route biz_type queries through gprocess tags * fix: avoid duplicate AI agent exec and exit probes * fix: correct captured byte accounting for ebpf reassembly * fix: address latest pr11446 review feedback * fix: restrict file agg events to ai agents * feat: link AI event data source retention updates * fix: add raw biz_type fallback for enum filters * fix: preserve ai agent endpoint and retention sync * fix: harden ai agent ebpf reassembly and limits * docs: clarify gprocess id sync timing * test: harden ai agent review regressions * fix: harden ai agent event reducers and cache * fix: clean up ai agent review follow-ups * fix: harden ai agent ebpf and proc scan follow-ups * fix: tighten ai agent endpoint and pid fetch paths * fix: avoid endpoint borrow conflicts during ai-agent matching * fix: align gprocess biz_type query translators * refactor: avoid extra endpoint copies in http parser * fix(cli): align Go version with server requirement * fix(cli): tidy module metadata for Go 1.26
1 parent f0b70b4 commit 1b52af5

140 files changed

Lines changed: 5720 additions & 231 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/cli-build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232
- name: Set up Go
3333
uses: actions/setup-go@master
3434
with:
35-
go-version: 1.24.x
35+
go-version: 1.26.x
3636

3737
- name: Set up GOPATH env
3838
run: echo "GOPATH=$(go env GOPATH)" >> "$GITHUB_ENV"

.github/workflows/cli-verify.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
- name: Set up Go
2020
uses: actions/setup-go@master
2121
with:
22-
go-version: 1.24.x
22+
go-version: 1.26.x
2323

2424
- name: Set up GOPATH env
2525
run: echo "GOPATH=$(go env GOPATH)" >> "$GITHUB_ENV"

agent/crates/enterprise-utils/src/lib.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,3 +509,89 @@ pub mod rpc {
509509
}
510510
}
511511
}
512+
513+
pub mod ai_agent {
514+
use std::sync::Arc;
515+
use std::time::Duration;
516+
517+
#[derive(Debug, Clone, Default)]
518+
pub struct AgentMeta {
519+
pub first_seen: Duration,
520+
pub last_seen: Duration,
521+
pub matched_endpoint: String,
522+
pub root_pid: u32,
523+
}
524+
525+
#[derive(Debug, Clone, Default)]
526+
pub struct AiAgentRegistry;
527+
528+
impl AiAgentRegistry {
529+
pub fn new() -> Self {
530+
AiAgentRegistry
531+
}
532+
533+
pub fn register(&self, _pid: u32, _endpoint: &str, _now: Duration) -> bool {
534+
false
535+
}
536+
537+
pub fn is_ai_agent(&self, _pid: u32) -> bool {
538+
false
539+
}
540+
541+
pub fn get_root_pid(&self, _pid: u32) -> u32 {
542+
0
543+
}
544+
545+
pub fn register_child(&self, _parent_pid: u32, _child_pid: u32, _now: Duration) -> bool {
546+
false
547+
}
548+
549+
pub fn get_all_pids(&self) -> Vec<u32> {
550+
vec![]
551+
}
552+
553+
pub fn cleanup_dead_pids(&self, _alive_pids: &[u32]) -> Vec<u32> {
554+
vec![]
555+
}
556+
557+
pub fn len(&self) -> usize {
558+
0
559+
}
560+
561+
pub fn is_empty(&self) -> bool {
562+
true
563+
}
564+
565+
pub fn sync_bpf_map_add(&self, _pid: u32) {}
566+
567+
pub fn sync_bpf_map_remove(&self, _pid: u32) {}
568+
569+
#[cfg(target_os = "linux")]
570+
pub fn set_bpf_map_fd(&self, _fd: i32) {}
571+
572+
pub fn set_file_io_enabled(&self, _enabled: bool) {}
573+
}
574+
575+
/// Check if a URL path matches an AI Agent endpoint pattern.
576+
pub fn match_ai_agent_endpoint(
577+
_endpoints: &[String],
578+
_path: &str,
579+
_pid: u32,
580+
_socket_role: u8,
581+
_now: Duration,
582+
) -> Option<String> {
583+
None
584+
}
585+
586+
/// Initialize the global AI Agent registry. Returns the registry Arc.
587+
/// Stub: returns a no-op registry.
588+
pub fn init_global_registry() -> Arc<AiAgentRegistry> {
589+
Arc::new(AiAgentRegistry::new())
590+
}
591+
592+
/// Get a reference to the global AI Agent registry.
593+
/// Stub: always returns None.
594+
pub fn global_registry() -> Option<&'static Arc<AiAgentRegistry>> {
595+
None
596+
}
597+
}

agent/src/common/ebpf.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ pub const GO_HTTP2_UPROBE_DATA: u8 = 5;
4040
pub const SOCKET_CLOSE_EVENT: u8 = 6;
4141
// unix socket
4242
pub const UNIX_SOCKET: u8 = 8;
43+
// AI Agent governance event types
44+
pub const FILE_OP_EVENT: u8 = 9;
45+
pub const PERM_OP_EVENT: u8 = 10;
46+
pub const PROC_LIFECYCLE_EVENT: u8 = 11;
4347

4448
const EBPF_TYPE_TRACEPOINT: u8 = 0;
4549
const EBPF_TYPE_TLS_UPROBE: u8 = 1;

agent/src/common/flow.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,10 @@ impl From<FlowPerfStats> for flow_log::FlowPerfStats {
537537
}
538538
}
539539

540+
// Business type constants for process classification
541+
pub const BIZ_TYPE_DEFAULT: u8 = 0;
542+
pub const BIZ_TYPE_AI_AGENT: u8 = 1;
543+
540544
#[derive(Clone, Debug, Default)]
541545
pub struct L7Stats {
542546
pub stats: L7PerfStats,

agent/src/common/l7_protocol_log.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use std::sync::{
2424
};
2525
use std::time::Duration;
2626

27+
use crate::common::l7_protocol_info::L7ProtocolInfoInterface;
28+
2729
use enum_dispatch::enum_dispatch;
2830
use log::debug;
2931
use lru::LruCache;
@@ -252,6 +254,16 @@ impl L7ParseResult {
252254
L7ParseResult::None => panic!("parse result is none but unwrap multi"),
253255
}
254256
}
257+
258+
/// Check if any parsed result has the given biz_type.
259+
/// Used to detect AI Agent flows after parsing.
260+
pub fn has_biz_type(&self, biz_type: u8) -> bool {
261+
match self {
262+
L7ParseResult::Single(info) => info.get_biz_type() == biz_type,
263+
L7ParseResult::Multi(infos) => infos.iter().any(|i| i.get_biz_type() == biz_type),
264+
L7ParseResult::None => false,
265+
}
266+
}
255267
}
256268

257269
#[enum_dispatch]
@@ -690,12 +702,14 @@ pub struct ParseParam<'a> {
690702

691703
// the config of `l7_log_packet_size`, must set in parse_payload and check_payload
692704
pub buf_size: u16,
693-
pub captured_byte: u16,
705+
pub captured_byte: u32,
694706

695707
pub oracle_parse_conf: OracleConfig,
696708
pub iso8583_parse_conf: Iso8583ParseConfig,
697709
pub web_sphere_mq_parse_conf: WebSphereMqParseConfig,
698710
pub net_sign_parse_conf: NetSignParseConfig,
711+
pub process_id: u32,
712+
pub socket_role: u8,
699713
}
700714

701715
impl<'a> fmt::Debug for ParseParam<'a> {
@@ -729,6 +743,8 @@ impl<'a> fmt::Debug for ParseParam<'a> {
729743
.field("iso8583_parse_conf", &self.iso8583_parse_conf)
730744
.field("web_sphere_mq_parse_conf", &self.web_sphere_mq_parse_conf)
731745
.field("net_sign_parse_conf", &self.net_sign_parse_conf)
746+
.field("process_id", &self.process_id)
747+
.field("socket_role", &self.socket_role)
732748
.finish()
733749
}
734750
}
@@ -803,6 +819,8 @@ impl<'a> ParseParam<'a> {
803819
iso8583_parse_conf: Iso8583ParseConfig::default(),
804820
web_sphere_mq_parse_conf: WebSphereMqParseConfig::default(),
805821
net_sign_parse_conf: NetSignParseConfig::default(),
822+
process_id: packet.process_id,
823+
socket_role: packet.socket_role,
806824
}
807825
}
808826
}
@@ -821,11 +839,17 @@ impl<'a> ParseParam<'a> {
821839
}
822840

823841
pub fn set_buf_size(&mut self, buf_size: usize) {
824-
self.buf_size = buf_size as u16;
842+
// Saturate to u16::MAX to avoid overflow when AI Agent flows use larger payload sizes.
843+
// buf_size is informational for plugins; actual payload truncation uses the usize value directly.
844+
self.buf_size = if buf_size > u16::MAX as usize {
845+
u16::MAX
846+
} else {
847+
buf_size as u16
848+
};
825849
}
826850

827851
pub fn set_captured_byte(&mut self, captured_byte: usize) {
828-
self.captured_byte = captured_byte as u16;
852+
self.captured_byte = u32::try_from(captured_byte).unwrap_or(u32::MAX);
829853
}
830854

831855
pub fn set_rrt_timeout(&mut self, t: usize) {
@@ -955,3 +979,28 @@ impl fmt::Debug for L7ProtocolBitmap {
955979
f.write_str(format!("{:#?}", p).as_str())
956980
}
957981
}
982+
983+
#[cfg(test)]
984+
mod tests {
985+
use super::*;
986+
use std::cell::RefCell;
987+
use std::rc::Rc;
988+
989+
#[test]
990+
fn captured_byte_should_not_truncate_large_payloads() {
991+
let packet = MetaPacket::default();
992+
let mut param = ParseParam::new(
993+
&packet,
994+
None,
995+
Rc::new(RefCell::new(None)),
996+
#[cfg(any(target_os = "linux", target_os = "android"))]
997+
Rc::new(RefCell::new(None)),
998+
false,
999+
false,
1000+
);
1001+
1002+
let captured: u32 = 200_000;
1003+
param.set_captured_byte(captured as usize);
1004+
assert_eq!(param.captured_byte as u32, captured);
1005+
}
1006+
}

agent/src/common/meta_packet.rs

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ pub struct MetaPacket<'a> {
245245
pub socket_id: u64,
246246
pub cap_start_seq: u64,
247247
pub cap_end_seq: u64,
248+
pub reasm_bytes: u32,
248249
pub l7_protocol_from_ebpf: L7Protocol,
249250
// 流结束标识, 目前只有 go http2 uprobe 用到
250251
pub is_request_end: bool,
@@ -1035,6 +1036,16 @@ impl<'a> MetaPacket<'a> {
10351036
#[inline]
10361037
pub fn get_captured_byte(&self) -> usize {
10371038
if self.tap_port.is_from(TapPort::FROM_EBPF) {
1039+
// For eBPF reassembly segments, upper layers merge multiple packets
1040+
// by accumulating each segment's captured length. `reasm_bytes` is a
1041+
// cumulative counter on the socket, so using it here would double
1042+
// count across segment merges (100 + 200 + 300 ...).
1043+
if self.is_reassembly_segment() {
1044+
return self.l4_payload_len as usize;
1045+
}
1046+
if self.reasm_bytes > 0 {
1047+
return self.reasm_bytes as usize;
1048+
}
10381049
return self.packet_len as usize - 54;
10391050
}
10401051

@@ -1061,6 +1072,16 @@ impl<'a> MetaPacket<'a> {
10611072
0
10621073
}
10631074

1075+
#[cfg(all(unix, feature = "libtrace"))]
1076+
fn is_reassembly_segment(&self) -> bool {
1077+
self.segment_flags != SegmentFlags::None
1078+
}
1079+
1080+
#[cfg(not(all(unix, feature = "libtrace")))]
1081+
fn is_reassembly_segment(&self) -> bool {
1082+
false
1083+
}
1084+
10641085
#[inline]
10651086
pub fn merge(&mut self, packet: &mut MetaPacket) {
10661087
if self.ebpf_type == EbpfType::None {
@@ -1083,6 +1104,13 @@ impl<'a> MetaPacket<'a> {
10831104
self.payload_len += packet.payload_len;
10841105
self.l4_payload_len += packet.l4_payload_len;
10851106
self.cap_end_seq = packet.cap_start_seq;
1107+
// eBPF reassembly: propagate the latest cumulative reassembly bytes.
1108+
// `reasm_bytes` reflects the total bytes reassembled in the kernel for
1109+
// this flow. When we merge multiple MSG_REASM_* segments, we must keep
1110+
// the newest cumulative value, otherwise `get_captured_byte()` will
1111+
// stay at the first segment's size (e.g., HTTP headers only) and
1112+
// `captured_request_byte` will be incorrect for large bodies.
1113+
self.reasm_bytes = self.reasm_bytes.max(packet.reasm_bytes);
10861114
}
10871115

10881116
#[cfg(all(unix, feature = "libtrace"))]
@@ -1147,6 +1175,7 @@ impl<'a> MetaPacket<'a> {
11471175
packet.signal_source = SignalSource::EBPF;
11481176
packet.cap_start_seq = data.cap_seq;
11491177
packet.cap_end_seq = data.cap_seq;
1178+
packet.reasm_bytes = data.reasm_bytes;
11501179
packet.process_id = data.process_id;
11511180
packet.thread_id = data.thread_id;
11521181
packet.coroutine_id = data.coroutine_id;
@@ -1424,12 +1453,12 @@ impl CacheItem for MetaPacket<'static> {
14241453
self.l7_protocol_from_ebpf
14251454
}
14261455

1427-
#[cfg(feature = "libtrace")]
1456+
#[cfg(all(unix, feature = "libtrace"))]
14281457
fn is_segment_start(&self) -> bool {
14291458
self.segment_flags == SegmentFlags::Start
14301459
}
14311460

1432-
#[cfg(not(feature = "libtrace"))]
1461+
#[cfg(not(all(unix, feature = "libtrace")))]
14331462
fn is_segment_start(&self) -> bool {
14341463
false
14351464
}
@@ -1598,4 +1627,62 @@ mod tests {
15981627
pkt
15991628
);
16001629
}
1630+
1631+
#[test]
1632+
fn get_captured_byte_prefers_reasm_bytes_for_ebpf() {
1633+
let mut pkt = MetaPacket::default();
1634+
pkt.tap_port = TapPort::from_ebpf(1, 0);
1635+
pkt.packet_len = 54 + 16;
1636+
pkt.reasm_bytes = 200_000;
1637+
1638+
assert_eq!(pkt.get_captured_byte(), 200_000);
1639+
}
1640+
1641+
#[cfg(all(unix, feature = "libtrace"))]
1642+
#[test]
1643+
fn get_captured_byte_for_ebpf_reasm_segment_should_use_current_segment_len() {
1644+
let mut pkt = MetaPacket::default();
1645+
pkt.tap_port = TapPort::from_ebpf(1, 0);
1646+
pkt.segment_flags = SegmentFlags::Seg;
1647+
pkt.reasm_bytes = 200_000;
1648+
pkt.raw_from_ebpf = vec![0u8; 4096];
1649+
pkt.l4_payload_len = 4096;
1650+
1651+
assert_eq!(pkt.get_captured_byte(), 4096);
1652+
}
1653+
1654+
#[cfg(all(unix, feature = "libtrace"))]
1655+
#[test]
1656+
fn get_captured_byte_for_merged_ebpf_reasm_start_should_use_merged_payload_len() {
1657+
let mut start = MetaPacket::default();
1658+
start.tap_port = TapPort::from_ebpf(1, 0);
1659+
start.ebpf_type = EbpfType::GoHttp2Uprobe;
1660+
start.segment_flags = SegmentFlags::Start;
1661+
start.reasm_bytes = 16_384;
1662+
start.raw_from_ebpf = vec![0u8; 16_384];
1663+
start.l4_payload_len = 16_384;
1664+
start.payload_len = 16_384;
1665+
start.packet_len = 16_384 + 54;
1666+
start.cap_start_seq = 10;
1667+
start.cap_end_seq = 10;
1668+
start.sub_packets.push(SubPacket::default());
1669+
1670+
let mut seg = MetaPacket::default();
1671+
seg.tap_port = TapPort::from_ebpf(1, 0);
1672+
seg.ebpf_type = EbpfType::GoHttp2Uprobe;
1673+
seg.segment_flags = SegmentFlags::Seg;
1674+
seg.reasm_bytes = 32_768;
1675+
seg.raw_from_ebpf = vec![0u8; 16_384];
1676+
seg.l4_payload_len = 16_384;
1677+
seg.payload_len = 16_384;
1678+
seg.packet_len = 16_384 + 54;
1679+
seg.cap_start_seq = 11;
1680+
seg.cap_end_seq = 11;
1681+
1682+
start.merge(&mut seg);
1683+
1684+
assert_eq!(start.l4_payload_len, 32_768);
1685+
assert_eq!(start.reasm_bytes, 32_768);
1686+
assert_eq!(start.get_captured_byte(), 32_768);
1687+
}
16011688
}

0 commit comments

Comments
 (0)