Skip to content

Commit d3d47a4

Browse files
authored
Merge pull request #16 from AstroHQ/feature/signpost-metric-sessions
Feature/signpost metric sessions & summary/average API
2 parents bbbf2ad + 48f9ae0 commit d3d47a4

6 files changed

Lines changed: 262 additions & 59 deletions

File tree

.github/workflows/rust.yml

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ on:
88

99
env:
1010
CARGO_TERM_COLOR: always
11+
RUSTFLAGS: "-Dwarnings"
1112

1213
jobs:
1314
build:
@@ -17,8 +18,12 @@ jobs:
1718
runs-on: ${{ matrix.os }}
1819

1920
steps:
20-
- uses: actions/checkout@v3
21-
- name: Build
22-
run: cargo build --verbose
23-
- name: Run tests
24-
run: cargo test --verbose
21+
- uses: actions/checkout@v4
22+
- name: Formatting
23+
run: cargo fmt --check
24+
- name: Build
25+
run: cargo build --all --all-features
26+
- name: Run tests
27+
run: cargo test --verbose
28+
- name: Run Clippy
29+
run: cargo clippy --all-targets --all-features

Cargo.toml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,20 @@ documentation = "https://docs.rs/metrics-sqlite"
1212
repository = "https://github.com/AstroHQ/metrics-sqlite/"
1313

1414
[dependencies]
15-
diesel = { version = "2.0.3", features = ["sqlite"] }
16-
diesel_migrations = "2.0.0"
15+
diesel = { version = "2.3", features = ["sqlite"] }
16+
diesel_migrations = "2.3"
1717
# this seems to let us force bundled sqlite in diesel
18-
libsqlite3-sys = { version = "0.31.0", features = ["bundled"] }
19-
metrics = "0.24.1"
20-
thiserror = "2.0.11"
18+
libsqlite3-sys = { version = "0.35.0", features = ["bundled"] }
19+
metrics = "0.24"
20+
thiserror = "2.0"
2121
tracing = "0.1"
2222
csv = { version = "1.1.6", optional = true }
23-
serde = { version = "1.0.125", optional = true }
23+
serde = { version = "1.0", optional = true }
24+
tokio = { version = "1.20", features = ["sync"] }
2425

2526
[dev-dependencies]
26-
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
27-
27+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
28+
anyhow = "1.0"
2829

2930
[features]
3031
default = []

examples/summary.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use metrics_sqlite::{MetricsDb, Session};
2+
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
3+
4+
fn main() -> anyhow::Result<()> {
5+
let fmt_layer = fmt::layer();
6+
let filter_layer = EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("info"))?;
7+
tracing_subscriber::registry()
8+
.with(filter_layer)
9+
.with(fmt_layer)
10+
.init();
11+
let mut db = MetricsDb::new("metrics.db")?;
12+
let session = db.session_from_signpost("liquid.connected")?;
13+
let rtt = average(&mut db, "net_quality.rtt", &session)?;
14+
let throughput = average(&mut db, "net_quality.throughput", &session)? / 1024.0; // KB/s
15+
let metered_throughput =
16+
average(&mut db, "rate_control.metered_throughput", &session)? / 1024.0; // KB/s
17+
println!("Last connection: {session:#?}");
18+
println!("RTT: Average {rtt:.2}ms");
19+
println!("Throughput: Average {throughput:.2}KB/s");
20+
println!("Metered: Average {metered_throughput:.2}KB/s");
21+
22+
Ok(())
23+
}
24+
fn average(db: &mut MetricsDb, key: &str, session: &Session) -> anyhow::Result<f64> {
25+
let metrics = db.metrics_for_key(key, Some(session))?;
26+
let sum: f64 = metrics.iter().map(|m| m.value).sum();
27+
let samples = metrics.len();
28+
let average = sum / samples as f64;
29+
Ok(average)
30+
}

