Skip to content

Commit bc0bf6c

Browse files
authored
Merge pull request #15 from JrTimha/development
Merge development branch to main
2 parents 4785efd + 7f41a03 commit bc0bf6c

12 files changed

Lines changed: 580 additions & 346 deletions

File tree

Cargo.lock

Lines changed: 389 additions & 309 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,40 @@
11
[package]
22
name = "ism"
3-
version = "0.7.2"
3+
version = "0.7.5"
44
edition = "2024"
55

66
[dependencies]
7-
log = "0.4.28"
8-
axum = { version = "0.8.6", features = ["multipart"] }
9-
tokio = {version = "1.48.0", features = ["full"]}
10-
tower = "0.5.2"
7+
log = "0.4.29"
8+
axum = { version = "0.8.8", features = ["multipart"] }
9+
tokio = {version = "1.49.0", features = ["full"]}
10+
tower = "0.5.3"
1111
config = "0.15.18"
1212
serde = "1.0.228"
13-
scylla = { version = "1.3.1", features = ["chrono-04"] }
13+
scylla = { version = "1.4.1", features = ["chrono-04"] }
1414
futures = "0.3.31"
15-
uuid = { version = "1.18.1", features = ["v4", "serde", "v7"] }
16-
chrono = { version = "0.4.42", features = ["serde"] }
17-
tower-http = { version = "0.6.6", features = ["cors", "trace"] }
18-
tracing = "0.1.41"
19-
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
15+
uuid = { version = "1.20.0", features = ["v4", "serde", "v7"] }
16+
chrono = { version = "0.4.43", features = ["serde"] }
17+
tower-http = { version = "0.6.8", features = ["cors", "trace"] }
18+
tracing = "0.1.44"
19+
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
2020
sqlx = {version = "0.8.6", features = ["runtime-tokio", "postgres", "chrono", "uuid", "macros"]}
2121
dotenv = "0.15.0"
22-
serde_json = "1.0.145"
23-
tokio-stream = { version = "0.1.17", features = ["sync"] }
24-
rdkafka = { version = "0.38.0", features = ["cmake-build", "tokio"] }
22+
serde_json = "1.0.149"
23+
tokio-stream = { version = "0.1.18", features = ["sync"] }
24+
rdkafka = { version = "0.39.0", features = ["cmake-build", "tokio"] }
2525
minio = { version = "0.3.0", features = ["default"] }
26-
image = { version = "0.25.8"}
26+
image = { version = "0.25.9"}
2727
bytes = "1.10.1"
2828
base64 = "0.22.1"
2929
validator = { version = "0.20.0", features = ["derive"] }
30-
redis = { version = "1.0.0-rc.3", features = ["tokio-comp", "connection-manager"] }
30+
redis = { version = "1.0.3", features = ["tokio-comp", "connection-manager"] }
3131

3232

3333
#keycloak:
3434
atomic-time = "0.1.5"
3535
educe = { version = "0.6.0", default-features = false, features = ["Debug"] }
36-
http = "1.3.1"
37-
jsonwebtoken = { version = "10.1.0", features = ["aws_lc_rs"] }
36+
http = "1.4.0"
37+
jsonwebtoken = { version = "10.3.0", features = ["rust_crypto"] }
3838
nonempty = { version = "0.12.0", features = ["std"] }
3939
reqwest = { version = "0.12.24", features = ["json"], default-features = false }
4040
serde-querystring = "0.3.0"
@@ -45,7 +45,7 @@ try-again = "0.2.2"
4545
typed-builder = "0.23.0"
4646
url = "2.5.7"
4747
async-trait = "0.1.89"
48-
thiserror = "2.0.9"
48+
thiserror = "2.0.18"
4949

5050
[features]
5151
default = ["default-tls", "reqwest/charset", "reqwest/http2", "reqwest/macos-system-configuration"]
@@ -55,7 +55,4 @@ rustls-tls = ["reqwest/rustls-tls"]
5555

5656
[dev-dependencies]
5757
assertr = "0.4.2"
58-
tower-http = { version = "0.6.6", features = ["trace"] }
59-
tracing-subscriber = "0.3.20"
60-
uuid = { version = "1.18.1", features = ["v7", "serde"] }
6158
sqlx-cli = { version = "0.8.6", features = ["postgres", "rustls"] }

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ ENV SQLX_OFFLINE=true
1212
RUN apt-get update && apt-get install -y --no-install-recommends \
1313
build-essential \
1414
libssl-dev \
15+
libcurl4-openssl-dev \
1516
pkg-config \
1617
cmake
1718

