Skip to content

Commit 4a44655

Browse files
authored
feat(relay): Overhaul for production-grade reliability, performance, and observability
This commit introduces a major architectural overhaul of the RMB Relay, focusing on self-healing resilience, high-load performance, and comprehensive monitoring. Core components have been re-engineered to be more robust, efficient, and transparent in a production environment. Architectural & Reliability Overhaul: * Federation on Redis Streams: The inter-relay federation mechanism is completely rebuilt on Redis Streams. This provides at-least-once delivery semantics and automatic recovery for failed or pending messages, replacing the previous, less reliable queue. * Self-Healing Substrate Client: The Substrate client and event listener are now highly resilient to network failures. They feature automatic, lock-free reconnections using `arc-swap`, capped exponential backoff, and a "single-flight" pattern to prevent thundering herd scenarios during cache misses. Performance & Efficiency: * Local Peer Circuit-Breaker: Message routing now uses a fast, in-memory check for locally connected peers, bypassing expensive lookups and federation logic to significantly speed up local traffic. * Binary Serialization: The Redis cache now uses `bincode` instead of `serde_json`, reducing data size and CPU overhead for serialization and deserialization. * Batched Message ACKs: WebSocket message acknowledgments are now batched, drastically reducing Redis round-trips and improving throughput under heavy load. * Fail-Fast Tag: A new `fail-fast` message tag provides immediate feedback to senders if a remote destination is known to be offline, avoiding unnecessary queuing and timeouts. Networking & Protocol Modernization: * Hyper 1.0 & HTTP/2: The entire networking stack has been upgraded to the modern Hyper 1.0 API. Federation traffic is now explicitly configured for HTTP/2 to leverage its performance benefits. * Protocol Field Deprecation: The `federation` and `relays` fields in the `Envelope` protobuf message have been formally marked as deprecated, aligning the protocol with the new chain-based discovery mechanism. Observability: * Comprehensive Prometheus Metrics: New, detailed Prometheus metrics have been added across the application, tracking the Event Listener's status (reconnects, processed blocks), cache performance (hits, misses), and session evictions.
1 parent 020c891 commit 4a44655

23 files changed

Lines changed: 2222 additions & 984 deletions

File tree

Cargo.lock

Lines changed: 705 additions & 476 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,33 @@ path = "src/bins/rmb-relay.rs"
1313

1414

