diff --git a/Cargo-minimal.lock b/Cargo-minimal.lock index f05545562..cb10c5dbc 100644 --- a/Cargo-minimal.lock +++ b/Cargo-minimal.lock @@ -1977,6 +1977,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hyperloglogplus" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "621debdf94dcac33e50475fdd76d34d5ea9c0362a834b9db08c3024696c1fbe3" +dependencies = [ + "serde", +] + [[package]] name = "iana-time-zone" version = "0.1.64" @@ -2831,6 +2840,7 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-util", + "hyperloglogplus", "ipnet", "maxminddb", "mockito", diff --git a/Cargo-recent.lock b/Cargo-recent.lock index f05545562..cb10c5dbc 100644 --- a/Cargo-recent.lock +++ b/Cargo-recent.lock @@ -1977,6 +1977,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hyperloglogplus" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "621debdf94dcac33e50475fdd76d34d5ea9c0362a834b9db08c3024696c1fbe3" +dependencies = [ + "serde", +] + [[package]] name = "iana-time-zone" version = "0.1.64" @@ -2831,6 +2840,7 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-util", + "hyperloglogplus", "ipnet", "maxminddb", "mockito", diff --git a/payjoin-mailroom/Cargo.toml b/payjoin-mailroom/Cargo.toml index 86e20bef0..8d30f63d5 100644 --- a/payjoin-mailroom/Cargo.toml +++ b/payjoin-mailroom/Cargo.toml @@ -38,6 +38,7 @@ config = "0.15" flate2 = { version = "1.1", optional = true } futures = "0.3" hex = { package = "hex-conservative", version = "0.1.1" } +hyperloglogplus = "0.4.1" http = "1.3.1" http-body-util = "0.1.3" hyper = { version = "1.6.0", features = ["http1", "server"] } diff --git a/payjoin-mailroom/src/db/mod.rs b/payjoin-mailroom/src/db/mod.rs index 707b6347a..210bac4e2 100644 --- a/payjoin-mailroom/src/db/mod.rs +++ b/payjoin-mailroom/src/db/mod.rs @@ -256,6 +256,7 @@ impl Db for MetricsDb { mailbox_id: &ShortId, data: Vec, ) -> Result, Error> { + self.metrics.record_short_id(mailbox_id); let result = self.inner.post_v2_payload(mailbox_id, data).await?; if result.is_some() { self.metrics.record_db_entry(PayjoinVersion::Two); @@ -267,6 +268,7 @@ impl Db for MetricsDb { &self, mailbox_id: &ShortId, ) -> Result>, Error> { + self.metrics.record_short_id(mailbox_id); self.inner.wait_for_v2_payload(mailbox_id).await } @@ -275,6 +277,7 @@ impl Db for MetricsDb { mailbox_id: &ShortId, data: Vec, ) -> Result<(), Error> { + self.metrics.record_short_id(mailbox_id); self.inner.post_v1_response(mailbox_id, data).await } @@ -283,6 +286,7 @@ impl Db for MetricsDb { mailbox_id: &ShortId, data: Vec, ) -> Result>, Error> { + self.metrics.record_short_id(mailbox_id); let result = self.inner.post_v1_request_and_wait_for_response(mailbox_id, data).await?; self.metrics.record_db_entry(PayjoinVersion::One); Ok(result) diff --git a/payjoin-mailroom/src/directory.rs b/payjoin-mailroom/src/directory.rs index 0cd51ed0a..fea002d79 100644 --- a/payjoin-mailroom/src/directory.rs +++ b/payjoin-mailroom/src/directory.rs @@ -865,4 +865,58 @@ mod tests { other => panic!("expected U64 Sum, got {other:?}"), } } + + #[tokio::test] + async fn post_mailbox_records_short_id_cardinality() { + use opentelemetry_sdk::metrics::{ + InMemoryMetricExporter, PeriodicReader, SdkMeterProvider, + }; + + use crate::db::MetricsDb; + use crate::metrics::{MetricsService, UNIQUE_SHORT_IDS}; + + let exporter = InMemoryMetricExporter::default(); + let reader = PeriodicReader::builder(exporter.clone()).build(); + let provider = SdkMeterProvider::builder().with_reader(reader).build(); + let metrics = MetricsService::new(Some(provider.clone())); + + let dir = tempfile::tempdir().expect("tempdir"); + let db = FilesDb::init(Duration::from_millis(100), dir.keep()).await.expect("db init"); + let db = MetricsDb::new(db, metrics); + let ohttp: ohttp::Server = + crate::key_config::gen_ohttp_server_config().expect("ohttp config").into(); + let svc = Service::new(db, ohttp, SentinelTag::new([0u8; 32]), None); + + let id = valid_short_id_path(); + let res = svc + .post_mailbox(&id, Body::from(b"payload".to_vec())) + .await + .expect("post_mailbox should succeed"); + assert_eq!(res.status(), StatusCode::OK); + + provider.force_flush().expect("flush failed"); + let finished = exporter.get_finished_metrics().expect("metrics"); + let gauge = finished + .iter() + .flat_map(|rm| rm.scope_metrics()) + .flat_map(|sm| sm.metrics()) + .find(|m| m.name() == UNIQUE_SHORT_IDS) + .expect("missing unique_short_ids metric"); + + use opentelemetry::KeyValue; + use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData}; + + match gauge.data() { + AggregatedMetrics::U64(MetricData::Gauge(g)) => { + let points: Vec<_> = g.data_points().collect(); + assert!(!points.is_empty(), "expected at least one data point"); + let hourly = points.iter().find(|dp| { + dp.attributes().any(|kv| kv == &KeyValue::new("interval", "hourly")) + }); + assert!(hourly.is_some(), "expected hourly data point"); + assert!(hourly.unwrap().value() >= 1, "expected at least 1 unique short ID"); + } + other => panic!("expected U64 Gauge, got {other:?}"), + } + } } diff --git a/payjoin-mailroom/src/metrics.rs b/payjoin-mailroom/src/metrics.rs index c807b9cfd..6c0722594 100644 --- a/payjoin-mailroom/src/metrics.rs +++ b/payjoin-mailroom/src/metrics.rs @@ -1,13 +1,116 @@ +use std::collections::hash_map::RandomState; +use std::collections::HashMap; use std::fmt; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; -use opentelemetry::metrics::{Counter, MeterProvider, UpDownCounter}; +use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; +use opentelemetry::metrics::{Counter, MeterProvider, ObservableGauge, UpDownCounter}; use opentelemetry::KeyValue; use opentelemetry_sdk::metrics::SdkMeterProvider; +use payjoin::directory::ShortId; pub(crate) const TOTAL_CONNECTIONS: &str = "total_connections"; pub(crate) const ACTIVE_CONNECTIONS: &str = "active_connections"; pub(crate) const HTTP_REQUESTS: &str = "http_request_total"; pub(crate) const DB_ENTRIES: &str = "db_entries_total"; +pub(crate) const UNIQUE_SHORT_IDS: &str = "unique_short_ids"; + +const HLL_PRECISION: u8 = 14; +const HOURLY_RETENTION_HOURS: u64 = 168; // 7 days +const DAILY_RETENTION_DAYS: u64 = 90; + +type HllSketch = HyperLogLogPlus<[u8; 8], RandomState>; + +fn new_sketch() -> HllSketch { + HyperLogLogPlus::new(HLL_PRECISION, RandomState::new()).expect("precision 14 is always valid") +} + +/// Estimates the number of unique ShortIds seen per time window. +/// Two tiers of HLL sketches: +/// - **Hourly** -- one sketch per hour, pruned after 7 days. +/// - **Daily** -- one sketch per day, pruned after 90 days. +struct HllSketches { + hourly: HashMap, + daily: HashMap, +} + +impl HllSketches { + fn new() -> Self { Self { hourly: HashMap::new(), daily: HashMap::new() } } + + fn add_id(&mut self, id: &ShortId) { + let secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock before UNIX epoch") + .as_secs(); + let hour = secs / 3600; + let day = secs / 86400; + + self.hourly.entry(hour).or_insert_with(new_sketch).insert(&id.0); + self.daily.entry(day).or_insert_with(new_sketch).insert(&id.0); + + self.hourly.retain(|&k, _| hour.saturating_sub(HOURLY_RETENTION_HOURS) <= k); + self.daily.retain(|&k, _| day.saturating_sub(DAILY_RETENTION_DAYS) <= k); + } + + fn hourly_count(&mut self) -> u64 { + let hour = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock before UNIX epoch") + .as_secs() + / 3600; + self.hourly.get_mut(&hour).map(|hll| hll.count().trunc() as u64).unwrap_or(0) + } + + fn daily_count(&mut self) -> u64 { + let day = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock before UNIX epoch") + .as_secs() + / 86400; + self.daily.get_mut(&day).map(|hll| hll.count().trunc() as u64).unwrap_or(0) + } + + fn weekly_count(&mut self) -> u64 { + let today = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock before UNIX epoch") + .as_secs() + / 86400; + let mut union = new_sketch(); + for offset in 0..7 { + if let Some(sketch) = self.daily.get(&(today - offset)) { + union.merge(sketch).expect("same precision"); + } + } + union.count().trunc() as u64 + } +} + +#[derive(Clone)] +pub struct UniqueShortIdTracker { + inner: Arc>, +} + +impl UniqueShortIdTracker { + pub fn new() -> Self { Self { inner: Arc::new(Mutex::new(HllSketches::new())) } } + + pub fn add_id(&self, id: &ShortId) { + self.inner.lock().expect("tracker lock poisoned").add_id(id); + } + + pub fn hourly_count(&self) -> u64 { + self.inner.lock().expect("tracker lock poisoned").hourly_count() + } + + pub fn daily_count(&self) -> u64 { + self.inner.lock().expect("tracker lock poisoned").daily_count() + } + + pub fn weekly_count(&self) -> u64 { + self.inner.lock().expect("tracker lock poisoned").weekly_count() + } +} #[derive(Clone)] pub struct MetricsService { @@ -19,6 +122,8 @@ pub struct MetricsService { active_connections: UpDownCounter, /// Total v1/v2 mailbox entries written, labelled by `version` db_entries_total: Counter, + tracker: UniqueShortIdTracker, + _unique_ids_gauge: Option>>, } #[repr(u8)] @@ -36,6 +141,7 @@ impl fmt::Display for PayjoinVersion { impl MetricsService { pub fn new(provider: Option) -> Self { + let has_reader = provider.is_some(); let provider = provider.unwrap_or_default(); let meter = provider.meter("payjoin-mailroom"); @@ -59,7 +165,42 @@ impl MetricsService { .with_description("Total mailbox entries stored by protocol version") .build(); - Self { http_requests_total, total_connections, active_connections, db_entries_total } + let tracker = UniqueShortIdTracker::new(); + + let unique_ids_gauge = if has_reader { + let gauge_tracker = tracker.clone(); + Some(Arc::new( + meter + .u64_observable_gauge(UNIQUE_SHORT_IDS) + .with_description("Estimated unique short IDs") + .with_callback(move |observer| { + observer.observe( + gauge_tracker.hourly_count(), + &[KeyValue::new("interval", "hourly")], + ); + observer.observe( + gauge_tracker.daily_count(), + &[KeyValue::new("interval", "daily")], + ); + observer.observe( + gauge_tracker.weekly_count(), + &[KeyValue::new("interval", "weekly")], + ); + }) + .build(), + )) + } else { + None + }; + + Self { + http_requests_total, + total_connections, + active_connections, + db_entries_total, + tracker, + _unique_ids_gauge: unique_ids_gauge, + } } pub fn record_http_request(&self, endpoint: &str, method: &str, status_code: u16) { @@ -83,4 +224,6 @@ impl MetricsService { pub fn record_db_entry(&self, version: PayjoinVersion) { self.db_entries_total.add(1, &[KeyValue::new("version", version.to_string())]); } + + pub fn record_short_id(&self, id: &ShortId) { self.tracker.add_id(id); } }