Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
};

module
.call_identity_disconnected(caller_identity, connection_id)
// We don't clear views after reducer calls
.call_identity_disconnected(caller_identity, connection_id, false)
.await
.map_err(client_disconnected_error_to_response)?;

Expand Down Expand Up @@ -274,7 +275,8 @@ async fn procedure<S: ControlStateDelegate + NodeDelegate>(
};

module
.call_identity_disconnected(caller_identity, connection_id)
// We don't clear views after procedure calls
.call_identity_disconnected(caller_identity, connection_id, false)
.await
.map_err(client_disconnected_error_to_response)?;

Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,11 @@ impl RelationalDB {
Ok(rows_deleted)
}

/// Clear all rows from all view tables without dropping them.
pub fn clear_all_views(&self, tx: &mut MutTx) -> Result<(), DBError> {
Ok(tx.clear_all_views()?)
}

pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {
Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?)
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,10 @@ impl Host {
} = launched;

// Disconnect dangling clients.
// No need to clear view tables here since we do it in `clear_all_clients`.
for (identity, connection_id) in connected_clients {
module_host
.call_identity_disconnected(identity, connection_id)
.call_identity_disconnected(identity, connection_id, false)
.await
.with_context(|| {
format!(
Expand Down
26 changes: 22 additions & 4 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID};
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID};
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
use spacetimedb_durability::DurableOffset;
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
Expand Down Expand Up @@ -903,7 +903,7 @@ impl ModuleHost {
// Call the `client_disconnected` reducer, if it exists.
// This is a no-op if the module doesn't define such a reducer.
this.subscriptions().remove_subscriber(client_id);
this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst)
this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst, true)
})
.await
{
Expand Down Expand Up @@ -1024,6 +1024,7 @@ impl ModuleHost {
caller_identity: Identity,
caller_connection_id: ConnectionId,
inst: &mut Instance,
drop_view_subscribers: bool,
) -> Result<(), ReducerCallError> {
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
let reducer_name = reducer_lookup
Expand All @@ -1046,10 +1047,22 @@ impl ModuleHost {
let me = self.clone();
let stdb = me.module.replica_ctx().relational_db.clone();

// Decrement the number of subscribers for each view this caller is subscribed to
let dec_view_subscribers = |tx: &mut MutTxId| {
if drop_view_subscribers {
if let Err(err) = tx.dec_st_view_subscribers(caller_identity) {
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
}
}
};

// A fallback transaction that deletes the client from `st_client`.
let fallback = || {
let database_identity = me.info.database_identity;
stdb.with_auto_commit(workload(), |mut_tx| {

dec_view_subscribers(mut_tx);

if !is_client_exist(mut_tx) {
// The client is already gone. Nothing to do.
log::debug!(
Expand All @@ -1076,7 +1089,9 @@ impl ModuleHost {

if let Some((reducer_id, reducer_def)) = reducer_lookup {
let stdb = me.module.replica_ctx().relational_db.clone();
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());
let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());

dec_view_subscribers(&mut mut_tx);

if !is_client_exist(&mut_tx) {
// The client is already gone. Nothing to do.
Expand Down Expand Up @@ -1151,10 +1166,11 @@ impl ModuleHost {
&self,
caller_identity: Identity,
caller_connection_id: ConnectionId,
drop_view_subscribers: bool,
) -> Result<(), ReducerCallError> {
let me = self.clone();
self.call("call_identity_disconnected", move |inst| {
me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst)
me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst, drop_view_subscribers)
})
.await?
}
Expand All @@ -1166,8 +1182,10 @@ impl ModuleHost {
let stdb = &me.module.replica_ctx().relational_db;
let workload = Workload::Internal;
stdb.with_auto_commit(workload, |mut_tx| {
stdb.clear_all_views(mut_tx)?;
stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?;
stdb.clear_table(mut_tx, ST_CLIENT_ID)?;
stdb.clear_table(mut_tx, ST_VIEW_SUB_ID)?;
Ok::<(), DBError>(())
})
})
Expand Down
6 changes: 3 additions & 3 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::{
use crate::{
locking_tx_datastore::mut_tx::ReadSet,
system_tables::{
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_CLIENT_ID, ST_VIEW_CLIENT_IDX,
ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID,
ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
},
};
use anyhow::anyhow;
Expand Down Expand Up @@ -304,7 +304,7 @@ impl CommittedState {
self.create_table(ST_VIEW_ID, schemas[ST_VIEW_IDX].clone());
self.create_table(ST_VIEW_PARAM_ID, schemas[ST_VIEW_PARAM_IDX].clone());
self.create_table(ST_VIEW_COLUMN_ID, schemas[ST_VIEW_COLUMN_IDX].clone());
self.create_table(ST_VIEW_CLIENT_ID, schemas[ST_VIEW_CLIENT_IDX].clone());
self.create_table(ST_VIEW_SUB_ID, schemas[ST_VIEW_SUB_IDX].clone());
self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone());

// Insert the sequences into `st_sequences`
Expand Down
34 changes: 18 additions & 16 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,13 +1252,13 @@ mod tests {
use crate::system_tables::{
system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields,
StConstraintRow, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields,
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_NAME,
ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID,
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_ID,
ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME,
ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID,
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_CLIENT_ID,
ST_VIEW_CLIENT_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID,
ST_VIEW_PARAM_NAME,
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID,
ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID,
ST_VIEW_SUB_NAME,
};
use crate::traits::{IsolationLevel, MutTx};
use crate::Result;
Expand All @@ -1272,7 +1272,7 @@ mod tests {
use spacetimedb_lib::error::ResultTest;
use spacetimedb_lib::st_var::StVarValue;
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration};
use spacetimedb_primitives::{col_list, ColId, ScheduleId, ViewId};
use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId};
use spacetimedb_sats::algebraic_value::ser::value_serialize;
use spacetimedb_sats::bsatn::ToBsatn;
use spacetimedb_sats::layout::RowTypeLayout;
Expand Down Expand Up @@ -1715,7 +1715,7 @@ mod tests {
TableRow { id: ST_VIEW_ID.into(), name: ST_VIEW_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewFields::ViewId.into()) },
TableRow { id: ST_VIEW_PARAM_ID.into(), name: ST_VIEW_PARAM_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_COLUMN_ID.into(), name: ST_VIEW_COLUMN_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_CLIENT_ID.into(), name: ST_VIEW_CLIENT_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) },

]));
Expand Down Expand Up @@ -1793,10 +1793,12 @@ mod tests {
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 2, name: "col_name", ty: AlgebraicType::String },
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 3, name: "col_type", ty: AlgebraicType::bytes() },

ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() },
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 1, name: "arg_id", ty: AlgebraicType::U64 },
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 },
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 3, name: "connection_id", ty: AlgebraicType::U128 },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 1, name: "arg_id", ty: ArgId::get_type() },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 3, name: "num_subscribers", ty: AlgebraicType::U64 },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 4, name: "has_subscribers", ty: AlgebraicType::Bool },
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 5, name: "last_called", ty: AlgebraicType::I64 },