src/broadcast/event_broadcast.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub struct BroadcastChannel {
3535
push_notification_producer: PushNotificationProducer
3636
}
3737

38+
3839
type UserConnectionMap = RwLock<HashMap<Uuid, Sender<Notification>>>;
3940

4041

@@ -144,5 +145,48 @@ impl BroadcastChannel {
144145
}
145146
}
146147

147-
148148
}
149+
150+
151+
#[cfg(test)]
152+
mod tests {
153+
use super::*;
154+
use crate::cache::redis_cache::NoOpCache;
155+
use crate::kafka::PushNotificationProducer;
156+
use crate::core::KafkaConfig;
157+
use crate::broadcast::Notification;
158+
use crate::broadcast::NotificationEvent::UserReadChat;
159+
use serde_json;
160+
use std::sync::Arc;
161+
162+
#[tokio::test]
163+
async fn send_event_to_subscribed_user_delivers_notification() {
164+
// initialize broadcast channel singleton with NoOpCache and logger producer
165+
let cache: Arc<dyn crate::cache::redis_cache::Cache> = Arc::new(NoOpCache);
166+
let kafka_cfg = KafkaConfig { bootstrap_host: String::from(""), bootstrap_port: 0, topic: String::from(""), client_id: String::from(""), partition: vec![], consumer_group: String::from("") };
167+
BroadcastChannel::init(cache, PushNotificationProducer::new(false, kafka_cfg)).await;
168+
169+
let bc = BroadcastChannel::get();
170+
171+
let user_id = uuid::Uuid::new_v4();
172+
// subscribe
173+
let mut rx = bc.subscribe_to_user_events(user_id).await;
174+
175+
let notification = Notification {
176+
body: UserReadChat { user_id, room_id: uuid::Uuid::new_v4() },
177+
created_at: chrono::Utc::now()
178+
};
179+
180+
// send to all (only this user)
181+
bc.send_event_to_all(vec![user_id], notification.clone()).await;
182+
183+
// receive
184+
let received = rx.recv().await.expect("Should receive notification");
185+
186+
let sent_json = serde_json::to_string(&notification).expect("serialize sent");
187+
let recv_json = serde_json::to_string(&received).expect("serialize recv");
188+
println!("Sent: {}", sent_json);
189+
println!("Received: {}", recv_json);
190+
assert_eq!(sent_json, recv_json);
191+
}
192+
}

src/broadcast/notification.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,13 @@ pub enum NotificationEvent {
5050
* Sending this event to all users in a room where a member has left
5151
*/
5252
#[serde(rename_all = "camelCase")]
53-
RoomChangeEvent {message: MessageDTO, room_preview_text: LastMessagePreviewText}
53+
RoomChangeEvent {message: MessageDTO, room_preview_text: LastMessagePreviewText},
54+
55+
/**
56+
* Sending this event to all users in a room when a user has read the latest message
57+
*/
58+
#[serde(rename_all = "camelCase")]
59+
UserReadChat {user_id: Uuid, room_id: Uuid}
5460
}
5561

5662

src/database/message_database.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,22 @@ impl MessageDatabase {
3838
if config.with_db_init {
3939
repository.create_keyspace_with_tables().await;
4040
}
41-
42-
if let Err(err) = repository.change_keyspace(&config.db_keyspace).await {
43-
panic!("Failed to use keyspace {:?}", err);
44-
}
41+
4542
repository
4643
}
4744

4845
pub async fn fetch_data(&self, timestamp: DateTime<Utc>, room_id: Uuid) -> Result<Vec<Message>, Box<dyn Error + Send + Sync>> {
4946
let session = self.session.clone();
50-
let mut iter: TypedRowStream<Message> = session.query_iter("SELECT chat_room_id, message_id, sender_id, msg_body, created_at, msg_type FROM chat_messages WHERE chat_room_id = ? AND created_at < ? ORDER BY created_at DESC LIMIT 25", (room_id, timestamp))
47+
let mut iter: TypedRowStream<Message> = session.query_iter("SELECT chat_room_id, message_id, sender_id, msg_body, created_at, msg_type FROM messaging.chat_messages WHERE chat_room_id = ? AND created_at < ? ORDER BY created_at DESC LIMIT 25", (room_id, timestamp))
5148
.await?.rows_stream::<Message>()?;
5249
let mut messages: Vec<Message> = Vec::new();
5350
while let Some(next) = iter.try_next().await? { messages.push(next) }
5451
Ok(messages)
5552
}
5653

