Skip to content

Commit 020c891

Browse files
authored
Better internal message switching (#213)
* Run cargo update + fix compiler warnings * Minor improvments: - Add todos that need to be processed - Rename some structs - Handle callback errors (drop the connection from worker connections subset) * Add TwinID type * Remove some un-necessary async_trait usage * Worker rework - Separate worker to its own strcture - Use cancellation token to track if either of reader or writer routines exited and make sure connection is removed from global sessions set * Increase connection channel size * Use semaphore to track capacity Also run cargo clippy to clean up all warnings * WIP: Better worker loop * Implement back pressure for slow clients * Add some tests * Fix formatting
1 parent 4fc8b9d commit 020c891

22 files changed

Lines changed: 1647 additions & 1003 deletions

File tree

Cargo.lock

Lines changed: 818 additions & 377 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ codec = { package = "parity-scale-codec", version = "3.0.0", default-features =
4646
] }
4747
itertools = "0.11"
4848
ttl_cache = "0.5"
49+
tokio-util = "0.7"
4950

5051
# for static build
5152
openssl = { version = "0.10", features = ["vendored"] }
@@ -73,6 +74,7 @@ lru = "0.9.0"
7374
rand = "0.8.5"
7475
tokio-stream = "0.1.14"
7576
async-stream = "0.3.5"
77+
derive_more = { version = "2.0.1", features = ["full"] }
7678

7779
[build-dependencies]
7880
git-version = "0.3.5"

src/bins/rmb-peer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ async fn app(args: Params) -> Result<()> {
172172

173173
// we need to make sure our twin is up to date
174174
let twin = db
175-
.get_twin(id)
175+
.get_twin(id.into())
176176
.await
177177
.context("failed to get twin details")?
178178
.ok_or_else(|| anyhow::anyhow!("self twin not found!"))?;

src/bins/rmb-relay.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use tokio::sync::oneshot;
1515

1616
/// A peer requires only which rely to connect to, and
1717
/// which identity (mnemonics)
18-
1918
/// the reliable message bus
2019
#[derive(Parser, Debug)]
2120
#[clap(name ="rmb-rely", author, version = env!("GIT_VERSION"), about, long_about = None)]
@@ -175,8 +174,7 @@ async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> {
175174
limiter,
176175
ranker,
177176
)
178-
.await
179-
.unwrap();
177+
.context("failed to initialize relay")?;
180178

181179
let mut l = events::Listener::new(args.substrate, redis_cache).await?;
182180
tokio::spawn(async move {

src/cache/memory.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use tokio::sync::RwLock;
55

66
use super::Cache;
77
use anyhow::Result;
8-
use async_trait::async_trait;
98
use ttl_cache::TtlCache;
109

1110
static IN_MEMORY_CAP: usize = 500;
@@ -33,7 +32,6 @@ impl<V> MemCache<V> {
3332
}
3433
}
3534

36-
#[async_trait]
3735
impl<T> Cache<T> for MemCache<T>
3836
where
3937
T: Clone + Send + Sync + 'static,

src/cache/mod.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,24 @@ pub use memory::MemCache;
44
pub use redis::RedisCache;
55

66
use anyhow::Result;
7-
use async_trait::async_trait;
8-
use std::marker::{Send, Sync};
7+
use std::{
8+
future::Future,
9+
marker::{Send, Sync},
10+
};
911

10-
#[async_trait]
1112
pub trait Cache<T>: Send + Sync + 'static {
12-
async fn set<S: ToString + Send + Sync>(&self, id: S, obj: T) -> Result<()>;
13-
async fn get<S: ToString + Send + Sync>(&self, id: S) -> Result<Option<T>>;
14-
async fn flush(&self) -> Result<()>;
13+
fn set<S: ToString + Send + Sync>(
14+
&self,
15+
id: S,
16+
obj: T,
17+
) -> impl Future<Output = Result<()>> + Send;
18+
fn get<S: ToString + Send + Sync>(
19+
&self,
20+
id: S,
21+
) -> impl Future<Output = Result<Option<T>>> + Send;
22+
fn flush(&self) -> impl Future<Output = Result<()>> + Send;
1523
}
1624

17-
#[async_trait]
1825
impl<T, C> Cache<T> for Option<C>
1926
where
2027
C: Cache<T>,
@@ -43,7 +50,6 @@ where
4350
#[derive(Clone, Copy)]
4451
pub struct NoCache;
4552

46-
#[async_trait]
4753
impl<T> Cache<T> for NoCache
4854
where
4955
T: Send + Sync + 'static,

src/cache/redis.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use super::Cache;
22

33
use anyhow::{Context, Result};
4-
use async_trait::async_trait;
54
use bb8_redis::{
65
bb8::{Pool, PooledConnection},
76
redis::cmd,
@@ -41,7 +40,6 @@ impl RedisCache {
4140
}
4241
}
4342

44-
#[async_trait]
4543
impl<T> Cache<T> for RedisCache
4644
where
4745
T: Serialize + DeserializeOwned + Send + Sync + 'static,
@@ -53,7 +51,7 @@ where
5351
.arg(&self.prefix)
5452
.arg(key.to_string())
5553
.arg(obj)
56-
.query_async(&mut *conn)
54+
.query_async::<_, ()>(&mut *conn)
5755
.await?;
5856

5957
Ok(())

src/identity/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ where
7979
}
8080
}
8181