src/lib.rs

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub enum MetricsError {
3939
/// Error querying metrics DB
4040
#[error("Error querying DB: {0}")]
4141
QueryError(#[from] diesel::result::Error),
42-
/// Error if path given is invalid
42+
/// Error if the path given is invalid
4343
#[error("Invalid database path")]
4444
InvalidDatabasePath,
4545
/// IO Error with reader/writer
@@ -50,12 +50,21 @@ pub enum MetricsError {
5050
#[cfg(feature = "csv")]
5151
#[error("CSV Error: {0}")]
5252
CsvError(#[from] csv::Error),
53-
/// Attempted to query database but found no records
53+
/// Attempted to query the database but found no records
5454
#[error("Database has no metrics stored in it")]
5555
EmptyDatabase,
5656
/// Given metric key name wasn't found in the DB
5757
#[error("Metric key {0} not found in database")]
5858
KeyNotFound(String),
59+
/// Attempting to communicate with exporter but it's gone away
60+
#[error("Exporter task has been stopped or crashed")]
61+
ExporterUnavailable,
62+
/// Session derived from the signpost has zero duration
63+
#[error("Session for signpost `{0}` has zero duration")]
64+
ZeroLengthSession(String),
65+
/// No metrics available for the requested key inside the derived session
66+
#[error("No metrics recorded for `{0}` in requested session")]
67+
NoMetricsForKey(String),
5968
}
6069
/// Metrics result type
6170
pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
@@ -65,6 +74,7 @@ mod models;
6574
mod recorder;
6675
mod schema;
6776

77+
use crate::metrics_db::query;
6878
use crate::recorder::Handle;
6979
pub use metrics_db::{MetricsDb, Session};
7080
pub use models::{Metric, MetricKey, NewMetric};
@@ -78,7 +88,7 @@ fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
7888
.ok_or(MetricsError::InvalidDatabasePath)?;
7989
let mut db = SqliteConnection::establish(url)?;
8090
db.run_pending_migrations(MIGRATIONS)
81-
.map_err(|e| MetricsError::MigrationError(e))?;
91+
.map_err(MetricsError::MigrationError)?;
8292

8393
Ok(db)
8494
}
@@ -101,9 +111,40 @@ enum Event {
101111
housekeeping_period: Option<Duration>,
102112
record_limit: Option<usize>,
103113
},
114+
RequestSummaryFromSignpost {
115+
signpost_key: String,
116+
keys: Vec<String>,
117+
tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
118+
},
119+
}
120+
121+
/// Handle for continued communication with sqlite exporter
122+
pub struct SqliteExporterHandle {
123+
sender: SyncSender<Event>,
124+
}
125+
impl SqliteExporterHandle {
126+
/// Request average metrics from a signpost to latest from exporter's DB
127+
pub fn request_average_metrics(
128+
&self,
129+
from_signpost: &str,
130+
with_keys: &[&str],
131+
) -> Result<HashMap<String, f64>> {
132+
let (tx, rx) = tokio::sync::oneshot::channel();
133+
self.sender
134+
.send(Event::RequestSummaryFromSignpost {
135+
signpost_key: from_signpost.to_string(),
136+
keys: with_keys.iter().map(|s| s.to_string()).collect(),
137+
tx,
138+
})
139+
.map_err(|_| MetricsError::ExporterUnavailable)?;
140+
match rx.blocking_recv() {
141+
Ok(metrics) => Ok(metrics?),
142+
Err(_) => Err(MetricsError::ExporterUnavailable),
143+
}
144+
}
104145
}
105146

106-
/// Exports metrics by storing them in a SQLite database at a periodic interval
147+
/// Exports metrics by storing them in an SQLite database at a periodic interval
107148
pub struct SqliteExporter {
108149
thread: Option<JoinHandle<()>>,
109150
sender: SyncSender<Event>,
@@ -199,6 +240,16 @@ impl InnerState {
199240
self.queue.push_back(metric);
200241
Ok(())
201242
}
243+
244+
// --- Summary/Average additions
245+
246+
pub fn metrics_summary_for_signpost_and_keys(
247+
&mut self,
248+
signpost: String,
249+
metrics: Vec<String>,
250+
) -> Result<HashMap<String, f64>> {
251+
query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
252+
}
202253
}
203254

204255
fn run_worker(
@@ -292,6 +343,44 @@ fn run_worker(
292343

293344
(state.should_flush(), false)
294345
}
346+
Ok(Event::RequestSummaryFromSignpost {
347+
signpost_key,
348+
keys,
349+
tx,
350+
}) => {
351+
match state.flush() {
352+
Ok(()) => match state
353+
.metrics_summary_for_signpost_and_keys(signpost_key, keys)
354+
{
355+
Ok(metrics) => {
356+
if tx.send(Ok(metrics)).is_err() {
357+
error!(
358+
"Failed to respond with metrics results, discarding"
359+
);
360+
}
361+
}
362+
Err(e) => {
363+
if let Err(e) = tx.send(Err(e)) {
364+
error!(
365+
"Failed to respond with metrics error result, discarding: {e:?}"
366+
);
367+
}
368+
}
369+
},
370+
Err(e) => {
371+
let err = MetricsError::from(e);
372+
error!(
373+
"Failed to flush pending metrics before summary request: {err:?}"
374+
);
375+
if let Err(send_err) = tx.send(Err(err)) {
376+
error!(
377+
"Failed to respond with metrics flush error result, discarding: {send_err:?}"
378+
);
379+
}
380+
}
381+
}
382+
(false, false)
383+
}
295384
Err(RecvTimeoutError::Timeout) => {
296385
debug!("Flushing due to {}s timeout", flush_duration.as_secs());
297386
(true, false)
@@ -320,7 +409,7 @@ fn run_worker(
320409
}
321410

322411
impl SqliteExporter {
323-
/// Creates a new `SqliteExporter` that stores metrics in a SQLite database file.
412+
/// Creates a new `SqliteExporter` that stores metrics in an SQLite database file.
324413
///
325414
/// `flush_interval` specifies how often metrics are flushed to SQLite/disk
326415
///
@@ -341,10 +430,10 @@ impl SqliteExporter {
341430
Ok(exporter)
342431
}
343432

344-
/// Sets optional periodic house keeping, None to disable (disabled by default)
433+
/// Sets optional periodic housekeeping, None to disable (disabled by default)
345434
/// ## Notes
346-
/// Periodic house keeping can affect metric recording, causing some data to be dropped during house keeping.
347-
/// Record limit if set will cause anything over limit + 25% of limit to be removed
435+
/// Periodic housekeeping can affect metric recording, causing some data to be dropped during housekeeping.
436+
/// Record limit if set will cause anything over limit + 25% of the limit to be removed
348437
pub fn set_periodic_housekeeping(
349438
&self,
350439
periodic_duration: Option<Duration>,
@@ -409,7 +498,7 @@ impl SqliteExporter {
409498
record_limit,
410499
excess
411500
);
412-
let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {});", excess);
501+
let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});");
413502
if let Err(e) = sql_query(query).execute(db) {
414503
error!("Failed to delete excessive records: {:?}", e);
415504
}
@@ -423,8 +512,12 @@ impl SqliteExporter {
423512
}
424513

425514
/// Install recorder as `metrics` crate's Recorder
426-
pub fn install(self) -> Result<(), SetRecorderError<Self>> {
427-
metrics::set_global_recorder(self)
515+
pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
516+
let handle = SqliteExporterHandle {
517+
sender: self.sender.clone(),
518+
};
519+
metrics::set_global_recorder(self)?;
520+
Ok(handle)
428521
}
429522
}
430523
impl Drop for SqliteExporter {

0 commit comments

Comments
 (0)