57-
pub async fn fetch_specific_message(&self, message_id: &Uuid, room_id: &Uuid, created: &DateTime<Utc>) -> Result<Message, Box<dyn std::error::Error>> {
54+
pub async fn fetch_specific_message(&self, message_id: &Uuid, room_id: &Uuid, created: &DateTime<Utc>) -> Result<Message, Box<dyn Error>> {
5855
let session = self.session.clone();
59-
let mut iter = session.query_iter("SELECT chat_room_id, message_id, sender_id, msg_body, created_at, msg_type FROM chat_messages WHERE chat_room_id = ? AND created_at = ? AND message_id = ?", (room_id, created, message_id))
56+
let mut iter = session.query_iter("SELECT chat_room_id, message_id, sender_id, msg_body, created_at, msg_type FROM messaging.chat_messages WHERE chat_room_id = ? AND created_at = ? AND message_id = ?", (room_id, created, message_id))
6057
.await?.rows_stream::<Message>()?;
6158
match iter.try_next().await? {
6259
Some(message) => Ok(message),
@@ -67,7 +64,7 @@ impl MessageDatabase {
6764
pub async fn insert_data(&self, message: Message) -> Result<QueryResult, ExecutionError> {
6865
let session = self.session.clone();
6966
session.query_unpaged(
70-
"INSERT INTO chat_messages (chat_room_id, message_id, sender_id, msg_body, msg_type, created_at) VALUES (?, ?, ?, ?, ?, ?)",
67+
"INSERT INTO messaging.chat_messages (chat_room_id, message_id, sender_id, msg_body, msg_type, created_at) VALUES (?, ?, ?, ?, ?, ?)",
7168
(message.chat_room_id, message.message_id, message.sender_id, message.msg_body, message.msg_type, message.created_at)
7269
).await
7370
}
@@ -96,7 +93,7 @@ impl MessageDatabase {
9693

9794
pub async fn clear_chat_room_messages(&self, room_id: &Uuid) -> Result<(), ExecutionError> {
9895
let session = self.session.clone();
99-
session.query_unpaged("DELETE FROM chat_messages WHERE chat_room_id = ?", (room_id,)).await?;
96+
session.query_unpaged("DELETE FROM messaging.chat_messages WHERE chat_room_id = ?", (room_id,)).await?;
10097
Ok(())
10198
}
10299

src/keycloak/action.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl<I: ActionInput, O: ActionOutput> Action<I, O> {
144144
}
145145

146146
#[cfg(test)]
147+
#[allow(unused)]
147148
mod test {
148149
use assertr::prelude::*;
149150

src/keycloak/layer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ extern crate alloc;
2121
/// Authentication happens by looking for the `Authorization` header on requests and parsing the contained JWT bearer token.
2222
/// See the crate level documentation for how this layer can be created and used.
2323
#[derive(Clone, TypedBuilder)]
24+
#[allow(unused_variables)]
2425
pub struct KeycloakAuthLayer<R, Extra = ProfileAndEmail>
2526
where
2627
R: Role,

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use ism::welcome::welcome;
1212
//learn to code rust axum here:
1313
//https://gitlab.com/famedly/conduit/-/tree/next?ref_type=heads
1414
//https://github.com/AarambhDevHub/rust-backend-axum
15-
//https://github.com/rust-lang/crates.io/
15+
//https://github.com/rust-lang/crates.io/ <---- THE BEST!
1616
#[tokio::main(flavor = "multi_thread")]
1717
async fn main() {
1818

src/rooms/handler.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,4 +200,14 @@ pub async fn handle_save_room_image(
200200
} else {
201201
Err(AppError::ValidationError("Required field 'image' not found in the upload.".to_string()))
202202
}
203+
}
204+
205+
pub async fn handle_get_read_states(
206+
Extension(token): Extension<KeycloakToken<String>>,
207+
State(state): State<Arc<AppState>>,
208+
Path(room_id): Path<Uuid>
209+
) -> Result<Json<Vec<RoomMember>>, AppError> {
210+
check_user_in_room(&state, &token.subject, &room_id).await?;
211+
let read_states = RoomService::get_read_states(state, room_id).await?;
212+
Ok(Json(read_states))
203213
}

0 commit comments

Comments
 (0)