82-
impl<'a, S> SigningAlgorithm for JwtSigner<'a, S>
82+
impl<S> SigningAlgorithm for JwtSigner<'_, S>
8383
where
8484
S: Signer,
8585
{
@@ -108,7 +108,7 @@ where
108108
}
109109
}
110110

111-
impl<'a, I> VerifyingAlgorithm for JwtVerifier<'a, I>
111+
impl<I> VerifyingAlgorithm for JwtVerifier<'_, I>
112112
where
113113
I: Identity,
114114
{

src/peer/protocol.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ where
164164
}
165165
let twin = self
166166
.twins
167-
.get_twin(envelope.source.twin)
167+
.get_twin(envelope.source.twin.into())
168168
.await
169169
.context("failed to get twin information")?
170170
.ok_or(ProtocolError::UnknownTwin(envelope.source.twin))?;
@@ -253,7 +253,7 @@ where
253253
pub async fn write(&mut self, mut envelope: Envelope) -> Result<(), ProtocolError> {
254254
let twin = self
255255
.twins
256-
.get_twin(envelope.destination.twin)
256+
.get_twin(envelope.destination.twin.into())
257257
.await?
258258
.ok_or_else(|| ProtocolError::UnknownTwin(envelope.destination.twin))?;
259259

src/peer/storage/redis_storage.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ struct BacklogKey<'a>(&'a str);
1818

1919
const MAX_COMMANDS: isize = 10000;
2020

21-
impl<'a> ToRedisArgs for BacklogKey<'a> {
21+
impl ToRedisArgs for BacklogKey<'_> {
2222
fn write_redis_args<W>(&self, out: &mut W)
2323
where
2424
W: ?Sized + bb8_redis::redis::RedisWrite,
@@ -27,15 +27,15 @@ impl<'a> ToRedisArgs for BacklogKey<'a> {
2727
}
2828
}
2929

30-
impl<'a> Display for BacklogKey<'a> {
30+
impl Display for BacklogKey<'_> {
3131
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3232
write!(f, "msgbus.backlog.{}", self.0)
3333
}
3434
}
3535

3636
struct RunKey<'a>(&'a str);
3737

38-
impl<'a> ToRedisArgs for RunKey<'a> {
38+
impl ToRedisArgs for RunKey<'_> {
3939
fn write_redis_args<W>(&self, out: &mut W)
4040
where
4141
W: ?Sized + bb8_redis::redis::RedisWrite,
@@ -44,7 +44,7 @@ impl<'a> ToRedisArgs for RunKey<'a> {
4444
}
4545
}
4646

47-
impl<'a> Display for RunKey<'a> {
47+
impl Display for RunKey<'_> {
4848
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4949
write!(f, "msgbus.{}", self.0)
5050
}

0 commit comments

Comments
 (0)