1515
[dependencies]
16-
anyhow = "1.0.56"
16+
anyhow = "1"
1717
async-trait = "0.1"
18-
base64 = "0.13.0"
19-
bb8-redis = "0.13"
18+
base64 = "0.13"
19+
bb8-redis = "0.16"
20+
redis = { version = "0.26", features = ["streams", "aio", "tokio-comp"] }
2021
clap = { version = "4", features = ["derive"] }
2122
futures = "0.3"
23+
arc-swap = "1.5"
2224
hex = { version = "0.4", features = ["alloc"] }
23-
http = "0.2.7"
25+
http = "1"
2426
log = "0.4"
2527
md5 = "0.7.0"
26-
nix = "0.24.1"
27-
serde = { version = "1.0.136", features = ["derive"] }
28-
serde_json = { version = "1.0.81" }
28+
nix = "0.24"
29+
serde = { version = "1", features = ["derive"] }
30+
serde_json = { version = "1" }
31+
bincode = "1"
2932
simple_logger = "2.1"
3033
thiserror = "1.0.31"
3134
tokio = { version = "1", features = ["full"] }
3235
tokio-retry = "0.3"
33-
uuid = { version = "1.1.0", features = ["v4"] }
36+
uuid = { version = "1", features = ["v4"] }
3437
jsonrpsee-core = "0.14.0"
3538
mime = "0.3"
3639
mpart-async = "0.6.0"
3740
url = "2.3.1"
38-
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
39-
futures-util = "0.3.25"
41+
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
42+
futures-util = "0.3"
4043
jwt = "0.16"
4144
subxt = { version = "0.28.0", features = ["substrate-compat"] }
4245
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = [
@@ -53,13 +56,16 @@ openssl = { version = "0.10", features = ["vendored"] }
5356
workers = { git = "https://github.com/threefoldtech/tokio-worker-pool", branch = "main" }
5457
protobuf = { version = "3.2.0", features = ["with-bytes"] }
5558

56-
hyper = { version = "0.14", features = ["full"] }
57-
hyper-tungstenite = "0.11"
59+
hyper = { version = "1", features = ["full"] }
60+
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
61+
http-body-util = "0.1"
62+
hyper-tungstenite = "0.14"
5863
lazy_static = "1.4.0"
59-
prometheus = { version = "0.13.3", features = ["process"] }
64+
prometheus = { version = "0.13", features = ["process"] }
6065

6166
tfchain-client = { git = "https://github.com/threefoldtech/tfchain.git", version = "0.2.0", branch = "development" }
62-
reqwest = "0.11"
67+
reqwest = "0.12"
68+
bytes = "1"
6369

6470
# for e2e
6571
bip39 = { version = "2.0.0", default-features = false }

proto/types.proto

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ message Envelope {
6262
optional string schema = 10;
6363

6464
// a federation url (domain)
65-
// this filed ignored by relays but left for back compatibilty with old peers
66-
optional string federation = 11;
65+
// deprecated: this filed ignored by relays but left for back compatibilty with old peers
66+
optional string federation = 11 [deprecated = true];
6767

6868
// pyload of the message is interpreted differently based
6969
// on the message filed
@@ -72,5 +72,6 @@ message Envelope {
7272
bytes cipher = 14;
7373
}
7474

75-
repeated string relays = 17;
75+
// deprecated: relays discovery is now driven by chain/cache; this field will be removed
76+
repeated string relays = 17 [deprecated = true];
7677
}

src/bins/rmb-relay.rs

Lines changed: 53 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::Duration;
33

44
use anyhow::{Context, Result};
55
use clap::{builder::ArgAction, Parser};
6+
use prometheus::default_registry;
67
use rmb::cache::RedisCache;
78
use rmb::events;
89
use rmb::redis;
@@ -11,7 +12,7 @@ use rmb::relay::{
1112
limiter::{FixedWindowOptions, Limiters},
1213
};
1314
use rmb::twin::SubstrateTwinDB;
14-
use tokio::sync::oneshot;
15+
use tokio::task::JoinHandle;
1516

1617
/// A peer requires only which rely to connect to, and
1718
/// which identity (mnemonics)
@@ -65,6 +66,10 @@ struct Args {
6566
/// failures that occurred outside this specified period will be disregarded.
6667
#[clap(short = 'p', long, default_value_t = 3600)]
6768
ranker_period: u64,
69+
70+
/// maximum number of blocks to catch up without flushing cache (on reconnect)
71+
#[clap(long, default_value_t = 600)]
72+
catchup_threshold: u64,
6873
}
6974

7075
fn set_limits() -> Result<()> {
@@ -92,7 +97,7 @@ fn set_limits() -> Result<()> {
9297
Ok(())
9398
}
9499

95-
async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> {
100+
async fn app(args: Args) -> Result<JoinHandle<()>> {
96101
if args.workers == 0 {
97102
anyhow::bail!("number of workers cannot be zero");
98103
}
@@ -122,12 +127,22 @@ async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> {
122127
// push to the queue that depends on how fast messages are sent but we can assume an extra 10%
123128
// of number of workers is needed
124129

125-
// a wiggle is 10% of number of workers with min of 1
126-
let wiggle = std::cmp::max((args.workers * 10) / 100, 1);
127-
let pool_size = args.workers + wiggle;
130+
// a wiggle is 10% of number of workers with min of 8
131+
let wiggle = std::cmp::max((args.workers * 10) / 100, 8);
132+
// federation workers scale with wiggle (existing behavior)
128133
let fed_size = wiggle * 2;
134+
// blocking connections are from switch workers and federation consumers
135+
let blocking = args.workers + fed_size;
136+
// explicit headroom for ops (XADD/XACK/etc.) to avoid starvation
137+
let ops_headroom = std::cmp::max(blocking / 4, 16);
138+
let pool_size = blocking + ops_headroom;
129139

130-
log::info!("redis pool size: {}", pool_size);
140+
log::info!(
141+
"redis pool size: {} (blocking: {}, ops_headroom: {})",
142+
pool_size,
143+
blocking,
144+
ops_headroom
145+
);
131146
log::info!("switch workers: {}", args.workers);
132147
log::info!("federation workers: {}", fed_size);
133148
log::info!(
@@ -166,6 +181,7 @@ async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> {
166181
)
167182
};
168183
let ranker = relay::ranker::RelayRanker::new(Duration::from_secs(args.ranker_period));
184+
rmb::cache::register_cache_metrics(default_registry());
169185
let r = relay::Relay::new(
170186
args.domains.iter().cloned().collect(),
171187
twins,
@@ -176,68 +192,55 @@ async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> {
176192
)
177193
.context("failed to initialize relay")?;
178194

179-
let mut l = events::Listener::new(args.substrate, redis_cache).await?;
180-
tokio::spawn(async move {
181-
let max_retries = 9; // max wait is 2^9 = 512 seconds ( 5 minutes )
182-
let mut attempt = 0;
183-
let mut backoff = Duration::from_secs(1);
184-
let mut got_hit = false;
185-
186-
loop {
187-
match l
188-
.listen(&mut got_hit)
189-
.await
190-
.context("failed to listen to chain events")
191-
{
192-
Ok(_) => break,
193-
Err(e) => {
194-
if got_hit {
195-
log::warn!("Listener got a hit, but failed to listen to chain events before no attempts will be reset");
196-
got_hit = false;
197-
attempt = 0;
198-
backoff = Duration::from_secs(1);
199-
}
200-
attempt += 1;
201-
if attempt > max_retries {
202-
log::error!("Listener failed after {} attempts: {:?}", attempt - 1, e);
203-
let _ = tx.send(());
204-
break;
205-
}
206-
log::warn!(
207-
"Listener failed on attempt {}: {:?}. Retrying in {:?}...",
208-
attempt,
209-
e,
210-
backoff
211-
);
212-
tokio::time::sleep(backoff).await;
213-
backoff *= 2;
195+
let mut l = events::EventListenerOptions::new()
196+
.with_catchup_threshold(args.catchup_threshold)
197+
.build(args.substrate, redis_cache)
198+
.await?;
199+
let listener_handler = tokio::spawn(async move {
200+
// The listener self-heals and retries internally; this task should run indefinitely.
201+
if let Err(e) = l.listen().await.context("failed to listen to chain events") {
202+
log::error!("Listener exited with error: {:?}", e);
203+
}
204+
});
205+
206+
let relay_handler = tokio::spawn(async move {
207+
r.start(&args.listen).await.unwrap();
208+
});
209+
210+
let main_handler = tokio::spawn(async move {
211+
tokio::select! {
212+
_ = relay_handler => {
213+
log::info!("Relay is closing successfully.");
214+
}
215+
result = listener_handler => {
216+
match result {
217+
Ok(_) => log::warn!("Listener task finished unexpectedly."),
218+
Err(e) => log::error!("Listener panicked: {:?}", e),
214219
}
215220
}
216221
}
217222
});
218223

219-
r.start(&args.listen).await.unwrap();
220-
Ok(())
224+
Ok(main_handler)
221225
}
222226

223227
#[tokio::main]
224228
async fn main() {
225229
let args = Args::parse();
226-
let (tx, rx) = oneshot::channel();
227-
let app_handle = tokio::spawn(async move {
228-
if let Err(e) = app(args, tx).await {
230+
let app_handle = match app(args).await {
231+
Ok(handles) => handles,
232+
Err(e) => {
229233
eprintln!("{:#}", e);
230234
std::process::exit(1);
231235
}
232-
});
236+
};
233237

234238
tokio::select! {
235239
_ = app_handle => {
236240
log::info!("Application is closing successfully.");
237241
}
238-
_ = rx => {
239-
log::error!("Listener shutdown signal received. Exiting application.");
240-
std::process::exit(1);
242+
_ = tokio::signal::ctrl_c() => {
243+
log::info!("Ctrl-C received. Shutting down...");
241244
}
242245
}
243246
}

src/cache/memory.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use std::sync::Arc;
33
use std::time::Duration;
44
use tokio::sync::RwLock;
55

6-
use super::Cache;
6+
#[cfg(feature = "tracker")]
7+
use super::RMB_CACHE_HITS_TOTAL;
8+
use super::{Cache, RMB_CACHE_ENTRIES, RMB_CACHE_FLUSHES_TOTAL, RMB_CACHE_MISSES_TOTAL};
79
use anyhow::Result;
810
use ttl_cache::TtlCache;
911

@@ -38,21 +40,35 @@ where
3840
{
3941
async fn set<K: ToString + Send + Sync>(&self, key: K, obj: T) -> Result<()> {
4042
let mut mem = self.mem.write().await;
41-
mem.insert(key.to_string(), obj, self.ttl);
43+
let prev = mem.insert(key.to_string(), obj, self.ttl);
44+
if prev.is_none() {
45+
RMB_CACHE_ENTRIES.inc();
46+
}
4247

4348
Ok(())
4449
}
4550

4651
async fn get<K: ToString + Send + Sync>(&self, key: K) -> Result<Option<T>> {
4752
let mem = self.mem.read().await;
4853
match mem.get(&key.to_string()) {
49-
None => Ok(None),
50-
Some(v) => Ok(Some(v.clone())),
54+
None => {
55+
RMB_CACHE_MISSES_TOTAL.inc();
56+
Ok(None)
57+
}
58+
Some(v) => {
59+
#[cfg(feature = "tracker")]
60+
{
61+
RMB_CACHE_HITS_TOTAL.inc();
62+
}
63+
Ok(Some(v.clone()))
64+
}
5165
}
5266
}
5367
async fn flush(&self) -> Result<()> {
5468
let mut mem = self.mem.write().await;
5569
mem.clear();
70+
RMB_CACHE_ENTRIES.set(0);
71+
RMB_CACHE_FLUSHES_TOTAL.inc();
5672

5773
Ok(())
5874
}

src/cache/mod.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,45 @@ use std::{
99
marker::{Send, Sync},
1010
};
1111

12+
use lazy_static::lazy_static;
13+
use prometheus::{IntCounter, IntGauge, Registry};
14+
15+
#[cfg(feature = "tracker")]
16+
lazy_static! {
17+
static ref RMB_CACHE_HITS_TOTAL: IntCounter = IntCounter::new(
18+
"rmb_cache_hits_total",
19+
"Number of cache hits when fetching twins",
20+
)
21+
.unwrap();
22+
}
23+
24+
lazy_static! {
25+
static ref RMB_CACHE_MISSES_TOTAL: IntCounter = IntCounter::new(
26+
"rmb_cache_misses_total",
27+
"Number of cache misses when fetching twins (RPC fallback likely)",
28+
)
29+
.unwrap();
30+
static ref RMB_CACHE_FLUSHES_TOTAL: IntCounter = IntCounter::new(
31+
"rmb_cache_flushes_total",
32+
"Number of times the twin cache has been flushed",
33+
)
34+
.unwrap();
35+
static ref RMB_CACHE_ENTRIES: IntGauge = IntGauge::new(
36+
"rmb_cache_entries",
37+
"Current number of entries in the twin cache",
38+
)
39+
.unwrap();
40+
}
41+
42+
pub fn register_cache_metrics(registry: &Registry) {
43+
// Best-effort register into provided registry; ignore duplicate registration errors.
44+
#[cfg(feature = "tracker")]
45+
let _ = registry.register(Box::new(RMB_CACHE_HITS_TOTAL.clone()));
46+
let _ = registry.register(Box::new(RMB_CACHE_MISSES_TOTAL.clone()));
47+
let _ = registry.register(Box::new(RMB_CACHE_FLUSHES_TOTAL.clone()));
48+
let _ = registry.register(Box::new(RMB_CACHE_ENTRIES.clone()));
49+
}
50+
1251
pub trait Cache<T>: Send + Sync + 'static {
1352
fn set<S: ToString + Send + Sync>(
1453
&self,

0 commit comments

Comments
 (0)