Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where

let identity_token = auth.creds.token().into();

let module_rx = leader.module_watcher().await.map_err(log_and_500)?;
let mut module_rx = leader.module_watcher().await.map_err(log_and_500)?;

let client_id = ClientActorId {
identity: auth.identity,
Expand All @@ -168,26 +168,36 @@ where
}
};

match forwarded_for {
let identity = client_id.identity;
let client_log_string = match forwarded_for {
Some(TypedHeader(XForwardedFor(ip))) => {
log::debug!("New client connected from ip {ip}")
format!("ip {ip} with Identity {identity} and ConnectionId {connection_id}")
}
None => log::debug!("New client connected from unknown ip"),
}
None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"),
};

let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await
{
Ok(s) => s,
log::debug!("New client connected from {client_log_string}");

match ClientConnection::call_client_connected_maybe_reject(&mut module_rx, client_id).await {
Ok(()) => log::info!("client_connected returned Ok for {client_log_string}"),
Comment thread
gefjon marked this conversation as resolved.
Outdated
Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => {
log::info!("{e}");
log::info!(
"Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}"
Comment thread
gefjon marked this conversation as resolved.
Outdated
);
return;
}
Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => {
log::warn!("ModuleHost died while we were connecting: {e:#}");
log::warn!("ModuleHost died while {client_log_string} was connecting: {e:#}");
return;
}
};
}

log::debug!(
"Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection"
);

let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
let client = ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await;

// Send the client their identity token message as the first message
// NOTE: We're adding this to the protocol because some client libraries are
Expand All @@ -200,7 +210,7 @@ where
connection_id,
};
if let Err(e) = client.send_message(message) {
log::warn!("{e}, before identity token was sent")
log::warn!("Error sending IdentityToken message to {client_log_string}: {e}");
}
});

Expand Down
24 changes: 20 additions & 4 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,31 @@ const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB;
const KB: usize = 1024;

impl ClientConnection {
/// Returns an error if ModuleHost closed
/// Call the database at `module_rx`'s `client_connection` reducer, if any,
/// and return `Err` if it signals rejecting this client's connection.
///
/// Call this method before [`Self::spawn`],
/// and do not call [`Self::spawn`] if this method returns `Err`.
pub async fn call_client_connected_maybe_reject(
Comment thread
gefjon marked this conversation as resolved.
module_rx: &mut watch::Receiver<ModuleHost>,
id: ClientActorId,
) -> Result<(), ClientConnectedError> {
let module = module_rx.borrow_and_update().clone();
module.call_identity_connected(id.identity, id.connection_id).await
}

/// Spawn a new [`ClientConnection`] for a WebSocket subscriber.
///
/// Callers should first call [`Self::call_client_connected_maybe_reject`]
/// to verify that the database at `module_rx` approves of this connection,
/// and should not invoke this method if that call returns an error.
pub async fn spawn<Fut>(
id: ClientActorId,
config: ClientConfig,
replica_id: u64,
mut module_rx: watch::Receiver<ModuleHost>,
actor: impl FnOnce(ClientConnection, MeteredReceiver<SerializableMessage>) -> Fut,
) -> Result<ClientConnection, ClientConnectedError>
) -> ClientConnection
where
Fut: Future<Output = ()> + Send + 'static,
{
Expand All @@ -409,7 +426,6 @@ impl ClientConnection {
// logically subscribed to the database, not any particular replica. We should handle failover for
// them and stuff. Not right now though.
let module = module_rx.borrow_and_update().clone();
module.call_identity_connected(id.identity, id.connection_id).await?;

let (sendtx, sendrx) = mpsc::channel::<SerializableMessage>(CLIENT_CHANNEL_CAPACITY);

Expand Down Expand Up @@ -455,7 +471,7 @@ impl ClientConnection {
// if this fails, the actor() function called .abort(), which like... okay, I guess?
let _ = fut_tx.send(actor_fut);

Ok(this)
this
}

pub fn dummy(
Expand Down
Loading