Skip to content

Commit 43b0909

Browse files
committed
refactor: improve error handling and metrics in relay components
clippy fixes
1 parent 18c66e2 commit 43b0909

11 files changed

Lines changed: 169 additions & 38 deletions

File tree

src/bins/rmb-relay.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::Duration;
33

44
use anyhow::{Context, Result};
55
use clap::{builder::ArgAction, Parser};
6+
use prometheus::default_registry;
67
use rmb::cache::RedisCache;
78
use rmb::events;
89
use rmb::redis;
@@ -170,6 +171,7 @@ async fn app(args: Args) -> Result<JoinHandle<()>> {
170171
)
171172
};
172173
let ranker = relay::ranker::RelayRanker::new(Duration::from_secs(args.ranker_period));
174+
rmb::cache::register_cache_metrics(default_registry());
173175
let r = relay::Relay::new(
174176
args.domains.iter().cloned().collect(),
175177
twins,

src/cache/memory.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use std::sync::Arc;
33
use std::time::Duration;
44
use tokio::sync::RwLock;
55

6-
use super::Cache;
6+
#[cfg(feature = "tracker")]
7+
use super::RMB_CACHE_HITS_TOTAL;
8+
use super::{Cache, RMB_CACHE_ENTRIES, RMB_CACHE_FLUSHES_TOTAL, RMB_CACHE_MISSES_TOTAL};
79
use anyhow::Result;
810
use ttl_cache::TtlCache;
911

@@ -38,21 +40,35 @@ where
3840
{
3941
async fn set<K: ToString + Send + Sync>(&self, key: K, obj: T) -> Result<()> {
4042
let mut mem = self.mem.write().await;
41-
mem.insert(key.to_string(), obj, self.ttl);
43+
let prev = mem.insert(key.to_string(), obj, self.ttl);
44+
if prev.is_none() {
45+
RMB_CACHE_ENTRIES.inc();
46+
}
4247

4348
Ok(())
4449
}
4550

4651
async fn get<K: ToString + Send + Sync>(&self, key: K) -> Result<Option<T>> {
4752
let mem = self.mem.read().await;
4853
match mem.get(&key.to_string()) {
49-
None => Ok(None),
50-
Some(v) => Ok(Some(v.clone())),
54+
None => {
55+
RMB_CACHE_MISSES_TOTAL.inc();
56+
Ok(None)
57+
}
58+
Some(v) => {
59+
#[cfg(feature = "tracker")]
60+
{
61+
RMB_CACHE_HITS_TOTAL.inc();
62+
}
63+
Ok(Some(v.clone()))
64+
}
5165
}
5266
}
5367
async fn flush(&self) -> Result<()> {
5468
let mut mem = self.mem.write().await;
5569
mem.clear();
70+
RMB_CACHE_ENTRIES.set(0);
71+
RMB_CACHE_FLUSHES_TOTAL.inc();
5672

5773
Ok(())
5874
}

src/cache/mod.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,45 @@ use std::{
99
marker::{Send, Sync},
1010
};
1111

12+
use lazy_static::lazy_static;
13+
use prometheus::{IntCounter, IntGauge, Registry};
14+
15+
#[cfg(feature = "tracker")]
16+
lazy_static! {
17+
static ref RMB_CACHE_HITS_TOTAL: IntCounter = IntCounter::new(
18+
"rmb_cache_hits_total",
19+
"Number of cache hits when fetching twins",
20+
)
21+
.unwrap();
22+
}
23+
24+
lazy_static! {
25+
static ref RMB_CACHE_MISSES_TOTAL: IntCounter = IntCounter::new(
26+
"rmb_cache_misses_total",
27+
"Number of cache misses when fetching twins (RPC fallback likely)",
28+
)
29+
.unwrap();
30+
static ref RMB_CACHE_FLUSHES_TOTAL: IntCounter = IntCounter::new(
31+
"rmb_cache_flushes_total",
32+
"Number of times the twin cache has been flushed",
33+
)
34+
.unwrap();
35+
static ref RMB_CACHE_ENTRIES: IntGauge = IntGauge::new(
36+
"rmb_cache_entries",
37+
"Current number of entries in the twin cache",
38+
)
39+
.unwrap();
40+
}
41+
42+
pub fn register_cache_metrics(registry: &Registry) {
43+
// Best-effort register into provided registry; ignore duplicate registration errors.
44+
#[cfg(feature = "tracker")]
45+
let _ = registry.register(Box::new(RMB_CACHE_HITS_TOTAL.clone()));
46+
let _ = registry.register(Box::new(RMB_CACHE_MISSES_TOTAL.clone()));
47+
let _ = registry.register(Box::new(RMB_CACHE_FLUSHES_TOTAL.clone()));
48+
let _ = registry.register(Box::new(RMB_CACHE_ENTRIES.clone()));
49+
}
50+
1251
pub trait Cache<T>: Send + Sync + 'static {
1352
fn set<S: ToString + Send + Sync>(
1453
&self,

src/cache/redis.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use super::Cache;
1+
#[cfg(feature = "tracker")]
2+
use super::RMB_CACHE_HITS_TOTAL;
3+
use super::{Cache, RMB_CACHE_ENTRIES, RMB_CACHE_FLUSHES_TOTAL, RMB_CACHE_MISSES_TOTAL};
24

35
use anyhow::{Context, Result};
46
use bb8_redis::{
@@ -47,12 +49,15 @@ where
4749
async fn set<S: ToString + Send + Sync>(&self, key: S, obj: T) -> Result<()> {
4850
let mut conn = self.get_connection().await?;
4951
let obj = serde_json::to_vec(&obj).context("unable to serialize twin object for redis")?;
50-
let _: () = cmd("HSET")
52+
let added: i64 = cmd("HSET")
5153
.arg(&self.prefix)
5254
.arg(key.to_string())
5355
.arg(obj)
54-
.query_async::<_, ()>(&mut *conn)
56+
.query_async::<_, i64>(&mut *conn)
5557
.await?;
58+
if added == 1 {
59+
RMB_CACHE_ENTRIES.inc();
60+
}
5661

5762
Ok(())
5863
}
@@ -70,14 +75,23 @@ where
7075
let ret: T = serde_json::from_slice(&val)
7176
.context("unable to deserialize redis value to twin object")?;
7277

78+
#[cfg(feature = "tracker")]
79+
{
80+
RMB_CACHE_HITS_TOTAL.inc();
81+
}
7382
Ok(Some(ret))
7483
}
75-
None => Ok(None),
84+
None => {
85+
RMB_CACHE_MISSES_TOTAL.inc();
86+
Ok(None)
87+
}
7688
}
7789
}
7890
async fn flush(&self) -> Result<()> {
7991
let mut conn = self.get_connection().await?;
8092
let _: () = cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?;
93+
RMB_CACHE_ENTRIES.set(0);
94+
RMB_CACHE_FLUSHES_TOTAL.inc();
8195

8296
Ok(())
8397
}

src/identity/ed25519.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ mod tests {
103103
Ed25519Signer::try_from(WORDS).expect("key must be loaded");
104104

105105
let err = Ed25519Signer::try_from("invalid words");
106-
assert_eq!(err.is_err(), true);
106+
assert!(err.is_err());
107107
}
108108

109109
#[test]
@@ -115,6 +115,6 @@ mod tests {
115115
let _: [u8; 32] = result.as_slice().try_into().expect("key of size 32");
116116

117117
let err = Ed25519Signer::try_from("0xinvalidseed");
118-
assert_eq!(err.is_err(), true);
118+
assert!(err.is_err());
119119
}
120120
}

