Skip to content

Commit 6a32bcc

Browse files
authored
Add db entry metrics (#1412)
2 parents 9c3ac24 + ca80b75 commit 6a32bcc

4 files changed

Lines changed: 159 additions & 12 deletions

File tree

payjoin-mailroom/src/db/mod.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use tower::util::BoxCloneSyncService;
88
use tower::{Service, ServiceExt};
99

1010
pub mod files;
11+
use crate::metrics::{MetricsService, PayjoinVersion};
1112

1213
pub trait SendableError:
1314
std::error::Error + std::marker::Send + std::marker::Sync + std::convert::Into<anyhow::Error>
@@ -235,3 +236,55 @@ impl Db for DbServiceAdapter {
235236
}
236237

237238
pub use files::FilesDb;
239+
240+
/// Db decorator that records write metrics
241+
#[derive(Clone)]
242+
pub struct MetricsDb<D: Db> {
243+
inner: D,
244+
metrics: MetricsService,
245+
}
246+
247+
impl<D: Db> MetricsDb<D> {
248+
pub fn new(inner: D, metrics: MetricsService) -> Self { Self { inner, metrics } }
249+
}
250+
251+
impl<D: Db> Db for MetricsDb<D> {
252+
type OperationalError = D::OperationalError;
253+
254+
async fn post_v2_payload(
255+
&self,
256+
mailbox_id: &ShortId,
257+
data: Vec<u8>,
258+
) -> Result<Option<()>, Error<Self::OperationalError>> {
259+
let result = self.inner.post_v2_payload(mailbox_id, data).await?;
260+
if result.is_some() {
261+
self.metrics.record_db_entry(PayjoinVersion::Two);
262+
}
263+
Ok(result)
264+
}
265+
266+
async fn wait_for_v2_payload(
267+
&self,
268+
mailbox_id: &ShortId,
269+
) -> Result<Arc<Vec<u8>>, Error<Self::OperationalError>> {
270+
self.inner.wait_for_v2_payload(mailbox_id).await
271+
}
272+
273+
async fn post_v1_response(
274+
&self,
275+
mailbox_id: &ShortId,
276+
data: Vec<u8>,
277+
) -> Result<(), Error<Self::OperationalError>> {
278+
self.inner.post_v1_response(mailbox_id, data).await
279+
}
280+
281+
async fn post_v1_request_and_wait_for_response(
282+
&self,
283+
mailbox_id: &ShortId,
284+
data: Vec<u8>,
285+
) -> Result<Arc<Vec<u8>>, Error<Self::OperationalError>> {
286+
let result = self.inner.post_v1_request_and_wait_for_response(mailbox_id, data).await?;
287+
self.metrics.record_db_entry(PayjoinVersion::One);
288+
Ok(result)
289+
}
290+
}

payjoin-mailroom/src/directory.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,4 +806,63 @@ mod tests {
806806
let html = landing_page_html();
807807
assert!(!html.contains("{{VERSION_STRING}}"));
808808
}
809+
810+
// MetricsDb decorator
811+
812+
#[tokio::test]
813+
async fn post_mailbox_increments_v2_db_entry_metric() {
814+
use opentelemetry_sdk::metrics::{
815+
InMemoryMetricExporter, PeriodicReader, SdkMeterProvider,
816+
};
817+
818+
use crate::db::MetricsDb;
819+
use crate::metrics::{MetricsService, DB_ENTRIES};
820+
821+
let exporter = InMemoryMetricExporter::default();
822+
let reader = PeriodicReader::builder(exporter.clone()).build();
823+
let provider = SdkMeterProvider::builder().with_reader(reader).build();
824+
let metrics = MetricsService::new(Some(provider.clone()));
825+
826+
let dir = tempfile::tempdir().expect("tempdir");
827+
let db = FilesDb::init(Duration::from_millis(100), dir.keep()).await.expect("db init");
828+
let db = MetricsDb::new(db, metrics);
829+
let ohttp: ohttp::Server =
830+
crate::key_config::gen_ohttp_server_config().expect("ohttp config").into();
831+
let svc = Service::new(db, ohttp, SentinelTag::new([0u8; 32]), None);
832+
833+
let id = valid_short_id_path();
834+
let res = svc
835+
.post_mailbox(&id, Body::from(b"small payload under 65k limit".to_vec()))
836+
.await
837+
.expect("post_mailbox should succeed");
838+
assert_eq!(res.status(), StatusCode::OK);
839+
840+
provider.force_flush().expect("flush failed");
841+
let finished = exporter.get_finished_metrics().expect("metrics");
842+
let db_metric = finished
843+
.iter()
844+
.flat_map(|rm| rm.scope_metrics())
845+
.flat_map(|sm| sm.metrics())
846+
.find(|m| m.name() == DB_ENTRIES)
847+
.expect("missing db_entries_total metric");
848+
849+
use opentelemetry::KeyValue;
850+
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
851+
852+
// This checks that counter value is 1 as post_mailbox was called once
853+
// Also confirms the v2 label is recorded
854+
match db_metric.data() {
855+
AggregatedMetrics::U64(MetricData::Sum(sum)) => {
856+
let points: Vec<_> = sum.data_points().collect();
857+
assert_eq!(points.len(), 1, "expected exactly one data point");
858+
assert_eq!(points[0].value(), 1, "expected counter value of 1");
859+
let attrs: Vec<_> = points[0].attributes().collect();
860+
assert!(
861+
attrs.contains(&&KeyValue::new("version", "2")),
862+
"expected version=2 attribute"
863+
);
864+
}
865+
other => panic!("expected U64 Sum, got {other:?}"),
866+
}
867+
}
809868
}

