|
| 1 | +//! Matrix channel adapter for WhatsApp bridge integration. |
| 2 | +//! |
| 3 | +//! This channel connects to a Matrix homeserver and listens for messages |
| 4 | +//! from the mautrix-whatsapp bridge, enabling WhatsApp integration. |
| 5 | +
|
| 6 | +use crate::bus::{InboundMessage, MessageBus, OutboundMessage}; |
| 7 | +use crate::channel::Channel; |
| 8 | +use crate::config::MatrixConfig; |
| 9 | +use async_trait::async_trait; |
| 10 | +use std::sync::atomic::{AtomicBool, Ordering}; |
| 11 | +use std::sync::Arc; |
| 12 | + |
| 13 | +/// Matrix channel adapter for WhatsApp bridge. |
| 14 | +pub struct MatrixChannel { |
| 15 | + config: MatrixConfig, |
| 16 | + running: Arc<AtomicBool>, |
| 17 | +} |
| 18 | + |
| 19 | +impl MatrixChannel { |
| 20 | + /// Create a new Matrix channel. |
| 21 | + pub fn new(config: MatrixConfig) -> Self { |
| 22 | + Self { |
| 23 | + config, |
| 24 | + running: Arc::new(AtomicBool::new(false)), |
| 25 | + } |
| 26 | + } |
| 27 | +} |
| 28 | + |
| 29 | +#[async_trait] |
| 30 | +impl Channel for MatrixChannel { |
| 31 | + fn name(&self) -> &str { |
| 32 | + "matrix" |
| 33 | + } |
| 34 | + |
| 35 | + async fn start(&self, _bus: Arc<MessageBus>) -> anyhow::Result<()> { |
| 36 | + log::info!("Matrix channel starting"); |
| 37 | + self.running.store(true, Ordering::SeqCst); |
| 38 | + |
| 39 | + #[cfg(feature = "matrix")] |
| 40 | + { |
| 41 | + use matrix_sdk::{ |
| 42 | + config::SyncSettings, |
| 43 | + room::Room, |
| 44 | + ruma::events::room::message::{MessageType, OriginalSyncRoomMessageEvent}, |
| 45 | + Client, |
| 46 | + }; |
| 47 | + |
| 48 | + // Build client |
| 49 | + let client = Client::builder() |
| 50 | + .homeserver_url(&self.config.homeserver_url) |
| 51 | + .build() |
| 52 | + .await?; |
| 53 | + |
| 54 | + // Login |
| 55 | + client |
| 56 | + .matrix_auth() |
| 57 | + .login_username(&self.config.username, &self.config.password) |
| 58 | + .await?; |
| 59 | + |
| 60 | + log::info!("Matrix client logged in as {}", self.config.username); |
| 61 | + |
| 62 | + let bus = _bus.clone(); |
| 63 | + let allow_from = self.config.allow_from.clone(); |
| 64 | + let running = self.running.clone(); |
| 65 | + |
| 66 | + // Register event handler for messages |
| 67 | + client.add_event_handler( |
| 68 | + move |ev: OriginalSyncRoomMessageEvent, room: Room| { |
| 69 | + let bus = bus.clone(); |
| 70 | + let allow_from = allow_from.clone(); |
| 71 | + async move { |
| 72 | + if !running.load(Ordering::SeqCst) { |
| 73 | + return; |
| 74 | + } |
| 75 | + |
| 76 | + // Get sender |
| 77 | + let sender = ev.sender.to_string(); |
| 78 | + |
| 79 | + // Check whitelist |
| 80 | + if !allow_from.contains(&sender) { |
| 81 | + log::warn!("Unauthorized Matrix message from: {}", sender); |
| 82 | + return; |
| 83 | + } |
| 84 | + |
| 85 | + // Extract content based on message type |
| 86 | + let content = match &ev.content.msgtype { |
| 87 | + MessageType::Text(text_content) => text_content.body.clone(), |
| 88 | + MessageType::Audio(audio) => { |
| 89 | + // Voice message - we'll handle transcription later |
| 90 | + format!("[Voice message: {}]", audio.body) |
| 91 | + } |
| 92 | + MessageType::Image(img) => { |
| 93 | + format!("[Image: {}]", img.body) |
| 94 | + } |
| 95 | + MessageType::File(file) => { |
| 96 | + format!("[File: {}]", file.body) |
| 97 | + } |
| 98 | + _ => { |
| 99 | + log::debug!("Unsupported Matrix message type"); |
| 100 | + return; |
| 101 | + } |
| 102 | + }; |
| 103 | + |
| 104 | + // Get room ID |
| 105 | + let room_id = room.room_id().to_string(); |
| 106 | + |
| 107 | + log::info!("Matrix message from {} in room {}", sender, room_id); |
| 108 | + |
| 109 | + // Create inbound message |
| 110 | + let inbound = InboundMessage::new("matrix", &sender, &room_id, &content); |
| 111 | + |
| 112 | + // Send to bus |
| 113 | + if let Err(e) = bus.inbound_sender().send(inbound).await { |
| 114 | + log::error!("Failed to send message to bus: {}", e); |
| 115 | + } |
| 116 | + } |
| 117 | + }, |
| 118 | + ); |
| 119 | + |
| 120 | + // Start sync |
| 121 | + let settings = SyncSettings::default(); |
| 122 | + tokio::spawn(async move { |
| 123 | + if let Err(e) = client.sync(settings).await { |
| 124 | + log::error!("Matrix sync error: {}", e); |
| 125 | + } |
| 126 | + }); |
| 127 | + |
| 128 | + Ok(()) |
| 129 | + } |
| 130 | + |
| 131 | + #[cfg(not(feature = "matrix"))] |
| 132 | + { |
| 133 | + anyhow::bail!("Matrix feature not enabled") |
| 134 | + } |
| 135 | + } |
| 136 | + |
| 137 | + async fn stop(&self) -> anyhow::Result<()> { |
| 138 | + log::info!("Matrix channel stopping"); |
| 139 | + self.running.store(false, Ordering::SeqCst); |
| 140 | + Ok(()) |
| 141 | + } |
| 142 | + |
| 143 | + async fn send(&self, msg: OutboundMessage) -> anyhow::Result<()> { |
| 144 | + #[cfg(feature = "matrix")] |
| 145 | + { |
| 146 | + use matrix_sdk::{ |
| 147 | + room::Room, |
| 148 | + ruma::RoomId, |
| 149 | + Client, |
| 150 | + }; |
| 151 | + |
| 152 | + // Build client (reconnect for send) |
| 153 | + let client = Client::builder() |
| 154 | + .homeserver_url(&self.config.homeserver_url) |
| 155 | + .build() |
| 156 | + .await?; |
| 157 | + |
| 158 | + // Login |
| 159 | + client |
| 160 | + .matrix_auth() |
| 161 | + .login_username(&self.config.username, &self.config.password) |
| 162 | + .await?; |
| 163 | + |
| 164 | + // Parse room ID |
| 165 | + let room_id = msg |
| 166 | + .chat_id |
| 167 | + .parse::<Box<RoomId>>() |
| 168 | + .map_err(|e| anyhow::anyhow!("Invalid room ID: {}", e))?; |
| 169 | + |
| 170 | + // Get room |
| 171 | + if let Some(room) = client.get_room(&room_id) { |
| 172 | + // Send message |
| 173 | + room.send_plain_text(&msg.content).await?; |
| 174 | + log::debug!("Sent message to Matrix room {}", room_id); |
| 175 | + } else { |
| 176 | + anyhow::bail!("Room {} not found", room_id); |
| 177 | + } |
| 178 | + |
| 179 | + Ok(()) |
| 180 | + } |
| 181 | + |
| 182 | + #[cfg(not(feature = "matrix"))] |
| 183 | + { |
| 184 | + let _ = msg; |
| 185 | + anyhow::bail!("Matrix feature not enabled") |
| 186 | + } |
| 187 | + } |
| 188 | + |
| 189 | + fn is_running(&self) -> bool { |
| 190 | + self.running.load(Ordering::SeqCst) |
| 191 | + } |
| 192 | + |
| 193 | + fn is_allowed(&self, sender_id: &str) -> bool { |
| 194 | + self.config.is_allowed(sender_id) |
| 195 | + } |
| 196 | +} |
| 197 | + |
| 198 | +#[cfg(test)] |
| 199 | +mod tests { |
| 200 | + use super::*; |
| 201 | + |
| 202 | + #[test] |
| 203 | + fn test_matrix_channel_name() { |
| 204 | + let config = MatrixConfig { |
| 205 | + homeserver_url: "https://example.com".to_string(), |
| 206 | + username: "test".to_string(), |
| 207 | + password: "pass".to_string(), |
| 208 | + allow_from: vec!["@user:example.com".to_string()], |
| 209 | + }; |
| 210 | + let channel = MatrixChannel::new(config); |
| 211 | + assert_eq!(channel.name(), "matrix"); |
| 212 | + } |
| 213 | + |
| 214 | + #[test] |
| 215 | + fn test_matrix_is_allowed() { |
| 216 | + let config = MatrixConfig { |
| 217 | + homeserver_url: "https://example.com".to_string(), |
| 218 | + username: "test".to_string(), |
| 219 | + password: "pass".to_string(), |
| 220 | + allow_from: vec![ |
| 221 | + "@user1:example.com".to_string(), |
| 222 | + "@user2:example.com".to_string(), |
| 223 | + ], |
| 224 | + }; |
| 225 | + let channel = MatrixChannel::new(config); |
| 226 | + assert!(channel.is_allowed("@user1:example.com")); |
| 227 | + assert!(channel.is_allowed("@user2:example.com")); |
| 228 | + assert!(!channel.is_allowed("@user3:example.com")); |
| 229 | + } |
| 230 | +} |
0 commit comments