diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 2e718528afd..f59f8cd68dd 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -145,7 +145,7 @@ where let identity_token = auth.creds.token().into(); - let module_rx = leader.module_watcher().await.map_err(log_and_500)?; + let mut module_rx = leader.module_watcher().await.map_err(log_and_500)?; let client_id = ClientActorId { identity: auth.identity, @@ -163,32 +163,46 @@ where let ws = match ws_upgrade.upgrade(ws_config).await { Ok(ws) => ws, Err(err) => { - log::error!("WebSocket init error: {err}"); + log::error!("websocket: WebSocket init error: {err}"); return; } }; - match forwarded_for { + let identity = client_id.identity; + let client_log_string = match forwarded_for { Some(TypedHeader(XForwardedFor(ip))) => { - log::debug!("New client connected from ip {ip}") + format!("ip {ip} with Identity {identity} and ConnectionId {connection_id}") } - None => log::debug!("New client connected from unknown ip"), - } + None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"), + }; - let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx); - let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await - { - Ok(s) => s, + log::debug!("websocket: New client connected from {client_log_string}"); + + let connected = match ClientConnection::call_client_connected_maybe_reject(&mut module_rx, client_id).await { + Ok(connected) => { + log::debug!("websocket: client_connected returned Ok for {client_log_string}"); + connected + } Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => { - log::info!("{e}"); + log::info!( + "websocket: Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}" + ); return; } Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => { - log::warn!("ModuleHost died while we were connecting: {e:#}"); + log::warn!("websocket: ModuleHost died while {client_log_string} was connecting: {e:#}"); return; } }; + log::debug!( + "websocket: Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection" + ); + + let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx); + let client = + ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor, connected).await; + // Send the client their identity token message as the first message // NOTE: We're adding this to the protocol because some client libraries are // unable to access the http response headers. @@ -200,7 +214,7 @@ where connection_id, }; if let Err(e) = client.send_message(message) { - log::warn!("{e}, before identity token was sent") + log::warn!("websocket: Error sending IdentityToken message to {client_log_string}: {e}"); } }); diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index acd9466d5b1..cf3e5511fff 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -392,15 +392,49 @@ impl Drop for MeteredReceiver { const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB; const KB: usize = 1024; +/// Value returned by [`ClientConnection::call_client_connected_maybe_reject`] +/// and consumed by [`ClientConnection::spawn`] which acts as a proof that the client is authorized. +/// +/// Because this struct does not capture the module or database info or the client connection info, +/// a malicious caller could [`ClientConnected::call_client_connected_maybe_reject`] for one client +/// and then use the resulting `Connected` token to [`ClientConnection::spawn`] for a different client. +/// We're not particularly worried about that. +/// This token exists as a sanity check that non-malicious callers don't accidentally [`ClientConnection::spawn`] +/// for an unauthorized client. +#[non_exhaustive] +pub struct Connected { + _private: (), +} + impl ClientConnection { - /// Returns an error if ModuleHost closed + /// Call the database at `module_rx`'s `client_connection` reducer, if any, + /// and return `Err` if it signals rejecting this client's connection. + /// + /// Call this method before [`Self::spawn`] + /// and pass the returned [`Connected`] to [`Self::spawn`] as proof that the client is authorized. + pub async fn call_client_connected_maybe_reject( + module_rx: &mut watch::Receiver, + id: ClientActorId, + ) -> Result { + let module = module_rx.borrow_and_update().clone(); + module.call_identity_connected(id.identity, id.connection_id).await?; + Ok(Connected { _private: () }) + } + + /// Spawn a new [`ClientConnection`] for a WebSocket subscriber. + /// + /// Callers should first call [`Self::call_client_connected_maybe_reject`] + /// to verify that the database at `module_rx` approves of this connection, + /// and should not invoke this method if that call returns an error, + /// and pass the returned [`Connected`] as `_proof_of_client_connected_call`. pub async fn spawn( id: ClientActorId, config: ClientConfig, replica_id: u64, mut module_rx: watch::Receiver, actor: impl FnOnce(ClientConnection, MeteredReceiver) -> Fut, - ) -> Result + _proof_of_client_connected_call: Connected, + ) -> ClientConnection where Fut: Future + Send + 'static, { @@ -409,7 +443,6 @@ impl ClientConnection { // logically subscribed to the database, not any particular replica. We should handle failover for // them and stuff. Not right now though. let module = module_rx.borrow_and_update().clone(); - module.call_identity_connected(id.identity, id.connection_id).await?; let (sendtx, sendrx) = mpsc::channel::(CLIENT_CHANNEL_CAPACITY); @@ -455,7 +488,7 @@ impl ClientConnection { // if this fails, the actor() function called .abort(), which like... okay, I guess? let _ = fut_tx.send(actor_fut); - Ok(this) + this } pub fn dummy(