payjoin-mailroom/src/lib.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@ pub mod ohttp_relay;
2929
use crate::metrics::MetricsService;
3030
use crate::middleware::{track_connections, track_metrics};
3131

32+
type DirectoryService =
33+
crate::directory::Service<crate::db::MetricsDb<crate::db::DbServiceAdapter>>;
34+
3235
#[derive(Clone)]
3336
struct Services {
34-
directory: crate::directory::Service<crate::db::DbServiceAdapter>,
37+
directory: DirectoryService,
3538
relay: crate::ohttp_relay::Service,
3639
metrics: MetricsService,
3740
#[cfg(feature = "access-control")]
@@ -40,16 +43,17 @@ struct Services {
4043

4144
pub async fn serve(config: Config, meter_provider: Option<SdkMeterProvider>) -> anyhow::Result<()> {
4245
let sentinel_tag = generate_sentinel_tag();
46+
let metrics = MetricsService::new(meter_provider);
4347

4448
#[cfg(feature = "access-control")]
4549
let geoip = init_geoip(&config).await?;
4650

47-
let directory = init_directory(&config, sentinel_tag).await?;
51+
let directory = init_directory(&config, sentinel_tag, &metrics).await?;
4852

4953
let services = Services {
5054
directory,
5155
relay: crate::ohttp_relay::Service::new(sentinel_tag).await,
52-
metrics: MetricsService::new(meter_provider),
56+
metrics,
5357
#[cfg(feature = "access-control")]
5458
geoip,
5559
};
@@ -84,11 +88,12 @@ pub async fn serve_manual_tls(
8488
use std::net::SocketAddr;
8589

8690
let sentinel_tag = generate_sentinel_tag();
91+
let metrics = MetricsService::new(None);
8792

8893
#[cfg(feature = "access-control")]
8994
let geoip = init_geoip(&config).await?;
9095

91-
let directory = init_directory(&config, sentinel_tag).await?;
96+
let directory = init_directory(&config, sentinel_tag, &metrics).await?;
9297

9398
let services = Services {
9499
directory,
@@ -98,7 +103,7 @@ pub async fn serve_manual_tls(
98103
default_gateway,
99104
)
100105
.await,
101-
metrics: MetricsService::new(None),
106+
metrics,
102107
#[cfg(feature = "access-control")]
103108
geoip,
104109
};
@@ -153,16 +158,17 @@ pub async fn serve_acme(
153158
.ok_or_else(|| anyhow::anyhow!("ACME configuration is required for serve_acme"))?;
154159

155160
let sentinel_tag = generate_sentinel_tag();
161+
let metrics = MetricsService::new(meter_provider);
156162

157163
#[cfg(feature = "access-control")]
158164
let geoip = init_geoip(&config).await?;
159165

160-
let directory = init_directory(&config, sentinel_tag).await?;
166+
let directory = init_directory(&config, sentinel_tag, &metrics).await?;
161167

162168
let services = Services {
163169
directory,
164170
relay: crate::ohttp_relay::Service::new(sentinel_tag).await,
165-
metrics: MetricsService::new(meter_provider),
171+
metrics,
166172
#[cfg(feature = "access-control")]
167173
geoip,
168174
};
@@ -220,10 +226,11 @@ impl Connected<IncomingStream<'_, Listener>> for middleware::MaybePeerIp {
220226
async fn init_directory(
221227
config: &Config,
222228
sentinel_tag: SentinelTag,
223-
) -> anyhow::Result<crate::directory::Service<crate::db::DbServiceAdapter>> {
229+
metrics: &MetricsService,
230+
) -> anyhow::Result<DirectoryService> {
224231
let files_db = crate::db::FilesDb::init(config.timeout, config.storage_dir.clone()).await?;
225232
files_db.spawn_background_prune().await;
226-
let db = crate::db::DbServiceAdapter::new(files_db);
233+
let db = crate::db::MetricsDb::new(crate::db::DbServiceAdapter::new(files_db), metrics.clone());
227234

228235
let ohttp_keys_dir = config.storage_dir.join("ohttp-keys");
229236
let ohttp_config = init_ohttp_config(&ohttp_keys_dir)?;
@@ -538,10 +545,11 @@ mod tests {
538545
);
539546

540547
let sentinel_tag = generate_sentinel_tag();
548+
let metrics = MetricsService::new(Some(provider.clone()));
541549
let services = Services {
542-
directory: init_directory(&config, sentinel_tag).await.unwrap(),
550+
directory: init_directory(&config, sentinel_tag, &metrics).await.unwrap(),
543551
relay: crate::ohttp_relay::Service::new(sentinel_tag).await,
544-
metrics: MetricsService::new(Some(provider.clone())),
552+
metrics,
545553
#[cfg(feature = "access-control")]
546554
geoip: None,
547555
};

payjoin-mailroom/src/metrics.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
use std::fmt;
2+
13
use opentelemetry::metrics::{Counter, MeterProvider, UpDownCounter};
24
use opentelemetry::KeyValue;
35
use opentelemetry_sdk::metrics::SdkMeterProvider;
46

57
pub(crate) const TOTAL_CONNECTIONS: &str = "total_connections";
68
pub(crate) const ACTIVE_CONNECTIONS: &str = "active_connections";
79
pub(crate) const HTTP_REQUESTS: &str = "http_request_total";
10+
pub(crate) const DB_ENTRIES: &str = "db_entries_total";
811

912
#[derive(Clone)]
1013
pub struct MetricsService {
@@ -14,6 +17,21 @@ pub struct MetricsService {
1417
total_connections: Counter<u64>,
1518
/// Number of active connections right now
1619
active_connections: UpDownCounter<i64>,
20+
/// Total v1/v2 mailbox entries written, labelled by `version`
21+
db_entries_total: Counter<u64>,
22+
}
23+
24+
#[repr(u8)]
25+
#[derive(Clone, Copy, PartialEq, Eq)]
26+
pub enum PayjoinVersion {
27+
/// BIP 78 Payjoin
28+
One = 1,
29+
/// BIP 77 Async Payjoin
30+
Two = 2,
31+
}
32+
33+
impl fmt::Display for PayjoinVersion {
34+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { (*self as u8).fmt(f) }
1735
}
1836

1937
impl MetricsService {
@@ -36,7 +54,12 @@ impl MetricsService {
3654
.with_description("Number of active connections")
3755
.build();
3856

39-
Self { http_requests_total, total_connections, active_connections }
57+
let db_entries_total = meter
58+
.u64_counter(DB_ENTRIES)
59+
.with_description("Total mailbox entries stored by protocol version")
60+
.build();
61+
62+
Self { http_requests_total, total_connections, active_connections, db_entries_total }
4063
}
4164

4265
pub fn record_http_request(&self, endpoint: &str, method: &str, status_code: u16) {
@@ -56,4 +79,8 @@ impl MetricsService {
5679
}
5780

5881
pub fn record_connection_close(&self) { self.active_connections.add(-1, &[]); }
82+
83+
pub fn record_db_entry(&self, version: PayjoinVersion) {
84+
self.db_entries_total.add(1, &[KeyValue::new("version", version.to_string())]);
85+
}
5986
}

0 commit comments

Comments
 (0)