-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdistributed_performance_analysis_demo.rs
More file actions
150 lines (132 loc) · 6.65 KB
/
distributed_performance_analysis_demo.rs
File metadata and controls
150 lines (132 loc) · 6.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//! Distributed performance analysis demonstration.
//!
//! This example shows how to use the distributed analysis tools to detect
//! bottlenecks, correlations, and anomalies in a multi‑agent system.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use profiling::{
DistributedPerformanceAnalyzer, BottleneckDetector, CorrelationAnalyzer, AnomalyDetector,
init_distributed_analysis,
};
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== Distributed Performance Analysis Demo ===");
// 1. Initialize distributed analyzer
println!("1. Initializing distributed performance analyzer...");
let analyzer = init_distributed_analysis().await;
// 2. Simulate metrics from multiple agents
println!("2. Simulating metrics from 3 agents...");
let agents = vec![1001, 1002, 1003];
let components = vec!["mesh-transport", "state-sync", "agent-core"];
for step in 0..50 {
println!(" Step {}:", step + 1);
for &agent_id in &agents {
for &component in &components {
let mut metrics = HashMap::new();
// Generate realistic metrics with some noise and trends
match component {
"mesh-transport" => {
let latency = 80.0 + (step as f64).sin() * 20.0 + rand::random::<f64>() * 10.0;
let queue_len = 500.0 + (step as f64).cos() * 200.0 + rand::random::<f64>() * 50.0;
let throughput = 15.0 + (step as f64).sin() * 5.0 + rand::random::<f64>() * 2.0;
metrics.insert("message_latency_ms".to_string(), latency);
metrics.insert("queue_length".to_string(), queue_len);
metrics.insert("throughput_mbps".to_string(), throughput);
}
"state-sync" => {
let sync_latency = 300.0 + (step as f64).sin() * 100.0 + rand::random::<f64>() * 30.0;
let conflict_rate = 0.05 + (step as f64).cos() * 0.03 + rand::random::<f64>() * 0.01;
metrics.insert("sync_latency_ms".to_string(), sync_latency);
metrics.insert("conflict_rate".to_string(), conflict_rate);
}
"agent-core" => {
let cpu_usage = 70.0 + (step as f64).sin() * 20.0 + rand::random::<f64>() * 10.0;
let memory_usage = 800.0 + (step as f64).cos() * 200.0 + rand::random::<f64>() * 50.0;
metrics.insert("cpu_usage".to_string(), cpu_usage);
metrics.insert("memory_usage_mb".to_string(), memory_usage);
}
_ => {}
}
// Inject an anomaly at step 25 for agent 1002
if step == 25 && agent_id == 1002 && component == "mesh-transport" {
metrics.insert("message_latency_ms".to_string(), 300.0); // spike
}
// Update analyzer
let anomalies = analyzer.update_metrics(agent_id, component, metrics).await;
if !anomalies.is_empty() {
println!(" ! Anomaly detected for agent {}: {:?}", agent_id, anomalies[0].metric);
}
}
}
// Sleep to simulate real‑time interval
sleep(Duration::from_millis(100)).await;
}
// 3. Generate and display report
println!("3. Generating performance report...");
let report = analyzer.generate_report().await;
println!(" Report generated at {:?}", report.generated_at);
println!(" Total agents monitored: {}", report.total_agents);
println!(" Total metrics collected: {}", report.total_metrics);
println!(" Active bottlenecks: {}", report.bottleneck_count);
println!(" Strong correlations: {}", report.strong_correlation_count);
if !report.top_bottlenecks.is_empty() {
println!(" Top bottlenecks:");
for (i, bottleneck) in report.top_bottlenecks.iter().enumerate() {
println!(" {}. {}: {} = {:.1} (threshold {:.1})",
i + 1, bottleneck.component, bottleneck.metric,
bottleneck.current_value, bottleneck.threshold);
}
}
if !report.top_correlations.is_empty() {
println!(" Top correlations:");
for (i, corr) in report.top_correlations.iter().enumerate() {
println!(" {}. {} ↔ {}: r = {:.3} ({} samples)",
i + 1, corr.metric_a, corr.metric_b,
corr.correlation, corr.sample_count);
}
}
// 4. Stand‑alone detectors demonstration
println!("4. Stand‑alone detectors demonstration...");
// Bottleneck detector
println!(" a) Bottleneck detector:");
let mut bottleneck_detector = BottleneckDetector::new();
let mut test_metrics = HashMap::new();
test_metrics.insert("cpu_usage".to_string(), 95.0);
test_metrics.insert("memory_usage_mb".to_string(), 512.0);
let bottlenecks = bottleneck_detector.analyze("agent-core", &test_metrics);
for bottleneck in &bottlenecks {
println!(" - {} bottleneck: {:.1} > {:.1} (severity {:.2})",
bottleneck.metric, bottleneck.current_value,
bottleneck.threshold, bottleneck.severity);
}
// Correlation analyzer
println!(" b) Correlation analyzer:");
let mut correlation_analyzer = CorrelationAnalyzer::new(100);
let now = SystemTime::now();
for i in 0..30 {
let t = now + Duration::from_secs(i as u64);
correlation_analyzer.record_sample("cpu_usage", t, i as f64 * 0.5 + 30.0);
correlation_analyzer.record_sample("memory_usage", t, i as f64 * 0.3 + 200.0);
}
let corr = correlation_analyzer.compute_correlation("cpu_usage", "memory_usage");
if let Some(c) = corr {
println!(" CPU ↔ Memory correlation: r = {:.3} (significant: {})",
c.correlation, c.significant);
}
// Anomaly detector
println!(" c) Anomaly detector:");
let mut anomaly_detector = AnomalyDetector::new(20, 2.5);
for i in 0..19 {
anomaly_detector.add_value("latency", 50.0 + i as f64 * 0.5);
}
let anomaly = anomaly_detector.add_value("latency", 120.0); // outlier
match anomaly {
Some(a) => println!(" Anomaly detected: {} = {:.1} (expected {:.1}, σ = {:.1})",
a.metric, a.observed_value, a.expected_value, a.deviation_sigma),
None => println!(" No anomaly detected (unexpected)"),
}
println!("=== Demo completed successfully ===");
Ok(())
}