Skip to content

Commit e58db44

Browse files
committed
fix: align l7 perf stats with session merge and flow-end endpoint flush
Keep protocol-merge partial fragments until the full session ends so HTTP2/go-uprobe response stats are not dropped early. Preserve a unique cached endpoint when flushing flow-end L7 stats, and add regression tests for partial merge and mixed-endpoint cases.
1 parent a45be9a commit e58db44

3 files changed

Lines changed: 375 additions & 24 deletions

File tree

agent/src/common/l7_protocol_log.rs

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@
1414
* limitations under the License.
1515
*/
1616

17-
use std::cell::RefCell;
18-
use std::fmt;
19-
use std::net::IpAddr;
20-
use std::rc::Rc;
21-
use std::sync::{
22-
atomic::{AtomicU64, Ordering},
23-
Arc,
17+
use std::{
18+
cell::RefCell,
19+
collections::HashSet,
20+
fmt,
21+
net::IpAddr,
22+
rc::Rc,
23+
sync::{
24+
atomic::{AtomicU64, Ordering},
25+
Arc,
26+
},
27+
time::Duration,
2428
};
25-
use std::time::Duration;
2629

2730
use enum_dispatch::enum_dispatch;
2831
use log::debug;
@@ -451,6 +454,20 @@ pub struct L7PerfCacheCounter {
451454
pub timeout_cache_len: Arc<AtomicU64>,
452455
}
453456

457+
#[derive(Debug, Default, Clone, PartialEq, Eq)]
458+
pub struct FlowPerfStatsWithEndpoint {
459+
pub stats: L7PerfStats,
460+
pub endpoint: Option<String>,
461+
}
462+
463+
fn resolve_unique_endpoint(endpoints: HashSet<Option<String>>) -> Option<String> {
464+
if endpoints.len() == 1 {
465+
endpoints.into_iter().next().flatten()
466+
} else {
467+
None
468+
}
469+
}
470+
454471
pub struct RrtCache {
455472
// lru cache previous rrt
456473
logs: LruCache<LogCacheKey, LogCache>,
@@ -526,13 +543,18 @@ impl RrtCache {
526543
ret
527544
}
528545

529-
pub fn collect_flow_perf_stats(&mut self, flow_id: u64) -> Option<(L7PerfStats, L7PerfStats)> {
546+
pub fn collect_flow_perf_stats(
547+
&mut self,
548+
flow_id: u64,
549+
) -> Option<(FlowPerfStatsWithEndpoint, FlowPerfStatsWithEndpoint)> {
530550
let Some(keys) = self.flows.pop(&flow_id) else {
531551
return None;
532552
};
533553

534-
let mut forward = L7PerfStats::default();
535-
let mut backward = L7PerfStats::default();
554+
let mut forward = FlowPerfStatsWithEndpoint::default();
555+
let mut backward = FlowPerfStatsWithEndpoint::default();
556+
let mut forward_endpoints = HashSet::new();
557+
let mut backward_endpoints = HashSet::new();
536558
for (key, _) in keys {
537559
if let Some(cache) = self.logs.pop(&key) {
538560
// Requests were already counted (req=1) when they first entered the cache;
@@ -541,15 +563,26 @@ impl RrtCache {
541563
if cache.msg_type != LogMessageType::Response {
542564
continue;
543565
}
566+
let stats = L7PerfStats::from(&cache);
567+
if stats == L7PerfStats::default() {
568+
continue;
569+
}
544570
if key.is_reversed() {
545-
backward.sequential_merge(&L7PerfStats::from(&cache));
571+
backward.stats.sequential_merge(&stats);
572+
backward_endpoints.insert(cache.endpoint);
546573
} else {
547-
forward.sequential_merge(&L7PerfStats::from(&cache));
574+
forward.stats.sequential_merge(&stats);
575+
forward_endpoints.insert(cache.endpoint);
548576
}
549577
}
550578
}
551579

552-
if forward == L7PerfStats::default() && backward == L7PerfStats::default() {
580+
forward.endpoint = resolve_unique_endpoint(forward_endpoints);
581+
backward.endpoint = resolve_unique_endpoint(backward_endpoints);
582+
583+
if forward == FlowPerfStatsWithEndpoint::default()
584+
&& backward == FlowPerfStatsWithEndpoint::default()
585+
{
553586
None
554587
} else {
555588
Some((forward, backward))

agent/src/flow_generator/flow_map.rs

Lines changed: 235 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ use crate::{
6262
L7Protocol, L7Stats, PacketDirection, SignalSource, TunnelField,
6363
},
6464
l7_protocol_info::{L7ProtocolInfo, L7ProtocolInfoInterface},
65-
l7_protocol_log::{L7PerfCache, L7PerfCacheCounter, L7ProtocolBitmap},
65+
l7_protocol_log::{
66+
FlowPerfStatsWithEndpoint, L7PerfCache, L7PerfCacheCounter, L7ProtocolBitmap,
67+
},
6668
lookup_key::LookupKey,
6769
meta_packet::{MetaPacket, MetaPacketTcpHeader, ProtocolData},
6870
tagged_flow::TaggedFlow,
@@ -2021,7 +2023,7 @@ impl FlowMap {
20212023
collect_stats: bool,
20222024
tagged_flow: Arc<BatchedBox<TaggedFlow>>,
20232025
flow_end: bool,
2024-
cached_l7_perf_stats: Option<(L7PerfStats, L7PerfStats)>,
2026+
cached_l7_perf_stats: Option<(FlowPerfStatsWithEndpoint, FlowPerfStatsWithEndpoint)>,
20252027
) {
20262028
if collect_stats {
20272029
let flow = &tagged_flow.flow;
@@ -2035,7 +2037,10 @@ impl FlowMap {
20352037
.unwrap_or_default()
20362038
})
20372039
} else {
2038-
(L7PerfStats::default(), L7PerfStats::default())
2040+
(
2041+
FlowPerfStatsWithEndpoint::default(),
2042+
FlowPerfStatsWithEndpoint::default(),
2043+
)
20392044
};
20402045
let l7_stats = L7Stats {
20412046
stats: L7PerfStats {
@@ -2044,8 +2049,9 @@ impl FlowMap {
20442049
flow_end,
20452050
false,
20462051
) as u32,
2047-
..forward
2052+
..forward.stats
20482053
},
2054+
endpoint: forward.endpoint,
20492055
flow_id: flow.flow_id,
20502056
signal_source: flow.signal_source,
20512057
time_in_second: self.start_time,
@@ -2064,8 +2070,9 @@ impl FlowMap {
20642070
flow_end,
20652071
true,
20662072
) as u32,
2067-
..backward
2073+
..backward.stats
20682074
},
2075+
endpoint: backward.endpoint,
20692076
flow_id: flow.flow_id,
20702077
signal_source: flow.signal_source,
20712078
time_in_second: self.start_time,
@@ -2117,8 +2124,8 @@ impl FlowMap {
21172124
.rrt_cache
21182125
.collect_flow_perf_stats(flow.flow_id);
21192126
if let Some((forward, backward)) = cached {
2120-
perf_stats.l7.sequential_merge(&forward);
2121-
perf_stats.l7.sequential_merge(&backward);
2127+
perf_stats.l7.sequential_merge(&forward.stats);
2128+
perf_stats.l7.sequential_merge(&backward.stats);
21222129
Some((forward, backward))
21232130
} else {
21242131
None
@@ -2836,6 +2843,227 @@ mod tests {
28362843

28372844
const DEFAULT_DURATION: Duration = Duration::from_millis(10);
28382845

2846+
fn new_flow_map_and_l7_stats_receiver(
2847+
agent_type: AgentType,
2848+
flow_timeout: Option<FlowTimeout>,
2849+
ignore_idc_vlan: bool,
2850+
) -> (ModuleConfig, FlowMap, Receiver<BatchedBox<L7Stats>>) {
2851+
let (_, mut policy_getter) = Policy::new(1, 0, 1 << 10, 1 << 14, false, false);
2852+
policy_getter.disable();
2853+
let queue_debugger = QueueDebugger::new();
2854+
let (output_queue_sender, _, _) = queue::bounded_with_debug(256, "", &queue_debugger);
2855+
let (l7_stats_output_queue_sender, l7_stats_output_receiver, _) =
2856+
queue::bounded_with_debug(256, "", &queue_debugger);
2857+
let (app_proto_log_queue, _, _) = queue::bounded_with_debug(256, "", &queue_debugger);
2858+
let (packet_sequence_queue, _, _) = queue::bounded_with_debug(256, "", &queue_debugger);
2859+
let mut module_config = ModuleConfig {
2860+
flow: FlowConfig {
2861+
agent_type,
2862+
collector_enabled: true,
2863+
l4_performance_enabled: true,
2864+
l7_metrics_enabled: true,
2865+
app_proto_log_enabled: true,
2866+
ignore_idc_vlan,
2867+
flow_timeout: flow_timeout.unwrap_or(super::TcpTimeout::default().into()),
2868+
..(&UserConfig::standalone_default()).into()
2869+
},
2870+
..Default::default()
2871+
};
2872+
module_config.flow.l7_log_tap_types[0] = true;
2873+
module_config.flow.agent_type = agent_type;
2874+
let flow_map = FlowMap::new(
2875+
0,
2876+
Some(output_queue_sender),
2877+
l7_stats_output_queue_sender,
2878+
policy_getter,
2879+
app_proto_log_queue,
2880+
Arc::new(AtomicI64::new(0)),
2881+
&module_config.flow,
2882+
Some(packet_sequence_queue),
2883+
Arc::new(stats::Collector::new("", Arc::new(AtomicI64::new(0)))),
2884+
false,
2885+
);
2886+
2887+
(module_config, flow_map, l7_stats_output_receiver)
2888+
}
2889+
2890+
fn new_l7_flow_node<'a>(
2891+
flow_map: &mut FlowMap,
2892+
config: &Config,
2893+
packet: &mut MetaPacket<'a>,
2894+
) -> Box<FlowNode> {
2895+
let mut node = flow_map.init_flow(config, packet);
2896+
node.flow_state = FlowState::Reset;
2897+
node.tagged_flow.flow.flow_key.proto = IpProtocol::TCP;
2898+
node.tagged_flow.flow.flow_perf_stats = Some(FlowPerfStats {
2899+
l7_protocol: L7Protocol::Http1,
2900+
..Default::default()
2901+
});
2902+
node.meta_flow_log = FlowLog::new(
2903+
false,
2904+
&mut flow_map.tcp_perf_pool,
2905+
true,
2906+
flow_map.perf_cache.clone(),
2907+
L4Protocol::Tcp,
2908+
L7ProtocolEnum::L7Protocol(L7Protocol::Http1),
2909+
false,
2910+
flow_map.flow_perf_counter.clone(),
2911+
0,
2912+
Rc::clone(&flow_map.wasm_vm),
2913+
#[cfg(any(target_os = "linux", target_os = "android"))]
2914+
Rc::clone(&flow_map.so_plugin),
2915+
flow_map.stats_counter.clone(),
2916+
config.log_parser.l7_log_session_aggr_max_timeout.as_secs() as usize,
2917+
config.flow.l7_protocol_inference_ttl.try_into().unwrap(),
2918+
None,
2919+
flow_map.ntp_diff.clone(),
2920+
flow_map.obfuscate_cache.clone(),
2921+
)
2922+
.map(Box::new);
2923+
node
2924+
}
2925+
2926+
#[test]
2927+
fn flow_end_l7_stats_keep_unique_cached_endpoint() {
2928+
let (module_config, mut flow_map, l7_stats_output) =
2929+
new_flow_map_and_l7_stats_receiver(AgentType::TtProcess, None, false);
2930+
let config = Config {
2931+
flow: &module_config.flow,
2932+
log_parser: &module_config.log_parser,
2933+
collector: &module_config.collector,
2934+
#[cfg(any(target_os = "linux", target_os = "android"))]
2935+
ebpf: None,
2936+
};
2937+
let mut packet = _new_meta_packet();
2938+
let mut node = new_l7_flow_node(&mut flow_map, &config, &mut packet);
2939+
let flow_id = node.tagged_flow.flow.flow_id;
2940+
packet.flow_id = flow_id;
2941+
packet.lookup_key.direction = PacketDirection::ClientToServer;
2942+
let key = LogCacheKey::new(
2943+
&ParseParam::new(
2944+
&packet,
2945+
Some(flow_map.perf_cache.clone()),
2946+
Rc::clone(&flow_map.wasm_vm),
2947+
#[cfg(any(target_os = "linux", target_os = "android"))]
2948+
Rc::clone(&flow_map.so_plugin),
2949+
true,
2950+
true,
2951+
),
2952+
Some(1),
2953+
false,
2954+
);
2955+
flow_map.perf_cache.borrow_mut().rrt_cache.put(
2956+
key,
2957+
LogCache {
2958+
msg_type: LogMessageType::Response,
2959+
time: packet.lookup_key.timestamp.as_micros() as u64,
2960+
resp_status: L7ResponseStatus::Ok,
2961+
endpoint: Some("/foo".to_string()),
2962+
..Default::default()
2963+
},
2964+
);
2965+
2966+
flow_map.node_removed_aftercare(&config, node, Duration::from_secs(1), None);
2967+
flow_map.flush_queue(config.flow, Duration::from_secs(2));
2968+
2969+
let forward = l7_stats_output.recv(Some(TIME_UNIT)).unwrap();
2970+
let backward = l7_stats_output.recv(Some(TIME_UNIT)).unwrap();
2971+
assert!(!forward.is_reversed);
2972+
assert_eq!(forward.stats.response_count, 1);
2973+
assert_eq!(forward.endpoint.as_deref(), Some("/foo"));
2974+
assert!(backward.is_reversed);
2975+
assert!(backward.endpoint.is_none());
2976+
}
2977+
2978+
#[test]
2979+
fn flow_end_l7_stats_drop_mixed_cached_endpoints() {
2980+
let (module_config, mut flow_map, l7_stats_output) =
2981+
new_flow_map_and_l7_stats_receiver(AgentType::TtProcess, None, false);
2982+
let config = Config {
2983+
flow: &module_config.flow,
2984+
log_parser: &module_config.log_parser,
2985+
collector: &module_config.collector,
2986+
#[cfg(any(target_os = "linux", target_os = "android"))]
2987+
ebpf: None,
2988+
};
2989+
let mut packet = _new_meta_packet();
2990+
let mut node = new_l7_flow_node(&mut flow_map, &config, &mut packet);
2991+
let flow_id = node.tagged_flow.flow.flow_id;
2992+
packet.flow_id = flow_id;
2993+
packet.lookup_key.direction = PacketDirection::ClientToServer;
2994+
2995+
for (session_id, endpoint) in [(1, "/foo"), (2, "/bar")] {
2996+
let key = LogCacheKey::new(
2997+
&ParseParam::new(
2998+
&packet,
2999+
Some(flow_map.perf_cache.clone()),
3000+
Rc::clone(&flow_map.wasm_vm),
3001+
#[cfg(any(target_os = "linux", target_os = "android"))]
3002+
Rc::clone(&flow_map.so_plugin),
3003+
true,
3004+
true,
3005+
),
3006+
Some(session_id),
3007+
false,
3008+
);
3009+
flow_map.perf_cache.borrow_mut().rrt_cache.put(
3010+
key,
3011+
LogCache {
3012+
msg_type: LogMessageType::Response,
3013+
time: packet.lookup_key.timestamp.as_micros() as u64,
3014+
resp_status: L7ResponseStatus::Ok,
3015+
endpoint: Some(endpoint.to_string()),
3016+
..Default::default()
3017+
},
3018+
);
3019+
}
3020+
3021+
flow_map.node_removed_aftercare(&config, node, Duration::from_secs(1), None);
3022+
flow_map.flush_queue(config.flow, Duration::from_secs(2));
3023+
3024+
let forward = l7_stats_output.recv(Some(TIME_UNIT)).unwrap();
3025+
let backward = l7_stats_output.recv(Some(TIME_UNIT)).unwrap();
3026+
assert!(!forward.is_reversed);
3027+
assert_eq!(forward.stats.response_count, 2);
3028+
assert!(forward.endpoint.is_none());
3029+
assert!(backward.is_reversed);
3030+
assert!(backward.endpoint.is_none());
3031+
}
3032+
3033+
#[test]
3034+
fn flow_end_timeout_only_stats_remain_endpoint_less() {
3035+
let (module_config, mut flow_map, l7_stats_output) =
3036+
new_flow_map_and_l7_stats_receiver(AgentType::TtProcess, None, false);
3037+
let config = Config {
3038+
flow: &module_config.flow,
3039+
log_parser: &module_config.log_parser,
3040+
collector: &module_config.collector,
3041+
#[cfg(any(target_os = "linux", target_os = "android"))]
3042+
ebpf: None,
3043+
};
3044+
let mut packet = _new_meta_packet();
3045+
let mut node = new_l7_flow_node(&mut flow_map, &config, &mut packet);
3046+
let flow_id = node.tagged_flow.flow.flow_id;
3047+
flow_map
3048+
.perf_cache
3049+
.borrow_mut()
3050+
.timeout_cache
3051+
.get_or_insert_mut(flow_id)
3052+
.timeout[0] = 1;
3053+
3054+
flow_map.node_removed_aftercare(&config, node, Duration::from_secs(1), None);
3055+
flow_map.flush_queue(config.flow, Duration::from_secs(2));
3056+
3057+
let forward = l7_stats_output.recv(Some(TIME_UNIT)).unwrap();
3058+
let backward = l7_stats_output.recv(Some(TIME_UNIT)).unwrap();
3059+
assert!(!forward.is_reversed);
3060+
assert_eq!(forward.stats.err_timeout, 1);
3061+
assert_eq!(forward.stats.response_count, 0);
3062+
assert!(forward.endpoint.is_none());
3063+
assert!(backward.is_reversed);
3064+
assert!(backward.endpoint.is_none());
3065+
}
3066+
28393067
#[test]
28403068
fn syn_rst() {
28413069
let (module_config, mut flow_map, output_queue_receiver) =

0 commit comments

Comments
 (0)