Skip to content

Commit 2abd0c6

Browse files
authored
Merge pull request #18 from JrTimha/development
Refactor WebSocket connection handling and improve Delta-Syncs
2 parents d282eb8 + 709ee20 commit 2abd0c6

2 files changed

Lines changed: 30 additions & 14 deletions

File tree

src/broadcast/event_broadcast.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl BroadcastChannel {
7777
pub async fn send_event(&self, notification: Notification, to_user: &Uuid) {
7878
let lock = self.channel.read().await;
7979
if let Some(sender) = lock.get(to_user) {
80-
match sender.send(notification) {
80+
match sender.send(notification.clone()) {
8181
Ok(sc) => {
8282
info!("Successfully sent {:?} broadcast event.", sc);
8383
}
@@ -86,11 +86,11 @@ impl BroadcastChannel {
8686
}
8787
}
8888
} else {
89-
if let Err(error) = self.cache.add_notification_for_user(to_user, &notification).await {
90-
error!("Failed to cache notification: {}", error);
91-
};
92-
self.send_undeliverable_notifications(notification, vec![to_user.clone()]).await;
89+
self.send_undeliverable_notifications(notification.clone(), vec![to_user.clone()]).await;
9390
}
91+
if let Err(error) = self.cache.add_notification_for_user(to_user, &notification).await {
92+
error!("Failed to cache notification: {}", error);
93+
};
9494
}
9595

9696
pub async fn send_event_to_all(&self, user_ids: Vec<Uuid>, notification: Notification) {
@@ -107,11 +107,11 @@ impl BroadcastChannel {
107107
}
108108
}
109109
} else {
110-
if let Err(error) = self.cache.add_notification_for_user(&user_id, &notification).await {
111-
error!("Failed to cache notification: {}", error);
112-
};
113110
not_deliverable.push(user_id);
114111
}
112+
if let Err(error) = self.cache.add_notification_for_user(&user_id, &notification).await {
113+
error!("Failed to cache notification: {}", error);
114+
};
115115
}
116116
if not_deliverable.len() > 0 {
117117
self.send_undeliverable_notifications(notification, not_deliverable).await;
@@ -134,6 +134,7 @@ impl BroadcastChannel {
134134
}
135135

136136
pub async fn unsubscribe(&self, user_id: Uuid) {
137+
debug!("Unsubscribing user {:?} from broadcasting events.", user_id);
137138
let mut lock = self.channel.write().await;
138139
if let Some(sender) = lock.get(&user_id) {
139140
if sender.receiver_count() > 0 {

src/messaging/notifications.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use crate::broadcast::{BroadcastChannel, Notification};
2020
use crate::core::AppState;
2121
use crate::errors::{AppError, AppResponse};
2222
use crate::keycloak::decode::KeycloakToken;
23-
use crate::keycloak::layer::KeycloakAuthLayer;
2423

2524
struct ConnectionGuard {
2625
user_id: Uuid,
@@ -85,7 +84,8 @@ async fn handle_socket(mut socket: WebSocket, user_id: Uuid) {
8584

8685
let mut broadcast_events = BroadcastChannel::get().subscribe_to_user_events(user_id.clone()).await;
8786
let _guard = ConnectionGuard { user_id };
88-
let mut ping_interval = time::interval(Duration::from_secs(30));
87+
let mut ping_interval = time::interval(Duration::from_secs(15));
88+
let mut last_pong_received = time::Instant::now();
8989

9090
loop {
9191
tokio::select! {
@@ -112,6 +112,12 @@ async fn handle_socket(mut socket: WebSocket, user_id: Uuid) {
112112

113113
// 2. Regular ping from ism:
114114
_ = ping_interval.tick() => {
115+
116+
if last_pong_received.elapsed() > Duration::from_secs(30) {
117+
debug!("Client did not respond to ping in time, closing websocket connection");
118+
break;
119+
}
120+
115121
if socket.send(Message::Ping(Bytes::new())).await.is_err() { // connection is dead when we can't send ping
116122
break;
117123
}
@@ -120,12 +126,21 @@ async fn handle_socket(mut socket: WebSocket, user_id: Uuid) {
120126
// 3. Receive messages from the client:
121127
client_msg = socket.recv() => {
122128
match client_msg {
123-
Some(Ok(Message::Close(_))) | None => break, //client is closing connection
124-
Some(Err(_)) => break, //client error
129+
Some(Ok(Message::Close(_))) | None => {
130+
debug!("Client has closed the websocket connection, closing.");
131+
break;
132+
}, //client is closing connection
133+
Some(Err(_)) => {
134+
debug!("Client has an error with the websocket connection, closing.");
135+
break;
136+
}, //client error
125137
Some(Ok(Message::Pong(_))) => {
126-
debug!("Client has sent Pong");
138+
debug!("Client has sent Websocket-Pong");
139+
last_pong_received = time::Instant::now();
140+
}
141+
Some(Ok(_)) => {
142+
last_pong_received = time::Instant::now();
127143
}
128-
_ => {} //for the future
129144
}
130145
}
131146
}

0 commit comments

Comments
 (0)