Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51ad2f2
Add system table for credentials
jsdt Jul 18, 2025
cd6625a
Add jwt payload to SpacetimeAuth
jsdt Jul 21, 2025
4c1807f
Plumb the jwt payload around
jsdt Jul 23, 2025
6b98710
Merge branch 'jsdt/credential-table' into jsdt/extract-payload
jsdt Jul 23, 2025
c65bb78
Store client credentials.
jsdt Jul 23, 2025
168fd84
Tweak error handling
jsdt Jul 24, 2025
4ecac1a
Merge branch 'master' into jsdt/store-client-creds
jsdt Jul 24, 2025
a336ba8
Fix some system table issues.
jsdt Jul 30, 2025
89955d0
Merge branch 'master' into jsdt/store-client-creds
jsdt Aug 1, 2025
14a6158
fmt
jsdt Aug 1, 2025
f796f2f
Merge branch 'master' into jsdt/store-client-creds
jsdt Aug 4, 2025
5df0d93
Commit using the db, so it gets persisted.
jsdt Aug 4, 2025
37467bf
Cleanup
jsdt Aug 5, 2025
b063fe3
Rollback on errors during identity connected.
jsdt Aug 5, 2025
9272ae4
Merge branch 'master' into jsdt/store-client-creds
jsdt Aug 6, 2025
2462a95
Update comments.
jsdt Aug 6, 2025
9f4eccd
Merge branch 'master' into jsdt/store-client-creds
jsdt Aug 7, 2025
67d60ca
Merge branch 'master' into jsdt/store-client-creds-mergetest
jsdt Sep 11, 2025
7c384f3
Cleanup
jsdt Sep 11, 2025
6819fe6
Attempt to merge, but this fails some tests.
jsdt Sep 12, 2025
63f35a7
Fix inserted line.
jsdt Sep 15, 2025
f637e01
Merge branch 'master' into jsdt/store-client-creds
jsdt Sep 15, 2025
765b69d
Merge branch 'master' into jsdt/store-client-creds
jsdt Sep 16, 2025
7bdabc0
Update comment
jsdt Sep 16, 2025
fc7cd3d
Remove commented out code.
jsdt Sep 18, 2025
54a9e7e
Merge branch 'master' into jsdt/store-client-creds
jsdt Sep 18, 2025
c80ac15
Add helper function
jsdt Sep 19, 2025
392ef4e
Merge branch 'master' into jsdt/store-client-creds
jsdt Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions crates/client-api/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ impl SpacetimeCreds {
pub struct SpacetimeAuth {
pub creds: SpacetimeCreds,
pub claims: SpacetimeIdentityClaims,
// The decoded JWT payload.
pub raw_payload: String,
/// The JWT payload as a json string (after base64 decoding).
pub jwt_payload: String,
}

impl From<SpacetimeAuth> for ConnectionAuthCtx {
fn from(auth: SpacetimeAuth) -> Self {
ConnectionAuthCtx {
claims: auth.claims,
jwt_payload: auth.raw_payload.clone(),
jwt_payload: auth.jwt_payload.clone(),
}
}
}
Expand Down Expand Up @@ -131,6 +131,9 @@ impl TokenClaims {
Identity::from_claims(&self.issuer, &self.subject)
}

/// Encode the claims into a JWT token and sign it with the provided signer.
/// This also adds claims for expiry and issued at time.
/// Returns an object representing the claims and the signed token.
pub fn encode_and_sign_with_expiry(
&self,
signer: &impl TokenSigner,
Expand All @@ -150,6 +153,9 @@ impl TokenClaims {
Ok((claims, token))
}

/// Encode the claims into a JWT token and sign it with the provided signer.
/// This also adds a claim for issued at time.
/// Returns an object representing the claims and the signed token.
pub fn encode_and_sign(&self, signer: &impl TokenSigner) -> Result<(SpacetimeIdentityClaims, String), JwtError> {
Comment thread
jsdt marked this conversation as resolved.
self.encode_and_sign_with_expiry(signer, None)
}
Expand Down Expand Up @@ -177,7 +183,7 @@ impl SpacetimeAuth {
Ok(Self {
creds,
claims,
raw_payload: payload,
jwt_payload: payload,
})
}

Expand Down Expand Up @@ -351,7 +357,7 @@ impl<S: NodeDelegate + Send + Sync> axum::extract::FromRequestParts<S> for Space
let auth = SpacetimeAuth {
creds,
claims,
raw_payload: payload,
jwt_payload: payload,
};
Ok(Self { auth: Some(auth) })
}
Expand Down
29 changes: 19 additions & 10 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,19 @@ where
None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"),
};

