55//! between devices on the same local network. The creator runs a WS server,
66//! the joiner connects as a client.
77//!
8- //! Phase 5c will add `WebRtcTransport` for cross-network sync.
8+ //! Phase 5d adds security hardening:
9+ //! - Heartbeat every 5s with 15s dead-man's-switch timeout
10+ //! - Session timeout (4h max, warning at 3h45m)
11+ //! - Forward secrecy via HKDF key ratchet every 30 minutes
912
1013use async_tungstenite:: tokio:: { accept_async, connect_async} ;
1114use async_tungstenite:: tungstenite:: Message ;
@@ -16,11 +19,27 @@ use tauri::{AppHandle, Emitter};
1619use tokio:: net:: TcpListener ;
1720use tokio:: sync:: mpsc;
1821use tokio:: sync:: Mutex ;
22+ use tokio:: time:: { Duration , Instant , interval} ;
1923
2024use crate :: sync:: document:: SyncDocument ;
2125use crate :: sync:: encryption:: { EncryptedEnvelope , SessionEncryption } ;
2226use crate :: sync:: pairing:: { PairingCreator , PairingJoiner } ;
2327
28+ // ---------------------------------------------------------------------------
29+ // Constants
30+ // ---------------------------------------------------------------------------
31+
32+ /// Heartbeat interval — send a keepalive every 5 seconds.
33+ const HEARTBEAT_INTERVAL : Duration = Duration :: from_secs ( 5 ) ;
34+ /// Peer timeout — if no message received for 15 seconds, disconnect.
35+ const PEER_TIMEOUT : Duration = Duration :: from_secs ( 15 ) ;
36+ /// Session maximum duration — 4 hours.
37+ const SESSION_MAX_DURATION : Duration = Duration :: from_secs ( 4 * 60 * 60 ) ;
38+ /// Session warning — 15 minutes before max duration (3h45m).
39+ const SESSION_WARNING_BEFORE : Duration = Duration :: from_secs ( 15 * 60 ) ;
40+ /// Key rotation interval — rotate encryption key every 30 minutes.
41+ const KEY_ROTATION_INTERVAL : Duration = Duration :: from_secs ( 30 * 60 ) ;
42+
2443// ---------------------------------------------------------------------------
2544// Wire protocol messages
2645// ---------------------------------------------------------------------------
@@ -50,6 +69,9 @@ pub enum SyncMessage {
5069 /// Heartbeat / keepalive.
5170 #[ serde( rename = "heartbeat" ) ]
5271 Heartbeat { timestamp : i64 } ,
72+ /// Key rotation notification — peer has rotated its encryption key.
73+ #[ serde( rename = "key_rotate" ) ]
74+ KeyRotate { epoch : u64 } ,
5375 /// Session terminated by peer.
5476 #[ serde( rename = "goodbye" ) ]
5577 Goodbye ,
@@ -83,13 +105,6 @@ impl TransportHandle {
83105
84106/// Start a local WebSocket server on a random port.
85107/// Returns the bound port and a handle for sending updates.
86- ///
87- /// Flow:
88- /// 1. Bind TCP listener on random port
89- /// 2. Wait for joiner to connect
90- /// 3. Perform SPAKE2 key exchange
91- /// 4. Exchange device info
92- /// 5. Enter sync loop: send/receive encrypted yrs updates
93108pub async fn start_creator_transport (
94109 app : AppHandle ,
95110 pairing_code : String ,
@@ -168,7 +183,7 @@ async fn handle_creator_connection(
168183 . ok_or ( "Connection closed before SPAKE2 exchange" ) ?
169184 . map_err ( |e| format ! ( "Failed to receive SPAKE2 message: {}" , e) ) ?;
170185 let joiner_payload = extract_spake2_payload ( joiner_spake) ?;
171- let encryption = Arc :: new ( creator. finish ( & joiner_payload) ?) ;
186+ let encryption = creator. finish ( & joiner_payload) ?;
172187 tracing:: info!( "Sync transport: SPAKE2 key exchange complete (creator)" ) ;
173188
174189 // --- Exchange device info ---
@@ -268,7 +283,7 @@ pub async fn start_joiner_transport(
268283 . await
269284 . map_err ( |e| format ! ( "Failed to send SPAKE2 message: {}" , e) ) ?;
270285
271- let encryption = Arc :: new ( joiner. finish ( & creator_payload) ?) ;
286+ let encryption = joiner. finish ( & creator_payload) ?;
272287 tracing:: info!( "Sync transport: SPAKE2 key exchange complete (joiner)" ) ;
273288
274289 // --- Exchange device info (joiner receives first, then sends) ---
@@ -410,30 +425,57 @@ where
410425 }
411426}
412427
428+ /// Send a JSON-serialized SyncMessage over the WebSocket.
429+ async fn send_msg < S > ( ws_write : & mut S , msg : & SyncMessage ) -> Result < ( ) , String >
430+ where
431+ S : futures_util:: Sink < Message , Error = async_tungstenite:: tungstenite:: Error > + Unpin ,
432+ {
433+ let json = serde_json:: to_string ( msg) . map_err ( |e| format ! ( "Failed to serialize: {}" , e) ) ?;
434+ ws_write
435+ . send ( Message :: Text ( json) )
436+ . await
437+ . map_err ( |e| format ! ( "Failed to send: {}" , e) )
438+ }
439+
413440// ---------------------------------------------------------------------------
414- // Shared sync loop
441+ // Shared sync loop (with security hardening)
415442// ---------------------------------------------------------------------------
416443
417444/// The main sync loop shared by both creator and joiner.
418- /// Receives plaintext updates from the local app via `outbound_rx`,
419- /// encrypts them, and sends over the WebSocket. Incoming encrypted
420- /// updates are decrypted and applied to the local CRDT document.
445+ ///
446+ /// Security features (Phase 5d):
447+ /// - **Heartbeat**: Sends keepalive every 5s, disconnects if peer silent for 15s
448+ /// - **Session timeout**: Auto-disconnect after 4h, warning at 3h45m
449+ /// - **Forward secrecy**: HKDF key ratchet every 30 minutes
421450async fn run_sync_loop < S , R > (
422451 app : AppHandle ,
423452 mut ws_write : S ,
424453 mut ws_read : R ,
425454 mut outbound_rx : mpsc:: Receiver < Vec < u8 > > ,
426- encryption : Arc < SessionEncryption > ,
455+ mut encryption : SessionEncryption ,
427456 doc : Arc < Mutex < SyncDocument > > ,
428457) -> Result < ( ) , String >
429458where
430459 S : futures_util:: Sink < Message , Error = async_tungstenite:: tungstenite:: Error > + Unpin ,
431460 R : futures_util:: Stream < Item = Result < Message , async_tungstenite:: tungstenite:: Error > > + Unpin ,
432461{
462+ let mut heartbeat_timer = interval ( HEARTBEAT_INTERVAL ) ;
463+ heartbeat_timer. tick ( ) . await ; // consume the immediate first tick
464+
465+ let session_start = Instant :: now ( ) ;
466+ let session_warning_at = SESSION_MAX_DURATION - SESSION_WARNING_BEFORE ;
467+ let mut session_warning_sent = false ;
468+
469+ let mut last_peer_activity = Instant :: now ( ) ;
470+ let mut key_rotation_epoch: u64 = 0 ;
471+ let mut last_key_rotation = Instant :: now ( ) ;
472+
433473 loop {
434474 tokio:: select! {
435475 // Inbound: message from remote peer
436476 incoming = ws_read. next( ) => {
477+ last_peer_activity = Instant :: now( ) ;
478+
437479 match incoming {
438480 Some ( Ok ( Message :: Text ( text) ) ) => {
439481 match serde_json:: from_str:: <SyncMessage >( & text) {
@@ -454,7 +496,20 @@ where
454496 }
455497 }
456498 Ok ( SyncMessage :: Heartbeat { .. } ) => {
457- // Peer is alive — nothing to do
499+ // Peer is alive — last_peer_activity already updated
500+ }
501+ Ok ( SyncMessage :: KeyRotate { epoch } ) => {
502+ // Peer rotated their key — we must rotate too
503+ if epoch > key_rotation_epoch {
504+ encryption. rotate_key( )
505+ . map_err( |e| format!( "Key rotation failed: {}" , e) ) ?;
506+ key_rotation_epoch = epoch;
507+ last_key_rotation = Instant :: now( ) ;
508+ tracing:: info!(
509+ "Key rotated to epoch {} (triggered by peer)" ,
510+ epoch
511+ ) ;
512+ }
458513 }
459514 Ok ( SyncMessage :: Goodbye ) => {
460515 tracing:: info!( "Peer disconnected gracefully" ) ;
@@ -476,7 +531,10 @@ where
476531 tracing:: error!( "Sync transport read error: {}" , e) ;
477532 break ;
478533 }
479- _ => { } // Ignore ping/pong/binary
534+ _ => {
535+ // Ping/pong/binary — still counts as activity
536+ last_peer_activity = Instant :: now( ) ;
537+ }
480538 }
481539 }
482540
@@ -487,9 +545,7 @@ where
487545 match encryption. encrypt( & plaintext) {
488546 Ok ( envelope) => {
489547 let msg = SyncMessage :: Update { envelope } ;
490- let json = serde_json:: to_string( & msg)
491- . map_err( |e| format!( "Failed to serialize: {}" , e) ) ?;
492- if ws_write. send( Message :: Text ( json) ) . await . is_err( ) {
548+ if send_msg( & mut ws_write, & msg) . await . is_err( ) {
493549 tracing:: error!( "Sync transport: failed to send update" ) ;
494550 break ;
495551 }
@@ -501,10 +557,70 @@ where
501557 }
502558 None => {
503559 // Channel closed (TransportHandle dropped) — send goodbye
504- let goodbye = serde_json:: to_string( & SyncMessage :: Goodbye ) . unwrap_or_default( ) ;
505- let _ = ws_write. send( Message :: Text ( goodbye) ) . await ;
560+ let _ = send_msg( & mut ws_write, & SyncMessage :: Goodbye ) . await ;
561+ break ;
562+ }
563+ }
564+ }
565+
566+ // Heartbeat timer
567+ _ = heartbeat_timer. tick( ) => {
568+ // --- Dead man's switch: check peer liveness ---
569+ if last_peer_activity. elapsed( ) > PEER_TIMEOUT {
570+ tracing:: warn!(
571+ "Peer heartbeat timeout ({:.1}s since last activity)" ,
572+ last_peer_activity. elapsed( ) . as_secs_f64( )
573+ ) ;
574+ app. emit( "sync-heartbeat-timeout" , ( ) ) . ok( ) ;
575+ break ;
576+ }
577+
578+ // --- Send heartbeat ---
579+ let hb = SyncMessage :: Heartbeat {
580+ timestamp: chrono:: Utc :: now( ) . timestamp( ) ,
581+ } ;
582+ if send_msg( & mut ws_write, & hb) . await . is_err( ) {
583+ tracing:: error!( "Sync transport: failed to send heartbeat" ) ;
584+ break ;
585+ }
586+
587+ // --- Session timeout check ---
588+ let elapsed = session_start. elapsed( ) ;
589+ if elapsed >= SESSION_MAX_DURATION {
590+ tracing:: info!( "Session timeout reached (4h) — disconnecting" ) ;
591+ app. emit( "sync-session-timeout" , ( ) ) . ok( ) ;
592+ let _ = send_msg( & mut ws_write, & SyncMessage :: Goodbye ) . await ;
593+ break ;
594+ }
595+ if !session_warning_sent && elapsed >= session_warning_at {
596+ let remaining_secs = ( SESSION_MAX_DURATION - elapsed) . as_secs( ) ;
597+ tracing:: info!(
598+ "Session timeout warning: {}m remaining" ,
599+ remaining_secs / 60
600+ ) ;
601+ app. emit( "sync-session-warning" , remaining_secs) . ok( ) ;
602+ session_warning_sent = true ;
603+ }
604+
605+ // --- Forward secrecy: key rotation ---
606+ if last_key_rotation. elapsed( ) >= KEY_ROTATION_INTERVAL {
607+ key_rotation_epoch += 1 ;
608+ // Notify peer BEFORE rotating (WebSocket is ordered)
609+ let rotate_msg = SyncMessage :: KeyRotate {
610+ epoch: key_rotation_epoch,
611+ } ;
612+ if send_msg( & mut ws_write, & rotate_msg) . await . is_err( ) {
613+ tracing:: error!( "Failed to send key rotation notification" ) ;
506614 break ;
507615 }
616+ // Now rotate our own key
617+ encryption. rotate_key( )
618+ . map_err( |e| format!( "Key rotation failed: {}" , e) ) ?;
619+ last_key_rotation = Instant :: now( ) ;
620+ tracing:: info!(
621+ "Key rotated to epoch {} (forward secrecy)" ,
622+ key_rotation_epoch
623+ ) ;
508624 }
509625 }
510626 }
@@ -577,4 +693,25 @@ mod tests {
577693 _ => panic ! ( "Expected DeviceInfo" ) ,
578694 }
579695 }
696+
697+ #[ test]
698+ fn test_key_rotate_serialization ( ) {
699+ let msg = SyncMessage :: KeyRotate { epoch : 42 } ;
700+ let json = serde_json:: to_string ( & msg) . unwrap ( ) ;
701+ assert ! ( json. contains( "key_rotate" ) ) ;
702+ assert ! ( json. contains( "42" ) ) ;
703+
704+ let parsed: SyncMessage = serde_json:: from_str ( & json) . unwrap ( ) ;
705+ match parsed {
706+ SyncMessage :: KeyRotate { epoch } => assert_eq ! ( epoch, 42 ) ,
707+ _ => panic ! ( "Expected KeyRotate" ) ,
708+ }
709+ }
710+
711+ #[ test]
712+ fn test_constants_sanity ( ) {
713+ assert ! ( PEER_TIMEOUT > HEARTBEAT_INTERVAL ) ;
714+ assert ! ( SESSION_MAX_DURATION > SESSION_WARNING_BEFORE ) ;
715+ assert ! ( KEY_ROTATION_INTERVAL < SESSION_MAX_DURATION ) ;
716+ }
580717}
0 commit comments