Skip to content

Commit 811b516

Browse files
authored
Support multiple relay domains (#217)
* fix dependency_on_unit_never_type_fallback compiler warnings * feat: support_multiple_relay_domains * Update readme * Update readme
1 parent 09896bf commit 811b516

8 files changed

Lines changed: 58 additions & 29 deletions

File tree

docs/readme.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,3 +272,19 @@ Example:
272272
```bash
273273
rmb-peer -m "{MNEMONIC}" --substrate wss://tfchain.dev.grid.tf:443 --relay wss://r1.dev.grid.tf --relay wss://r2.dev.grid.tf
274274
```
275+
276+
## Relay Domains
277+
278+
Starting from v1.3.1, the relay supports multiple domains (when needed), which are used to determine whether a destination twin is directly connected to this relay. these domains should be points to the public IP of this relay.
279+
280+
```bash
281+
rmb-relay --domain relay.example.com --domain relay1.example.com
282+
```
283+
284+
or short name
285+
286+
```bash
287+
rmb-relay -m relay.example.com -m relay1.example.com
288+
```
289+
290+
The relay will internally route messages over existing WebSocket connections to twins that use one of the provided domains as their relay domain. If the destination twin isn't associated with any of these domains, the message will be forwarded to one of the twin's relays via federation (https protocol).

src/bins/rmb-relay.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use tokio::sync::oneshot;
2020
#[derive(Parser, Debug)]
2121
#[clap(name ="rmb-rely", author, version = env!("GIT_VERSION"), about, long_about = None)]
2222
struct Args {
23-
/// domain of this relay or it's public IP. used to identify
23+
/// domains of this relay or it's public IPs. used to identify
2424
/// if a twin is on this relay or not.
25-
#[clap(short = 'm', long)]
26-
domain: String,
25+
#[clap(short = 'm', long = "domain", num_args = 1..)]
26+
domains: Vec<String>,
2727

2828
/// redis address
2929
#[clap(short, long, default_value_t = String::from("redis://localhost:6379"))]
@@ -167,9 +167,16 @@ async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> {
167167
)
168168
};
169169
let ranker = relay::ranker::RelayRanker::new(Duration::from_secs(args.ranker_period));
170-
let r = relay::Relay::new(&args.domain, twins, opt, federation, limiter, ranker)
171-
.await
172-
.unwrap();
170+
let r = relay::Relay::new(
171+
args.domains.iter().cloned().collect(),
172+
twins,
173+
opt,
174+
federation,
175+
limiter,
176+
ranker,
177+
)
178+
.await
179+
.unwrap();
173180

174181
let mut l = events::Listener::new(args.substrate, redis_cache).await?;
175182
tokio::spawn(async move {

src/cache/redis.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ where
4949
async fn set<S: ToString + Send + Sync>(&self, key: S, obj: T) -> Result<()> {
5050
let mut conn = self.get_connection().await?;
5151
let obj = serde_json::to_vec(&obj).context("unable to serialize twin object for redis")?;
52-
cmd("HSET")
52+
let _: () = cmd("HSET")
5353
.arg(&self.prefix)
5454
.arg(key.to_string())
5555
.arg(obj)
@@ -79,7 +79,7 @@ where
7979
}
8080
async fn flush(&self) -> Result<()> {
8181
let mut conn = self.get_connection().await?;
82-
cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?;
82+
let _: () = cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?;
8383

8484
Ok(())
8585
}

src/peer/storage/redis_storage.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ impl Storage for RedisStorage {
152152
}
153153
let mut conn = self.get_connection().await?;
154154
let key = BacklogKey(&backlog.uid);
155-
conn.set_ex(&key, backlog, backlog.ttl as usize).await?;
155+
let _: () = conn.set_ex(&key, backlog, backlog.ttl as usize).await?;
156156

157157
Ok(())
158158
}
@@ -167,8 +167,8 @@ impl Storage for RedisStorage {
167167
request.reply_to = Queue::Response.to_string();
168168

169169
let key = RunKey(&request.command);
170-
conn.lpush(&key, &request).await?;
171-
conn.ltrim(&key, 0, self.max_commands - 1).await?;
170+
let _: () = conn.lpush(&key, &request).await?;
171+
let _: () = conn.ltrim(&key, 0, self.max_commands - 1).await?;
172172

173173
Ok(())
174174
}
@@ -177,8 +177,8 @@ impl Storage for RedisStorage {
177177
let mut conn = self.get_connection().await?;
178178
// set reply queue
179179

180-
conn.lpush(queue, &response).await?;
181-
conn.ltrim(queue, 0, self.max_commands - 1).await?;
180+
let _: () = conn.lpush(queue, &response).await?;
181+
let _: () = conn.ltrim(queue, 0, self.max_commands - 1).await?;
182182

183183
Ok(())
184184
}

src/relay/api.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use hyper_tungstenite::{HyperWebsocket, WebSocketStream};
1515
use prometheus::Encoder;
1616
use prometheus::TextEncoder;
1717
use protobuf::Message as ProtoMessage;
18+
use std::collections::HashSet;
1819
use std::fmt::Display;
1920
use std::pin::Pin;
2021
use std::sync::Arc;
@@ -28,7 +29,7 @@ use super::HttpError;
2829
pub(crate) struct AppData<D: TwinDB, R: RateLimiter> {
2930
switch: Arc<Switch<RelayHook>>,
3031
twins: D,
31-
domain: String,
32+
domains: HashSet<String>,
3233
federator: Arc<Federator>,
3334
limiter: R,
3435
}
@@ -38,15 +39,15 @@ where
3839
D: TwinDB,
3940
R: RateLimiter,
4041
{
41-
pub(crate) fn new<S: Into<String>>(
42-
domain: S,
42+
pub(crate) fn new(
43+
domains: HashSet<String>,
4344
switch: Arc<Switch<RelayHook>>,
4445
twins: D,
4546
federator: Federator,
4647
limiter: R,
4748
) -> Self {
4849
Self {
49-
domain: domain.into(),
50+
domains: domains,
5051
switch,
5152
twins,
5253
federator: Arc::new(federator),
@@ -134,7 +135,7 @@ async fn entry<D: TwinDB, R: RateLimiter>(
134135
let stream = Stream::new(
135136
// todo: improve the domain clone
136137
claims,
137-
data.domain.clone(),
138+
data.domains.clone(),
138139
Arc::clone(&data.switch),
139140
Arc::clone(&data.federator),
140141
metrics,
@@ -275,7 +276,7 @@ impl Hook for RelayHook {
275276

276277
struct Stream<M: Metrics, D: TwinDB> {
277278
id: StreamID,
278-
domain: String,
279+
domains: HashSet<String>,
279280
switch: Arc<Switch<RelayHook>>,
280281
federator: Arc<Federator>,
281282
metrics: M,
@@ -284,7 +285,7 @@ struct Stream<M: Metrics, D: TwinDB> {
284285
impl<M: Metrics, D: TwinDB> Stream<M, D> {
285286
fn new(
286287
claims: Claims,
287-
domain: String,
288+
domains: HashSet<String>,
288289
switch: Arc<Switch<RelayHook>>,
289290
federator: Arc<Federator>,
290291
metrics: M,
@@ -293,7 +294,7 @@ impl<M: Metrics, D: TwinDB> Stream<M, D> {
293294
let id: StreamID = (claims.id, claims.sid).into();
294295
Self {
295296
id,
296-
domain,
297+
domains,
297298
switch,
298299
federator,
299300
metrics,
@@ -338,7 +339,7 @@ impl<M: Metrics, D: TwinDB> Stream<M, D> {
338339
if !twin
339340
.relay
340341
.ok_or_else(|| anyhow::Error::msg("relay info is not set for this twin"))?
341-
.contains(&self.domain)
342+
.has_common(&self.domains)
342343
{
343344
log::debug!("got an foreign message");
344345
// push message to the (relay.federation) queue

src/relay/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use self::ranker::RelayRanker;
1515
use api::RelayHook;
1616
use federation::Federation;
1717
pub use federation::FederationOptions;
18+
use std::collections::HashSet;
1819
use std::sync::Arc;
1920
use switch::Switch;
2021
pub use switch::SwitchOptions;
@@ -23,7 +24,7 @@ pub mod ranker;
2324
pub struct Relay<D: TwinDB, R: RateLimiter> {
2425
switch: Arc<Switch<RelayHook>>,
2526
twins: D,
26-
domain: String,
27+
domains: HashSet<String>,
2728
federation: Federation<D>,
2829
limiter: R,
2930
}
@@ -33,8 +34,8 @@ where
3334
D: TwinDB + Clone,
3435
R: RateLimiter,
3536
{
36-
pub async fn new<S: Into<String>>(
37-
domain: S,
37+
pub async fn new(
38+
domains: HashSet<String>,
3839
twins: D,
3940
opt: SwitchOptions,
4041
federation: FederationOptions<D>,
@@ -46,7 +47,7 @@ where
4647
Ok(Self {
4748
switch: Arc::new(switch),
4849
twins,
49-
domain: domain.into(),
50+
domains: domains,
5051
federation,
5152
limiter,
5253
})
@@ -56,7 +57,7 @@ where
5657
let tcp_listener = TcpListener::bind(address).await?;
5758
let federator = self.federation.start();
5859
let http = api::HttpService::new(api::AppData::new(
59-
self.domain,
60+
self.domains,
6061
self.switch,
6162
self.twins,
6263
federator,

src/relay/switch/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ where
436436
c = c.arg(id);
437437
}
438438

439-
c.query_async(&mut *con).await?;
439+
let _: () = c.query_async(&mut *con).await?;
440440

441441
Ok(())
442442
}
@@ -493,7 +493,7 @@ async fn send<'a>(
493493
.query_async(&mut *con)
494494
.await?;
495495

496-
cmd("EXPIRE")
496+
let _: () = cmd("EXPIRE")
497497
.arg(stream_id)
498498
.arg(QUEUE_EXPIRE)
499499
.query_async(&mut *con)

src/twin/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,8 @@ impl RelayDomains {
9797
pub fn iter(&self) -> std::collections::hash_set::Iter<String> {
9898
self.0.iter()
9999
}
100+
101+
pub fn has_common(&self, other: &HashSet<String>) -> bool {
102+
!self.0.is_disjoint(other)
103+
}
100104
}

0 commit comments

Comments
 (0)