let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
let client = match ClientConnection::spawn(
log::debug!("websocket: New client connected from {client_log_string}");

let connected = match ClientConnection::call_client_connected_maybe_reject(
&mut module_rx,
client_id,
auth.into(),
client_config,
leader.replica_id,
module_rx,
actor,
auth.clone().into(),
)
.await
{
Ok(s) => s,
Ok(connected) => {
log::debug!("websocket: client_connected returned Ok for {client_log_string}");
connected
}
Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => {
log::info!(
"websocket: Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}"
Expand All @@ -206,8 +207,16 @@ where
);

let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
let client =
ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor, connected).await;
let client = ClientConnection::spawn(
client_id,
auth.into(),
client_config,
leader.replica_id,
module_rx,
actor,
connected,
)
.await;

// Send the client their identity token message as the first message
// NOTE: We're adding this to the protocol because some client libraries are
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,10 @@ impl ClientConnection {
pub async fn call_client_connected_maybe_reject(
module_rx: &mut watch::Receiver<ModuleHost>,
id: ClientActorId,
auth: ConnectionAuthCtx,
) -> Result<Connected, ClientConnectedError> {
let module = module_rx.borrow_and_update().clone();
module.call_identity_connected(id.identity, id.connection_id).await?;
module.call_identity_connected(auth, id.connection_id).await?;
Ok(Connected { _private: () })
}

Expand All @@ -455,7 +456,6 @@ impl ClientConnection {
// logically subscribed to the database, not any particular replica. We should handle failover for
// them and stuff. Not right now though.
let module = module_rx.borrow_and_update().clone();
module.call_identity_connected(auth.clone(), id.connection_id).await?;

let (sendtx, sendrx) = mpsc::channel::<SerializableMessage>(CLIENT_CHANNEL_CAPACITY);

Expand Down
1 change: 0 additions & 1 deletion crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use spacetimedb_vm::expr::Crud;

pub use spacetimedb_datastore::error::{DatastoreError, IndexError, SequenceError, TableError};

// #[derive(Error, Debug, PartialEq, Eq)]
#[derive(Error, Debug)]
pub enum ClientError {
#[error("Client not found: {0}")]
Expand Down
73 changes: 41 additions & 32 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use derive_more::From;
use indexmap::IndexSet;
use itertools::Itertools;
use prometheus::{Histogram, IntGauge};
use scopeguard::ScopeGuard;
use spacetimedb_auth::identity::ConnectionAuthCtx;
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate};
use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
Expand Down Expand Up @@ -690,13 +691,38 @@ impl ModuleHost {
let me = self.clone();
self.call("call_identity_connected", move |inst| {
let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnConnect);
let stdb = &me.module.replica_ctx().relational_db;
let workload = Workload::Reducer(ReducerContext {
name: "call_identity_connected".to_owned(),
caller_identity: caller_auth.claims.identity,
caller_connection_id,
timestamp: Timestamp::now(),
arg_bsatn: Bytes::new(),
});
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload);
let mut mut_tx = scopeguard::guard(mut_tx, |mut_tx| {
// If we crash before committing, we need to ensure that the transaction is rolled back.
// This is necessary to avoid leaving the database in an inconsistent state.
log::debug!("call_identity_connected: rolling back transaction");
let (metrics, reducer_name) = mut_tx.rollback();
stdb.report_mut_tx_metrics(reducer_name, metrics, None);
});

mut_tx
.insert_st_client(
caller_auth.claims.identity,
caller_connection_id,
&caller_auth.jwt_payload,
)
.map_err(DBError::from)?;

if let Some((reducer_id, reducer_def)) = reducer_lookup {
// The module defined a lifecycle reducer to handle new connections.
// Call this reducer.
// If the call fails (as in, something unexpectedly goes wrong with WASM execution),
// abort the connection: we can't really recover.
let reducer_outcome = me.call_reducer_inner_with_inst(
Some(ScopeGuard::into_inner(mut_tx)),
caller_auth.claims.identity,
Some(caller_connection_id),
None,
Expand Down Expand Up @@ -728,38 +754,19 @@ impl ModuleHost {
}
} else {
// The module doesn't define a client_connected reducer.
// Commit a transaction to update `st_clients`
// and to ensure we always have those events paired in the commitlog.
// We need to commit the transaction to update st_clients and st_connection_credentials.
//
// This is necessary to be able to disconnect clients after a server crash.
let reducer_name = reducer_lookup
.as_ref()
.map(|(_, def)| &*def.name)
.unwrap_or("__identity_connected__");

let workload = Workload::Reducer(ReducerContext {
name: reducer_name.to_owned(),
caller_identity: caller_auth.claims.identity,
caller_connection_id,
timestamp: Timestamp::now(),
arg_bsatn: Bytes::new(),
});

let stdb = me.module.replica_ctx().relational_db.clone();
stdb.with_auto_commit(workload, |mut_tx| {
mut_tx
.insert_st_client(caller_auth.claims.identity, caller_connection_id)
.map_err(DBError::from)?;
mut_tx
.insert_st_client_credentials(caller_connection_id, &caller_auth.jwt_payload)
.map_err(DBError::from)
})
.inspect_err(|e| {
log::error!(
"`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}"
)
})
.map_err(Into::into)

// TODO: Is this being broadcast? Does it need to be, or are st_client table subscriptions
// not allowed?
// I don't think it was being broadcast previously.
Comment thread
jsdt marked this conversation as resolved.
stdb.finish_tx(ScopeGuard::into_inner(mut_tx), Ok(()))
.map_err(|e: DBError| {
log::error!("`call_identity_connected`: finish transaction failed: {e:#?}");
ClientConnectedError::DBError(e)
})?;
Ok(())
}
})
.await
Expand Down Expand Up @@ -813,6 +820,7 @@ impl ModuleHost {
// If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured
// that `st_client` is updated appropriately.
let result = me.call_reducer_inner_with_inst(
None,
caller_identity,
Some(caller_connection_id),
None,
Expand Down Expand Up @@ -917,6 +925,7 @@ impl ModuleHost {
}
fn call_reducer_inner_with_inst(
&self,
tx: Option<MutTxId>,
caller_identity: Identity,
caller_connection_id: Option<ConnectionId>,
client: Option<Arc<ClientConnectionSender>>,
Expand All @@ -932,7 +941,7 @@ impl ModuleHost {
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);

Ok(module_instance.call_reducer(
None,
tx,
CallReducerParams {
timestamp: Timestamp::now(),
caller_identity,
Expand Down
31 changes: 2 additions & 29 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,36 +363,9 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
.with_label_values(&database_identity, reducer_name);

let workload = Workload::Reducer(ReducerContext::from(op.clone()));
let mut tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload));
let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload));
let _guard = metric_reducer_plus_query_duration.with_timer(tx.timer);

// For OnConnect, we insert the credentials before the reducer, so we can look them up
// inside that reducer.
// If the connection is rejected, this should get rolled back.
if let Some(Lifecycle::OnConnect) = reducer_def.lifecycle {
let client_clone = match client.clone() {
Some(client) => client,
None => {
log::error!("OnConnect reducer called without a client");
return ReducerCallResult {
outcome: ReducerOutcome::Failed("OnConnect reducer called without a client".into()),
energy_used: EnergyQuanta::ZERO,
execution_duration: Duration::ZERO,
};
}
};
if let Some(err) = tx
.insert_st_client_credentials(caller_connection_id, &client_clone.auth.jwt_payload)
.err()
{
return ReducerCallResult {
outcome: ReducerOutcome::Failed(format!("Error inserting client credentials: {err}")),
energy_used: EnergyQuanta::ZERO,
execution_duration: Duration::ZERO,
};
}
};

let mut tx_slot = self.instance.instance_env().tx.clone();

let reducer_span = tracing::trace_span!(
Expand Down Expand Up @@ -484,7 +457,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
// and conversely removing from `st_clients` on disconnect.
Ok(Ok(())) => {
let res = match reducer_def.lifecycle {
Some(Lifecycle::OnConnect) => tx.insert_st_client(caller_identity, caller_connection_id),
Some(Lifecycle::OnConnect) => Ok(()),
Some(Lifecycle::OnDisconnect) => {
tx.delete_st_client(caller_identity, caller_connection_id, database_identity)
}
Expand Down
55 changes: 25 additions & 30 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,37 +876,16 @@ impl ModuleSubscriptions {
return Ok(Err(WriteConflict));
};
*db_update = DatabaseUpdate::from_writes(&tx_data);
(read_tx, Some(tx_data), tx_metrics)
(read_tx, Arc::new(tx_data), tx_metrics)
}
EventStatus::Failed(_) | EventStatus::OutOfEnergy => {
let (tx_metrics, tx) = stdb.rollback_mut_tx_downgrade(tx, Workload::Update);
(tx, None, tx_metrics)
}
};

let tx_data = tx_data.map(Arc::new);

// When we're done with this method, release the tx and report metrics.
let mut read_tx = scopeguard::guard(read_tx, |tx| {
let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx);
self.relational_db
.report_tx_metrics(reducer, tx_data.clone(), Some(tx_metrics_mut), Some(tx_metrics_read));
});
// Create the delta transaction we'll use to eval updates against.
let delta_read_tx = tx_data
.as_ref()
.as_ref()
.map(|tx_data| DeltaTx::new(&read_tx, tx_data, subscriptions.index_ids_for_subscriptions()))
.unwrap_or_else(|| DeltaTx::from(&*read_tx));
// If the transaction failed, we need to rollback the mutable tx.
// We don't need to do any subscription updates in this case, so we will exit early.

let event = Arc::new(event);
let mut update_metrics: ExecutionMetrics = ExecutionMetrics::default();

match &event.status {
EventStatus::Committed(_) => {
update_metrics = subscriptions.eval_updates_sequential(&delta_read_tx, event.clone(), caller);
}
EventStatus::Failed(_) => {
let event = Arc::new(event);
let (tx_metrics, reducer) = stdb.rollback_mut_tx(tx);
self.relational_db
.report_tx_metrics(reducer, None, Some(tx_metrics), None);
if let Some(client) = caller {
let message = TransactionUpdateMessage {
event: Some(event.clone()),
Expand All @@ -917,9 +896,25 @@ impl ModuleSubscriptions {
} else {
log::trace!("Reducer failed but there is no client to send the failure to!")
}
return Ok(Ok((event, ExecutionMetrics::default())));
}
EventStatus::OutOfEnergy => {} // ?
}
};
let event = Arc::new(event);

// When we're done with this method, release the tx and report metrics.
let mut read_tx = scopeguard::guard(read_tx, |tx| {
let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx);
self.relational_db.report_tx_metrics(
reducer,
Some(tx_data.clone()),
Some(tx_metrics_mut),
Some(tx_metrics_read),
);
});
// Create the delta transaction we'll use to eval updates against.
let delta_read_tx = DeltaTx::new(&read_tx, tx_data.as_ref(), subscriptions.index_ids_for_subscriptions());

let update_metrics = subscriptions.eval_updates_sequential(&delta_read_tx, event.clone(), caller);

// Merge in the subscription evaluation metrics.
read_tx.metrics.merge(update_metrics);
Expand Down
Loading
Loading