src/identity/sr25519.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,14 @@ mod tests {
100100
Sr25519Signer::try_from(WORDS).expect("key must be loaded");
101101

102102
let err = Sr25519Signer::try_from("invalid words");
103-
assert_eq!(err.is_err(), true);
103+
assert!(err.is_err());
104104
}
105105

106106
#[test]
107107
fn test_load_from_seed() {
108108
Sr25519Signer::try_from(SEED).expect("key must be loaded");
109109

110110
let err = Sr25519Signer::try_from("0xinvalidseed");
111-
assert_eq!(err.is_err(), true);
111+
assert!(err.is_err());
112112
}
113113
}

src/peer/e2e/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ mod test {
141141
.parse()
142142
.unwrap();
143143

144-
let shared1 = sk1.shared(&sk2.public()).unwrap();
145-
let shared2 = sk2.shared(&sk1.public()).unwrap();
144+
let shared1 = sk1.shared(sk2.public()).unwrap();
145+
let shared2 = sk2.shared(sk1.public()).unwrap();
146146

147147
assert_eq!(shared1, shared2);
148148

src/relay/api.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ async fn federation<D: TwinDB, R: RateLimiter>(
209209

210210
let dst: SessionID = (&envelope.destination).into();
211211

212-
// fast-fail: if requested and destination is not local, return error immediately
212+
// fast-fail path: if requested and destination is not local, return error immediately
213213
if has_fast_fail(&envelope) && !data.switch.is_local(&dst).await {
214214
// destination session doesn’t exist on this relay
215215
return Response::builder()
@@ -376,18 +376,15 @@ impl<M: Metrics, D: TwinDB> Session<M, D> {
376376
}
377377

378378
let is_local = self.switch.is_local(&dst).await;
379-
// fast-fail: if requested and destination is not local, return error immediately
380-
if has_fast_fail(envelope) && !is_local {
381-
anyhow::bail!("destination offline");
382-
}
383379

384380
// check if the dst twin id is already connected locally
385381
// if so, we don't have to check federation and directly
386382
// switch the message
387383
if is_local {
388-
log::debug!("found local session for '{}' , forwarding message", dst);
384+
log::trace!("found local session for '{}' , forwarding message", dst);
389385
if let Err(err) = self.switch.send(&dst, &msg).await {
390386
log::error!("failed to route message to peer '{}' : {}", dst, err);
387+
anyhow::bail!("failed to route message to peer '{}': {}", dst, err);
391388
}
392389
return Ok(());
393390
}
@@ -414,13 +411,18 @@ impl<M: Metrics, D: TwinDB> Session<M, D> {
414411
// push message to the (relay.federation) queue
415412
return Ok(self.federator.send(&msg).await?);
416413
}
417-
414+
// fast-fail path: we confirmed that this is not an foreign message, and if it is fast-fail taged and destination session is not connected
415+
// then the peer must be offline, return error immediately
416+
if has_fast_fail(envelope) && !is_local {
417+
anyhow::bail!("destination offline");
418+
}
418419
// we don't return an error because when we return an error
419420
// we will send this error back to the sender user. Hence
420421
// calling the switch.send again
421422
// this is an internal error anyway, and should not happen
422423
if let Err(err) = self.switch.send(&dst, &msg).await {
423424
log::error!("failed to route message to peer '{}': {}", dst, err);
425+
anyhow::bail!("failed to route message to peer '{}': {}", dst, err);
424426
}
425427

426428
Ok(())
@@ -474,7 +476,7 @@ impl<M: Metrics, D: TwinDB> Session<M, D> {
474476
// TODO: throttling to avoid sending too many messages in short time!
475477
match message {
476478
Message::Text(_) => {
477-
log::trace!("received unsupported (text) message. disconnecting!");
479+
log::debug!("received unsupported (text) message. disconnecting!");
478480
break;
479481
}
480482
Message::Binary(msg) => {
@@ -487,16 +489,21 @@ impl<M: Metrics, D: TwinDB> Session<M, D> {
487489
super::switch::MESSAGE_RX_TWIN.with_label_values(&[&format!("{}", envelope.source.twin)]).inc();
488490

489491
if !self.metrics.measure(msg.len()).await {
490-
log::trace!("twin with stream id {} exceeded its request limits, dropping message", self.id);
491-
self.send_error(envelope, "exceeded rate limits, dropping message").await;
492-
continue
492+
log::debug!("twin with stream id {} exceeded its request limits, dropping message", self.id);
493+
if envelope.has_request() {
494+
self.send_error(envelope, "exceeded rate limits, dropping message").await;
495+
}
496+
continue;
493497
}
494498

495499
// if we failed to route back the message to the user
496500
// for any reason we send an error message back
497501
// server error has no source address set.
498502
if let Err(err) = self.route(&mut envelope, msg).await {
499-
self.send_error(envelope, err).await;
503+
// check if the envolope is request or response
504+
if envelope.has_request() {
505+
self.send_error(envelope, err).await;
506+
}
500507
}
501508
}
502509
Message::Ping(_) => {

src/relay/switch/mod.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@ use tokio::time::Duration;
2727
use tokio_util::sync::{CancellationToken, DropGuard};
2828
use worker::WorkerJob;
2929

30-
use prometheus::{IntCounter, IntGaugeVec, Opts, Registry};
31-
32-
#[cfg(feature = "tracker")]
33-
use prometheus::IntCounterVec;
30+
use prometheus::{IntCounter, IntCounterVec, IntGaugeVec, Opts, Registry};
3431

3532
lazy_static::lazy_static! {
3633
static ref CON_PER_WORKER: IntGaugeVec = IntGaugeVec::new(
@@ -45,6 +42,16 @@ lazy_static::lazy_static! {
4542
static ref MESSAGE_RX_BYTES: IntCounter = IntCounter::new("relay_message_rx_bytes", "size of messages received by relay in bytes").unwrap();
4643

4744
static ref MESSAGE_TX_BYTES: IntCounter = IntCounter::new("relay_message_tx_bytes", "size of messages forwarded by relay in bytes").unwrap();
45+
46+
// Total number of session evictions, labeled by reason
47+
// reason ∈ { "closed", "back_pressure_timeout", "cancelled" }
48+
static ref RELAY_SESSION_EVICTIONS_TOTAL: IntCounterVec = IntCounterVec::new(
49+
Opts::new(
50+
"relay_session_evictions_total",
51+
"Total number of session evictions from the relay, labeled by reason",
52+
),
53+
&["reason"],
54+
).unwrap();
4855
}
4956

5057
#[cfg(feature = "tracker")]
@@ -201,6 +208,8 @@ where
201208
opts.registry.register(Box::new(MESSAGE_TX.clone()))?;
202209
opts.registry.register(Box::new(MESSAGE_RX_BYTES.clone()))?;
203210
opts.registry.register(Box::new(MESSAGE_TX_BYTES.clone()))?;
211+
opts.registry
212+
.register(Box::new(RELAY_SESSION_EVICTIONS_TOTAL.clone()))?;
204213
#[cfg(feature = "tracker")]
205214
opts.registry.register(Box::new(MESSAGE_RX_TWIN.clone()))?;
206215

@@ -277,6 +286,9 @@ where
277286
}
278287

279288
/// checks if a session is connected locally
289+
/// it doesn't verify if the twin is belongs to this relay
290+
/// so should be treated as cheap way to route to locals online twins bypassing twin lookup
291+
/// if the session is not found, further routing verification should be done
280292
pub async fn is_local(&self, id: &SessionID) -> bool {
281293
self.sessions.lock().await.contains_key(id)
282294
}
@@ -382,7 +394,7 @@ mod test {
382394
.expect("can connect");
383395

384396
let switch = Switch::<MessageSender>::new(super::SwitchOptions {
385-
pool: pool,
397+
pool,
386398
workers: 2,
387399
max_users: 100,
388400
registry: Registry::new(),

0 commit comments

Comments
 (0)