Skip to content

Commit 50db518

Browse files
committed
fix: align l7 perf stats with session merge and flow-end endpoint flush
Keep protocol-merge partial fragments until the full session ends so HTTP2/go-uprobe response stats are not dropped early. Preserve a unique cached endpoint when flushing flow-end L7 stats, and add regression tests for partial merge and mixed-endpoint cases.
1 parent a45be9a commit 50db518

3 files changed

Lines changed: 376 additions & 25 deletions

File tree

agent/src/common/l7_protocol_log.rs

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@
1414
* limitations under the License.
1515
*/
1616

17-
use std::cell::RefCell;
18-
use std::fmt;
19-
use std::net::IpAddr;
20-
use std::rc::Rc;
21-
use std::sync::{
22-
atomic::{AtomicU64, Ordering},
23-
Arc,
17+
use std::{
18+
cell::RefCell,
19+
collections::HashSet,
20+
fmt,
21+
net::IpAddr,
22+
rc::Rc,
23+
sync::{
24+
atomic::{AtomicU64, Ordering},
25+
Arc,
26+
},
27+
time::Duration,
2428
};
25-
use std::time::Duration;
2629

2730
use enum_dispatch::enum_dispatch;
2831
use log::debug;
@@ -451,6 +454,20 @@ pub struct L7PerfCacheCounter {
451454
pub timeout_cache_len: Arc<AtomicU64>,
452455
}
453456

457+
#[derive(Debug, Default, Clone, PartialEq, Eq)]
458+
pub struct FlowPerfStatsWithEndpoint {
459+
pub stats: L7PerfStats,
460+
pub endpoint: Option<String>,
461+
}
462+
463+
fn resolve_unique_endpoint(endpoints: HashSet<Option<String>>) -> Option<String> {
464+
if endpoints.len() == 1 {
465+
endpoints.into_iter().next().flatten()
466+
} else {
467+
None
468+
}
469+
}
470+
454471
pub struct RrtCache {
455472
// lru cache previous rrt
456473
logs: LruCache<LogCacheKey, LogCache>,
@@ -526,13 +543,18 @@ impl RrtCache {
526543
ret
527544
}
528545

529-
pub fn collect_flow_perf_stats(&mut self, flow_id: u64) -> Option<(L7PerfStats, L7PerfStats)> {
546+
pub fn collect_flow_perf_stats(
547+
&mut self,
548+
flow_id: u64,
549+
) -> Option<(FlowPerfStatsWithEndpoint, FlowPerfStatsWithEndpoint)> {
530550
let Some(keys) = self.flows.pop(&flow_id) else {
531551
return None;
532552
};
533553

534-
let mut forward = L7PerfStats::default();
535-
let mut backward = L7PerfStats::default();
554+
let mut forward = FlowPerfStatsWithEndpoint::default();
555+
let mut backward = FlowPerfStatsWithEndpoint::default();
556+
let mut forward_endpoints = HashSet::new();
557+
let mut backward_endpoints = HashSet::new();
536558
for (key, _) in keys {
537559
if let Some(cache) = self.logs.pop(&key) {
538560
// Requests were already counted (req=1) when they first entered the cache;
@@ -541,15 +563,26 @@ impl RrtCache {
541563
if cache.msg_type != LogMessageType::Response {
542564
continue;
543565
}
566+
let stats = L7PerfStats::from(&cache);
567+
if stats == L7PerfStats::default() {
568+
continue;
569+
}
544570
if key.is_reversed() {
545-
backward.sequential_merge(&L7PerfStats::from(&cache));
571+
backward.stats.sequential_merge(&stats);
572+
backward_endpoints.insert(cache.endpoint);
546573
} else {
547-
forward.sequential_merge(&L7PerfStats::from(&cache));
574+
forward.stats.sequential_merge(&stats);
575+
forward_endpoints.insert(cache.endpoint);
548576
}
549577
}
550578
}
551579

552-
if forward == L7PerfStats::default() && backward == L7PerfStats::default() {
580+
forward.endpoint = resolve_unique_endpoint(forward_endpoints);
581+
backward.endpoint = resolve_unique_endpoint(backward_endpoints);
582+
583+
if forward == FlowPerfStatsWithEndpoint::default()
584+
&& backward == FlowPerfStatsWithEndpoint::default()
585+
{
553586
None
554587
} else {
555588
Some((forward, backward))

0 commit comments

Comments
 (0)