Skip to content

Commit 59771b6

Browse files
committed
feat: refactor federation to use Redis Streams for reliable message delivery
1 parent 4112843 commit 59771b6

10 files changed

Lines changed: 403 additions & 89 deletions

File tree

Cargo.lock

Lines changed: 23 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ path = "src/bins/rmb-relay.rs"
1616
anyhow = "1.0.56"
1717
async-trait = "0.1"
1818
base64 = "0.13.0"
19-
bb8-redis = "0.13"
19+
bb8-redis = "0.16"
20+
redis = { version = "0.26", features = ["streams", "aio", "tokio-comp"] }
2021
clap = { version = "4", features = ["derive"] }
2122
futures = "0.3"
2223
hex = { version = "0.4", features = ["alloc"] }

src/cache/redis.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ where
5353
.arg(&self.prefix)
5454
.arg(key.to_string())
5555
.arg(obj)
56-
.query_async::<_, i64>(&mut *conn)
56+
.query_async::<i64>(&mut *conn)
5757
.await?;
5858
if added == 1 {
5959
RMB_CACHE_ENTRIES.inc();

src/peer/storage/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use crate::types::{self, Address, AddressExt, EnvelopeExt};
55
use crate::types::{Backlog, Envelope};
66
use anyhow::{Context, Result};
77
use async_trait::async_trait;
8-
use bb8_redis::redis;
98
pub use redis::*;
109
pub use redis_storage::RedisStorage;
1110
use serde::{Deserialize, Serialize};
@@ -170,7 +169,7 @@ impl redis::ToRedisArgs for JsonOutgoingRequest {
170169

171170
impl redis::FromRedisValue for JsonOutgoingRequest {
172171
fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
173-
if let redis::Value::Data(data) = v {
172+
if let redis::Value::BulkString(data) = v {
174173
serde_json::from_slice(data).map_err(|e| {
175174
redis::RedisError::from((
176175
redis::ErrorKind::TypeError,
@@ -247,7 +246,7 @@ impl redis::ToRedisArgs for JsonIncomingRequest {
247246

248247
impl redis::FromRedisValue for JsonIncomingRequest {
249248
fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
250-
if let redis::Value::Data(data) = v {
249+
if let redis::Value::BulkString(data) = v {
251250
serde_json::from_slice(data).map_err(|e| {
252251
redis::RedisError::from((
253252
redis::ErrorKind::TypeError,
@@ -318,7 +317,7 @@ impl redis::ToRedisArgs for JsonOutgoingResponse {
318317

319318
impl redis::FromRedisValue for JsonOutgoingResponse {
320319
fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
321-
if let redis::Value::Data(data) = v {
320+
if let redis::Value::BulkString(data) = v {
322321
serde_json::from_slice(data).map_err(|e| {
323322
redis::RedisError::from((
324323
redis::ErrorKind::TypeError,
@@ -433,7 +432,7 @@ impl redis::ToRedisArgs for JsonIncomingResponse {
433432

434433
impl redis::FromRedisValue for JsonIncomingResponse {
435434
fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
436-
if let redis::Value::Data(data) = v {
435+
if let redis::Value::BulkString(data) = v {
437436
serde_json::from_slice(data).map_err(|e| {
438437
redis::RedisError::from((
439438
redis::ErrorKind::TypeError,

src/peer/storage/redis_storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ impl Storage for RedisStorage {
152152
}
153153
let mut conn = self.get_connection().await?;
154154
let key = BacklogKey(&backlog.uid);
155-
let _: () = conn.set_ex(&key, backlog, backlog.ttl as usize).await?;
155+
let _: () = conn.set_ex(&key, backlog, backlog.ttl).await?;
156156

157157
Ok(())
158158
}
@@ -189,7 +189,7 @@ impl Storage for RedisStorage {
189189
let resp_queue = Queue::Response.as_ref();
190190
let queues = (req_queue, resp_queue);
191191

192-
let (queue, value): (String, Value) = conn.brpop(queues, 0).await?;
192+
let (queue, value): (String, Value) = conn.brpop(queues, 0.0).await?;
193193

194194
let msg: JsonMessage = if queue == req_queue {
195195
JsonOutgoingRequest::from_redis_value(&value)

0 commit comments

Comments
 (0)