Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Bug Fixes

- **WebSocket `getState` returns no response** — sending `{"type":"getState"}` over the WebSocket API would receive no reply. The handler was routing the response to a Tauri frontend IPC event (`timer:state-query`) instead of writing it back through the WebSocket connection. The fix introduces a per-connection `tokio::sync::mpsc` channel so the receive task can deliver direct replies through the send task, which holds the WebSocket sender. `getState` now correctly responds with `{"type":"state","payload":{...}}` to the requesting client only.
- **Custom notification sounds not playing on Windows** — ADPCM-encoded WAV files (the default output of Windows Sound Recorder) failed to decode because the `symphonia-codec-adpcm` crate was not in the dependency tree. The app would silently fall back to the built-in sound while still displaying the custom filename in Settings. ADPCM decoding is now enabled by adding `symphonia-codec-adpcm` as a direct dependency.
- **Custom sound file picker offered FLAC, which could never be decoded** — the file picker filter included `.flac` as a valid extension, but FLAC decoding was never compiled in. FLAC has been removed from the filter.
- **Unsupported audio format selected silently reverted to default** — when a custom sound file was copied successfully but failed to decode (unsupported encoding), the app fell back to the default sound with no indication to the user. The file is now probed immediately after being copied; if decoding fails the file is discarded and an inline error message is shown below the relevant audio row in Settings → Notifications.
Expand Down
1 change: 1 addition & 0 deletions src-tauri/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ symphonia-codec-adpcm = "0.5.5"
tiny-skia = "0.12"
notify = "8"

[dev-dependencies]
tokio-tungstenite = "0.29"

[target.'cfg(target_os = "macos")'.dependencies]
objc2 = "0.6"
raw-window-handle = "0.6"
179 changes: 140 additions & 39 deletions src-tauri/src/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures_util::{SinkExt, StreamExt};
use tauri::{AppHandle, Emitter, Manager};
use tokio::{
net::TcpListener,
sync::broadcast,
sync::{broadcast, mpsc},
task::JoinHandle,
};

