|
| 1 | +use interprocess::local_socket::{GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, Name, prelude::*}; |
| 2 | +use std::io::{ErrorKind, Read, Write}; |
| 3 | +use std::sync::mpsc; |
| 4 | +use std::thread; |
| 5 | +use std::time::Duration; |
| 6 | + |
| 7 | +use crate::consts::APP_SOCKET_FILE_NAME; |
| 8 | +use crate::event::{AppEvent, AppEventScheduler}; |
| 9 | + |
| 10 | +// TODO: Needs to be integrated/replaced with the action system. |
| 11 | +// TODO: At that point this should just wrap the action, meaning all actions bindable by the user can also be accessed via the socket. |
| 12 | +#[derive(serde::Serialize, serde::Deserialize)] |
| 13 | +pub(crate) enum Message { |
| 14 | + OpenFiles(Vec<std::path::PathBuf>), |
| 15 | +} |
| 16 | + |
| 17 | +fn handle_message(message: Message, app_event_scheduler: &AppEventScheduler) { |
| 18 | + match message { |
| 19 | + Message::OpenFiles(paths) => { |
| 20 | + app_event_scheduler.schedule(AppEvent::OpenFiles(paths)); |
| 21 | + } |
| 22 | + } |
| 23 | +} |
| 24 | + |
| 25 | +pub(crate) fn send(message: Message) -> std::io::Result<()> { |
| 26 | + let data = ron::ser::to_string(&message).map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))?; |
| 27 | + let mut connection = interprocess::local_socket::Stream::connect(socket_name())?; |
| 28 | + connection.write_all(data.as_bytes()) |
| 29 | +} |
| 30 | + |
| 31 | +pub(crate) struct SocketHandle { |
| 32 | + thread: Option<thread::JoinHandle<()>>, |
| 33 | + shutdown_sender: mpsc::Sender<()>, |
| 34 | +} |
| 35 | +impl Drop for SocketHandle { |
| 36 | + fn drop(&mut self) { |
| 37 | + let _ = self.shutdown_sender.send(()); |
| 38 | + let _ = self.thread.take().expect("SocketHandle can only be dropped once").join(); |
| 39 | + } |
| 40 | +} |
| 41 | + |
| 42 | +pub(crate) fn start(app_event_scheduler: AppEventScheduler) -> SocketHandle { |
| 43 | + let (shutdown_sender, shutdown_receiver) = mpsc::channel(); |
| 44 | + |
| 45 | + let thread = thread::Builder::new() |
| 46 | + .name("socket".to_string()) |
| 47 | + .spawn(move || run(app_event_scheduler, shutdown_receiver)) |
| 48 | + .expect("Failed to spawn socket thread"); |
| 49 | + |
| 50 | + SocketHandle { |
| 51 | + shutdown_sender, |
| 52 | + thread: Some(thread), |
| 53 | + } |
| 54 | +} |
| 55 | + |
| 56 | +fn run(app_event_scheduler: AppEventScheduler, shutdown_receiver: mpsc::Receiver<()>) { |
| 57 | + let listener = match ListenerOptions::new() |
| 58 | + .name(socket_name()) |
| 59 | + .nonblocking(ListenerNonblockingMode::Accept) |
| 60 | + .try_overwrite(true) |
| 61 | + .max_spin_time(Duration::from_millis(100)) |
| 62 | + .create_sync() |
| 63 | + { |
| 64 | + Ok(listener) => listener, |
| 65 | + Err(error) => { |
| 66 | + tracing::error!("Failed to bind socket: {}", error); |
| 67 | + return; |
| 68 | + } |
| 69 | + }; |
| 70 | + |
| 71 | + let max_backoff = Duration::from_millis(100); |
| 72 | + let mut backoff = Duration::ZERO; |
| 73 | + |
| 74 | + loop { |
| 75 | + if backoff.is_zero() { |
| 76 | + match shutdown_receiver.try_recv() { |
| 77 | + Ok(()) | Err(mpsc::TryRecvError::Disconnected) => break, |
| 78 | + Err(mpsc::TryRecvError::Empty) => {} |
| 79 | + } |
| 80 | + backoff = Duration::from_nanos(1); |
| 81 | + } else { |
| 82 | + match shutdown_receiver.recv_timeout(backoff) { |
| 83 | + Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => break, |
| 84 | + Err(mpsc::RecvTimeoutError::Timeout) => {} |
| 85 | + } |
| 86 | + backoff = (backoff * 2).min(max_backoff); |
| 87 | + } |
| 88 | + |
| 89 | + match listener.accept() { |
| 90 | + Ok(mut connection) => { |
| 91 | + backoff = Duration::ZERO; |
| 92 | + |
| 93 | + let app_event_scheduler = app_event_scheduler.clone(); |
| 94 | + let spawn_result = thread::Builder::new().name("socket-connection".to_string()).spawn(move || { |
| 95 | + let mut data = String::new(); |
| 96 | + if let Err(error) = connection.read_to_string(&mut data) { |
| 97 | + tracing::error!("Failed to read socket message: {}", error); |
| 98 | + return; |
| 99 | + } |
| 100 | + |
| 101 | + match ron::de::from_str(&data) { |
| 102 | + Ok(message) => handle_message(message, &app_event_scheduler), |
| 103 | + Err(error) => tracing::error!("Failed to deserialize socket message: {}", error), |
| 104 | + } |
| 105 | + }); |
| 106 | + if let Err(error) = spawn_result { |
| 107 | + tracing::error!("Failed to spawn socket connection thread: {}", error); |
| 108 | + } |
| 109 | + } |
| 110 | + Err(error) if matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::Interrupted) => {} |
| 111 | + Err(error) => { |
| 112 | + tracing::error!("Failed to accept socket connection: {}", error); |
| 113 | + } |
| 114 | + } |
| 115 | + } |
| 116 | +} |
| 117 | + |
| 118 | +fn socket_name() -> Name<'static> { |
| 119 | + if cfg!(target_os = "windows") { |
| 120 | + let user = std::env::var("USERNAME").unwrap_or_default(); |
| 121 | + let name = format!("{user}-{app}-{APP_SOCKET_FILE_NAME}", app = crate::consts::APP_NAME); |
| 122 | + name.to_ns_name::<GenericNamespaced>().expect("valid named pipe name") |
| 123 | + } else { |
| 124 | + crate::dirs::app_data_dir().join(APP_SOCKET_FILE_NAME).to_fs_name::<GenericFilePath>().expect("valid socket path") |
| 125 | + } |
| 126 | +} |
0 commit comments