Skip to content

Commit 96aaaad

Browse files
authored
Merge pull request #18 from AstroHQ/jfro/fix-avg-metrics-test
fix: average_metrics_from_signpost using wrong start signpost, causing long durations
2 parents 1039cc4 + b0868fb commit 96aaaad

4 files changed

Lines changed: 185 additions & 20 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ tokio = { version = "1.20", features = ["sync"] }
2626
[dev-dependencies]
2727
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
2828
anyhow = "1.0"
29+
clap = { version = "4", features = ["derive"] }
30+
chrono = "0.4"
31+
tempfile = "3"
2932

3033
[features]
3134
default = []

examples/summary.rs

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,72 @@
1-
use metrics_sqlite::{MetricsDb, Session};
1+
use std::time::Duration;
2+
3+
use chrono::{DateTime, Local, TimeZone};
4+
use clap::Parser;
5+
use metrics_sqlite::MetricsDb;
26
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
37

8+
#[derive(Parser)]
9+
struct Args {
10+
/// Path to the metrics database file
11+
db_path: String,
12+
}
13+
414
fn main() -> anyhow::Result<()> {
15+
let args = Args::parse();
516
let fmt_layer = fmt::layer();
617
let filter_layer = EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("info"))?;
718
tracing_subscriber::registry()
819
.with(filter_layer)
920
.with(fmt_layer)
1021
.init();
11-
let mut db = MetricsDb::new("metrics.db")?;
22+
23+
let mut db = MetricsDb::new(&args.db_path)?;
1224
let session = db.session_from_signpost("liquid.connected")?;
13-
let rtt = average(&mut db, "net_quality.rtt", &session)?;
14-
let throughput = average(&mut db, "net_quality.throughput", &session)? / 1024.0; // KB/s
15-
let metered_throughput =
16-
average(&mut db, "rate_control.metered_throughput", &session)? / 1024.0; // KB/s
17-
println!("Last connection: {session:#?}");
18-
println!("RTT: Average {rtt:.2}ms");
19-
println!("Throughput: Average {throughput:.2}KB/s");
20-
println!("Metered: Average {metered_throughput:.2}KB/s");
25+
let averages = db.average_metrics_from_signpost(
26+
"liquid.connected",
27+
&[
28+
"net_quality.rtt",
29+
"net_quality.throughput",
30+
// "rate_control.metered_throughput",
31+
],
32+
)?;
33+
34+
println!("Last connection:");
35+
println!(" Start: {}", format_timestamp(session.start_time));
36+
println!(" End: {}", format_timestamp(session.end_time));
37+
println!(" Duration: {}", format_duration(session.duration));
38+
39+
if let Some(&rtt) = averages.get("net_quality.rtt") {
40+
println!("RTT: Average {rtt:.2}ms");
41+
}
42+
if let Some(&throughput) = averages.get("net_quality.throughput") {
43+
println!("Throughput: Average {:.2}KB/s", throughput / 1024.0);
44+
}
45+
if let Some(&metered) = averages.get("rate_control.metered_throughput") {
46+
println!("Metered: Average {:.2}KB/s", metered / 1024.0);
47+
}
2148