Expand Down Expand Up @@ -68,8 +68,8 @@ pub enum WsEvent {

#[derive(Clone)]
struct ServerState {
app: AppHandle,
broadcast_tx: broadcast::Sender<WsEvent>,
snapshot_fn: Arc<dyn Fn() -> Option<TimerSnapshot> + Send + Sync>,
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -113,9 +113,16 @@ pub async fn start(port: u16, app: AppHandle, state: &Arc<WsState>) {
}
};

let app_clone = app.clone();
let snapshot_fn = Arc::new(move || {
app_clone
.try_state::<TimerController>()
.map(|t| t.get_snapshot())
}) as Arc<dyn Fn() -> Option<TimerSnapshot> + Send + Sync>;

let server_state = ServerState {
app: app.clone(),
broadcast_tx: state.broadcast_tx.clone(),
snapshot_fn,
};

let router = Router::new()
Expand Down Expand Up @@ -154,28 +161,33 @@ async fn handle_socket(socket: WebSocket, state: ServerState) {
log::debug!("[ws] client connected");
let (mut sender, mut receiver) = socket.split();
let mut rx = state.broadcast_tx.subscribe();
let (direct_tx, mut direct_rx) = mpsc::unbounded_channel::<String>();

// Task: forward broadcast events to this client.
// Task: forward broadcast events and direct replies to this client.
let mut send_task = tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
let json = match serde_json::to_string(&event) {
Ok(s) => s,
Err(_) => continue,
};
if sender.send(Message::Text(json.into())).await.is_err() {
break;
loop {
tokio::select! {
result = rx.recv() => {
let Ok(event) = result else { break };
let Ok(json) = serde_json::to_string(&event) else { continue };
if sender.send(Message::Text(json.into())).await.is_err() { break }
}
msg = direct_rx.recv() => {
let Some(json) = msg else { break };
if sender.send(Message::Text(json.into())).await.is_err() { break }
}
}
}
});

// Main loop: handle incoming messages from this client.
let app = state.app.clone();
let broadcast_tx = state.broadcast_tx.clone();
let snapshot_fn = Arc::clone(&state.snapshot_fn);
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(text) => {
handle_client_message(&text, &app, &broadcast_tx).await;
let snapshot = (snapshot_fn)();
handle_client_message(&text, snapshot, &direct_tx).await;
}
Message::Close(_) => break,
_ => {}
Expand All @@ -193,24 +205,19 @@ async fn handle_socket(socket: WebSocket, state: ServerState) {

async fn handle_client_message(
text: &str,
app: &AppHandle,
_broadcast_tx: &broadcast::Sender<WsEvent>,
snapshot: Option<TimerSnapshot>,
direct_tx: &mpsc::UnboundedSender<String>,
) {
let Ok(msg) = serde_json::from_str::<serde_json::Value>(text) else {
return;
};

if let Some("getState") = msg.get("type").and_then(|t| t.as_str()) {
if let Some(timer) = app.try_state::<TimerController>() {
let snapshot = timer.get_snapshot();
let response = serde_json::json!({
"type": "state",
"payload": snapshot,
});
// Note: we can't send directly here without the sender;
// the client will receive state via the next broadcast.
// For an immediate reply, broadcast it.
let _ = app.emit("timer:state-query", response);
if let Some(snap) = snapshot {
let json = serde_json::to_string(
&serde_json::json!({ "type": "state", "payload": snap })
).unwrap_or_default();
let _ = direct_tx.send(json);
}
}
}
Expand Down Expand Up @@ -252,17 +259,8 @@ pub fn broadcast_reset(state: &Arc<WsState>) {
mod tests {
use super::*;

#[test]
fn ws_state_can_be_created() {
let state = WsState::new();
// broadcast_tx should have 0 receivers initially.
assert_eq!(state.broadcast_tx.receiver_count(), 0);
}

#[test]
fn ws_event_serializes_correctly() {
use crate::timer::TimerSnapshot;
let snap = TimerSnapshot {
fn make_snapshot() -> TimerSnapshot {
TimerSnapshot {
round_type: "work".into(),
previous_round_type: "short-break".into(),
elapsed_secs: 60,
Expand All @@ -272,8 +270,20 @@ mod tests {
work_round_number: 1,
work_rounds_total: 4,
session_work_count: 1,
};
let event = WsEvent::RoundChange { payload: snap };
}
}

// -- existing serialization tests --

#[test]
fn ws_state_can_be_created() {
let state = WsState::new();
assert_eq!(state.broadcast_tx.receiver_count(), 0);
}

#[test]
fn ws_event_serializes_correctly() {
let event = WsEvent::RoundChange { payload: make_snapshot() };
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("\"type\":\"roundChange\""));
assert!(json.contains("\"elapsed_secs\":60"));
Expand Down Expand Up @@ -309,4 +319,95 @@ mod tests {
let json = serde_json::to_string(&event).unwrap();
assert_eq!(json, r#"{"type":"reset"}"#);
}

// -- handle_client_message unit tests --

#[tokio::test]
async fn getstate_sends_state_reply() {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
handle_client_message(r#"{"type":"getState"}"#, Some(make_snapshot()), &tx).await;
let reply = rx.try_recv().expect("expected a reply on direct channel");
let val: serde_json::Value = serde_json::from_str(&reply).unwrap();
assert_eq!(val["type"], "state");
assert_eq!(val["payload"]["elapsed_secs"], 60);
assert_eq!(val["payload"]["total_secs"], 1500);
assert_eq!(val["payload"]["round_type"], "work");
assert_eq!(val["payload"]["is_running"], true);
}

#[tokio::test]
async fn getstate_no_timer_state_sends_nothing() {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
handle_client_message(r#"{"type":"getState"}"#, None, &tx).await;
assert!(rx.try_recv().is_err(), "expected no reply when snapshot is None");
}

#[tokio::test]
async fn malformed_json_is_silently_ignored() {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
handle_client_message("not valid json {{{", Some(make_snapshot()), &tx).await;
assert!(rx.try_recv().is_err(), "expected no reply for malformed JSON");
}

#[tokio::test]
async fn unknown_message_type_is_ignored() {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
handle_client_message(r#"{"type":"unknownCommand"}"#, Some(make_snapshot()), &tx).await;
assert!(rx.try_recv().is_err(), "expected no reply for unknown message type");
}

#[tokio::test]
async fn reply_uses_direct_channel_not_broadcast() {
let (broadcast_tx, _) = broadcast::channel::<WsEvent>(8);
let (direct_tx, mut direct_rx) = mpsc::unbounded_channel::<String>();
handle_client_message(r#"{"type":"getState"}"#, Some(make_snapshot()), &direct_tx).await;
// Reply appeared on the direct channel
assert!(direct_rx.try_recv().is_ok(), "expected reply on direct channel");
// Nothing sent to the broadcast channel
assert_eq!(broadcast_tx.receiver_count(), 0);
}

// -- network-level integration test --

#[tokio::test]
async fn integration_getstate_round_trip() {
use axum::Router;
use tokio::net::TcpListener;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message as TungMessage;
use futures_util::{SinkExt, StreamExt};

let snap = make_snapshot();
let snap_clone = snap.clone();
let snapshot_fn = Arc::new(move || Some(snap_clone.clone()))
as Arc<dyn Fn() -> Option<TimerSnapshot> + Send + Sync>;

let (broadcast_tx, _) = broadcast::channel::<WsEvent>(8);
let server_state = ServerState { broadcast_tx, snapshot_fn };

let router = Router::new()
.route("/ws", get(ws_handler))
.with_state(server_state);

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();

tokio::spawn(async move {
axum::serve(listener, router).await.unwrap();
});

let url = format!("ws://127.0.0.1:{port}/ws");
let (mut ws, _) = connect_async(&url).await.expect("WebSocket connect failed");

ws.send(TungMessage::Text(r#"{"type":"getState"}"#.into())).await.unwrap();

let msg = ws.next().await.expect("expected a message").unwrap();
let TungMessage::Text(text) = msg else { panic!("expected text frame") };
let val: serde_json::Value = serde_json::from_str(&text).unwrap();

assert_eq!(val["type"], "state", "response type should be 'state'");
assert_eq!(val["payload"]["elapsed_secs"], snap.elapsed_secs);
assert_eq!(val["payload"]["total_secs"], snap.total_secs);
assert_eq!(val["payload"]["round_type"], snap.round_type);
}
}
Loading