ColRow { table: ST_VIEW_ARG_ID.into(), pos: 0, name: "id", ty: AlgebraicType::U64 },
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() },
Expand All @@ -1820,8 +1822,8 @@ mod tests {
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", },
IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", },
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
]));
Expand Down Expand Up @@ -2282,8 +2284,8 @@ mod tests {
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", },
IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", },
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", },
Expand Down
62 changes: 59 additions & 3 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use super::{
SharedMutexGuard, SharedWriteGuard,
};
use crate::system_tables::{
system_tables, ConnectionIdViaU128, StConnectionCredentialsFields, StConnectionCredentialsRow, StViewColumnFields,
StViewFields, StViewParamFields, StViewParamRow, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID,
ST_VIEW_PARAM_ID,
system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow,
StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, StViewSubsFields, StViewSubsRow,
ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, ST_VIEW_SUB_ID,
};
use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags};
use crate::{
Expand Down Expand Up @@ -1767,6 +1767,62 @@ impl<'a, I: Iterator<Item = RowRef<'a>>> Iterator for FilterDeleted<'a, I> {
}

impl MutTxId {
/// Decrements the number of subscribers in `st_view_sub` for a client identity.
pub fn dec_st_view_subscribers(&mut self, sender: Identity) -> Result<()> {
let sender = IdentityViaU256(sender);
let cols = col_list![StViewSubsFields::Identity];
let value = sender.into();

// Collect the rows for this identity.
// These are rows for which we will decrement the subscriber count.
let rows_to_delete = self
.iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)?
.map(|row_ref| StViewSubsRow::try_from(row_ref).map(|row| (row, row_ref.pointer())))
.filter(|result| match result {
Ok((row, _)) => row.has_subscribers && row.num_subscribers > 0,
_ => true,
})
.collect::<Result<Vec<_>>>()?;

// Copy the rows to delete and decrement their subscriber count.
// These are the rows that we will insert.
let rows_to_insert = rows_to_delete
.iter()
.map(|(row, _)| row.clone())
.map(|row| StViewSubsRow {
num_subscribers: row.num_subscribers - 1,
has_subscribers: row.num_subscribers > 1,
..row
})
.collect::<Vec<_>>();

// Delete the old rows
for (_, ptr) in rows_to_delete {
self.delete(ST_VIEW_SUB_ID, ptr)?;
}

// Insert the new rows
for row in rows_to_insert {
self.insert_via_serialize_bsatn(ST_VIEW_SUB_ID, &row)?;
}

Ok(())
}

/// Clear all rows from all view tables without dropping them.
pub fn clear_all_views(&mut self) -> Result<()> {
for table_id in self
.iter(ST_VIEW_ID)?
.map(StViewRow::try_from)
.collect::<Result<Vec<_>>>()?
.into_iter()
.filter_map(|row| row.table_id)
{
self.clear_table(table_id)?;
}
Ok(())
}

pub fn insert_st_client(
&mut self,
identity: Identity,
Expand Down
Loading
Loading