2249
Ok(())
2350
}
24-
fn average(db: &mut MetricsDb, key: &str, session: &Session) -> anyhow::Result<f64> {
25-
let metrics = db.metrics_for_key(key, Some(session))?;
26-
let sum: f64 = metrics.iter().map(|m| m.value).sum();
27-
let samples = metrics.len();
28-
let average = sum / samples as f64;
29-
Ok(average)
51+
52+
fn format_timestamp(secs: f64) -> String {
53+
let dt: DateTime<Local> = Local
54+
.timestamp_opt(secs as i64, ((secs.fract()) * 1_000_000_000.0) as u32)
55+
.unwrap();
56+
dt.format("%Y-%m-%d %H:%M:%S").to_string()
57+
}
58+
59+
fn format_duration(d: Duration) -> String {
60+
let total_secs = d.as_secs();
61+
let hours = total_secs / 3600;
62+
let minutes = (total_secs % 3600) / 60;
63+
let seconds = total_secs % 60;
64+
let millis = d.subsec_millis();
65+
if hours > 0 {
66+
format!("{hours}h {minutes:02}m {seconds:02}.{millis:03}s")
67+
} else if minutes > 0 {
68+
format!("{minutes}m {seconds:02}.{millis:03}s")
69+
} else {
70+
format!("{seconds}.{millis:03}s")
71+
}
3072
}

src/metrics_db.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl MetricsDb {
6161
self.sessions.clone()
6262
}
6363

64-
/// Returns a session (timestamp range) from the first occurrence of the signpost to the latest metric
64+
/// Returns a session (timestamp range) from the last occurrence of the signpost to the latest metric
6565
pub fn session_from_signpost(&mut self, metric: &str) -> Result<Session> {
6666
query::session_from_signpost(&mut self.db, metric)
6767
}
@@ -102,6 +102,21 @@ impl MetricsDb {
102102
Ok(r)
103103
}
104104

105+
/// Returns average values for the given metric keys within the session defined by a signpost.
106+
///
107+
/// Also includes `session.duration` in the results.
108+
pub fn average_metrics_from_signpost(
109+
&mut self,
110+
signpost: &str,
111+
keys: &[&str],
112+
) -> Result<std::collections::HashMap<String, f64>> {
113+
query::metrics_summary_for_signpost_and_keys(
114+
&mut self.db,
115+
signpost,
116+
keys.iter().map(|s| s.to_string()).collect(),
117+
)
118+
}
119+
105120
/// Returns all metrics for given key in ascending timestamp order
106121
pub fn metrics_for_key(
107122
&mut self,

src/metrics_db/query.rs

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ pub(crate) fn metric_key_for_name<'a>(
1717
pub(crate) fn session_from_signpost(db: &mut SqliteConnection, metric: &str) -> Result<Session> {
1818
use crate::schema::metrics::dsl::*;
1919
let metric_key = metric_key_for_name(db, metric)?;
20-
let query = metrics
21-
.order(timestamp.asc())
20+
// Use the most recent signpost occurrence as the session start
21+
let start_query = metrics
22+
.order(timestamp.desc())
2223
.filter(metric_key_id.eq(metric_key.id))
2324
.limit(1);
24-
let start = query.first::<Metric>(db)?;
25+
let start = start_query.first::<Metric>(db)?;
26+
// End is the latest metric in the DB (approximates NOW)
2527
let end_query = metrics.order(timestamp.desc()).limit(1);
2628
let end = end_query.first::<Metric>(db)?;
2729
if end.timestamp <= start.timestamp {
@@ -87,3 +89,106 @@ pub(crate) fn metrics_summary_for_signpost_and_keys(
8789
results.insert("session.duration".to_string(), duration_secs);
8890
Ok(results)
8991
}
92+
93+
#[cfg(test)]
94+
mod tests {
95+
use super::*;
96+
use crate::models::{MetricKey, NewMetric};
97+
98+
/// Create a temp DB and return the connection + temp dir (keep alive for DB lifetime)
99+
fn setup_test_db() -> (SqliteConnection, tempfile::TempDir) {
100+
let dir = tempfile::tempdir().unwrap();
101+
let db_path = dir.path().join("test.db");
102+
let db = crate::setup_db(&db_path).unwrap();
103+
(db, dir)
104+
}
105+
106+
/// Insert a metric key, returning its id
107+
fn insert_key(db: &mut SqliteConnection, name: &str) -> i64 {
108+
let key = MetricKey::key_by_name(name, db).unwrap();
109+
key.id
110+
}
111+
112+
/// Insert a metric row
113+
fn insert_metric(db: &mut SqliteConnection, key_id: i64, ts: f64, val: f64) {
114+
use crate::schema::metrics::dsl::*;
115+
diesel::insert_into(metrics)
116+
.values(&NewMetric {
117+
timestamp: ts,
118+
metric_key_id: key_id,
119+
value: val,
120+
})
121+
.execute(db)
122+
.unwrap();
123+
}
124+
125+
/// Sets up a DB simulating stale signpost data from a previous session:
126+
/// - "app.started" signpost at t=100.0 (old/stale session)
127+
/// - "app.started" signpost at t=500.0 (current session)
128+
/// - "cpu" metrics at t=500, 510, 520 with values 50, 60, 70 (current session)
129+
/// - "cpu" metrics at t=100, 110 with values 10, 20 (old session)
130+
/// - latest metric in DB is at t=520
131+
fn populate_test_db(db: &mut SqliteConnection) {
132+
let signpost_id = insert_key(db, "app.started");
133+
let cpu_id = insert_key(db, "cpu");
134+
135+
// Old session data (stale)
136+
insert_metric(db, signpost_id, 100.0, 1.0);
137+
insert_metric(db, cpu_id, 100.0, 10.0);
138+
insert_metric(db, cpu_id, 110.0, 20.0);
139+
140+
// Current session data
141+
insert_metric(db, signpost_id, 500.0, 1.0);
142+
insert_metric(db, cpu_id, 500.0, 50.0);
143+
insert_metric(db, cpu_id, 510.0, 60.0);
144+
insert_metric(db, cpu_id, 520.0, 70.0);
145+
}
146+
147+
#[test]
148+
fn test_session_from_signpost_uses_latest_signpost() {
149+
// With stale signpost data at t=100 and current at t=500,
150+
// start should be 500 (latest signpost), not 100 (oldest).
151+
let (mut db, _dir) = setup_test_db();
152+
populate_test_db(&mut db);
153+
154+
let session = session_from_signpost(&mut db, "app.started").unwrap();
155+
assert_eq!(
156+
session.start_time, 500.0,
157+
"start should be the most recent signpost (t=500), not the stale one (t=100)"
158+
);
159+
assert_eq!(session.end_time, 520.0);
160+
}
161+
162+
#[test]
163+
fn test_summary_duration_not_inflated_by_stale_signpost() {
164+
// Duration should be based on the latest signpost, not a stale one from a previous session.
165+
let (mut db, _dir) = setup_test_db();
166+
populate_test_db(&mut db);
167+
168+
let results =
169+
metrics_summary_for_signpost_and_keys(&mut db, "app.started", vec!["cpu".to_string()])
170+
.unwrap();
171+
172+
// cpu average across t=500..520 (current session only): (50+60+70)/3 = 60
173+
assert!(
174+
(results["cpu"] - 60.0).abs() < f64::EPSILON,
175+
"cpu average should only include current session metrics"
176+
);
177+
// session.duration should be 520 - 500 = 20, not 520 - 100 = 420
178+
assert_eq!(
179+
results["session.duration"], 20.0,
180+
"duration should be 20s (t=500..520), not inflated to 420s by stale signpost"
181+
);
182+
}
183+
184+
#[test]
185+
fn test_average_for_session_with_explicit_range() {
186+
let (mut db, _dir) = setup_test_db();
187+
populate_test_db(&mut db);
188+
189+
// With a manually bounded session t=500..510, avg = (50+60)/2 = 55
190+
let session = Session::new(500.0, 510.0);
191+
let avg = average_for_session(&mut db, "cpu", &session).unwrap();
192+
assert!((avg - 55.0).abs() < f64::EPSILON);
193+
}
194+
}

0 commit comments

Comments
 (0)